sessions.rb 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726
  1. require 'json'
  2. require 'session_helper'
  3. module Sessions
  4. # get application root directory
  5. @root = Dir.pwd.to_s
  6. if @root.blank? || @root == '/'
  7. @root = Rails.root
  8. end
  9. # get working directories
  10. @path = "#{@root}/tmp/websocket_#{Rails.env}"
  11. # create global vars for threads
  12. @@client_threads = {} # rubocop:disable Style/ClassVars
  13. =begin
  14. start new session
  15. Sessions.create(client_id, session_data, { type: 'websocket' })
  16. returns
  17. true|false
  18. =end
  19. def self.create(client_id, session, meta)
  20. path = "#{@path}/#{client_id}"
  21. path_tmp = "#{@path}/tmp/#{client_id}"
  22. session_file = "#{path_tmp}/session"
  23. # collect session data
  24. meta[:last_ping] = Time.now.utc.to_i
  25. data = {
  26. user: session,
  27. meta: meta,
  28. }
  29. content = data.to_json
  30. # store session data in session file
  31. FileUtils.mkpath path_tmp
  32. File.open(session_file, 'wb') do |file|
  33. file.write content
  34. end
  35. # destroy old session if needed
  36. if File.exist?(path)
  37. Sessions.destroy(client_id)
  38. end
  39. # move to destination directory
  40. FileUtils.mv(path_tmp, path)
  41. # send update to browser
  42. return if !session || session['id'].blank?
  43. send(
  44. client_id,
  45. {
  46. event: 'ws:login',
  47. data: { success: true },
  48. }
  49. )
  50. end
  51. =begin
  52. list of all session
  53. client_ids = Sessions.sessions
  54. returns
  55. ['4711', '4712']
  56. =end
  57. def self.sessions
  58. path = "#{@path}/"
  59. # just make sure that spool path exists
  60. if !File.exist?(path)
  61. FileUtils.mkpath path
  62. end
  63. data = []
  64. Dir.foreach(path) do |entry|
  65. next if entry == '.'
  66. next if entry == '..'
  67. next if entry == 'tmp'
  68. next if entry == 'spool'
  69. data.push entry.to_s
  70. end
  71. data
  72. end
  73. =begin
  74. list of all session
  75. Sessions.session_exists?(client_id)
  76. returns
  77. true|false
  78. =end
  79. def self.session_exists?(client_id)
  80. session_dir = "#{@path}/#{client_id}"
  81. return false if !File.exist?(session_dir)
  82. session_file = "#{session_dir}/session"
  83. return false if !File.exist?(session_file)
  84. true
  85. end
  86. =begin
  87. list of all session with data
  88. client_ids_with_data = Sessions.list
  89. returns
  90. {
  91. '4711' => {
  92. user: {
  93. 'id' => 123,
  94. },
  95. meta: {
  96. type: 'websocket',
  97. last_ping: time_of_last_ping,
  98. }
  99. },
  100. '4712' => {
  101. user: {
  102. 'id' => 124,
  103. },
  104. meta: {
  105. type: 'ajax',
  106. last_ping: time_of_last_ping,
  107. }
  108. },
  109. }
  110. =end
  111. def self.list
  112. client_ids = sessions
  113. session_list = {}
  114. client_ids.each do |client_id|
  115. data = get(client_id)
  116. next if !data
  117. session_list[client_id] = data
  118. end
  119. session_list
  120. end
  121. =begin
  122. destroy session
  123. Sessions.destroy(client_id)
  124. returns
  125. true|false
  126. =end
  127. def self.destroy(client_id)
  128. path = "#{@path}/#{client_id}"
  129. FileUtils.rm_rf path
  130. end
  131. =begin
  132. destroy idle session
  133. list_of_client_ids = Sessions.destroy_idle_sessions
  134. returns
  135. ['4711', '4712']
  136. =end
  137. def self.destroy_idle_sessions(idle_time_in_sec = 240)
  138. list_of_closed_sessions = []
  139. clients = Sessions.list
  140. clients.each do |client_id, client|
  141. if !client[:meta] || !client[:meta][:last_ping] || ( client[:meta][:last_ping].to_i + idle_time_in_sec ) < Time.now.utc.to_i
  142. list_of_closed_sessions.push client_id
  143. Sessions.destroy(client_id)
  144. end
  145. end
  146. list_of_closed_sessions
  147. end
  148. =begin
  149. touch session
  150. Sessions.touch(client_id)
  151. returns
  152. true|false
  153. =end
  154. def self.touch(client_id)
  155. data = get(client_id)
  156. return false if !data
  157. path = "#{@path}/#{client_id}"
  158. data[:meta][:last_ping] = Time.now.utc.to_i
  159. File.open("#{path}/session", 'wb' ) do |file|
  160. file.flock(File::LOCK_EX)
  161. file.write data.to_json
  162. file.flock(File::LOCK_UN)
  163. end
  164. true
  165. end
  166. =begin
  167. get session data
  168. data = Sessions.get(client_id)
  169. returns
  170. {
  171. user: {
  172. 'id' => 123,
  173. },
  174. meta: {
  175. type: 'websocket',
  176. last_ping: time_of_last_ping,
  177. }
  178. }
  179. =end
  180. def self.get(client_id)
  181. session_dir = "#{@path}/#{client_id}"
  182. session_file = "#{session_dir}/session"
  183. data = nil
  184. # if no session dir exists, session got destoried
  185. if !File.exist?(session_dir)
  186. destroy(client_id)
  187. log('debug', "missing session directory #{session_dir} for '#{client_id}', remove session.")
  188. return
  189. end
  190. # if only session file is missing, then it's an error behavior
  191. if !File.exist?(session_file)
  192. destroy(client_id)
  193. log('error', "missing session file for '#{client_id}', remove session.")
  194. return
  195. end
  196. begin
  197. File.open(session_file, 'rb') do |file|
  198. file.flock(File::LOCK_SH)
  199. all = file.read
  200. file.flock(File::LOCK_UN)
  201. data_json = JSON.parse(all)
  202. if data_json
  203. data = symbolize_keys(data_json)
  204. data[:user] = data_json['user'] # for compat. reasons
  205. end
  206. end
  207. rescue => e
  208. log('error', e.inspect)
  209. destroy(client_id)
  210. log('error', "error in reading/parsing session file '#{session_file}', remove session.")
  211. return
  212. end
  213. data
  214. end
  215. =begin
  216. send message to client
  217. Sessions.send(client_id_of_recipient, data)
  218. returns
  219. true|false
  220. =end
  221. def self.send(client_id, data)
  222. path = "#{@path}/#{client_id}/"
  223. filename = "send-#{Time.now.utc.to_f}"
  224. location = "#{path}#{filename}"
  225. check = true
  226. count = 0
  227. while check
  228. if File.exist?(location)
  229. count += 1
  230. location = "#{path}#{filename}-#{count}"
  231. else
  232. check = false
  233. end
  234. end
  235. return false if !File.directory? path
  236. begin
  237. File.open(location, 'wb') do |file|
  238. file.flock(File::LOCK_EX)
  239. file.write data.to_json
  240. file.flock(File::LOCK_UN)
  241. file.close
  242. end
  243. rescue => e
  244. log('error', e.inspect)
  245. log('error', "error in writing message file '#{location}'")
  246. return false
  247. end
  248. true
  249. end
  250. =begin
  251. send message to recipient client
  252. Sessions.send_to(user_id, data)
  253. returns
  254. true|false
  255. =end
  256. def self.send_to(user_id, data)
  257. # list all current clients
  258. client_list = sessions
  259. client_list.each do |client_id|
  260. session = Sessions.get(client_id)
  261. next if !session
  262. next if !session[:user]
  263. next if !session[:user]['id']
  264. next if session[:user]['id'].to_i != user_id.to_i
  265. Sessions.send(client_id, data)
  266. end
  267. true
  268. end
  269. =begin
  270. send message to all authenticated client
  271. Sessions.broadcast(data)
  272. returns
  273. [array_with_client_ids_of_recipients]
  274. broadcase also to not authenticated client
  275. Sessions.broadcast(data, 'public') # public|authenticated
  276. broadcase also not to sender
  277. Sessions.broadcast(data, 'public', sender_user_id)
  278. =end
  279. def self.broadcast(data, recipient = 'authenticated', sender_user_id = nil)
  280. # list all current clients
  281. recipients = []
  282. client_list = sessions
  283. client_list.each do |client_id|
  284. session = Sessions.get(client_id)
  285. next if !session
  286. if recipient != 'public'
  287. next if session[:user].blank?
  288. next if session[:user]['id'].blank?
  289. end
  290. if sender_user_id
  291. next if session[:user] && session[:user]['id'] && session[:user]['id'].to_i == sender_user_id.to_i
  292. end
  293. Sessions.send(client_id, data)
  294. recipients.push client_id
  295. end
  296. recipients
  297. end
  298. =begin
  299. get messages for client
  300. messages = Sessions.queue(client_id_of_recipient)
  301. returns
  302. [
  303. {
  304. key1 => 'some data of message 1',
  305. key2 => 'some data of message 1',
  306. },
  307. {
  308. key1 => 'some data of message 2',
  309. key2 => 'some data of message 2',
  310. },
  311. ]
  312. =end
  313. def self.queue(client_id)
  314. path = "#{@path}/#{client_id}/"
  315. data = []
  316. files = []
  317. Dir.foreach(path) do |entry|
  318. next if entry == '.'
  319. next if entry == '..'
  320. files.push entry
  321. end
  322. files.sort.each do |entry|
  323. filename = "#{path}/#{entry}"
  324. next if entry !~ /^send/
  325. message = Sessions.queue_file_read(path, entry)
  326. next if !message
  327. data.push message
  328. end
  329. data
  330. end
  331. def self.queue_file_read(path, filename)
  332. location = "#{path}#{filename}"
  333. message = ''
  334. File.open(location, 'rb') do |file|
  335. file.flock(File::LOCK_EX)
  336. message = file.read
  337. file.flock(File::LOCK_UN)
  338. end
  339. File.delete(location)
  340. return if message.blank?
  341. begin
  342. return JSON.parse(message)
  343. rescue => e
  344. log('error', "can't parse queue message: #{message}, #{e.inspect}")
  345. return
  346. end
  347. end
  348. =begin
  349. remove all session and spool messages
  350. Sessions.cleanup
  351. =end
  352. def self.cleanup
  353. return true if !File.exist?(@path)
  354. FileUtils.rm_rf @path
  355. true
  356. end
  357. def self.spool_create(data)
  358. msg = JSON.generate(data)
  359. path = "#{@path}/spool/"
  360. FileUtils.mkpath path
  361. data = {
  362. msg: msg,
  363. timestamp: Time.now.utc.to_i,
  364. }
  365. file_path = "#{path}/#{Time.now.utc.to_f}-#{rand(99_999)}"
  366. File.open(file_path, 'wb') do |file|
  367. file.flock(File::LOCK_EX)
  368. file.write data.to_json
  369. file.flock(File::LOCK_UN)
  370. end
  371. end
  372. def self.spool_list(timestamp, current_user_id)
  373. path = "#{@path}/spool/"
  374. FileUtils.mkpath path
  375. data = []
  376. to_delete = []
  377. files = []
  378. Dir.foreach(path) do |entry|
  379. next if entry == '.'
  380. next if entry == '..'
  381. files.push entry
  382. end
  383. files.sort.each do |entry|
  384. filename = "#{path}/#{entry}"
  385. next if !File.exist?(filename)
  386. File.open(filename, 'rb') do |file|
  387. file.flock(File::LOCK_SH)
  388. message = file.read
  389. file.flock(File::LOCK_UN)
  390. begin
  391. spool = JSON.parse(message)
  392. message_parsed = JSON.parse(spool['msg'])
  393. rescue => e
  394. log('error', "can't parse spool message: #{message}, #{e.inspect}")
  395. to_delete.push "#{path}/#{entry}"
  396. next
  397. end
  398. # ignore message older then 48h
  399. if spool['timestamp'] + (2 * 86_400) < Time.now.utc.to_i
  400. to_delete.push "#{path}/#{entry}"
  401. next
  402. end
  403. # add spool attribute to push spool info to clients
  404. message_parsed['spool'] = true
  405. # only send not already now messages
  406. if !timestamp || timestamp < spool['timestamp']
  407. # spool to recipient list
  408. if message_parsed['recipient'] && message_parsed['recipient']['user_id']
  409. message_parsed['recipient']['user_id'].each do |user_id|
  410. next if current_user_id != user_id
  411. message = message_parsed
  412. if message_parsed['event'] == 'broadcast'
  413. message = message_parsed['data']
  414. end
  415. item = {
  416. type: 'direct',
  417. message: message,
  418. }
  419. data.push item
  420. end
  421. # spool to every client
  422. else
  423. message = message_parsed
  424. if message_parsed['event'] == 'broadcast'
  425. message = message_parsed['data']
  426. end
  427. item = {
  428. type: 'broadcast',
  429. message: message,
  430. }
  431. data.push item
  432. end
  433. end
  434. end
  435. end
  436. to_delete.each do |file|
  437. File.delete(file)
  438. end
  439. data
  440. end
  441. def self.jobs(node_id = nil)
  442. # just make sure that spool path exists
  443. if !File.exist?(@path)
  444. FileUtils.mkpath @path
  445. end
  446. # dispatch sessions
  447. if node_id&.zero?
  448. loop do
  449. # nodes
  450. nodes_stats = Sessions::Node.stats
  451. client_ids = sessions
  452. client_ids.each do |client_id|
  453. # ask nodes for nodes
  454. next if nodes_stats[client_id]
  455. # assigne to node
  456. Sessions::Node.session_assigne(client_id)
  457. sleep 1
  458. end
  459. sleep 1
  460. end
  461. end
  462. Thread.abort_on_exception = true
  463. loop do
  464. if node_id
  465. # register node
  466. Sessions::Node.register(node_id)
  467. # watch for assigned sessions
  468. client_ids = Sessions::Node.sessions_by(node_id)
  469. else
  470. client_ids = sessions
  471. end
  472. client_ids.each do |client_id|
  473. # connection already open, ignore
  474. next if @@client_threads[client_id]
  475. # get current user
  476. session_data = Sessions.get(client_id)
  477. next if session_data.blank?
  478. next if session_data[:user].blank?
  479. next if session_data[:user]['id'].blank?
  480. user = User.lookup(id: session_data[:user]['id'])
  481. next if user.blank?
  482. # start client thread
  483. next if @@client_threads[client_id].present?
  484. @@client_threads[client_id] = true
  485. @@client_threads[client_id] = Thread.new do
  486. thread_client(client_id, 0, Time.now.utc, node_id)
  487. @@client_threads[client_id] = nil
  488. log('debug', "close client (#{client_id}) thread")
  489. if ActiveRecord::Base.connection.owner == Thread.current
  490. ActiveRecord::Base.connection.close
  491. end
  492. end
  493. sleep 1
  494. end
  495. sleep 1
  496. end
  497. end
  498. =begin
  499. check if thread for client_id is running
  500. Sessions.thread_client_exists?(client_id)
  501. returns
  502. thread
  503. =end
  504. def self.thread_client_exists?(client_id)
  505. @@client_threads[client_id]
  506. end
  507. =begin
  508. start client for browser
  509. Sessions.thread_client(client_id)
  510. returns
  511. thread
  512. =end
  513. def self.thread_client(client_id, try_count = 0, try_run_time = Time.now.utc, node_id)
  514. log('debug', "LOOP #{node_id}.#{client_id} - #{try_count}")
  515. begin
  516. Sessions::Client.new(client_id, node_id)
  517. rescue => e
  518. log('error', "thread_client #{client_id} exited with error #{e.inspect}")
  519. log('error', e.backtrace.join("\n ") )
  520. sleep 10
  521. begin
  522. ActiveRecord::Base.connection_pool.release_connection
  523. rescue => e
  524. log('error', "Can't reconnect to database #{e.inspect}")
  525. end
  526. try_run_max = 10
  527. try_count += 1
  528. # reset error counter if to old
  529. if try_run_time + ( 60 * 5 ) < Time.now.utc
  530. try_count = 0
  531. end
  532. try_run_time = Time.now.utc
  533. # restart job again
  534. if try_run_max > try_count
  535. thread_client(client_id, try_count, try_run_time, node_id)
  536. end
  537. raise "STOP thread_client for client #{node_id}.#{client_id} after #{try_run_max} tries"
  538. end
  539. log('debug', "/LOOP #{node_id}.#{client_id} - #{try_count}")
  540. end
  541. def self.symbolize_keys(hash)
  542. hash.each_with_object({}) do |(key, value), result|
  543. new_key = case key
  544. when String then key.to_sym
  545. else key
  546. end
  547. new_value = case value
  548. when Hash then symbolize_keys(value)
  549. else value
  550. end
  551. result[new_key] = new_value
  552. end
  553. end
  554. # we use it in rails and non rails context
  555. def self.log(level, message)
  556. if defined?(Rails)
  557. if level == 'debug'
  558. Rails.logger.debug { message }
  559. elsif level == 'notice'
  560. Rails.logger.notice message
  561. else
  562. Rails.logger.error message
  563. end
  564. return
  565. end
  566. puts "#{Time.now.utc.iso8601}:#{level} #{message}" # rubocop:disable Rails/Output
  567. end
  568. end