sessions.rb 13 KB


  1. require 'json'
  2. require 'session_helper'
  3. module Sessions
  4. # get application root directory
  5. @root = Dir.pwd.to_s
  6. if !@root || @root.empty? || @root == '/'
  7. @root = Rails.root
  8. end
  9. # get working directories
  10. @path = "#{@root}/tmp/websocket"
  11. # create global vars for threads
  12. @@client_threads = {} # rubocop:disable Style/ClassVars
  13. =begin
  14. start new session
  15. Sessions.create( client_id, session_data, { type: 'websocket' } )
  16. returns
  17. true|false
  18. =end
  19. def self.create(client_id, session, meta)
  20. path = "#{@path}/#{client_id}"
  21. path_tmp = "#{@path}/tmp/#{client_id}"
  22. session_file = "#{path_tmp}/session"
  23. # collect session data
  24. meta[:last_ping] = Time.now.utc.to_i
  25. data = {
  26. user: session,
  27. meta: meta,
  28. }
  29. content = data.to_json
  30. # store session data in session file
  31. FileUtils.mkpath path_tmp
  32. File.open(session_file, 'wb') { |file|
  33. file.write content
  34. }
  35. # destory old session if needed
  36. if File.exist?(path)
  37. Sessions.destory(client_id)
  38. end
  39. # move to destination directory
  40. FileUtils.mv(path_tmp, path)
  41. # send update to browser
  42. if session && session['id']
  43. send(
  44. client_id,
  45. {
  46. event: 'ws:login',
  47. data: { success: true },
  48. }
  49. )
  50. end
  51. end
  52. =begin
  53. list of all session
  54. client_ids = Sessions.sessions
  55. returns
  56. ['4711', '4712']
  57. =end
  58. def self.sessions
  59. path = "#{@path}/"
  60. # just make sure that spool path exists
  61. if !File.exist?(path)
  62. FileUtils.mkpath path
  63. end
  64. data = []
  65. Dir.foreach(path) do |entry|
  66. next if entry == '.'
  67. next if entry == '..'
  68. next if entry == 'tmp'
  69. next if entry == 'spool'
  70. data.push entry.to_s
  71. end
  72. data
  73. end
  74. =begin
  75. list of all session
  76. Sessions.session_exists?(client_id)
  77. returns
  78. true|false
  79. =end
  80. def self.session_exists?(client_id)
  81. client_ids = sessions
  82. client_ids.include? client_id.to_s
  83. end
  84. =begin
  85. list of all session with data
  86. client_ids_with_data = Sessions.list
  87. returns
  88. {
  89. '4711' => {
  90. user: {
  91. 'id' => 123,
  92. },
  93. meta: {
  94. type: 'websocket',
  95. last_ping: time_of_last_ping,
  96. }
  97. },
  98. '4712' => {
  99. user: {
  100. 'id' => 124,
  101. },
  102. meta: {
  103. type: 'ajax',
  104. last_ping: time_of_last_ping,
  105. }
  106. },
  107. }
  108. =end
  109. def self.list
  110. client_ids = sessions
  111. session_list = {}
  112. client_ids.each { |client_id|
  113. data = get(client_id)
  114. next if !data
  115. session_list[client_id] = data
  116. }
  117. session_list
  118. end
  119. =begin
  120. destroy session
  121. Sessions.destory(client_id)
  122. returns
  123. true|false
  124. =end
  125. def self.destory(client_id)
  126. path = "#{@path}/#{client_id}"
  127. FileUtils.rm_rf path
  128. end
  129. =begin
  130. destroy idle session
  131. list_of_client_ids = Sessions.destory_idle_sessions
  132. returns
  133. ['4711', '4712']
  134. =end
  135. def self.destory_idle_sessions(idle_time_in_sec = 240)
  136. list_of_closed_sessions = []
  137. clients = Sessions.list
  138. clients.each { |client_id, client|
  139. if !client[:meta] || !client[:meta][:last_ping] || ( client[:meta][:last_ping].to_i + idle_time_in_sec ) < Time.now.utc.to_i
  140. list_of_closed_sessions.push client_id
  141. Sessions.destory(client_id)
  142. end
  143. }
  144. list_of_closed_sessions
  145. end
  146. =begin
  147. touch session
  148. Sessions.touch(client_id)
  149. returns
  150. true|false
  151. =end
  152. def self.touch(client_id)
  153. data = get(client_id)
  154. return false if !data
  155. path = "#{@path}/#{client_id}"
  156. data[:meta][:last_ping] = Time.now.utc.to_i
  157. content = data.to_json
  158. File.open( path + '/session', 'wb' ) { |file|
  159. file.write content
  160. }
  161. true
  162. end
  163. =begin
  164. get session data
  165. data = Sessions.get(client_id)
  166. returns
  167. {
  168. user: {
  169. 'id' => 123,
  170. },
  171. meta: {
  172. type: 'websocket',
  173. last_ping: time_of_last_ping,
  174. }
  175. }
  176. =end
  177. def self.get(client_id)
  178. session_dir = "#{@path}/#{client_id}"
  179. session_file = "#{session_dir}/session"
  180. data = nil
  181. # if no session dir exists, session got destoried
  182. if !File.exist? session_dir
  183. destory(client_id)
  184. log('debug', "missing session directory for '#{client_id}', remove session.")
  185. return
  186. end
  187. # if only session file is missing, then it's an error behavior
  188. if !File.exist? session_file
  189. destory(client_id)
  190. log('error', "missing session file for '#{client_id}', remove session.")
  191. return
  192. end
  193. begin
  194. File.open(session_file, 'rb') { |file|
  195. file.flock(File::LOCK_EX)
  196. all = file.read
  197. file.flock(File::LOCK_UN)
  198. data_json = JSON.parse(all)
  199. if data_json
  200. data = symbolize_keys(data_json)
  201. data[:user] = data_json['user'] # for compat. reasons
  202. end
  203. }
  204. rescue => e
  205. log('error', e.inspect)
  206. destory(client_id)
  207. log('error', "error in reading/parsing session file '#{session_file}', remove session.")
  208. return
  209. end
  210. data
  211. end
  212. =begin
  213. send message to client
  214. Sessions.send(client_id_of_recipient, data)
  215. returns
  216. true|false
  217. =end
  218. def self.send(client_id, data)
  219. path = "#{@path}/#{client_id}/"
  220. filename = "send-#{Time.now.utc.to_f}"
  221. check = true
  222. count = 0
  223. while check
  224. if File.exist?(path + filename)
  225. count += 1
  226. filename = "#{filename}-#{count}"
  227. else
  228. check = false
  229. end
  230. end
  231. return false if !File.directory? path
  232. File.open(path + 'a-' + filename, 'wb') { |file|
  233. file.flock(File::LOCK_EX)
  234. file.write data.to_json
  235. file.flock(File::LOCK_UN)
  236. file.close
  237. }
  238. return false if !File.exist?(path + 'a-' + filename)
  239. FileUtils.mv(path + 'a-' + filename, path + filename)
  240. true
  241. end
  242. =begin
  243. send message to recipient client
  244. Sessions.send_to(user_id, data)
  245. returns
  246. true|false
  247. =end
  248. def self.send_to(user_id, data)
  249. # list all current clients
  250. client_list = sessions
  251. client_list.each {|client_id|
  252. session = Sessions.get(client_id)
  253. next if !session
  254. next if !session[:user]
  255. next if !session[:user]['id']
  256. next if session[:user]['id'].to_i != user_id.to_i
  257. Sessions.send(client_id, data)
  258. }
  259. true
  260. end
  261. =begin
  262. send message to all authenticated client
  263. Sessions.broadcast(data)
  264. returns
  265. true|false
  266. =end
  267. def self.broadcast(data)
  268. # list all current clients
  269. client_list = sessions
  270. client_list.each {|client_id|
  271. session = Sessions.get(client_id)
  272. next if !session
  273. next if !session[:user]
  274. next if !session[:user]['id']
  275. Sessions.send(client_id, data)
  276. }
  277. true
  278. end
  279. =begin
  280. get messages for client
  281. messages = Sessions.queue(client_id_of_recipient)
  282. returns
  283. [
  284. {
  285. key1 => 'some data of message 1',
  286. key2 => 'some data of message 1',
  287. },
  288. {
  289. key1 => 'some data of message 2',
  290. key2 => 'some data of message 2',
  291. },
  292. ]
  293. =end
  294. def self.queue(client_id)
  295. path = "#{@path}/#{client_id}/"
  296. data = []
  297. files = []
  298. Dir.foreach( path ) {|entry|
  299. next if entry == '.'
  300. next if entry == '..'
  301. files.push entry
  302. }
  303. files.sort.each {|entry|
  304. filename = "#{path}/#{entry}"
  305. if /^send/ =~ entry
  306. data.push Sessions.queue_file_read(path, entry)
  307. end
  308. }
  309. data
  310. end
  311. def self.queue_file_read(path, filename)
  312. file_old = "#{path}#{filename}"
  313. file_new = "#{path}a-#{filename}"
  314. FileUtils.mv(file_old, file_new)
  315. all = ''
  316. File.open(file_new, 'rb') { |file|
  317. all = file.read
  318. }
  319. File.delete(file_new)
  320. JSON.parse(all)
  321. end
  322. def self.cleanup
  323. path = "#{@path}/spool/"
  324. FileUtils.rm_rf path
  325. path = "#{@path}/tmp/"
  326. FileUtils.rm_rf path
  327. end
  328. def self.spool_create(data)
  329. msg = JSON.generate(data)
  330. path = "#{@path}/spool/"
  331. FileUtils.mkpath path
  332. file_path = "#{path}/#{Time.now.utc.to_f}-#{rand(99_999)}"
  333. File.open( file_path, 'wb' ) { |file|
  334. data = {
  335. msg: msg,
  336. timestamp: Time.now.utc.to_i,
  337. }
  338. file.write data.to_json
  339. }
  340. end
  341. def self.spool_list(timestamp, current_user_id)
  342. path = "#{@path}/spool/"
  343. FileUtils.mkpath path
  344. data = []
  345. to_delete = []
  346. files = []
  347. Dir.foreach(path) {|entry|
  348. next if entry == '.'
  349. next if entry == '..'
  350. files.push entry
  351. }
  352. files.sort.each {|entry|
  353. filename = "#{path}/#{entry}"
  354. next if !File.exist?(filename)
  355. File.open(filename, 'rb') { |file|
  356. all = file.read
  357. spool = JSON.parse(all)
  358. begin
  359. message_parsed = JSON.parse(spool['msg'])
  360. rescue => e
  361. log('error', "can't parse spool message: #{message}, #{e.inspect}")
  362. next
  363. end
  364. # ignore message older then 48h
  365. if spool['timestamp'] + (2 * 86_400) < Time.now.utc.to_i
  366. to_delete.push "#{path}/#{entry}"
  367. next
  368. end
  369. # add spool attribute to push spool info to clients
  370. message_parsed['spool'] = true
  371. # only send not already now messages
  372. if !timestamp || timestamp < spool['timestamp']
  373. # spool to recipient list
  374. if message_parsed['recipient'] && message_parsed['recipient']['user_id']
  375. message_parsed['recipient']['user_id'].each { |user_id|
  376. next if current_user_id != user_id
  377. message = message_parsed
  378. if message_parsed['event'] == 'broadcast'
  379. message = message_parsed['data']
  380. end
  381. item = {
  382. type: 'direct',
  383. message: message,
  384. }
  385. data.push item
  386. }
  387. # spool to every client
  388. else
  389. message = message_parsed
  390. if message_parsed['event'] == 'broadcast'
  391. message = message_parsed['data']
  392. end
  393. item = {
  394. type: 'broadcast',
  395. message: message,
  396. }
  397. data.push item
  398. end
  399. end
  400. }
  401. }
  402. to_delete.each {|file|
  403. File.delete(file)
  404. }
  405. data
  406. end
  407. def self.jobs
  408. # just make sure that spool path exists
  409. if !File.exist?(@path)
  410. FileUtils.mkpath @path
  411. end
  412. Thread.abort_on_exception = true
  413. loop do
  414. client_ids = sessions
  415. client_ids.each { |client_id|
  416. # connection already open, ignore
  417. next if @@client_threads[client_id]
  418. # get current user
  419. session_data = Sessions.get(client_id)
  420. next if !session_data
  421. next if !session_data[:user]
  422. next if !session_data[:user]['id']
  423. user = User.lookup( id: session_data[:user]['id'] )
  424. next if !user
  425. # start client thread
  426. next if @@client_threads[client_id]
  427. @@client_threads[client_id] = true
  428. @@client_threads[client_id] = Thread.new {
  429. thread_client(client_id)
  430. @@client_threads[client_id] = nil
  431. log('debug', "close client (#{client_id}) thread")
  432. ActiveRecord::Base.connection.close
  433. }
  434. sleep 0.5
  435. }
  436. # system settings
  437. sleep 0.5
  438. end
  439. end
  440. =begin
  441. check if thread for client_id is running
  442. Sessions.thread_client_exists?(client_id)
  443. returns
  444. thread
  445. =end
  446. def self.thread_client_exists?(client_id)
  447. @@client_threads[client_id]
  448. end
  449. =begin
  450. start client for browser
  451. Sessions.thread_client(client_id)
  452. returns
  453. thread
  454. =end
  455. def self.thread_client(client_id, try_count = 0, try_run_time = Time.now.utc)
  456. log('debug', "LOOP #{client_id} - #{try_count}")
  457. begin
  458. Sessions::Client.new(client_id)
  459. rescue => e
  460. log('error', "thread_client #{client_id} exited with error #{e.inspect}")
  461. log('error', e.backtrace.join("\n ") )
  462. sleep 10
  463. begin
  464. ActiveRecord::Base.connection_pool.release_connection
  465. rescue => e
  466. log('error', "Can't reconnect to database #{e.inspect}")
  467. end
  468. try_run_max = 10
  469. try_count += 1
  470. # reset error counter if to old
  471. if try_run_time + ( 60 * 5 ) < Time.now.utc
  472. try_count = 0
  473. end
  474. try_run_time = Time.now.utc
  475. # restart job again
  476. if try_run_max > try_count
  477. thread_client(client_id, try_count, try_run_time)
  478. else
  479. raise "STOP thread_client for client #{client_id} after #{try_run_max} tries"
  480. end
  481. end
  482. log('debug', "/LOOP #{client_id} - #{try_count}")
  483. end
  484. def self.symbolize_keys(hash)
  485. hash.each_with_object({}) {|(key, value), result|
  486. new_key = case key
  487. when String then key.to_sym
  488. else key
  489. end
  490. new_value = case value
  491. when Hash then symbolize_keys(value)
  492. else value
  493. end
  494. result[new_key] = new_value
  495. }
  496. end
  497. # we use it in rails and non rails context
  498. def self.log(level, message)
  499. if defined?(Rails)
  500. if level == 'debug'
  501. Rails.logger.debug message
  502. elsif level == 'notice'
  503. Rails.logger.notice message
  504. else
  505. Rails.logger.error message
  506. end
  507. return
  508. end
  509. puts "#{Time.now.utc.iso8601}:#{level} #{message}" # rubocop:disable Rails/Output
  510. end
  511. end