sessions.rb 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572
  1. require 'json'
  2. require 'session_helper'
  3. module Sessions
  4. # get application root directory
  5. @root = Dir.pwd.to_s
  6. if !@root || @root.empty? || @root == '/'
  7. @root = Rails.root
  8. end
  9. # get working directories
  10. @path = @root + '/tmp/websocket'
  11. @pid = @root + '/tmp/pids/sessionworker.pid'
  12. # create global vars for threads
  13. @@client_threads = {}
  14. =begin
  15. start new session
  16. Sessions.create( client_id, session_data, { :type => 'websocket' } )
  17. returns
  18. true|false
  19. =end
  20. def self.create( client_id, session, meta )
  21. path = @path + '/' + client_id.to_s
  22. FileUtils.mkpath path
  23. meta[:last_ping] = Time.new.to_i.to_s
  24. File.open( path + '/session', 'wb' ) { |file|
  25. data = {
  26. :user => {
  27. :id => session['id'],
  28. },
  29. :meta => meta,
  30. }
  31. file.write Marshal.dump(data)
  32. }
  33. # send update to browser
  34. if session['id']
  35. self.send( client_id, {
  36. :event => 'ws:login',
  37. :data => { :success => true },
  38. })
  39. end
  40. end
  41. =begin
  42. list of all session
  43. client_ids = Sessions.sessions
  44. returns
  45. ['4711', '4712']
  46. =end
  47. def self.sessions
  48. path = @path + '/'
  49. # just make sure that spool path exists
  50. if !File::exists?( path )
  51. FileUtils.mkpath path
  52. end
  53. data = []
  54. Dir.foreach( path ) do |entry|
  55. next if entry == '.' || entry == '..' || entry == 'spool'
  56. data.push entry.to_s
  57. end
  58. data
  59. end
  60. =begin
  61. list of all session
  62. Sessions.session_exists?(client_id)
  63. returns
  64. true|false
  65. =end
  66. def self.session_exists?(client_id)
  67. client_ids = self.sessions
  68. client_ids.include? client_id.to_s
  69. end
  70. =begin
  71. list of all session with data
  72. client_ids_with_data = Sessions.list
  73. returns
  74. {
  75. '4711' => {
  76. :user => {
  77. :id => 123,
  78. },
  79. :meta => {
  80. :type => 'websocket',
  81. :last_ping => time_of_last_ping,
  82. }
  83. },
  84. '4712' => {
  85. :user => {
  86. :id => 124,
  87. },
  88. :meta => {
  89. :type => 'ajax',
  90. :last_ping => time_of_last_ping,
  91. }
  92. },
  93. }
  94. =end
  95. def self.list
  96. client_ids = self.sessions
  97. session_list = {}
  98. client_ids.each { |client_id|
  99. data = self.get(client_id)
  100. next if !data
  101. session_list[client_id] = data
  102. }
  103. session_list
  104. end
  105. =begin
  106. destroy session
  107. Sessions.destory(client_id)
  108. returns
  109. true|false
  110. =end
  111. def self.destory( client_id )
  112. path = @path + '/' + client_id.to_s
  113. FileUtils.rm_rf path
  114. end
  115. =begin
  116. destroy idle session
  117. list_of_client_ids = Sessions.destory_idle_sessions
  118. returns
  119. ['4711', '4712']
  120. =end
  121. def self.destory_idle_sessions(idle_time_in_min = 4)
  122. list_of_closed_sessions = []
  123. clients = Sessions.list
  124. clients.each { |client_id, client|
  125. if !client[:meta] || !client[:meta][:last_ping] || ( client[:meta][:last_ping].to_i + ( 60 * idle_time_in_min ) ) < Time.now.to_i
  126. list_of_closed_sessions.push client_id
  127. Sessions.destory( client_id )
  128. end
  129. }
  130. list_of_closed_sessions
  131. end
  132. =begin
  133. touch session
  134. Sessions.touch(client_id)
  135. returns
  136. true|false
  137. =end
  138. def self.touch( client_id )
  139. data = self.get(client_id)
  140. return false if !data
  141. path = @path + '/' + client_id.to_s
  142. data[:meta][:last_ping] = Time.new.to_i.to_s
  143. File.open( path + '/session', 'wb' ) { |file|
  144. file.write Marshal.dump(data)
  145. }
  146. true
  147. end
  148. =begin
  149. get session data
  150. data = Sessions.get(client_id)
  151. returns
  152. {
  153. :user => {
  154. :id => 123,
  155. },
  156. :meta => {
  157. :type => 'websocket',
  158. :last_ping => time_of_last_ping,
  159. }
  160. }
  161. =end
  162. def self.get( client_id )
  163. session_dir = @path + '/' + client_id.to_s
  164. session_file = session_dir + '/session'
  165. data = nil
  166. if !File.exist? session_file
  167. self.destory(client_id)
  168. puts "ERROR: missing session file for '#{client_id.to_s}', remove session."
  169. return
  170. end
  171. begin
  172. File.open( session_file, 'rb' ) { |file|
  173. file.flock( File::LOCK_EX )
  174. all = file.read
  175. file.flock( File::LOCK_UN )
  176. data = Marshal.load( all )
  177. }
  178. rescue Exception => e
  179. puts e.inspect
  180. self.destory(client_id)
  181. puts "ERROR: reading session file '#{session_file}', remove session."
  182. return
  183. end
  184. data
  185. end
  186. =begin
  187. send message to client
  188. Sessions.send(client_id_of_recipient, data)
  189. returns
  190. true|false
  191. =end
  192. def self.send( client_id, data )
  193. path = @path + '/' + client_id.to_s + '/'
  194. filename = 'send-' + Time.new().to_f.to_s# + '-' + rand(99999999).to_s
  195. check = true
  196. count = 0
  197. while check
  198. if File::exists?( path + filename )
  199. count += 1
  200. filename = filename + '-' + count
  201. else
  202. check = false
  203. end
  204. end
  205. return false if !File.directory? path
  206. File.open( path + 'a-' + filename, 'wb' ) { |file|
  207. file.flock( File::LOCK_EX )
  208. file.write data.to_json
  209. file.flock( File::LOCK_UN )
  210. file.close
  211. }
  212. return false if !File.exists?( path + 'a-' + filename )
  213. FileUtils.mv( path + 'a-' + filename, path + filename )
  214. true
  215. end
  216. =begin
  217. send message to recipient client
  218. Sessions.send_to(user_id, data)
  219. returns
  220. true|false
  221. =end
  222. def self.send_to( user_id, data )
  223. # list all current clients
  224. client_list = self.sessions
  225. client_list.each {|client_id|
  226. session = Sessions.get(client_id)
  227. next if !session
  228. next if !session[:user]
  229. next if !session[:user][:id]
  230. next if session[:user][:id].to_i != user_id.to_i
  231. Sessions.send( client_id, data )
  232. }
  233. true
  234. end
  235. =begin
  236. send message to all client
  237. Sessions.broadcast(data)
  238. returns
  239. true|false
  240. =end
  241. def self.broadcast( data )
  242. # list all current clients
  243. client_list = self.sessions
  244. client_list.each {|client_id|
  245. Sessions.send( client_id, data )
  246. }
  247. true
  248. end
  249. =begin
  250. get messages for client
  251. messages = Sessions.queue(client_id_of_recipient)
  252. returns
  253. [
  254. {
  255. key1 => 'some data of message 1',
  256. key2 => 'some data of message 1',
  257. },
  258. {
  259. key1 => 'some data of message 2',
  260. key2 => 'some data of message 2',
  261. },
  262. ]
  263. =end
  264. def self.queue( client_id )
  265. path = @path + '/' + client_id.to_s + '/'
  266. data = []
  267. files = []
  268. Dir.foreach( path ) {|entry|
  269. next if entry == '.' || entry == '..'
  270. files.push entry
  271. }
  272. files.sort.each {|entry|
  273. filename = path + '/' + entry
  274. if /^send/.match( entry )
  275. data.push Sessions.queue_file_read( path, entry )
  276. end
  277. }
  278. data
  279. end
  280. def self.queue_file_read( path, filename )
  281. file_old = path + filename
  282. file_new = path + 'a-' + filename
  283. FileUtils.mv( file_old, file_new )
  284. data = nil
  285. all = ''
  286. File.open( file_new, 'rb' ) { |file|
  287. all = file.read
  288. }
  289. File.delete( file_new )
  290. JSON.parse( all )
  291. end
  292. def self.spool_cleanup
  293. path = @path + '/spool/'
  294. FileUtils.rm_rf path
  295. end
  296. def self.spool_create( msg )
  297. path = @path + '/spool/'
  298. FileUtils.mkpath path
  299. file = Time.new.to_f.to_s + '-' + rand(99999).to_s
  300. File.open( path + '/' + file , 'wb' ) { |file|
  301. data = {
  302. :msg => msg,
  303. :timestamp => Time.now.to_i,
  304. }
  305. file.write data.to_json
  306. }
  307. end
  308. def self.spool_list( timestamp, current_user_id )
  309. path = @path + '/spool/'
  310. FileUtils.mkpath path
  311. data = []
  312. to_delete = []
  313. files = []
  314. Dir.foreach( path ) {|entry|
  315. next if entry == '.' || entry == '..'
  316. files.push entry
  317. }
  318. files.sort.each {|entry|
  319. filename = path + '/' + entry
  320. next if !File::exists?( filename )
  321. File.open( filename, 'rb' ) { |file|
  322. all = file.read
  323. spool = JSON.parse( all )
  324. begin
  325. message_parsed = JSON.parse( spool['msg'] )
  326. rescue => e
  327. log 'error', "can't parse spool message: #{ message }, #{ e.inspect }"
  328. next
  329. end
  330. # ignore message older then 48h
  331. if spool['timestamp'] + (2 * 86400) < Time.now.to_i
  332. to_delete.push path + '/' + entry
  333. next
  334. end
  335. # add spool attribute to push spool info to clients
  336. message_parsed['spool'] = true
  337. # only send not already now messages
  338. if !timestamp || timestamp < spool['timestamp']
  339. # spool to recipient list
  340. if message_parsed['recipient'] && message_parsed['recipient']['user_id']
  341. message_parsed['recipient']['user_id'].each { |user_id|
  342. if current_user_id == user_id
  343. item = {
  344. :type => 'direct',
  345. :message => message_parsed,
  346. }
  347. data.push item
  348. end
  349. }
  350. # spool to every client
  351. else
  352. item = {
  353. :type => 'broadcast',
  354. :message => message_parsed,
  355. }
  356. data.push item
  357. end
  358. end
  359. }
  360. }
  361. to_delete.each {|file|
  362. File.delete(file)
  363. }
  364. return data
  365. end
  366. def self.jobs
  367. # just make sure that spool path exists
  368. if !File::exists?( @path )
  369. FileUtils.mkpath @path
  370. end
  371. Thread.abort_on_exception = true
  372. while true
  373. client_ids = self.sessions
  374. client_ids.each { |client_id|
  375. # connection already open, ignore
  376. next if @@client_threads[client_id]
  377. # get current user
  378. session_data = Sessions.get( client_id )
  379. next if !session_data
  380. next if !session_data[:user]
  381. next if !session_data[:user][:id]
  382. user = User.find( session_data[:user][:id] )
  383. next if !user
  384. # start client thread
  385. if !@@client_threads[client_id]
  386. @@client_threads[client_id] = true
  387. @@client_threads[client_id] = Thread.new {
  388. thread_client(client_id)
  389. @@client_threads[client_id] = nil
  390. puts "close client (#{client_id}) thread"
  391. }
  392. sleep 0.5
  393. end
  394. }
  395. # system settings
  396. sleep 0.5
  397. end
  398. end
  399. =begin
  400. check if thread for client_id is running
  401. Sessions.thread_client_exists?(client_id)
  402. returns
  403. thread
  404. =end
  405. def self.thread_client_exists?(client_id)
  406. @@client_threads[client_id]
  407. end
  408. =begin
  409. start client for browser
  410. Sessions.thread_client(client_id)
  411. returns
  412. thread
  413. =end
  414. def self.thread_client(client_id, try_count = 0, try_run_time = Time.now)
  415. puts "LOOP #{client_id} - #{try_count}"
  416. begin
  417. Sessions::Client.new(client_id)
  418. rescue => e
  419. puts "thread_client #{client_id} exited with error #{ e.inspect }"
  420. puts e.backtrace.join("\n ")
  421. sleep 10
  422. begin
  423. # ActiveRecord::Base.remove_connection
  424. # ActiveRecord::Base.connection_pool.reap
  425. ActiveRecord::Base.connection_pool.release_connection
  426. rescue => e
  427. puts "Can't reconnect to database #{ e.inspect }"
  428. end
  429. try_run_max = 10
  430. try_count += 1
  431. # reset error counter if to old
  432. if try_run_time + ( 60 * 5 ) < Time.now
  433. try_count = 0
  434. end
  435. try_run_time = Time.now
  436. # restart job again
  437. if try_run_max > try_count
  438. thread_client(client_id, try_count, try_run_time)
  439. else
  440. raise "STOP thread_client for client #{client_id} after #{try_run_max} tries"
  441. end
  442. end
  443. puts "/LOOP #{client_id} - #{try_count}"
  444. end
  445. end