sessions.rb 14 KB


  1. require 'json'
  2. require 'session_helper'
  3. module Sessions
  4. # get application root directory
  5. @root = Dir.pwd.to_s
  6. if !@root || @root.empty? || @root == '/'
  7. @root = Rails.root
  8. end
  9. # get working directories
  10. @path = "#{@root}/tmp/websocket_#{Rails.env}"
  11. # create global vars for threads
  12. @@client_threads = {} # rubocop:disable Style/ClassVars
  13. =begin
  14. start new session
  15. Sessions.create(client_id, session_data, { type: 'websocket' })
  16. returns
  17. true|false
  18. =end
  19. def self.create(client_id, session, meta)
  20. path = "#{@path}/#{client_id}"
  21. path_tmp = "#{@path}/tmp/#{client_id}"
  22. session_file = "#{path_tmp}/session"
  23. # collect session data
  24. meta[:last_ping] = Time.now.utc.to_i
  25. data = {
  26. user: session,
  27. meta: meta,
  28. }
  29. content = data.to_json
  30. # store session data in session file
  31. FileUtils.mkpath path_tmp
  32. File.open(session_file, 'wb') { |file|
  33. file.write content
  34. }
  35. # destroy old session if needed
  36. if File.exist?(path)
  37. Sessions.destroy(client_id)
  38. end
  39. # move to destination directory
  40. FileUtils.mv(path_tmp, path)
  41. # send update to browser
  42. if session && session['id']
  43. send(
  44. client_id,
  45. {
  46. event: 'ws:login',
  47. data: { success: true },
  48. }
  49. )
  50. end
  51. end
  52. =begin
  53. list of all session
  54. client_ids = Sessions.sessions
  55. returns
  56. ['4711', '4712']
  57. =end
  58. def self.sessions
  59. path = "#{@path}/"
  60. # just make sure that spool path exists
  61. if !File.exist?(path)
  62. FileUtils.mkpath path
  63. end
  64. data = []
  65. Dir.foreach(path) do |entry|
  66. next if entry == '.'
  67. next if entry == '..'
  68. next if entry == 'tmp'
  69. next if entry == 'spool'
  70. data.push entry.to_s
  71. end
  72. data
  73. end
  74. =begin
  75. list of all session
  76. Sessions.session_exists?(client_id)
  77. returns
  78. true|false
  79. =end
  80. def self.session_exists?(client_id)
  81. client_ids = sessions
  82. client_ids.include? client_id.to_s
  83. end
  84. =begin
  85. list of all session with data
  86. client_ids_with_data = Sessions.list
  87. returns
  88. {
  89. '4711' => {
  90. user: {
  91. 'id' => 123,
  92. },
  93. meta: {
  94. type: 'websocket',
  95. last_ping: time_of_last_ping,
  96. }
  97. },
  98. '4712' => {
  99. user: {
  100. 'id' => 124,
  101. },
  102. meta: {
  103. type: 'ajax',
  104. last_ping: time_of_last_ping,
  105. }
  106. },
  107. }
  108. =end
  109. def self.list
  110. client_ids = sessions
  111. session_list = {}
  112. client_ids.each { |client_id|
  113. data = get(client_id)
  114. next if !data
  115. session_list[client_id] = data
  116. }
  117. session_list
  118. end
  119. =begin
  120. destroy session
  121. Sessions.destroy(client_id)
  122. returns
  123. true|false
  124. =end
  125. def self.destroy(client_id)
  126. path = "#{@path}/#{client_id}"
  127. FileUtils.rm_rf path
  128. end
  129. =begin
  130. destroy idle session
  131. list_of_client_ids = Sessions.destroy_idle_sessions
  132. returns
  133. ['4711', '4712']
  134. =end
  135. def self.destroy_idle_sessions(idle_time_in_sec = 240)
  136. list_of_closed_sessions = []
  137. clients = Sessions.list
  138. clients.each { |client_id, client|
  139. if !client[:meta] || !client[:meta][:last_ping] || ( client[:meta][:last_ping].to_i + idle_time_in_sec ) < Time.now.utc.to_i
  140. list_of_closed_sessions.push client_id
  141. Sessions.destroy(client_id)
  142. end
  143. }
  144. list_of_closed_sessions
  145. end
  146. =begin
  147. touch session
  148. Sessions.touch(client_id)
  149. returns
  150. true|false
  151. =end
  152. def self.touch(client_id)
  153. data = get(client_id)
  154. return false if !data
  155. path = "#{@path}/#{client_id}"
  156. data[:meta][:last_ping] = Time.now.utc.to_i
  157. content = data.to_json
  158. File.open("#{path}/session", 'wb' ) { |file|
  159. file.write content
  160. }
  161. true
  162. end
  163. =begin
  164. get session data
  165. data = Sessions.get(client_id)
  166. returns
  167. {
  168. user: {
  169. 'id' => 123,
  170. },
  171. meta: {
  172. type: 'websocket',
  173. last_ping: time_of_last_ping,
  174. }
  175. }
  176. =end
  177. def self.get(client_id)
  178. session_dir = "#{@path}/#{client_id}"
  179. session_file = "#{session_dir}/session"
  180. data = nil
  181. # if no session dir exists, session got destoried
  182. if !File.exist? session_dir
  183. destroy(client_id)
  184. log('debug', "missing session directory for '#{client_id}', remove session.")
  185. return
  186. end
  187. # if only session file is missing, then it's an error behavior
  188. if !File.exist? session_file
  189. destroy(client_id)
  190. log('error', "missing session file for '#{client_id}', remove session.")
  191. return
  192. end
  193. begin
  194. File.open(session_file, 'rb') { |file|
  195. file.flock(File::LOCK_EX)
  196. all = file.read
  197. file.flock(File::LOCK_UN)
  198. data_json = JSON.parse(all)
  199. if data_json
  200. data = symbolize_keys(data_json)
  201. data[:user] = data_json['user'] # for compat. reasons
  202. end
  203. }
  204. rescue => e
  205. log('error', e.inspect)
  206. destroy(client_id)
  207. log('error', "error in reading/parsing session file '#{session_file}', remove session.")
  208. return
  209. end
  210. data
  211. end
  212. =begin
  213. send message to client
  214. Sessions.send(client_id_of_recipient, data)
  215. returns
  216. true|false
  217. =end
  218. def self.send(client_id, data)
  219. path = "#{@path}/#{client_id}/"
  220. filename = "send-#{Time.now.utc.to_f}"
  221. location = "#{path}#{filename}"
  222. check = true
  223. count = 0
  224. while check
  225. if File.exist?(location)
  226. count += 1
  227. location = "#{path}#{filename}-#{count}"
  228. else
  229. check = false
  230. end
  231. end
  232. return false if !File.directory? path
  233. begin
  234. File.open(location, 'wb') { |file|
  235. file.flock(File::LOCK_EX)
  236. file.write data.to_json
  237. file.flock(File::LOCK_UN)
  238. file.close
  239. }
  240. rescue => e
  241. log('error', e.inspect)
  242. log('error', "error in writing message file '#{location}'")
  243. return false
  244. end
  245. true
  246. end
  247. =begin
  248. send message to recipient client
  249. Sessions.send_to(user_id, data)
  250. returns
  251. true|false
  252. =end
  253. def self.send_to(user_id, data)
  254. # list all current clients
  255. client_list = sessions
  256. client_list.each { |client_id|
  257. session = Sessions.get(client_id)
  258. next if !session
  259. next if !session[:user]
  260. next if !session[:user]['id']
  261. next if session[:user]['id'].to_i != user_id.to_i
  262. Sessions.send(client_id, data)
  263. }
  264. true
  265. end
  266. =begin
  267. send message to all authenticated client
  268. Sessions.broadcast(data)
  269. returns
  270. [array_with_client_ids_of_recipients]
  271. broadcase also to not authenticated client
  272. Sessions.broadcast(data, 'public') # public|authenticated
  273. broadcase also not to sender
  274. Sessions.broadcast(data, 'public', sender_user_id)
  275. =end
  276. def self.broadcast(data, recipient = 'authenticated', sender_user_id = nil)
  277. # list all current clients
  278. recipients = []
  279. client_list = sessions
  280. client_list.each { |client_id|
  281. session = Sessions.get(client_id)
  282. next if !session
  283. if recipient != 'public'
  284. next if !session[:user]
  285. next if !session[:user]['id']
  286. end
  287. if sender_user_id
  288. next if session[:user] && session[:user]['id'] && session[:user]['id'].to_i == sender_user_id.to_i
  289. end
  290. Sessions.send(client_id, data)
  291. recipients.push client_id
  292. }
  293. recipients
  294. end
  295. =begin
  296. get messages for client
  297. messages = Sessions.queue(client_id_of_recipient)
  298. returns
  299. [
  300. {
  301. key1 => 'some data of message 1',
  302. key2 => 'some data of message 1',
  303. },
  304. {
  305. key1 => 'some data of message 2',
  306. key2 => 'some data of message 2',
  307. },
  308. ]
  309. =end
  310. def self.queue(client_id)
  311. path = "#{@path}/#{client_id}/"
  312. data = []
  313. files = []
  314. Dir.foreach(path) { |entry|
  315. next if entry == '.'
  316. next if entry == '..'
  317. files.push entry
  318. }
  319. files.sort.each { |entry|
  320. filename = "#{path}/#{entry}"
  321. next if entry !~ /^send/
  322. message = Sessions.queue_file_read(path, entry)
  323. next if !message
  324. data.push message
  325. }
  326. data
  327. end
  328. def self.queue_file_read(path, filename)
  329. file_old = "#{path}#{filename}"
  330. file_new = "#{path}a-#{filename}"
  331. FileUtils.mv(file_old, file_new)
  332. message = ''
  333. File.open(file_new, 'rb') { |file|
  334. message = file.read
  335. }
  336. File.delete(file_new)
  337. begin
  338. return JSON.parse(message)
  339. rescue => e
  340. log('error', "can't parse queue message: #{message}, #{e.inspect}")
  341. return
  342. end
  343. end
  344. def self.cleanup
  345. path = "#{@path}/spool/"
  346. FileUtils.rm_rf path
  347. path = "#{@path}/tmp/"
  348. FileUtils.rm_rf path
  349. end
  350. def self.spool_create(data)
  351. msg = JSON.generate(data)
  352. path = "#{@path}/spool/"
  353. FileUtils.mkpath path
  354. file_path = "#{path}/#{Time.now.utc.to_f}-#{rand(99_999)}"
  355. File.open(file_path, 'wb') { |file|
  356. data = {
  357. msg: msg,
  358. timestamp: Time.now.utc.to_i,
  359. }
  360. file.write data.to_json
  361. }
  362. end
  363. def self.spool_list(timestamp, current_user_id)
  364. path = "#{@path}/spool/"
  365. FileUtils.mkpath path
  366. data = []
  367. to_delete = []
  368. files = []
  369. Dir.foreach(path) { |entry|
  370. next if entry == '.'
  371. next if entry == '..'
  372. files.push entry
  373. }
  374. files.sort.each { |entry|
  375. filename = "#{path}/#{entry}"
  376. next if !File.exist?(filename)
  377. File.open(filename, 'rb') { |file|
  378. message = file.read
  379. begin
  380. spool = JSON.parse(message)
  381. message_parsed = JSON.parse(spool['msg'])
  382. rescue => e
  383. log('error', "can't parse spool message: #{message}, #{e.inspect}")
  384. to_delete.push "#{path}/#{entry}"
  385. next
  386. end
  387. # ignore message older then 48h
  388. if spool['timestamp'] + (2 * 86_400) < Time.now.utc.to_i
  389. to_delete.push "#{path}/#{entry}"
  390. next
  391. end
  392. # add spool attribute to push spool info to clients
  393. message_parsed['spool'] = true
  394. # only send not already now messages
  395. if !timestamp || timestamp < spool['timestamp']
  396. # spool to recipient list
  397. if message_parsed['recipient'] && message_parsed['recipient']['user_id']
  398. message_parsed['recipient']['user_id'].each { |user_id|
  399. next if current_user_id != user_id
  400. message = message_parsed
  401. if message_parsed['event'] == 'broadcast'
  402. message = message_parsed['data']
  403. end
  404. item = {
  405. type: 'direct',
  406. message: message,
  407. }
  408. data.push item
  409. }
  410. # spool to every client
  411. else
  412. message = message_parsed
  413. if message_parsed['event'] == 'broadcast'
  414. message = message_parsed['data']
  415. end
  416. item = {
  417. type: 'broadcast',
  418. message: message,
  419. }
  420. data.push item
  421. end
  422. end
  423. }
  424. }
  425. to_delete.each { |file|
  426. File.delete(file)
  427. }
  428. data
  429. end
  430. def self.jobs
  431. # just make sure that spool path exists
  432. if !File.exist?(@path)
  433. FileUtils.mkpath @path
  434. end
  435. Thread.abort_on_exception = true
  436. loop do
  437. client_ids = sessions
  438. client_ids.each { |client_id|
  439. # connection already open, ignore
  440. next if @@client_threads[client_id]
  441. # get current user
  442. session_data = Sessions.get(client_id)
  443. next if !session_data
  444. next if !session_data[:user]
  445. next if !session_data[:user]['id']
  446. user = User.lookup( id: session_data[:user]['id'] )
  447. next if !user
  448. # start client thread
  449. next if @@client_threads[client_id]
  450. @@client_threads[client_id] = true
  451. @@client_threads[client_id] = Thread.new {
  452. thread_client(client_id)
  453. @@client_threads[client_id] = nil
  454. log('debug', "close client (#{client_id}) thread")
  455. ActiveRecord::Base.connection.close
  456. }
  457. sleep 0.5
  458. }
  459. # system settings
  460. sleep 0.5
  461. end
  462. end
  463. =begin
  464. check if thread for client_id is running
  465. Sessions.thread_client_exists?(client_id)
  466. returns
  467. thread
  468. =end
  469. def self.thread_client_exists?(client_id)
  470. @@client_threads[client_id]
  471. end
  472. =begin
  473. start client for browser
  474. Sessions.thread_client(client_id)
  475. returns
  476. thread
  477. =end
  478. def self.thread_client(client_id, try_count = 0, try_run_time = Time.now.utc)
  479. log('debug', "LOOP #{client_id} - #{try_count}")
  480. begin
  481. Sessions::Client.new(client_id)
  482. rescue => e
  483. log('error', "thread_client #{client_id} exited with error #{e.inspect}")
  484. log('error', e.backtrace.join("\n ") )
  485. sleep 10
  486. begin
  487. ActiveRecord::Base.connection_pool.release_connection
  488. rescue => e
  489. log('error', "Can't reconnect to database #{e.inspect}")
  490. end
  491. try_run_max = 10
  492. try_count += 1
  493. # reset error counter if to old
  494. if try_run_time + ( 60 * 5 ) < Time.now.utc
  495. try_count = 0
  496. end
  497. try_run_time = Time.now.utc
  498. # restart job again
  499. if try_run_max > try_count
  500. thread_client(client_id, try_count, try_run_time)
  501. else
  502. raise "STOP thread_client for client #{client_id} after #{try_run_max} tries"
  503. end
  504. end
  505. log('debug', "/LOOP #{client_id} - #{try_count}")
  506. end
  507. def self.symbolize_keys(hash)
  508. hash.each_with_object({}) { |(key, value), result|
  509. new_key = case key
  510. when String then key.to_sym
  511. else key
  512. end
  513. new_value = case value
  514. when Hash then symbolize_keys(value)
  515. else value
  516. end
  517. result[new_key] = new_value
  518. }
  519. end
  520. # we use it in rails and non rails context
  521. def self.log(level, message)
  522. if defined?(Rails)
  523. if level == 'debug'
  524. Rails.logger.debug message
  525. elsif level == 'notice'
  526. Rails.logger.notice message
  527. else
  528. Rails.logger.error message
  529. end
  530. return
  531. end
  532. puts "#{Time.now.utc.iso8601}:#{level} #{message}" # rubocop:disable Rails/Output
  533. end
  534. end