sessions.rb 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. require 'json'
  2. require 'rss'
  3. require 'session_helper'
  4. module Sessions
  5. # get application root directory
  6. @root = Dir.pwd.to_s
  7. if !@root || @root.empty? || @root == '/'
  8. @root = Rails.root
  9. end
  10. # get working directories
  11. @path = @root + '/tmp/websocket'
  12. @pid = @root + '/tmp/pids/sessionworker.pid'
  13. # create global vars for threads
  14. @@user_threads = {}
  15. @@client_threads = {}
  16. def self.create( client_id, session, meta )
  17. path = @path + '/' + client_id.to_s
  18. FileUtils.mkpath path
  19. meta[:last_ping] = Time.new.to_i.to_s
  20. File.open( path + '/session', 'wb' ) { |file|
  21. data = {
  22. :user => {
  23. :id => session['id'],
  24. },
  25. :meta => meta,
  26. }
  27. # puts 'CREATE' + Marshal.dump(data)
  28. file.write Marshal.dump(data)
  29. }
  30. # send update to browser
  31. if session['id']
  32. self.send( client_id, {
  33. :event => 'ws:login',
  34. :data => { :success => true },
  35. })
  36. end
  37. end
  38. def self.spool_create( msg )
  39. path = @path + '/spool/'
  40. FileUtils.mkpath path
  41. file = Time.new.to_f.to_s + '-' + rand(99999).to_s
  42. File.open( path + '/' + file , 'wb' ) { |file|
  43. data = {
  44. :msg => msg,
  45. :timestamp => Time.now.to_i,
  46. }
  47. # puts 'CREATE' + Marshal.dump(data)
  48. file.write data.to_json
  49. }
  50. end
  51. def self.spool_list( timestamp, current_user_id )
  52. path = @path + '/spool/'
  53. FileUtils.mkpath path
  54. data = []
  55. to_delete = []
  56. files = []
  57. Dir.foreach( path ) {|entry|
  58. next if entry == '.' || entry == '..'
  59. files.push entry
  60. }
  61. files.sort.each {|entry|
  62. filename = path + '/' + entry
  63. next if !File::exists?( filename )
  64. File.open( filename, 'rb' ) { |file|
  65. all = file.read
  66. spool = JSON.parse( all )
  67. begin
  68. message_parsed = JSON.parse( spool['msg'] )
  69. rescue => e
  70. log 'error', "can't parse spool message: #{ message }, #{ e.inspect }"
  71. next
  72. end
  73. # ignore message older then 48h
  74. if spool['timestamp'] + (2 * 86400) < Time.now.to_i
  75. to_delete.push path + '/' + entry
  76. next
  77. end
  78. # add spool attribute to push spool info to clients
  79. message_parsed['spool'] = true
  80. # only send not already now messages
  81. if !timestamp || timestamp < spool['timestamp']
  82. # spool to recipient list
  83. if message_parsed['recipient'] && message_parsed['recipient']['user_id']
  84. message_parsed['recipient']['user_id'].each { |user_id|
  85. if current_user_id == user_id
  86. item = {
  87. :type => 'direct',
  88. :message => message_parsed,
  89. }
  90. data.push item
  91. end
  92. }
  93. # spool to every client
  94. else
  95. item = {
  96. :type => 'broadcast',
  97. :message => message_parsed,
  98. }
  99. data.push item
  100. end
  101. end
  102. }
  103. }
  104. to_delete.each {|file|
  105. File.delete(file)
  106. }
  107. return data
  108. end
  109. def self.list
  110. client_ids = self.sessions
  111. session_list = {}
  112. client_ids.each { |client_id|
  113. data = self.get(client_id)
  114. next if !data
  115. session_list[client_id] = data
  116. }
  117. return session_list
  118. end
  119. def self.touch( client_id )
  120. data = self.get(client_id)
  121. return if !data
  122. path = @path + '/' + client_id.to_s
  123. data[:meta][:last_ping] = Time.new.to_i.to_s
  124. File.open( path + '/session', 'wb' ) { |file|
  125. file.write Marshal.dump(data)
  126. }
  127. return true
  128. end
  129. def self.get( client_id )
  130. session_file = @path + '/' + client_id.to_s + '/session'
  131. data = nil
  132. return if !File.exist? session_file
  133. begin
  134. File.open( session_file, 'rb' ) { |file|
  135. file.flock( File::LOCK_EX )
  136. all = file.read
  137. file.flock( File::LOCK_UN )
  138. data = Marshal.load( all )
  139. }
  140. rescue Exception => e
  141. File.delete(session_file)
  142. puts "Error reading '#{session_file}':"
  143. puts e.inspect
  144. return
  145. end
  146. return data
  147. end
  148. def self.send( client_id, data )
  149. path = @path + '/' + client_id.to_s + '/'
  150. filename = 'send-' + Time.new().to_f.to_s# + '-' + rand(99999999).to_s
  151. check = true
  152. count = 0
  153. while check
  154. if File::exists?( path + filename )
  155. count += 1
  156. filename = filename + '-' + count
  157. # filename = filename + '-' + rand(99999).to_s
  158. # filename = filename + '-' + rand(99999).to_s
  159. else
  160. check = false
  161. end
  162. end
  163. return false if !File.directory? path
  164. File.open( path + 'a-' + filename, 'wb' ) { |file|
  165. file.flock( File::LOCK_EX )
  166. file.write data.to_json
  167. file.flock( File::LOCK_UN )
  168. file.close
  169. }
  170. return false if !File.exists?( path + 'a-' + filename )
  171. FileUtils.mv( path + 'a-' + filename, path + filename )
  172. return true
  173. end
  174. def self.jobs
  175. # just make sure that spool path exists
  176. if !File::exists?( @path )
  177. FileUtils.mkpath @path
  178. end
  179. Thread.abort_on_exception = true
  180. while true
  181. client_ids = self.sessions
  182. client_ids.each { |client_id|
  183. # connection already open
  184. next if @@client_threads[client_id]
  185. # get current user
  186. session_data = Sessions.get( client_id )
  187. next if !session_data
  188. next if !session_data[:user]
  189. next if !session_data[:user][:id]
  190. user = User.find( session_data[:user][:id] )
  191. next if !user
  192. # start user thread
  193. start_user_thread = false
  194. if !@@user_threads[user.id]
  195. start_user_thread = true
  196. @@user_threads[user.id] = Thread.new {
  197. Sessions::Worker.new(user.id)
  198. @@user_threads[user.id] = nil
  199. puts "close user(#{user.id}) thread"
  200. # raise "Exception from thread"
  201. }
  202. end
  203. # wait with client thread unil user thread has done some little work
  204. if start_user_thread
  205. sleep 0.5
  206. end
  207. # start client thread
  208. if !@@client_threads[client_id]
  209. @@client_threads[client_id] = Thread.new {
  210. Sessions::Client.new(client_id)
  211. @@client_threads[client_id] = nil
  212. puts "close client(#{client_id}) thread"
  213. # raise "Exception from thread"
  214. }
  215. end
  216. }
  217. # system settings
  218. sleep 0.5
  219. end
  220. end
  221. def self.sessions
  222. path = @path + '/'
  223. # just make sure that spool path exists
  224. if !File::exists?( path )
  225. FileUtils.mkpath path
  226. end
  227. data = []
  228. Dir.foreach( path ) do |entry|
  229. next if entry == '.' || entry == '..' || entry == 'spool'
  230. data.push entry.to_s
  231. end
  232. return data
  233. end
  234. def self.queue( client_id )
  235. path = @path + '/' + client_id.to_s + '/'
  236. data = []
  237. files = []
  238. Dir.foreach( path ) {|entry|
  239. next if entry == '.' || entry == '..'
  240. files.push entry
  241. }
  242. files.sort.each {|entry|
  243. filename = path + '/' + entry
  244. if /^send/.match( entry )
  245. data.push Sessions.queue_file( path, entry )
  246. end
  247. }
  248. return data
  249. end
  250. def self.queue_file( path, filename )
  251. file_old = path + filename
  252. file_new = path + 'a-' + filename
  253. FileUtils.mv( file_old, file_new )
  254. data = nil
  255. all = ''
  256. File.open( file_new, 'rb' ) { |file|
  257. all = file.read
  258. }
  259. File.delete( file_new )
  260. data = JSON.parse( all )
  261. return data
  262. end
  263. def self.broadcast( data )
  264. # list all current clients
  265. client_list = self.list
  266. client_list.each {|local_client_id, local_client|
  267. Sessions.send( local_client_id, data )
  268. }
  269. return true
  270. end
  271. def self.destory( client_id )
  272. path = @path + '/' + client_id.to_s
  273. FileUtils.rm_rf path
  274. end
  275. end