sessions.rb 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593
  1. require 'json'
  2. require 'session_helper'
  3. module Sessions
  4. # get application root directory
  5. @root = Dir.pwd.to_s
  6. if !@root || @root.empty? || @root == '/'
  7. @root = Rails.root
  8. end
  9. # get working directories
  10. @path = @root + '/tmp/websocket'
  11. @pid = @root + '/tmp/pids/sessionworker.pid'
  12. # create global vars for threads
  13. @@client_threads = {}
  14. =begin
  15. start new session
  16. Sessions.create( client_id, session_data, { :type => 'websocket' } )
  17. returns
  18. true|false
  19. =end
  20. def self.create( client_id, session, meta )
  21. path = @path + '/' + client_id.to_s
  22. FileUtils.mkpath path
  23. meta[:last_ping] = Time.new.to_i.to_s
  24. File.open( path + '/session', 'wb' ) { |file|
  25. data = {
  26. user: session,
  27. meta: meta,
  28. }
  29. file.write data.to_json
  30. }
  31. # send update to browser
  32. if session && session['id']
  33. self.send(
  34. client_id,
  35. {
  36. event: 'ws:login',
  37. data: { success: true },
  38. }
  39. )
  40. end
  41. end
  42. =begin
  43. list of all session
  44. client_ids = Sessions.sessions
  45. returns
  46. ['4711', '4712']
  47. =end
  48. def self.sessions
  49. path = @path + '/'
  50. # just make sure that spool path exists
  51. if !File::exist?( path )
  52. FileUtils.mkpath path
  53. end
  54. data = []
  55. Dir.foreach( path ) do |entry|
  56. next if entry == '.' || entry == '..' || entry == 'spool'
  57. data.push entry.to_s
  58. end
  59. data
  60. end
  61. =begin
  62. list of all session
  63. Sessions.session_exists?(client_id)
  64. returns
  65. true|false
  66. =end
  67. def self.session_exists?(client_id)
  68. client_ids = self.sessions
  69. client_ids.include? client_id.to_s
  70. end
  71. =begin
  72. list of all session with data
  73. client_ids_with_data = Sessions.list
  74. returns
  75. {
  76. '4711' => {
  77. :user => {
  78. 'id' => 123,
  79. },
  80. :meta => {
  81. :type => 'websocket',
  82. :last_ping => time_of_last_ping,
  83. }
  84. },
  85. '4712' => {
  86. :user => {
  87. 'id' => 124,
  88. },
  89. :meta => {
  90. :type => 'ajax',
  91. :last_ping => time_of_last_ping,
  92. }
  93. },
  94. }
  95. =end
  96. def self.list
  97. client_ids = self.sessions
  98. session_list = {}
  99. client_ids.each { |client_id|
  100. data = self.get(client_id)
  101. next if !data
  102. session_list[client_id] = data
  103. }
  104. session_list
  105. end
  106. =begin
  107. destroy session
  108. Sessions.destory(client_id)
  109. returns
  110. true|false
  111. =end
  112. def self.destory( client_id )
  113. path = @path + '/' + client_id.to_s
  114. FileUtils.rm_rf path
  115. end
  116. =begin
  117. destroy idle session
  118. list_of_client_ids = Sessions.destory_idle_sessions
  119. returns
  120. ['4711', '4712']
  121. =end
  122. def self.destory_idle_sessions(idle_time_in_sec = 240)
  123. list_of_closed_sessions = []
  124. clients = Sessions.list
  125. clients.each { |client_id, client|
  126. if !client[:meta] || !client[:meta][:last_ping] || ( client[:meta][:last_ping].to_i + idle_time_in_sec ) < Time.now.to_i
  127. list_of_closed_sessions.push client_id
  128. Sessions.destory( client_id )
  129. end
  130. }
  131. list_of_closed_sessions
  132. end
  133. =begin
  134. touch session
  135. Sessions.touch(client_id)
  136. returns
  137. true|false
  138. =end
  139. def self.touch( client_id )
  140. data = self.get(client_id)
  141. return false if !data
  142. path = @path + '/' + client_id.to_s
  143. data[:meta][:last_ping] = Time.new.to_i.to_s
  144. File.open( path + '/session', 'wb' ) { |file|
  145. file.write data.to_json
  146. }
  147. true
  148. end
  149. =begin
  150. get session data
  151. data = Sessions.get(client_id)
  152. returns
  153. {
  154. :user => {
  155. 'id' => 123,
  156. },
  157. :meta => {
  158. :type => 'websocket',
  159. :last_ping => time_of_last_ping,
  160. }
  161. }
  162. =end
  163. def self.get( client_id )
  164. session_dir = @path + '/' + client_id.to_s
  165. session_file = session_dir + '/session'
  166. data = nil
  167. if !File.exist? session_file
  168. self.destory(client_id)
  169. puts "ERROR: missing session file for '#{client_id.to_s}', remove session."
  170. return
  171. end
  172. begin
  173. File.open( session_file, 'rb' ) { |file|
  174. file.flock( File::LOCK_EX )
  175. all = file.read
  176. file.flock( File::LOCK_UN )
  177. dataJSON = JSON.parse( all )
  178. if dataJSON
  179. data = self.symbolize_keys(dataJSON)
  180. data[:user] = dataJSON['user'] # for compat. reasons
  181. end
  182. }
  183. rescue Exception => e
  184. puts e.inspect
  185. self.destory(client_id)
  186. puts "ERROR: reading session file '#{session_file}', remove session."
  187. return
  188. end
  189. data
  190. end
  191. =begin
  192. send message to client
  193. Sessions.send(client_id_of_recipient, data)
  194. returns
  195. true|false
  196. =end
  197. def self.send( client_id, data )
  198. path = @path + '/' + client_id.to_s + '/'
  199. filename = 'send-' + Time.new().to_f.to_s# + '-' + rand(99999999).to_s
  200. check = true
  201. count = 0
  202. while check
  203. if File::exist?( path + filename )
  204. count += 1
  205. filename = filename + '-' + count
  206. else
  207. check = false
  208. end
  209. end
  210. return false if !File.directory? path
  211. File.open( path + 'a-' + filename, 'wb' ) { |file|
  212. file.flock( File::LOCK_EX )
  213. file.write data.to_json
  214. file.flock( File::LOCK_UN )
  215. file.close
  216. }
  217. return false if !File.exist?( path + 'a-' + filename )
  218. FileUtils.mv( path + 'a-' + filename, path + filename )
  219. true
  220. end
  221. =begin
  222. send message to recipient client
  223. Sessions.send_to(user_id, data)
  224. returns
  225. true|false
  226. =end
  227. def self.send_to( user_id, data )
  228. # list all current clients
  229. client_list = self.sessions
  230. client_list.each {|client_id|
  231. session = Sessions.get(client_id)
  232. next if !session
  233. next if !session[:user]
  234. next if !session[:user]['id']
  235. next if session[:user]['id'].to_i != user_id.to_i
  236. Sessions.send( client_id, data )
  237. }
  238. true
  239. end
  240. =begin
  241. send message to all client
  242. Sessions.broadcast(data)
  243. returns
  244. true|false
  245. =end
  246. def self.broadcast( data )
  247. # list all current clients
  248. client_list = self.sessions
  249. client_list.each {|client_id|
  250. Sessions.send( client_id, data )
  251. }
  252. true
  253. end
  254. =begin
  255. get messages for client
  256. messages = Sessions.queue(client_id_of_recipient)
  257. returns
  258. [
  259. {
  260. key1 => 'some data of message 1',
  261. key2 => 'some data of message 1',
  262. },
  263. {
  264. key1 => 'some data of message 2',
  265. key2 => 'some data of message 2',
  266. },
  267. ]
  268. =end
  269. def self.queue( client_id )
  270. path = @path + '/' + client_id.to_s + '/'
  271. data = []
  272. files = []
  273. Dir.foreach( path ) {|entry|
  274. next if entry == '.' || entry == '..'
  275. files.push entry
  276. }
  277. files.sort.each {|entry|
  278. filename = path + '/' + entry
  279. if /^send/.match( entry )
  280. data.push Sessions.queue_file_read( path, entry )
  281. end
  282. }
  283. data
  284. end
  285. def self.queue_file_read( path, filename )
  286. file_old = path + filename
  287. file_new = path + 'a-' + filename
  288. FileUtils.mv( file_old, file_new )
  289. data = nil
  290. all = ''
  291. File.open( file_new, 'rb' ) { |file|
  292. all = file.read
  293. }
  294. File.delete( file_new )
  295. JSON.parse( all )
  296. end
  297. def self.spool_cleanup
  298. path = @path + '/spool/'
  299. FileUtils.rm_rf path
  300. end
  301. def self.spool_create( msg )
  302. path = @path + '/spool/'
  303. FileUtils.mkpath path
  304. file = Time.new.to_f.to_s + '-' + rand(99_999).to_s
  305. File.open( path + '/' + file, 'wb' ) { |file|
  306. data = {
  307. msg: msg,
  308. timestamp: Time.now.to_i,
  309. }
  310. file.write data.to_json
  311. }
  312. end
  313. def self.spool_list( timestamp, current_user_id )
  314. path = @path + '/spool/'
  315. FileUtils.mkpath path
  316. data = []
  317. to_delete = []
  318. files = []
  319. Dir.foreach( path ) {|entry|
  320. next if entry == '.' || entry == '..'
  321. files.push entry
  322. }
  323. files.sort.each {|entry|
  324. filename = path + '/' + entry
  325. next if !File::exist?( filename )
  326. File.open( filename, 'rb' ) { |file|
  327. all = file.read
  328. spool = JSON.parse( all )
  329. begin
  330. message_parsed = JSON.parse( spool['msg'] )
  331. rescue => e
  332. log 'error', "can't parse spool message: #{ message }, #{ e.inspect }"
  333. next
  334. end
  335. # ignore message older then 48h
  336. if spool['timestamp'] + (2 * 86_400) < Time.now.to_i
  337. to_delete.push path + '/' + entry
  338. next
  339. end
  340. # add spool attribute to push spool info to clients
  341. message_parsed['spool'] = true
  342. # only send not already now messages
  343. if !timestamp || timestamp < spool['timestamp']
  344. # spool to recipient list
  345. if message_parsed['recipient'] && message_parsed['recipient']['user_id']
  346. message_parsed['recipient']['user_id'].each { |user_id|
  347. if current_user_id == user_id
  348. item = {
  349. type: 'direct',
  350. message: message_parsed,
  351. }
  352. data.push item
  353. end
  354. }
  355. # spool to every client
  356. else
  357. item = {
  358. type: 'broadcast',
  359. message: message_parsed,
  360. }
  361. data.push item
  362. end
  363. end
  364. }
  365. }
  366. to_delete.each {|file|
  367. File.delete(file)
  368. }
  369. data
  370. end
  371. def self.jobs
  372. # just make sure that spool path exists
  373. if !File::exist?( @path )
  374. FileUtils.mkpath @path
  375. end
  376. Thread.abort_on_exception = true
  377. while true
  378. client_ids = self.sessions
  379. client_ids.each { |client_id|
  380. # connection already open, ignore
  381. next if @@client_threads[client_id]
  382. # get current user
  383. session_data = Sessions.get( client_id )
  384. next if !session_data
  385. next if !session_data[:user]
  386. next if !session_data[:user]['id']
  387. user = User.lookup( id: session_data[:user]['id'] )
  388. next if !user
  389. # start client thread
  390. if !@@client_threads[client_id]
  391. @@client_threads[client_id] = true
  392. @@client_threads[client_id] = Thread.new {
  393. thread_client(client_id)
  394. @@client_threads[client_id] = nil
  395. puts "close client (#{client_id}) thread"
  396. ActiveRecord::Base.connection.close
  397. }
  398. sleep 0.5
  399. end
  400. }
  401. # system settings
  402. sleep 0.5
  403. end
  404. end
  405. =begin
  406. check if thread for client_id is running
  407. Sessions.thread_client_exists?(client_id)
  408. returns
  409. thread
  410. =end
  411. def self.thread_client_exists?(client_id)
  412. @@client_threads[client_id]
  413. end
  414. =begin
  415. start client for browser
  416. Sessions.thread_client(client_id)
  417. returns
  418. thread
  419. =end
  420. def self.thread_client(client_id, try_count = 0, try_run_time = Time.now)
  421. puts "LOOP #{client_id} - #{try_count}"
  422. begin
  423. Sessions::Client.new(client_id)
  424. rescue => e
  425. puts "thread_client #{client_id} exited with error #{ e.inspect }"
  426. puts e.backtrace.join("\n ")
  427. sleep 10
  428. begin
  429. # ActiveRecord::Base.remove_connection
  430. # ActiveRecord::Base.connection_pool.reap
  431. ActiveRecord::Base.connection_pool.release_connection
  432. rescue => e
  433. puts "Can't reconnect to database #{ e.inspect }"
  434. end
  435. try_run_max = 10
  436. try_count += 1
  437. # reset error counter if to old
  438. if try_run_time + ( 60 * 5 ) < Time.now
  439. try_count = 0
  440. end
  441. try_run_time = Time.now
  442. # restart job again
  443. if try_run_max > try_count
  444. thread_client(client_id, try_count, try_run_time)
  445. else
  446. raise "STOP thread_client for client #{client_id} after #{try_run_max} tries"
  447. end
  448. end
  449. puts "/LOOP #{client_id} - #{try_count}"
  450. end
  451. def self.symbolize_keys(hash)
  452. hash.inject({}) {|result, (key, value)|
  453. new_key = case key
  454. when String then key.to_sym
  455. else key
  456. end
  457. new_value = case value
  458. when Hash then symbolize_keys(value)
  459. else value
  460. end
  461. result[new_key] = new_value
  462. result
  463. }
  464. end
  465. end