sessions.rb 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569
  1. require 'json'
  2. require 'session_helper'
  3. module Sessions
  4. # get application root directory
  5. @root = Dir.pwd.to_s
  6. if !@root || @root.empty? || @root == '/'
  7. @root = Rails.root
  8. end
  9. # get working directories
  10. @path = @root + '/tmp/websocket'
  11. @pid = @root + '/tmp/pids/sessionworker.pid'
  12. # create global vars for threads
  13. @@client_threads = {}
  14. =begin
  15. start new session
  16. Sessions.create( client_id, session_data, { :type => 'websocket' } )
  17. returns
  18. true|false
  19. =end
  20. def self.create( client_id, session, meta )
  21. data = {}
  22. if session
  23. data[:id] = session[:id]
  24. end
  25. path = @path + '/' + client_id.to_s
  26. FileUtils.mkpath path
  27. meta[:last_ping] = Time.new.to_i.to_s
  28. File.open( path + '/session', 'wb' ) { |file|
  29. data = {
  30. :user => data,
  31. :meta => meta,
  32. }
  33. file.write Marshal.dump(data)
  34. }
  35. # send update to browser
  36. if session && session['id']
  37. self.send( client_id, {
  38. :event => 'ws:login',
  39. :data => { :success => true },
  40. })
  41. end
  42. end
  43. =begin
  44. list of all session
  45. client_ids = Sessions.sessions
  46. returns
  47. ['4711', '4712']
  48. =end
  49. def self.sessions
  50. path = @path + '/'
  51. # just make sure that spool path exists
  52. if !File::exists?( path )
  53. FileUtils.mkpath path
  54. end
  55. data = []
  56. Dir.foreach( path ) do |entry|
  57. next if entry == '.' || entry == '..' || entry == 'spool'
  58. data.push entry.to_s
  59. end
  60. data
  61. end
  62. =begin
  63. list of all session
  64. Sessions.session_exists?(client_id)
  65. returns
  66. true|false
  67. =end
  68. def self.session_exists?(client_id)
  69. client_ids = self.sessions
  70. client_ids.include? client_id.to_s
  71. end
  72. =begin
  73. list of all session with data
  74. client_ids_with_data = Sessions.list
  75. returns
  76. {
  77. '4711' => {
  78. :user => {
  79. :id => 123,
  80. },
  81. :meta => {
  82. :type => 'websocket',
  83. :last_ping => time_of_last_ping,
  84. }
  85. },
  86. '4712' => {
  87. :user => {
  88. :id => 124,
  89. },
  90. :meta => {
  91. :type => 'ajax',
  92. :last_ping => time_of_last_ping,
  93. }
  94. },
  95. }
  96. =end
  97. def self.list
  98. client_ids = self.sessions
  99. session_list = {}
  100. client_ids.each { |client_id|
  101. data = self.get(client_id)
  102. next if !data
  103. session_list[client_id] = data
  104. }
  105. session_list
  106. end
  107. =begin
  108. destroy session
  109. Sessions.destory?(client_id)
  110. returns
  111. true|false
  112. =end
  113. def self.destory( client_id )
  114. path = @path + '/' + client_id.to_s
  115. FileUtils.rm_rf path
  116. end
  117. =begin
  118. destroy idle session
  119. list_of_client_ids = Sessions.destory_idle_sessions
  120. returns
  121. ['4711', '4712']
  122. =end
  123. def self.destory_idle_sessions(idle_time_in_min = 4)
  124. list_of_closed_sessions = []
  125. clients = Sessions.list
  126. clients.each { |client_id, client|
  127. if ( client[:meta][:last_ping].to_i + ( 60 * idle_time_in_min ) ) < Time.now.to_i
  128. list_of_closed_sessions.push client_id
  129. Sessions.destory( client_id )
  130. end
  131. }
  132. list_of_closed_sessions
  133. end
  134. =begin
  135. touch session
  136. Sessions.touch(client_id)
  137. returns
  138. true|false
  139. =end
  140. def self.touch( client_id )
  141. data = self.get(client_id)
  142. return false if !data
  143. path = @path + '/' + client_id.to_s
  144. data[:meta][:last_ping] = Time.new.to_i.to_s
  145. File.open( path + '/session', 'wb' ) { |file|
  146. file.write Marshal.dump(data)
  147. }
  148. true
  149. end
  150. =begin
  151. get session data
  152. data = Sessions.get(client_id)
  153. returns
  154. {
  155. :user => {
  156. :id => 123,
  157. },
  158. :meta => {
  159. :type => 'websocket',
  160. :last_ping => time_of_last_ping,
  161. }
  162. }
  163. =end
  164. def self.get( client_id )
  165. session_file = @path + '/' + client_id.to_s + '/session'
  166. data = nil
  167. return if !File.exist? session_file
  168. begin
  169. File.open( session_file, 'rb' ) { |file|
  170. file.flock( File::LOCK_EX )
  171. all = file.read
  172. file.flock( File::LOCK_UN )
  173. data = Marshal.load( all )
  174. }
  175. rescue Exception => e
  176. File.delete(session_file)
  177. puts "Error reading '#{session_file}':"
  178. puts e.inspect
  179. return
  180. end
  181. data
  182. end
  183. =begin
  184. send message to client
  185. Sessions.send(client_id_of_recipient, data)
  186. returns
  187. true|false
  188. =end
  189. def self.send( client_id, data )
  190. path = @path + '/' + client_id.to_s + '/'
  191. filename = 'send-' + Time.new().to_f.to_s# + '-' + rand(99999999).to_s
  192. check = true
  193. count = 0
  194. while check
  195. if File::exists?( path + filename )
  196. count += 1
  197. filename = filename + '-' + count
  198. else
  199. check = false
  200. end
  201. end
  202. return false if !File.directory? path
  203. File.open( path + 'a-' + filename, 'wb' ) { |file|
  204. file.flock( File::LOCK_EX )
  205. file.write data.to_json
  206. file.flock( File::LOCK_UN )
  207. file.close
  208. }
  209. return false if !File.exists?( path + 'a-' + filename )
  210. FileUtils.mv( path + 'a-' + filename, path + filename )
  211. true
  212. end
  213. =begin
  214. send message to recipient client
  215. Sessions.send_to(user_id, data)
  216. returns
  217. true|false
  218. =end
  219. def self.send_to( user_id, data )
  220. # list all current clients
  221. client_list = self.sessions
  222. client_list.each {|client_id|
  223. session = Sessions.get(client_id)
  224. next if !session
  225. next if !session[:user]
  226. next if !session[:user][:id]
  227. next if session[:user][:id].to_i != user_id.to_i
  228. Sessions.send( client_id, data )
  229. }
  230. true
  231. end
  232. =begin
  233. send message to all client
  234. Sessions.broadcast(data)
  235. returns
  236. true|false
  237. =end
  238. def self.broadcast( data )
  239. # list all current clients
  240. client_list = self.sessions
  241. client_list.each {|client_id|
  242. Sessions.send( client_id, data )
  243. }
  244. true
  245. end
  246. =begin
  247. get messages for client
  248. messages = Sessions.queue(client_id_of_recipient)
  249. returns
  250. [
  251. {
  252. key1 => 'some data of message 1',
  253. key2 => 'some data of message 1',
  254. },
  255. {
  256. key1 => 'some data of message 2',
  257. key2 => 'some data of message 2',
  258. },
  259. ]
  260. =end
  261. def self.queue( client_id )
  262. path = @path + '/' + client_id.to_s + '/'
  263. data = []
  264. files = []
  265. Dir.foreach( path ) {|entry|
  266. next if entry == '.' || entry == '..'
  267. files.push entry
  268. }
  269. files.sort.each {|entry|
  270. filename = path + '/' + entry
  271. if /^send/.match( entry )
  272. data.push Sessions.queue_file_read( path, entry )
  273. end
  274. }
  275. data
  276. end
  277. def self.queue_file_read( path, filename )
  278. file_old = path + filename
  279. file_new = path + 'a-' + filename
  280. FileUtils.mv( file_old, file_new )
  281. data = nil
  282. all = ''
  283. File.open( file_new, 'rb' ) { |file|
  284. all = file.read
  285. }
  286. File.delete( file_new )
  287. JSON.parse( all )
  288. end
  289. def self.spool_cleanup
  290. path = @path + '/spool/'
  291. FileUtils.rm_rf path
  292. end
  293. def self.spool_create( msg )
  294. path = @path + '/spool/'
  295. FileUtils.mkpath path
  296. file = Time.new.to_f.to_s + '-' + rand(99999).to_s
  297. File.open( path + '/' + file , 'wb' ) { |file|
  298. data = {
  299. :msg => msg,
  300. :timestamp => Time.now.to_i,
  301. }
  302. file.write data.to_json
  303. }
  304. end
  305. def self.spool_list( timestamp, current_user_id )
  306. path = @path + '/spool/'
  307. FileUtils.mkpath path
  308. data = []
  309. to_delete = []
  310. files = []
  311. Dir.foreach( path ) {|entry|
  312. next if entry == '.' || entry == '..'
  313. files.push entry
  314. }
  315. files.sort.each {|entry|
  316. filename = path + '/' + entry
  317. next if !File::exists?( filename )
  318. File.open( filename, 'rb' ) { |file|
  319. all = file.read
  320. spool = JSON.parse( all )
  321. begin
  322. message_parsed = JSON.parse( spool['msg'] )
  323. rescue => e
  324. log 'error', "can't parse spool message: #{ message }, #{ e.inspect }"
  325. next
  326. end
  327. # ignore message older then 48h
  328. if spool['timestamp'] + (2 * 86400) < Time.now.to_i
  329. to_delete.push path + '/' + entry
  330. next
  331. end
  332. # add spool attribute to push spool info to clients
  333. message_parsed['spool'] = true
  334. # only send not already now messages
  335. if !timestamp || timestamp < spool['timestamp']
  336. # spool to recipient list
  337. if message_parsed['recipient'] && message_parsed['recipient']['user_id']
  338. message_parsed['recipient']['user_id'].each { |user_id|
  339. if current_user_id == user_id
  340. item = {
  341. :type => 'direct',
  342. :message => message_parsed,
  343. }
  344. data.push item
  345. end
  346. }
  347. # spool to every client
  348. else
  349. item = {
  350. :type => 'broadcast',
  351. :message => message_parsed,
  352. }
  353. data.push item
  354. end
  355. end
  356. }
  357. }
  358. to_delete.each {|file|
  359. File.delete(file)
  360. }
  361. return data
  362. end
  363. def self.jobs
  364. # just make sure that spool path exists
  365. if !File::exists?( @path )
  366. FileUtils.mkpath @path
  367. end
  368. Thread.abort_on_exception = true
  369. while true
  370. client_ids = self.sessions
  371. client_ids.each { |client_id|
  372. # connection already open, ignore
  373. next if @@client_threads[client_id]
  374. # get current user
  375. session_data = Sessions.get( client_id )
  376. next if !session_data
  377. next if !session_data[:user]
  378. next if !session_data[:user][:id]
  379. user = User.find( session_data[:user][:id] )
  380. next if !user
  381. # start client thread
  382. if !@@client_threads[client_id]
  383. @@client_threads[client_id] = true
  384. @@client_threads[client_id] = Thread.new {
  385. thread_client(client_id)
  386. @@client_threads[client_id] = nil
  387. puts "close client (#{client_id}) thread"
  388. }
  389. sleep 0.5
  390. end
  391. }
  392. # system settings
  393. sleep 0.5
  394. end
  395. end
  396. =begin
  397. check if thread for client_id is running
  398. Sessions.thread_client_exists?(client_id)
  399. returns
  400. thread
  401. =end
  402. def self.thread_client_exists?(client_id)
  403. @@client_threads[client_id]
  404. end
  405. =begin
  406. start client for browser
  407. Sessions.thread_client(client_id)
  408. returns
  409. thread
  410. =end
  411. def self.thread_client(client_id, try_count = 0, try_run_time = Time.now)
  412. puts "LOOP #{client_id} - #{try_count}"
  413. begin
  414. Sessions::Client.new(client_id)
  415. rescue => e
  416. puts "thread_client #{client_id} exited with error #{ e.inspect }"
  417. puts e.backtrace.join("\n ")
  418. sleep 10
  419. begin
  420. # ActiveRecord::Base.remove_connection
  421. # ActiveRecord::Base.connection_pool.reap
  422. ActiveRecord::Base.connection_pool.release_connection
  423. rescue => e
  424. puts "Can't reconnect to database #{ e.inspect }"
  425. end
  426. try_run_max = 10
  427. try_count += 1
  428. # reset error counter if to old
  429. if try_run_time + ( 60 * 5 ) < Time.now
  430. try_count = 0
  431. end
  432. try_run_time = Time.now
  433. # restart job again
  434. if try_run_max > try_count
  435. thread_client(client_id, try_count, try_run_time)
  436. else
  437. raise "STOP thread_client for client #{client_id} after #{try_run_max} tries"
  438. end
  439. end
  440. puts "/LOOP #{client_id} - #{try_count}"
  441. end
  442. end