sessions.rb 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814
  1. module Sessions
  2. # get application root directory
  3. @root = Dir.pwd.to_s
  4. if @root.blank? || @root == '/'
  5. @root = Rails.root
  6. end
  7. # get working directories
  8. @path = "#{@root}/tmp/websocket_#{Rails.env}"
  9. # create global vars for threads
  10. @@client_threads = {} # rubocop:disable Style/ClassVars
  11. =begin
  12. start new session
  13. Sessions.create(client_id, session_data, { type: 'websocket' })
  14. returns
  15. true|false
  16. =end
  17. def self.create(client_id, session, meta)
  18. path = "#{@path}/#{client_id}"
  19. path_tmp = "#{@path}/tmp/#{client_id}"
  20. session_file = "#{path_tmp}/session"
  21. # collect session data
  22. meta[:last_ping] = Time.now.utc.to_i
  23. data = {
  24. user: session,
  25. meta: meta,
  26. }
  27. content = data.to_json
  28. # store session data in session file
  29. FileUtils.mkpath path_tmp
  30. File.open(session_file, 'wb') do |file|
  31. file.write content
  32. end
  33. # destroy old session if needed
  34. if File.exist?(path)
  35. Sessions.destroy(client_id)
  36. end
  37. # move to destination directory
  38. FileUtils.mv(path_tmp, path)
  39. # send update to browser
  40. return if !session || session['id'].blank?
  41. send(
  42. client_id,
  43. {
  44. event: 'ws:login',
  45. data: { success: true },
  46. }
  47. )
  48. end
  49. =begin
  50. list of all session
  51. client_ids = Sessions.sessions
  52. returns
  53. ['4711', '4712']
  54. =end
  55. def self.sessions
  56. path = "#{@path}/"
  57. # just make sure that spool path exists
  58. if !File.exist?(path)
  59. FileUtils.mkpath path
  60. end
  61. data = []
  62. Dir.foreach(path) do |entry|
  63. next if entry == '.'
  64. next if entry == '..'
  65. next if entry == 'tmp'
  66. next if entry == 'spool'
  67. data.push entry.to_s
  68. end
  69. data
  70. end
  71. =begin
  72. list of all session
  73. Sessions.session_exists?(client_id)
  74. returns
  75. true|false
  76. =end
  77. def self.session_exists?(client_id)
  78. session_dir = "#{@path}/#{client_id}"
  79. return false if !File.exist?(session_dir)
  80. session_file = "#{session_dir}/session"
  81. return false if !File.exist?(session_file)
  82. true
  83. end
  84. =begin
  85. list of all session with data
  86. client_ids_with_data = Sessions.list
  87. returns
  88. {
  89. '4711' => {
  90. user: {
  91. 'id' => 123,
  92. },
  93. meta: {
  94. type: 'websocket',
  95. last_ping: time_of_last_ping,
  96. }
  97. },
  98. '4712' => {
  99. user: {
  100. 'id' => 124,
  101. },
  102. meta: {
  103. type: 'ajax',
  104. last_ping: time_of_last_ping,
  105. }
  106. },
  107. }
  108. =end
  109. def self.list
  110. client_ids = sessions
  111. session_list = {}
  112. client_ids.each do |client_id|
  113. data = get(client_id)
  114. next if !data
  115. session_list[client_id] = data
  116. end
  117. session_list
  118. end
  119. =begin
  120. destroy session
  121. Sessions.destroy(client_id)
  122. returns
  123. true|false
  124. =end
  125. def self.destroy(client_id)
  126. path = "#{@path}/#{client_id}"
  127. FileUtils.rm_rf path
  128. end
  129. =begin
  130. destroy idle session
  131. list_of_client_ids = Sessions.destroy_idle_sessions
  132. returns
  133. ['4711', '4712']
  134. =end
  135. def self.destroy_idle_sessions(idle_time_in_sec = 240)
  136. list_of_closed_sessions = []
  137. clients = Sessions.list
  138. clients.each do |client_id, client|
  139. if !client[:meta] || !client[:meta][:last_ping] || ( client[:meta][:last_ping].to_i + idle_time_in_sec ) < Time.now.utc.to_i
  140. list_of_closed_sessions.push client_id
  141. Sessions.destroy(client_id)
  142. end
  143. end
  144. list_of_closed_sessions
  145. end
  146. =begin
  147. touch session
  148. Sessions.touch(client_id)
  149. returns
  150. true|false
  151. =end
  152. def self.touch(client_id)
  153. data = get(client_id)
  154. return false if !data
  155. path = "#{@path}/#{client_id}"
  156. data[:meta][:last_ping] = Time.now.utc.to_i
  157. File.open("#{path}/session", 'wb' ) do |file|
  158. file.flock(File::LOCK_EX)
  159. file.write data.to_json
  160. file.flock(File::LOCK_UN)
  161. end
  162. true
  163. end
  164. =begin
  165. get session data
  166. data = Sessions.get(client_id)
  167. returns
  168. {
  169. user: {
  170. 'id' => 123,
  171. },
  172. meta: {
  173. type: 'websocket',
  174. last_ping: time_of_last_ping,
  175. }
  176. }
  177. =end
  178. def self.get(client_id)
  179. session_dir = "#{@path}/#{client_id}"
  180. session_file = "#{session_dir}/session"
  181. data = nil
  182. # if no session dir exists, session got destoried
  183. if !File.exist?(session_dir)
  184. destroy(client_id)
  185. log('debug', "missing session directory #{session_dir} for '#{client_id}', remove session.")
  186. return
  187. end
  188. # if only session file is missing, then it's an error behavior
  189. if !File.exist?(session_file)
  190. destroy(client_id)
  191. log('error', "missing session file for '#{client_id}', remove session.")
  192. return
  193. end
  194. begin
  195. File.open(session_file, 'rb') do |file|
  196. file.flock(File::LOCK_SH)
  197. all = file.read
  198. file.flock(File::LOCK_UN)
  199. data_json = JSON.parse(all)
  200. if data_json
  201. data = symbolize_keys(data_json)
  202. data[:user] = data_json['user'] # for compat. reasons
  203. end
  204. end
  205. rescue => e
  206. log('error', e.inspect)
  207. destroy(client_id)
  208. log('error', "error in reading/parsing session file '#{session_file}', remove session.")
  209. return
  210. end
  211. data
  212. end
  213. =begin
  214. send message to client
  215. Sessions.send(client_id_of_recipient, data)
  216. returns
  217. true|false
  218. =end
  219. def self.send(client_id, data)
  220. path = "#{@path}/#{client_id}/"
  221. filename = "send-#{Time.now.utc.to_f}"
  222. location = "#{path}#{filename}"
  223. check = true
  224. count = 0
  225. while check
  226. if File.exist?(location)
  227. count += 1
  228. location = "#{path}#{filename}-#{count}"
  229. else
  230. check = false
  231. end
  232. end
  233. return false if !File.directory? path
  234. begin
  235. File.open(location, 'wb') do |file|
  236. file.flock(File::LOCK_EX)
  237. file.write data.to_json
  238. file.flock(File::LOCK_UN)
  239. file.close
  240. end
  241. rescue => e
  242. log('error', e.inspect)
  243. log('error', "error in writing message file '#{location}'")
  244. return false
  245. end
  246. true
  247. end
  248. =begin
  249. send message to recipient client
  250. Sessions.send_to(user_id, data)
  251. e. g.
  252. Sessions.send_to(user_id, {
  253. event: 'session:takeover',
  254. data: {
  255. taskbar_id: 12312
  256. },
  257. })
  258. returns
  259. true|false
  260. =end
  261. def self.send_to(user_id, data)
  262. # list all current clients
  263. client_list = sessions
  264. client_list.each do |client_id|
  265. session = Sessions.get(client_id)
  266. next if !session
  267. next if !session[:user]
  268. next if !session[:user]['id']
  269. next if session[:user]['id'].to_i != user_id.to_i
  270. Sessions.send(client_id, data)
  271. end
  272. true
  273. end
  274. =begin
  275. send message to all authenticated client
  276. Sessions.broadcast(data)
  277. returns
  278. [array_with_client_ids_of_recipients]
  279. broadcase also to not authenticated client
  280. Sessions.broadcast(data, 'public') # public|authenticated
  281. broadcase also not to sender
  282. Sessions.broadcast(data, 'public', sender_user_id)
  283. =end
  284. def self.broadcast(data, recipient = 'authenticated', sender_user_id = nil)
  285. # list all current clients
  286. recipients = []
  287. client_list = sessions
  288. client_list.each do |client_id|
  289. session = Sessions.get(client_id)
  290. next if !session
  291. if recipient != 'public'
  292. next if session[:user].blank?
  293. next if session[:user]['id'].blank?
  294. end
  295. next if sender_user_id && session[:user] && session[:user]['id'] && session[:user]['id'].to_i == sender_user_id.to_i
  296. Sessions.send(client_id, data)
  297. recipients.push client_id
  298. end
  299. recipients
  300. end
  301. =begin
  302. get messages for client
  303. messages = Sessions.queue(client_id_of_recipient)
  304. returns
  305. [
  306. {
  307. key1 => 'some data of message 1',
  308. key2 => 'some data of message 1',
  309. },
  310. {
  311. key1 => 'some data of message 2',
  312. key2 => 'some data of message 2',
  313. },
  314. ]
  315. =end
  316. def self.queue(client_id)
  317. path = "#{@path}/#{client_id}/"
  318. data = []
  319. files = []
  320. Dir.foreach(path) do |entry|
  321. next if entry == '.'
  322. next if entry == '..'
  323. files.push entry
  324. end
  325. files.sort.each do |entry|
  326. next if !entry.start_with?('send')
  327. message = Sessions.queue_file_read(path, entry)
  328. next if !message
  329. data.push message
  330. end
  331. data
  332. end
  333. def self.queue_file_read(path, filename)
  334. location = "#{path}#{filename}"
  335. message = ''
  336. File.open(location, 'rb') do |file|
  337. file.flock(File::LOCK_EX)
  338. message = file.read
  339. file.flock(File::LOCK_UN)
  340. end
  341. File.delete(location)
  342. return if message.blank?
  343. begin
  344. JSON.parse(message)
  345. rescue => e
  346. log('error', "can't parse queue message: #{message}, #{e.inspect}")
  347. nil
  348. end
  349. end
  350. =begin
  351. remove all session and spool messages
  352. Sessions.cleanup
  353. =end
  354. def self.cleanup
  355. return true if !File.exist?(@path)
  356. FileUtils.rm_rf @path
  357. true
  358. end
  359. =begin
  360. create spool messages
  361. Sessions.spool_create(some: 'data')
  362. =end
  363. def self.spool_create(data)
  364. msg = JSON.generate(data)
  365. path = "#{@path}/spool/"
  366. FileUtils.mkpath path
  367. data = {
  368. msg: msg,
  369. timestamp: Time.now.utc.to_i,
  370. }
  371. file_path = "#{path}/#{Time.now.utc.to_f}-#{rand(99_999)}"
  372. File.open(file_path, 'wb') do |file|
  373. file.flock(File::LOCK_EX)
  374. file.write data.to_json
  375. file.flock(File::LOCK_UN)
  376. end
  377. end
  378. =begin
  379. get spool messages
  380. Sessions.spool_list(junger_then, for_user_id)
  381. =end
  382. def self.spool_list(timestamp, current_user_id)
  383. path = "#{@path}/spool/"
  384. FileUtils.mkpath path
  385. data = []
  386. to_delete = []
  387. files = []
  388. Dir.foreach(path) do |entry|
  389. next if entry == '.'
  390. next if entry == '..'
  391. files.push entry
  392. end
  393. files.sort.each do |entry|
  394. filename = "#{path}/#{entry}"
  395. next if !File.exist?(filename)
  396. File.open(filename, 'rb') do |file|
  397. file.flock(File::LOCK_SH)
  398. message = file.read
  399. file.flock(File::LOCK_UN)
  400. message_parsed = {}
  401. begin
  402. spool = JSON.parse(message)
  403. message_parsed = JSON.parse(spool['msg'])
  404. rescue => e
  405. log('error', "can't parse spool message: #{message}, #{e.inspect}")
  406. to_delete.push "#{path}/#{entry}"
  407. next
  408. end
  409. # ignore message older then 48h
  410. if spool['timestamp'] + (2 * 86_400) < Time.now.utc.to_i
  411. to_delete.push "#{path}/#{entry}"
  412. next
  413. end
  414. # add spool attribute to push spool info to clients
  415. message_parsed['spool'] = true
  416. # only send not already older messages
  417. if !timestamp || timestamp < spool['timestamp']
  418. # spool to recipient list
  419. if message_parsed['recipient'] && message_parsed['recipient']['user_id']
  420. message_parsed['recipient']['user_id'].each do |user_id|
  421. next if current_user_id != user_id
  422. message = message_parsed
  423. if message_parsed['event'] == 'broadcast'
  424. message = message_parsed['data']
  425. end
  426. item = {
  427. type: 'direct',
  428. message: message,
  429. }
  430. data.push item
  431. end
  432. # spool to every client
  433. else
  434. message = message_parsed
  435. if message_parsed['event'] == 'broadcast'
  436. message = message_parsed['data']
  437. end
  438. item = {
  439. type: 'broadcast',
  440. message: message,
  441. }
  442. data.push item
  443. end
  444. end
  445. end
  446. end
  447. to_delete.each do |file|
  448. File.delete(file)
  449. end
  450. data
  451. end
  452. =begin
  453. delete spool messages
  454. Sessions.spool_delete
  455. =end
  456. def self.spool_delete
  457. path = "#{@path}/spool/"
  458. FileUtils.rm_rf path
  459. end
  460. def self.jobs(node_id = nil)
  461. # just make sure that spool path exists
  462. if !File.exist?(@path)
  463. FileUtils.mkpath(@path)
  464. end
  465. # dispatch sessions
  466. if node_id.blank? && ENV['ZAMMAD_SESSION_JOBS_CONCURRENT'].to_i.positive?
  467. previous_nodes_sessions = Sessions::Node.stats
  468. if previous_nodes_sessions.present?
  469. log('info', "Cleaning up previous Sessions::Node sessions: #{previous_nodes_sessions}")
  470. Sessions::Node.cleanup
  471. end
  472. dispatcher_pid = Process.pid
  473. node_count = ENV['ZAMMAD_SESSION_JOBS_CONCURRENT'].to_i
  474. node_pids = []
  475. (1..node_count).each do |worker_node_id|
  476. node_pids << fork do
  477. title = "Zammad Session Jobs Node ##{worker_node_id}: dispatch_pid:#{dispatcher_pid} -> worker_pid:#{Process.pid}"
  478. $PROGRAM_NAME = title
  479. log('info', "#{title} started.")
  480. ::Sessions.jobs(worker_node_id)
  481. sleep node_count
  482. rescue Interrupt
  483. nil
  484. end
  485. end
  486. Signal.trap 'SIGTERM' do
  487. node_pids.each do |node_pid|
  488. Process.kill 'TERM', node_pid
  489. end
  490. Process.waitall
  491. raise SignalException, 'SIGTERM'
  492. end
  493. # dispatch client_ids to nodes
  494. loop do
  495. # nodes
  496. nodes_stats = Sessions::Node.stats
  497. client_ids = sessions
  498. client_ids.each do |client_id|
  499. # ask nodes for nodes
  500. next if nodes_stats[client_id]
  501. # assign to node
  502. Sessions::Node.session_assigne(client_id)
  503. sleep 1
  504. end
  505. sleep 1
  506. end
  507. end
  508. Thread.abort_on_exception = true
  509. loop do
  510. if node_id
  511. # register node
  512. Sessions::Node.register(node_id)
  513. # watch for assigned sessions
  514. client_ids = Sessions::Node.sessions_by(node_id)
  515. else
  516. client_ids = sessions
  517. end
  518. client_ids.each do |client_id|
  519. # connection already open, ignore
  520. next if @@client_threads[client_id]
  521. # get current user
  522. session_data = Sessions.get(client_id)
  523. next if session_data.blank?
  524. next if session_data[:user].blank?
  525. next if session_data[:user]['id'].blank?
  526. user = User.lookup(id: session_data[:user]['id'])
  527. next if user.blank?
  528. # start client thread
  529. next if @@client_threads[client_id].present?
  530. @@client_threads[client_id] = true
  531. @@client_threads[client_id] = Thread.new do
  532. thread_client(client_id, 0, Time.now.utc, node_id)
  533. @@client_threads[client_id] = nil
  534. log('debug', "close client (#{client_id}) thread")
  535. if ActiveRecord::Base.connection.owner == Thread.current
  536. ActiveRecord::Base.connection.close
  537. end
  538. end
  539. sleep 1
  540. end
  541. sleep 1
  542. end
  543. end
  544. =begin
  545. check if thread for client_id is running
  546. Sessions.thread_client_exists?(client_id)
  547. returns
  548. thread
  549. =end
  550. def self.thread_client_exists?(client_id)
  551. @@client_threads[client_id]
  552. end
  553. =begin
  554. start client for browser
  555. Sessions.thread_client(client_id)
  556. returns
  557. thread
  558. =end
  559. def self.thread_client(client_id, try_count = 0, try_run_time = Time.now.utc, node_id)
  560. log('debug', "LOOP #{node_id}.#{client_id} - #{try_count}")
  561. begin
  562. Sessions::Client.new(client_id, node_id)
  563. rescue => e
  564. log('error', "thread_client #{client_id} exited with error #{e.inspect}")
  565. log('error', e.backtrace.join("\n ") )
  566. sleep 10
  567. begin
  568. ActiveRecord::Base.connection_pool.release_connection
  569. rescue => e
  570. log('error', "Can't reconnect to database #{e.inspect}")
  571. end
  572. try_run_max = 10
  573. try_count += 1
  574. # reset error counter if to old
  575. if try_run_time + ( 60 * 5 ) < Time.now.utc
  576. try_count = 0
  577. end
  578. try_run_time = Time.now.utc
  579. # restart job again
  580. if try_run_max > try_count
  581. thread_client(client_id, try_count, try_run_time, node_id)
  582. end
  583. raise "STOP thread_client for client #{node_id}.#{client_id} after #{try_run_max} tries"
  584. end
  585. log('debug', "/LOOP #{node_id}.#{client_id} - #{try_count}")
  586. end
  587. def self.symbolize_keys(hash)
  588. hash.each_with_object({}) do |(key, value), result|
  589. new_key = case key
  590. when String then key.to_sym
  591. else key
  592. end
  593. new_value = case value
  594. when Hash then symbolize_keys(value)
  595. else value
  596. end
  597. result[new_key] = new_value
  598. end
  599. end
  600. # we use it in rails and non rails context
  601. def self.log(level, message)
  602. if defined?(Rails)
  603. case level
  604. when 'debug'
  605. Rails.logger.debug { message }
  606. when 'info'
  607. Rails.logger.info message
  608. else
  609. Rails.logger.error message
  610. end
  611. return
  612. end
  613. puts "#{Time.now.utc.iso8601}:#{level} #{message}" # rubocop:disable Rails/Output
  614. end
  615. end