sessions.rb 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667
  1. require 'json'
  2. require 'session_helper'
  3. module Sessions
  4. # get application root directory
  5. @root = Dir.pwd.to_s
  6. if !@root || @root.empty? || @root == '/'
  7. @root = Rails.root
  8. end
  9. # get working directories
  10. @path = "#{@root}/tmp/websocket_#{Rails.env}"
  11. # create global vars for threads
  12. @@client_threads = {} # rubocop:disable Style/ClassVars
  13. =begin
  14. start new session
  15. Sessions.create(client_id, session_data, { type: 'websocket' })
  16. returns
  17. true|false
  18. =end
  19. def self.create(client_id, session, meta)
  20. path = "#{@path}/#{client_id}"
  21. path_tmp = "#{@path}/tmp/#{client_id}"
  22. session_file = "#{path_tmp}/session"
  23. # collect session data
  24. meta[:last_ping] = Time.now.utc.to_i
  25. data = {
  26. user: session,
  27. meta: meta,
  28. }
  29. content = data.to_json
  30. # store session data in session file
  31. FileUtils.mkpath path_tmp
  32. File.open(session_file, 'wb') { |file|
  33. file.write content
  34. }
  35. # destory old session if needed
  36. if File.exist?(path)
  37. Sessions.destory(client_id)
  38. end
  39. # move to destination directory
  40. FileUtils.mv(path_tmp, path)
  41. # send update to browser
  42. if session && session['id']
  43. send(
  44. client_id,
  45. {
  46. event: 'ws:login',
  47. data: { success: true },
  48. }
  49. )
  50. end
  51. end
  52. =begin
  53. list of all session
  54. client_ids = Sessions.sessions
  55. returns
  56. ['4711', '4712']
  57. =end
  58. def self.sessions
  59. path = "#{@path}/"
  60. # just make sure that spool path exists
  61. if !File.exist?(path)
  62. FileUtils.mkpath path
  63. end
  64. data = []
  65. Dir.foreach(path) do |entry|
  66. next if entry == '.'
  67. next if entry == '..'
  68. next if entry == 'tmp'
  69. next if entry == 'spool'
  70. data.push entry.to_s
  71. end
  72. data
  73. end
  74. =begin
  75. list of all session
  76. Sessions.session_exists?(client_id)
  77. returns
  78. true|false
  79. =end
  80. def self.session_exists?(client_id)
  81. client_ids = sessions
  82. client_ids.include? client_id.to_s
  83. end
  84. =begin
  85. list of all session with data
  86. client_ids_with_data = Sessions.list
  87. returns
  88. {
  89. '4711' => {
  90. user: {
  91. 'id' => 123,
  92. },
  93. meta: {
  94. type: 'websocket',
  95. last_ping: time_of_last_ping,
  96. }
  97. },
  98. '4712' => {
  99. user: {
  100. 'id' => 124,
  101. },
  102. meta: {
  103. type: 'ajax',
  104. last_ping: time_of_last_ping,
  105. }
  106. },
  107. }
  108. =end
  109. def self.list
  110. client_ids = sessions
  111. session_list = {}
  112. client_ids.each { |client_id|
  113. data = get(client_id)
  114. next if !data
  115. session_list[client_id] = data
  116. }
  117. session_list
  118. end
  119. =begin
  120. destroy session
  121. Sessions.destory(client_id)
  122. returns
  123. true|false
  124. =end
  125. def self.destory(client_id)
  126. path = "#{@path}/#{client_id}"
  127. FileUtils.rm_rf path
  128. end
  129. =begin
  130. destroy idle session
  131. list_of_client_ids = Sessions.destory_idle_sessions
  132. returns
  133. ['4711', '4712']
  134. =end
  135. def self.destory_idle_sessions(idle_time_in_sec = 240)
  136. list_of_closed_sessions = []
  137. clients = Sessions.list
  138. clients.each { |client_id, client|
  139. if !client[:meta] || !client[:meta][:last_ping] || ( client[:meta][:last_ping].to_i + idle_time_in_sec ) < Time.now.utc.to_i
  140. list_of_closed_sessions.push client_id
  141. Sessions.destory(client_id)
  142. end
  143. }
  144. list_of_closed_sessions
  145. end
  146. =begin
  147. touch session
  148. Sessions.touch(client_id)
  149. returns
  150. true|false
  151. =end
  152. def self.touch(client_id)
  153. data = get(client_id)
  154. return false if !data
  155. path = "#{@path}/#{client_id}"
  156. data[:meta][:last_ping] = Time.now.utc.to_i
  157. content = data.to_json
  158. File.open("#{path}/session", 'wb' ) { |file|
  159. file.write content
  160. }
  161. true
  162. end
  163. =begin
  164. get session data
  165. data = Sessions.get(client_id)
  166. returns
  167. {
  168. user: {
  169. 'id' => 123,
  170. },
  171. meta: {
  172. type: 'websocket',
  173. last_ping: time_of_last_ping,
  174. }
  175. }
  176. =end
  177. def self.get(client_id)
  178. session_dir = "#{@path}/#{client_id}"
  179. session_file = "#{session_dir}/session"
  180. data = nil
  181. # if no session dir exists, session got destoried
  182. if !File.exist? session_dir
  183. destory(client_id)
  184. log('debug', "missing session directory for '#{client_id}', remove session.")
  185. return
  186. end
  187. # if only session file is missing, then it's an error behavior
  188. if !File.exist? session_file
  189. destory(client_id)
  190. log('error', "missing session file for '#{client_id}', remove session.")
  191. return
  192. end
  193. begin
  194. File.open(session_file, 'rb') { |file|
  195. file.flock(File::LOCK_EX)
  196. all = file.read
  197. file.flock(File::LOCK_UN)
  198. data_json = JSON.parse(all)
  199. if data_json
  200. data = symbolize_keys(data_json)
  201. data[:user] = data_json['user'] # for compat. reasons
  202. end
  203. }
  204. rescue => e
  205. log('error', e.inspect)
  206. destory(client_id)
  207. log('error', "error in reading/parsing session file '#{session_file}', remove session.")
  208. return
  209. end
  210. data
  211. end
  212. =begin
  213. send message to client
  214. Sessions.send(client_id_of_recipient, data)
  215. returns
  216. true|false
  217. =end
  218. def self.send(client_id, data)
  219. path = "#{@path}/#{client_id}/"
  220. filename = "send-#{Time.now.utc.to_f}"
  221. check = true
  222. count = 0
  223. while check
  224. if File.exist?(path + filename)
  225. count += 1
  226. filename = "#{filename}-#{count}"
  227. else
  228. check = false
  229. end
  230. end
  231. return false if !File.directory? path
  232. File.open(path + 'a-' + filename, 'wb') { |file|
  233. file.flock(File::LOCK_EX)
  234. file.write data.to_json
  235. file.flock(File::LOCK_UN)
  236. file.close
  237. }
  238. return false if !File.exist?(path + 'a-' + filename)
  239. FileUtils.mv(path + 'a-' + filename, path + filename)
  240. true
  241. end
  242. =begin
  243. send message to recipient client
  244. Sessions.send_to(user_id, data)
  245. returns
  246. true|false
  247. =end
  248. def self.send_to(user_id, data)
  249. # list all current clients
  250. client_list = sessions
  251. client_list.each {|client_id|
  252. session = Sessions.get(client_id)
  253. next if !session
  254. next if !session[:user]
  255. next if !session[:user]['id']
  256. next if session[:user]['id'].to_i != user_id.to_i
  257. Sessions.send(client_id, data)
  258. }
  259. true
  260. end
  261. =begin
  262. send message to all authenticated client
  263. Sessions.broadcast(data)
  264. returns
  265. [array_with_client_ids_of_recipients]
  266. broadcase also to not authenticated client
  267. Sessions.broadcast(data, 'public') # public|authenticated
  268. broadcase also not to sender
  269. Sessions.broadcast(data, 'public', sender_user_id)
  270. =end
  271. def self.broadcast(data, recipient = 'authenticated', sender_user_id = nil)
  272. # list all current clients
  273. recipients = []
  274. client_list = sessions
  275. client_list.each {|client_id|
  276. session = Sessions.get(client_id)
  277. next if !session
  278. if recipient != 'public'
  279. next if !session[:user]
  280. next if !session[:user]['id']
  281. end
  282. if sender_user_id
  283. next if session[:user] && session[:user]['id'] && session[:user]['id'].to_i == sender_user_id.to_i
  284. end
  285. Sessions.send(client_id, data)
  286. recipients.push client_id
  287. }
  288. recipients
  289. end
  290. =begin
  291. get messages for client
  292. messages = Sessions.queue(client_id_of_recipient)
  293. returns
  294. [
  295. {
  296. key1 => 'some data of message 1',
  297. key2 => 'some data of message 1',
  298. },
  299. {
  300. key1 => 'some data of message 2',
  301. key2 => 'some data of message 2',
  302. },
  303. ]
  304. =end
  305. def self.queue(client_id)
  306. path = "#{@path}/#{client_id}/"
  307. data = []
  308. files = []
  309. Dir.foreach(path) {|entry|
  310. next if entry == '.'
  311. next if entry == '..'
  312. files.push entry
  313. }
  314. files.sort.each {|entry|
  315. filename = "#{path}/#{entry}"
  316. if /^send/ =~ entry
  317. data.push Sessions.queue_file_read(path, entry)
  318. end
  319. }
  320. data
  321. end
  322. def self.queue_file_read(path, filename)
  323. file_old = "#{path}#{filename}"
  324. file_new = "#{path}a-#{filename}"
  325. FileUtils.mv(file_old, file_new)
  326. all = ''
  327. File.open(file_new, 'rb') { |file|
  328. all = file.read
  329. }
  330. File.delete(file_new)
  331. JSON.parse(all)
  332. end
  333. def self.cleanup
  334. path = "#{@path}/spool/"
  335. FileUtils.rm_rf path
  336. path = "#{@path}/tmp/"
  337. FileUtils.rm_rf path
  338. end
  339. def self.spool_create(data)
  340. msg = JSON.generate(data)
  341. path = "#{@path}/spool/"
  342. FileUtils.mkpath path
  343. file_path = "#{path}/#{Time.now.utc.to_f}-#{rand(99_999)}"
  344. File.open(file_path, 'wb') { |file|
  345. data = {
  346. msg: msg,
  347. timestamp: Time.now.utc.to_i,
  348. }
  349. file.write data.to_json
  350. }
  351. end
  352. def self.spool_list(timestamp, current_user_id)
  353. path = "#{@path}/spool/"
  354. FileUtils.mkpath path
  355. data = []
  356. to_delete = []
  357. files = []
  358. Dir.foreach(path) {|entry|
  359. next if entry == '.'
  360. next if entry == '..'
  361. files.push entry
  362. }
  363. files.sort.each {|entry|
  364. filename = "#{path}/#{entry}"
  365. next if !File.exist?(filename)
  366. File.open(filename, 'rb') { |file|
  367. all = file.read
  368. spool = JSON.parse(all)
  369. begin
  370. message_parsed = JSON.parse(spool['msg'])
  371. rescue => e
  372. log('error', "can't parse spool message: #{message}, #{e.inspect}")
  373. next
  374. end
  375. # ignore message older then 48h
  376. if spool['timestamp'] + (2 * 86_400) < Time.now.utc.to_i
  377. to_delete.push "#{path}/#{entry}"
  378. next
  379. end
  380. # add spool attribute to push spool info to clients
  381. message_parsed['spool'] = true
  382. # only send not already now messages
  383. if !timestamp || timestamp < spool['timestamp']
  384. # spool to recipient list
  385. if message_parsed['recipient'] && message_parsed['recipient']['user_id']
  386. message_parsed['recipient']['user_id'].each { |user_id|
  387. next if current_user_id != user_id
  388. message = message_parsed
  389. if message_parsed['event'] == 'broadcast'
  390. message = message_parsed['data']
  391. end
  392. item = {
  393. type: 'direct',
  394. message: message,
  395. }
  396. data.push item
  397. }
  398. # spool to every client
  399. else
  400. message = message_parsed
  401. if message_parsed['event'] == 'broadcast'
  402. message = message_parsed['data']
  403. end
  404. item = {
  405. type: 'broadcast',
  406. message: message,
  407. }
  408. data.push item
  409. end
  410. end
  411. }
  412. }
  413. to_delete.each {|file|
  414. File.delete(file)
  415. }
  416. data
  417. end
  418. def self.jobs
  419. # just make sure that spool path exists
  420. if !File.exist?(@path)
  421. FileUtils.mkpath @path
  422. end
  423. Thread.abort_on_exception = true
  424. loop do
  425. client_ids = sessions
  426. client_ids.each { |client_id|
  427. # connection already open, ignore
  428. next if @@client_threads[client_id]
  429. # get current user
  430. session_data = Sessions.get(client_id)
  431. next if !session_data
  432. next if !session_data[:user]
  433. next if !session_data[:user]['id']
  434. user = User.lookup( id: session_data[:user]['id'] )
  435. next if !user
  436. # start client thread
  437. next if @@client_threads[client_id]
  438. @@client_threads[client_id] = true
  439. @@client_threads[client_id] = Thread.new {
  440. thread_client(client_id)
  441. @@client_threads[client_id] = nil
  442. log('debug', "close client (#{client_id}) thread")
  443. ActiveRecord::Base.connection.close
  444. }
  445. sleep 0.5
  446. }
  447. # system settings
  448. sleep 0.5
  449. end
  450. end
  451. =begin
  452. check if thread for client_id is running
  453. Sessions.thread_client_exists?(client_id)
  454. returns
  455. thread
  456. =end
  457. def self.thread_client_exists?(client_id)
  458. @@client_threads[client_id]
  459. end
  460. =begin
  461. start client for browser
  462. Sessions.thread_client(client_id)
  463. returns
  464. thread
  465. =end
  466. def self.thread_client(client_id, try_count = 0, try_run_time = Time.now.utc)
  467. log('debug', "LOOP #{client_id} - #{try_count}")
  468. begin
  469. Sessions::Client.new(client_id)
  470. rescue => e
  471. log('error', "thread_client #{client_id} exited with error #{e.inspect}")
  472. log('error', e.backtrace.join("\n ") )
  473. sleep 10
  474. begin
  475. ActiveRecord::Base.connection_pool.release_connection
  476. rescue => e
  477. log('error', "Can't reconnect to database #{e.inspect}")
  478. end
  479. try_run_max = 10
  480. try_count += 1
  481. # reset error counter if to old
  482. if try_run_time + ( 60 * 5 ) < Time.now.utc
  483. try_count = 0
  484. end
  485. try_run_time = Time.now.utc
  486. # restart job again
  487. if try_run_max > try_count
  488. thread_client(client_id, try_count, try_run_time)
  489. else
  490. raise "STOP thread_client for client #{client_id} after #{try_run_max} tries"
  491. end
  492. end
  493. log('debug', "/LOOP #{client_id} - #{try_count}")
  494. end
  495. def self.symbolize_keys(hash)
  496. hash.each_with_object({}) {|(key, value), result|
  497. new_key = case key
  498. when String then key.to_sym
  499. else key
  500. end
  501. new_value = case value
  502. when Hash then symbolize_keys(value)
  503. else value
  504. end
  505. result[new_key] = new_value
  506. }
  507. end
  508. # we use it in rails and non rails context
  509. def self.log(level, message)
  510. if defined?(Rails)
  511. if level == 'debug'
  512. Rails.logger.debug message
  513. elsif level == 'notice'
  514. Rails.logger.notice message
  515. else
  516. Rails.logger.error message
  517. end
  518. return
  519. end
  520. puts "#{Time.now.utc.iso8601}:#{level} #{message}" # rubocop:disable Rails/Output
  521. end
  522. end