sessions.rb 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. require 'json'
  2. require 'rss'
  3. require 'session_helper'
  4. module Sessions
  5. # get application root directory
  6. @root = Dir.pwd.to_s
  7. if !@root || @root.empty? || @root == '/'
  8. @root = Rails.root
  9. end
  10. # get working directories
  11. @path = @root + '/tmp/websocket'
  12. @pid = @root + '/tmp/pids/sessionworker.pid'
  13. # create global vars for threads
  14. @@user_threads = {}
  15. @@client_threads = {}
  16. def self.create( client_id, session, meta )
  17. path = @path + '/' + client_id.to_s
  18. FileUtils.mkpath path
  19. meta[:last_ping] = Time.new.to_i.to_s
  20. File.open( path + '/session', 'wb' ) { |file|
  21. data = {
  22. :user => {
  23. :id => session['id'],
  24. },
  25. :meta => meta,
  26. }
  27. # puts 'CREATE' + Marshal.dump(data)
  28. file.write Marshal.dump(data)
  29. }
  30. # send update to browser
  31. if session['id']
  32. self.send( client_id, {
  33. :event => 'ws:login',
  34. :data => { :success => true },
  35. })
  36. end
  37. end
  38. def self.spool_create( msg )
  39. path = @path + '/spool/'
  40. FileUtils.mkpath path
  41. file = Time.new.to_f.to_s + '-' + rand(99999).to_s
  42. File.open( path + '/' + file , 'wb' ) { |file|
  43. data = {
  44. :msg => msg,
  45. :timestamp => Time.now.to_i,
  46. }
  47. # puts 'CREATE' + Marshal.dump(data)
  48. file.write data.to_json
  49. }
  50. end
  51. def self.spool_list( timestamp, current_user_id )
  52. path = @path + '/spool/'
  53. FileUtils.mkpath path
  54. data = []
  55. to_delete = []
  56. files = []
  57. Dir.foreach( path ) {|entry|
  58. next if entry == '.' || entry == '..'
  59. files.push entry
  60. }
  61. files.sort.each {|entry|
  62. filename = path + '/' + entry
  63. next if !File::exists?( filename )
  64. File.open( filename, 'rb' ) { |file|
  65. all = file.read
  66. spool = JSON.parse( all )
  67. begin
  68. message_parsed = JSON.parse( spool['msg'] )
  69. rescue => e
  70. log 'error', "can't parse spool message: #{ message }, #{ e.inspect }"
  71. next
  72. end
  73. # ignore message older then 48h
  74. if spool['timestamp'] + (2 * 86400) < Time.now.to_i
  75. to_delete.push path + '/' + entry
  76. next
  77. end
  78. # add spool attribute to push spool info to clients
  79. message_parsed['spool'] = true
  80. # only send not already now messages
  81. if !timestamp || timestamp < spool['timestamp']
  82. # spool to recipient list
  83. if message_parsed['recipient'] && message_parsed['recipient']['user_id']
  84. message_parsed['recipient']['user_id'].each { |user_id|
  85. if current_user_id == user_id
  86. item = {
  87. :type => 'direct',
  88. :message => message_parsed,
  89. }
  90. data.push item
  91. end
  92. }
  93. # spool to every client
  94. else
  95. item = {
  96. :type => 'broadcast',
  97. :message => message_parsed,
  98. }
  99. data.push item
  100. end
  101. end
  102. }
  103. }
  104. to_delete.each {|file|
  105. File.delete(file)
  106. }
  107. return data
  108. end
  109. def self.list
  110. client_ids = self.sessions
  111. session_list = {}
  112. client_ids.each { |client_id|
  113. data = self.get(client_id)
  114. next if !data
  115. session_list[client_id] = data
  116. }
  117. return session_list
  118. end
  119. def self.touch( client_id )
  120. data = self.get(client_id)
  121. return if !data
  122. path = @path + '/' + client_id.to_s
  123. data[:meta][:last_ping] = Time.new.to_i.to_s
  124. File.open( path + '/session', 'wb' ) { |file|
  125. file.write Marshal.dump(data)
  126. }
  127. return true
  128. end
  129. def self.get( client_id )
  130. session_file = @path + '/' + client_id.to_s + '/session'
  131. data = nil
  132. return if !File.exist? session_file
  133. begin
  134. File.open( session_file, 'rb' ) { |file|
  135. file.flock( File::LOCK_EX )
  136. all = file.read
  137. file.flock( File::LOCK_UN )
  138. data = Marshal.load( all )
  139. }
  140. rescue Exception => e
  141. File.delete(session_file)
  142. puts "Error reading '#{session_file}':"
  143. puts e.inspect
  144. return
  145. end
  146. return data
  147. end
  148. def self.send( client_id, data )
  149. path = @path + '/' + client_id.to_s + '/'
  150. filename = 'send-' + Time.new().to_f.to_s# + '-' + rand(99999999).to_s
  151. check = true
  152. count = 0
  153. while check
  154. if File::exists?( path + filename )
  155. count += 1
  156. filename = filename + '-' + count
  157. # filename = filename + '-' + rand(99999).to_s
  158. # filename = filename + '-' + rand(99999).to_s
  159. else
  160. check = false
  161. end
  162. end
  163. return false if !File.directory? path
  164. File.open( path + 'a-' + filename, 'wb' ) { |file|
  165. file.flock( File::LOCK_EX )
  166. file.write data.to_json
  167. file.flock( File::LOCK_UN )
  168. file.close
  169. }
  170. return false if !File.exists?( path + 'a-' + filename )
  171. FileUtils.mv( path + 'a-' + filename, path + filename )
  172. return true
  173. end
  174. def self.jobs
  175. # just make sure that spool path exists
  176. if !File::exists?( @path )
  177. FileUtils.mkpath @path
  178. end
  179. Thread.abort_on_exception = true
  180. while true
  181. client_ids = self.sessions
  182. client_ids.each { |client_id|
  183. # connection already open
  184. next if @@client_threads[client_id]
  185. # get current user
  186. session_data = Sessions.get( client_id )
  187. next if !session_data
  188. next if !session_data[:user]
  189. next if !session_data[:user][:id]
  190. user = User.find( session_data[:user][:id] )
  191. next if !user
  192. # start user thread
  193. start_user_thread = false
  194. if !@@user_threads[user.id]
  195. @@user_threads[user.id] = true
  196. @@user_threads[user.id] = Thread.new {
  197. thread_worker(user.id, 0)
  198. @@user_threads[user.id] = nil
  199. puts "close user (#{user.id}) thread"
  200. }
  201. start_user_thread = true
  202. end
  203. # wait with client thread unil user thread has done some little work
  204. if start_user_thread
  205. sleep 0.5
  206. end
  207. # start client thread
  208. if !@@client_threads[client_id]
  209. @@client_threads[client_id] = true
  210. @@client_threads[client_id] = Thread.new {
  211. thread_client(client_id, 0)
  212. @@client_threads[client_id] = nil
  213. puts "close client (#{client_id}) thread"
  214. }
  215. end
  216. }
  217. # system settings
  218. sleep 0.5
  219. end
  220. end
  221. def self.thread_worker(user_id, count)
  222. puts "LOOP WORKER #{user_id} - #{count}"
  223. begin
  224. Sessions::Worker.new(user_id)
  225. rescue => e
  226. puts "thread_client exited with error #{ e.inspect }"
  227. sleep 5
  228. begin
  229. ActiveRecord::Base.connection.reconnect!
  230. rescue => e
  231. puts "Can't reconnect to database #{ e.inspect }"
  232. end
  233. ct = count++1
  234. if ct < 10
  235. thread_worker(user_id, ct)
  236. else
  237. raise "STOP thread_worker for user #{user_id} after 10 tries"
  238. end
  239. end
  240. puts "/LOOP WORKER #{user_id} - #{count}"
  241. end
  242. def self.thread_client(client_id, count)
  243. puts "LOOP #{client_id} - #{count}"
  244. begin
  245. Sessions::Client.new(client_id)
  246. rescue => e
  247. puts "thread_client exited with error #{ e.inspect }"
  248. sleep 5
  249. begin
  250. ActiveRecord::Base.connection.reconnect!
  251. rescue => e
  252. puts "Can't reconnect to database #{ e.inspect }"
  253. end
  254. ct = count++1
  255. if ct < 10
  256. thread_client(client_id, ct)
  257. else
  258. raise "STOP thread_client for client #{client_id} after 10 tries"
  259. end
  260. end
  261. puts "/LOOP #{client_id} - #{count}"
  262. end
  263. def self.sessions
  264. path = @path + '/'
  265. # just make sure that spool path exists
  266. if !File::exists?( path )
  267. FileUtils.mkpath path
  268. end
  269. data = []
  270. Dir.foreach( path ) do |entry|
  271. next if entry == '.' || entry == '..' || entry == 'spool'
  272. data.push entry.to_s
  273. end
  274. return data
  275. end
  276. def self.queue( client_id )
  277. path = @path + '/' + client_id.to_s + '/'
  278. data = []
  279. files = []
  280. Dir.foreach( path ) {|entry|
  281. next if entry == '.' || entry == '..'
  282. files.push entry
  283. }
  284. files.sort.each {|entry|
  285. filename = path + '/' + entry
  286. if /^send/.match( entry )
  287. data.push Sessions.queue_file( path, entry )
  288. end
  289. }
  290. return data
  291. end
  292. def self.queue_file( path, filename )
  293. file_old = path + filename
  294. file_new = path + 'a-' + filename
  295. FileUtils.mv( file_old, file_new )
  296. data = nil
  297. all = ''
  298. File.open( file_new, 'rb' ) { |file|
  299. all = file.read
  300. }
  301. File.delete( file_new )
  302. data = JSON.parse( all )
  303. return data
  304. end
  305. def self.broadcast( data )
  306. # list all current clients
  307. client_list = self.list
  308. client_list.each {|local_client_id, local_client|
  309. Sessions.send( local_client_id, data )
  310. }
  311. return true
  312. end
  313. def self.destory( client_id )
  314. path = @path + '/' + client_id.to_s
  315. FileUtils.rm_rf path
  316. end
  317. end