sessions.rb 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816
  1. # Copyright (C) 2012-2021 Zammad Foundation, http://zammad-foundation.org/
  2. module Sessions
  3. # get application root directory
  4. @root = Dir.pwd.to_s
  5. if @root.blank? || @root == '/'
  6. @root = Rails.root
  7. end
  8. # get working directories
  9. @path = "#{@root}/tmp/websocket_#{Rails.env}"
  10. # create global vars for threads
  11. @@client_threads = {} # rubocop:disable Style/ClassVars
  12. =begin
  13. start new session
  14. Sessions.create(client_id, session_data, { type: 'websocket' })
  15. returns
  16. true|false
  17. =end
  18. def self.create(client_id, session, meta)
  19. path = "#{@path}/#{client_id}"
  20. path_tmp = "#{@path}/tmp/#{client_id}"
  21. session_file = "#{path_tmp}/session"
  22. # collect session data
  23. meta[:last_ping] = Time.now.utc.to_i
  24. data = {
  25. user: session,
  26. meta: meta,
  27. }
  28. content = data.to_json
  29. # store session data in session file
  30. FileUtils.mkpath path_tmp
  31. File.open(session_file, 'wb') do |file|
  32. file.write content
  33. end
  34. # destroy old session if needed
  35. if File.exist?(path)
  36. Sessions.destroy(client_id)
  37. end
  38. # move to destination directory
  39. FileUtils.mv(path_tmp, path)
  40. # send update to browser
  41. return if !session || session['id'].blank?
  42. send(
  43. client_id,
  44. {
  45. event: 'ws:login',
  46. data: { success: true },
  47. }
  48. )
  49. end
  50. =begin
  51. list of all session
  52. client_ids = Sessions.sessions
  53. returns
  54. ['4711', '4712']
  55. =end
  56. def self.sessions
  57. path = "#{@path}/"
  58. # just make sure that spool path exists
  59. if !File.exist?(path)
  60. FileUtils.mkpath path
  61. end
  62. data = []
  63. Dir.foreach(path) do |entry|
  64. next if entry == '.'
  65. next if entry == '..'
  66. next if entry == 'tmp'
  67. next if entry == 'spool'
  68. data.push entry.to_s
  69. end
  70. data
  71. end
  72. =begin
  73. list of all session
  74. Sessions.session_exists?(client_id)
  75. returns
  76. true|false
  77. =end
  78. def self.session_exists?(client_id)
  79. session_dir = "#{@path}/#{client_id}"
  80. return false if !File.exist?(session_dir)
  81. session_file = "#{session_dir}/session"
  82. return false if !File.exist?(session_file)
  83. true
  84. end
  85. =begin
  86. list of all session with data
  87. client_ids_with_data = Sessions.list
  88. returns
  89. {
  90. '4711' => {
  91. user: {
  92. 'id' => 123,
  93. },
  94. meta: {
  95. type: 'websocket',
  96. last_ping: time_of_last_ping,
  97. }
  98. },
  99. '4712' => {
  100. user: {
  101. 'id' => 124,
  102. },
  103. meta: {
  104. type: 'ajax',
  105. last_ping: time_of_last_ping,
  106. }
  107. },
  108. }
  109. =end
  110. def self.list
  111. client_ids = sessions
  112. session_list = {}
  113. client_ids.each do |client_id|
  114. data = get(client_id)
  115. next if !data
  116. session_list[client_id] = data
  117. end
  118. session_list
  119. end
  120. =begin
  121. destroy session
  122. Sessions.destroy(client_id)
  123. returns
  124. true|false
  125. =end
  126. def self.destroy(client_id)
  127. path = "#{@path}/#{client_id}"
  128. FileUtils.rm_rf path
  129. end
  130. =begin
  131. destroy idle session
  132. list_of_client_ids = Sessions.destroy_idle_sessions
  133. returns
  134. ['4711', '4712']
  135. =end
  136. def self.destroy_idle_sessions(idle_time_in_sec = 240)
  137. list_of_closed_sessions = []
  138. clients = Sessions.list
  139. clients.each do |client_id, client|
  140. if !client[:meta] || !client[:meta][:last_ping] || ( client[:meta][:last_ping].to_i + idle_time_in_sec ) < Time.now.utc.to_i
  141. list_of_closed_sessions.push client_id
  142. Sessions.destroy(client_id)
  143. end
  144. end
  145. list_of_closed_sessions
  146. end
  147. =begin
  148. touch session
  149. Sessions.touch(client_id)
  150. returns
  151. true|false
  152. =end
  153. def self.touch(client_id)
  154. data = get(client_id)
  155. return false if !data
  156. path = "#{@path}/#{client_id}"
  157. data[:meta][:last_ping] = Time.now.utc.to_i
  158. File.open("#{path}/session", 'wb' ) do |file|
  159. file.flock(File::LOCK_EX)
  160. file.write data.to_json
  161. file.flock(File::LOCK_UN)
  162. end
  163. true
  164. end
  165. =begin
  166. get session data
  167. data = Sessions.get(client_id)
  168. returns
  169. {
  170. user: {
  171. 'id' => 123,
  172. },
  173. meta: {
  174. type: 'websocket',
  175. last_ping: time_of_last_ping,
  176. }
  177. }
  178. =end
  179. def self.get(client_id)
  180. session_dir = "#{@path}/#{client_id}"
  181. session_file = "#{session_dir}/session"
  182. data = nil
  183. # if no session dir exists, session got destoried
  184. if !File.exist?(session_dir)
  185. destroy(client_id)
  186. log('debug', "missing session directory #{session_dir} for '#{client_id}', remove session.")
  187. return
  188. end
  189. # if only session file is missing, then it's an error behavior
  190. if !File.exist?(session_file)
  191. destroy(client_id)
  192. log('error', "missing session file for '#{client_id}', remove session.")
  193. return
  194. end
  195. begin
  196. File.open(session_file, 'rb') do |file|
  197. file.flock(File::LOCK_SH)
  198. all = file.read
  199. file.flock(File::LOCK_UN)
  200. data_json = JSON.parse(all)
  201. if data_json
  202. data = symbolize_keys(data_json)
  203. data[:user] = data_json['user'] # for compat. reasons
  204. end
  205. end
  206. rescue => e
  207. log('error', e.inspect)
  208. destroy(client_id)
  209. log('error', "error in reading/parsing session file '#{session_file}', remove session.")
  210. return
  211. end
  212. data
  213. end
  214. =begin
  215. send message to client
  216. Sessions.send(client_id_of_recipient, data)
  217. returns
  218. true|false
  219. =end
  220. def self.send(client_id, data)
  221. path = "#{@path}/#{client_id}/"
  222. filename = "send-#{Time.now.utc.to_f}"
  223. location = "#{path}#{filename}"
  224. check = true
  225. count = 0
  226. while check
  227. if File.exist?(location)
  228. count += 1
  229. location = "#{path}#{filename}-#{count}"
  230. else
  231. check = false
  232. end
  233. end
  234. return false if !File.directory? path
  235. begin
  236. File.open(location, 'wb') do |file|
  237. file.flock(File::LOCK_EX)
  238. file.write data.to_json
  239. file.flock(File::LOCK_UN)
  240. file.close
  241. end
  242. rescue => e
  243. log('error', e.inspect)
  244. log('error', "error in writing message file '#{location}'")
  245. return false
  246. end
  247. true
  248. end
  249. =begin
  250. send message to recipient client
  251. Sessions.send_to(user_id, data)
  252. e. g.
  253. Sessions.send_to(user_id, {
  254. event: 'session:takeover',
  255. data: {
  256. taskbar_id: 12312
  257. },
  258. })
  259. returns
  260. true|false
  261. =end
  262. def self.send_to(user_id, data)
  263. # list all current clients
  264. client_list = sessions
  265. client_list.each do |client_id|
  266. session = Sessions.get(client_id)
  267. next if !session
  268. next if !session[:user]
  269. next if !session[:user]['id']
  270. next if session[:user]['id'].to_i != user_id.to_i
  271. Sessions.send(client_id, data)
  272. end
  273. true
  274. end
  275. =begin
  276. send message to all authenticated client
  277. Sessions.broadcast(data)
  278. returns
  279. [array_with_client_ids_of_recipients]
  280. broadcase also to not authenticated client
  281. Sessions.broadcast(data, 'public') # public|authenticated
  282. broadcase also not to sender
  283. Sessions.broadcast(data, 'public', sender_user_id)
  284. =end
  285. def self.broadcast(data, recipient = 'authenticated', sender_user_id = nil)
  286. # list all current clients
  287. recipients = []
  288. client_list = sessions
  289. client_list.each do |client_id|
  290. session = Sessions.get(client_id)
  291. next if !session
  292. if recipient != 'public'
  293. next if session[:user].blank?
  294. next if session[:user]['id'].blank?
  295. end
  296. next if sender_user_id && session[:user] && session[:user]['id'] && session[:user]['id'].to_i == sender_user_id.to_i
  297. Sessions.send(client_id, data)
  298. recipients.push client_id
  299. end
  300. recipients
  301. end
  302. =begin
  303. get messages for client
  304. messages = Sessions.queue(client_id_of_recipient)
  305. returns
  306. [
  307. {
  308. key1 => 'some data of message 1',
  309. key2 => 'some data of message 1',
  310. },
  311. {
  312. key1 => 'some data of message 2',
  313. key2 => 'some data of message 2',
  314. },
  315. ]
  316. =end
  317. def self.queue(client_id)
  318. path = "#{@path}/#{client_id}/"
  319. data = []
  320. files = []
  321. Dir.foreach(path) do |entry|
  322. next if entry == '.'
  323. next if entry == '..'
  324. files.push entry
  325. end
  326. files.sort.each do |entry|
  327. next if !entry.start_with?('send')
  328. message = Sessions.queue_file_read(path, entry)
  329. next if !message
  330. data.push message
  331. end
  332. data
  333. end
  334. def self.queue_file_read(path, filename)
  335. location = "#{path}#{filename}"
  336. message = ''
  337. File.open(location, 'rb') do |file|
  338. file.flock(File::LOCK_EX)
  339. message = file.read
  340. file.flock(File::LOCK_UN)
  341. end
  342. File.delete(location)
  343. return if message.blank?
  344. begin
  345. JSON.parse(message)
  346. rescue => e
  347. log('error', "can't parse queue message: #{message}, #{e.inspect}")
  348. nil
  349. end
  350. end
  351. =begin
  352. remove all session and spool messages
  353. Sessions.cleanup
  354. =end
  355. def self.cleanup
  356. return true if !File.exist?(@path)
  357. FileUtils.rm_rf @path
  358. true
  359. end
  360. =begin
  361. create spool messages
  362. Sessions.spool_create(some: 'data')
  363. =end
  364. def self.spool_create(data)
  365. msg = JSON.generate(data)
  366. path = "#{@path}/spool/"
  367. FileUtils.mkpath path
  368. data = {
  369. msg: msg,
  370. timestamp: Time.now.utc.to_i,
  371. }
  372. file_path = "#{path}/#{Time.now.utc.to_f}-#{rand(99_999)}"
  373. File.open(file_path, 'wb') do |file|
  374. file.flock(File::LOCK_EX)
  375. file.write data.to_json
  376. file.flock(File::LOCK_UN)
  377. end
  378. end
  379. =begin
  380. get spool messages
  381. Sessions.spool_list(junger_then, for_user_id)
  382. =end
  383. def self.spool_list(timestamp, current_user_id)
  384. path = "#{@path}/spool/"
  385. FileUtils.mkpath path
  386. data = []
  387. to_delete = []
  388. files = []
  389. Dir.foreach(path) do |entry|
  390. next if entry == '.'
  391. next if entry == '..'
  392. files.push entry
  393. end
  394. files.sort.each do |entry|
  395. filename = "#{path}/#{entry}"
  396. next if !File.exist?(filename)
  397. File.open(filename, 'rb') do |file|
  398. file.flock(File::LOCK_SH)
  399. message = file.read
  400. file.flock(File::LOCK_UN)
  401. message_parsed = {}
  402. begin
  403. spool = JSON.parse(message)
  404. message_parsed = JSON.parse(spool['msg'])
  405. rescue => e
  406. log('error', "can't parse spool message: #{message}, #{e.inspect}")
  407. to_delete.push "#{path}/#{entry}"
  408. next
  409. end
  410. # ignore message older then 48h
  411. if spool['timestamp'] + (2 * 86_400) < Time.now.utc.to_i
  412. to_delete.push "#{path}/#{entry}"
  413. next
  414. end
  415. # add spool attribute to push spool info to clients
  416. message_parsed['spool'] = true
  417. # only send not already older messages
  418. if !timestamp || timestamp < spool['timestamp']
  419. # spool to recipient list
  420. if message_parsed['recipient'] && message_parsed['recipient']['user_id']
  421. message_parsed['recipient']['user_id'].each do |user_id|
  422. next if current_user_id != user_id
  423. message = message_parsed
  424. if message_parsed['event'] == 'broadcast'
  425. message = message_parsed['data']
  426. end
  427. item = {
  428. type: 'direct',
  429. message: message,
  430. }
  431. data.push item
  432. end
  433. # spool to every client
  434. else
  435. message = message_parsed
  436. if message_parsed['event'] == 'broadcast'
  437. message = message_parsed['data']
  438. end
  439. item = {
  440. type: 'broadcast',
  441. message: message,
  442. }
  443. data.push item
  444. end
  445. end
  446. end
  447. end
  448. to_delete.each do |file|
  449. File.delete(file)
  450. end
  451. data
  452. end
  453. =begin
  454. delete spool messages
  455. Sessions.spool_delete
  456. =end
  457. def self.spool_delete
  458. path = "#{@path}/spool/"
  459. FileUtils.rm_rf path
  460. end
  461. def self.jobs(node_id = nil)
  462. # just make sure that spool path exists
  463. if !File.exist?(@path)
  464. FileUtils.mkpath(@path)
  465. end
  466. # dispatch sessions
  467. if node_id.blank? && ENV['ZAMMAD_SESSION_JOBS_CONCURRENT'].to_i.positive?
  468. previous_nodes_sessions = Sessions::Node.stats
  469. if previous_nodes_sessions.present?
  470. log('info', "Cleaning up previous Sessions::Node sessions: #{previous_nodes_sessions}")
  471. Sessions::Node.cleanup
  472. end
  473. dispatcher_pid = Process.pid
  474. node_count = ENV['ZAMMAD_SESSION_JOBS_CONCURRENT'].to_i
  475. node_pids = []
  476. (1..node_count).each do |worker_node_id|
  477. node_pids << fork do
  478. title = "Zammad Session Jobs Node ##{worker_node_id}: dispatch_pid:#{dispatcher_pid} -> worker_pid:#{Process.pid}"
  479. $PROGRAM_NAME = title
  480. log('info', "#{title} started.")
  481. ::Sessions.jobs(worker_node_id)
  482. sleep node_count
  483. rescue Interrupt
  484. nil
  485. end
  486. end
  487. Signal.trap 'SIGTERM' do
  488. node_pids.each do |node_pid|
  489. Process.kill 'TERM', node_pid
  490. end
  491. Process.waitall
  492. raise SignalException, 'SIGTERM'
  493. end
  494. # dispatch client_ids to nodes
  495. loop do
  496. # nodes
  497. nodes_stats = Sessions::Node.stats
  498. client_ids = sessions
  499. client_ids.each do |client_id|
  500. # ask nodes for nodes
  501. next if nodes_stats[client_id]
  502. # assign to node
  503. Sessions::Node.session_assigne(client_id)
  504. sleep 1
  505. end
  506. sleep 1
  507. end
  508. end
  509. Thread.abort_on_exception = true
  510. loop do
  511. if node_id
  512. # register node
  513. Sessions::Node.register(node_id)
  514. # watch for assigned sessions
  515. client_ids = Sessions::Node.sessions_by(node_id)
  516. else
  517. client_ids = sessions
  518. end
  519. client_ids.each do |client_id|
  520. # connection already open, ignore
  521. next if @@client_threads[client_id]
  522. # get current user
  523. session_data = Sessions.get(client_id)
  524. next if session_data.blank?
  525. next if session_data[:user].blank?
  526. next if session_data[:user]['id'].blank?
  527. user = User.lookup(id: session_data[:user]['id'])
  528. next if user.blank?
  529. # start client thread
  530. next if @@client_threads[client_id].present?
  531. @@client_threads[client_id] = true
  532. @@client_threads[client_id] = Thread.new do
  533. thread_client(client_id, 0, Time.now.utc, node_id)
  534. @@client_threads[client_id] = nil
  535. log('debug', "close client (#{client_id}) thread")
  536. if ActiveRecord::Base.connection.owner == Thread.current
  537. ActiveRecord::Base.connection.close
  538. end
  539. end
  540. sleep 1
  541. end
  542. sleep 1
  543. end
  544. end
  545. =begin
  546. check if thread for client_id is running
  547. Sessions.thread_client_exists?(client_id)
  548. returns
  549. thread
  550. =end
  551. def self.thread_client_exists?(client_id)
  552. @@client_threads[client_id]
  553. end
  554. =begin
  555. start client for browser
  556. Sessions.thread_client(client_id)
  557. returns
  558. thread
  559. =end
  560. def self.thread_client(client_id, try_count = 0, try_run_time = Time.now.utc, node_id)
  561. log('debug', "LOOP #{node_id}.#{client_id} - #{try_count}")
  562. begin
  563. Sessions::Client.new(client_id, node_id)
  564. rescue => e
  565. log('error', "thread_client #{client_id} exited with error #{e.inspect}")
  566. log('error', e.backtrace.join("\n ") )
  567. sleep 10
  568. begin
  569. ActiveRecord::Base.connection_pool.release_connection
  570. rescue => e
  571. log('error', "Can't reconnect to database #{e.inspect}")
  572. end
  573. try_run_max = 10
  574. try_count += 1
  575. # reset error counter if to old
  576. if try_run_time + ( 60 * 5 ) < Time.now.utc
  577. try_count = 0
  578. end
  579. try_run_time = Time.now.utc
  580. # restart job again
  581. if try_run_max > try_count
  582. thread_client(client_id, try_count, try_run_time, node_id)
  583. end
  584. raise "STOP thread_client for client #{node_id}.#{client_id} after #{try_run_max} tries"
  585. end
  586. log('debug', "/LOOP #{node_id}.#{client_id} - #{try_count}")
  587. end
  588. def self.symbolize_keys(hash)
  589. hash.each_with_object({}) do |(key, value), result|
  590. new_key = case key
  591. when String then key.to_sym
  592. else key
  593. end
  594. new_value = case value
  595. when Hash then symbolize_keys(value)
  596. else value
  597. end
  598. result[new_key] = new_value
  599. end
  600. end
  601. # we use it in rails and non rails context
  602. def self.log(level, message)
  603. if defined?(Rails)
  604. case level
  605. when 'debug'
  606. Rails.logger.debug { message }
  607. when 'info'
  608. Rails.logger.info message
  609. else
  610. Rails.logger.error message
  611. end
  612. return
  613. end
  614. puts "#{Time.now.utc.iso8601}:#{level} #{message}" # rubocop:disable Rails/Output
  615. end
  616. end