123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- require 'json'
- require 'rss'
- require 'session_helper'
- module Sessions
- # get application root directory
- @root = Dir.pwd.to_s
- if !@root || @root.empty? || @root == '/'
- @root = Rails.root
- end
- # get working directories
- @path = @root + '/tmp/websocket'
- @pid = @root + '/tmp/pids/sessionworker.pid'
- # create global vars for threads
- @@user_threads = {}
- @@client_threads = {}
- def self.create( client_id, session, meta )
- path = @path + '/' + client_id.to_s
- FileUtils.mkpath path
- meta[:last_ping] = Time.new.to_i.to_s
- File.open( path + '/session', 'wb' ) { |file|
- data = {
- :user => {
- :id => session['id'],
- },
- :meta => meta,
- }
- # puts 'CREATE' + Marshal.dump(data)
- file.write Marshal.dump(data)
- }
- # send update to browser
- if session['id']
- self.send( client_id, {
- :event => 'ws:login',
- :data => { :success => true },
- })
- end
- end
- def self.spool_create( msg )
- path = @path + '/spool/'
- FileUtils.mkpath path
- file = Time.new.to_f.to_s + '-' + rand(99999).to_s
- File.open( path + '/' + file , 'wb' ) { |file|
- data = {
- :msg => msg,
- :timestamp => Time.now.to_i,
- }
- # puts 'CREATE' + Marshal.dump(data)
- file.write data.to_json
- }
- end
- def self.spool_list( timestamp, current_user_id )
- path = @path + '/spool/'
- FileUtils.mkpath path
- data = []
- to_delete = []
- files = []
- Dir.foreach( path ) {|entry|
- next if entry == '.' || entry == '..'
- files.push entry
- }
- files.sort.each {|entry|
- filename = path + '/' + entry
- next if !File::exists?( filename )
- File.open( filename, 'rb' ) { |file|
- all = file.read
- spool = JSON.parse( all )
- begin
- message_parsed = JSON.parse( spool['msg'] )
- rescue => e
- log 'error', "can't parse spool message: #{ message }, #{ e.inspect }"
- next
- end
- # ignore message older then 48h
- if spool['timestamp'] + (2 * 86400) < Time.now.to_i
- to_delete.push path + '/' + entry
- next
- end
- # add spool attribute to push spool info to clients
- message_parsed['spool'] = true
- # only send not already now messages
- if !timestamp || timestamp < spool['timestamp']
- # spool to recipient list
- if message_parsed['recipient'] && message_parsed['recipient']['user_id']
- message_parsed['recipient']['user_id'].each { |user_id|
- if current_user_id == user_id
- item = {
- :type => 'direct',
- :message => message_parsed,
- }
- data.push item
- end
- }
- # spool to every client
- else
- item = {
- :type => 'broadcast',
- :message => message_parsed,
- }
- data.push item
- end
- end
- }
- }
- to_delete.each {|file|
- File.delete(file)
- }
- return data
- end
- def self.list
- client_ids = self.sessions
- session_list = {}
- client_ids.each { |client_id|
- data = self.get(client_id)
- next if !data
- session_list[client_id] = data
- }
- return session_list
- end
- def self.touch( client_id )
- data = self.get(client_id)
- return if !data
- path = @path + '/' + client_id.to_s
- data[:meta][:last_ping] = Time.new.to_i.to_s
- File.open( path + '/session', 'wb' ) { |file|
- file.write Marshal.dump(data)
- }
- return true
- end
- def self.get( client_id )
- session_file = @path + '/' + client_id.to_s + '/session'
- data = nil
- return if !File.exist? session_file
- begin
- File.open( session_file, 'rb' ) { |file|
- file.flock( File::LOCK_EX )
- all = file.read
- file.flock( File::LOCK_UN )
- data = Marshal.load( all )
- }
- rescue Exception => e
- File.delete(session_file)
- puts "Error reading '#{session_file}':"
- puts e.inspect
- return
- end
- return data
- end
- def self.send( client_id, data )
- path = @path + '/' + client_id.to_s + '/'
- filename = 'send-' + Time.new().to_f.to_s# + '-' + rand(99999999).to_s
- check = true
- count = 0
- while check
- if File::exists?( path + filename )
- count += 1
- filename = filename + '-' + count
- # filename = filename + '-' + rand(99999).to_s
- # filename = filename + '-' + rand(99999).to_s
- else
- check = false
- end
- end
- return false if !File.directory? path
- File.open( path + 'a-' + filename, 'wb' ) { |file|
- file.flock( File::LOCK_EX )
- file.write data.to_json
- file.flock( File::LOCK_UN )
- file.close
- }
- return false if !File.exists?( path + 'a-' + filename )
- FileUtils.mv( path + 'a-' + filename, path + filename )
- return true
- end
- def self.jobs
- # just make sure that spool path exists
- if !File::exists?( @path )
- FileUtils.mkpath @path
- end
- Thread.abort_on_exception = true
- while true
- client_ids = self.sessions
- client_ids.each { |client_id|
- # connection already open
- next if @@client_threads[client_id]
- # get current user
- session_data = Sessions.get( client_id )
- next if !session_data
- next if !session_data[:user]
- next if !session_data[:user][:id]
- user = User.find( session_data[:user][:id] )
- next if !user
- # start user thread
- start_user_thread = false
- if !@@user_threads[user.id]
- start_user_thread = true
- @@user_threads[user.id] = Thread.new {
- Sessions::Worker.new(user.id)
- @@user_threads[user.id] = nil
- puts "close user(#{user.id}) thread"
- # raise "Exception from thread"
- }
- end
- # wait with client thread unil user thread has done some little work
- if start_user_thread
- sleep 0.5
- end
- # start client thread
- if !@@client_threads[client_id]
- @@client_threads[client_id] = Thread.new {
- Sessions::Client.new(client_id)
- @@client_threads[client_id] = nil
- puts "close client(#{client_id}) thread"
- # raise "Exception from thread"
- }
- end
- }
- # system settings
- sleep 0.5
- end
- end
- def self.sessions
- path = @path + '/'
- # just make sure that spool path exists
- if !File::exists?( path )
- FileUtils.mkpath path
- end
- data = []
- Dir.foreach( path ) do |entry|
- next if entry == '.' || entry == '..' || entry == 'spool'
- data.push entry.to_s
- end
- return data
- end
- def self.queue( client_id )
- path = @path + '/' + client_id.to_s + '/'
- data = []
- files = []
- Dir.foreach( path ) {|entry|
- next if entry == '.' || entry == '..'
- files.push entry
- }
- files.sort.each {|entry|
- filename = path + '/' + entry
- if /^send/.match( entry )
- data.push Sessions.queue_file( path, entry )
- end
- }
- return data
- end
- def self.queue_file( path, filename )
- file_old = path + filename
- file_new = path + 'a-' + filename
- FileUtils.mv( file_old, file_new )
- data = nil
- all = ''
- File.open( file_new, 'rb' ) { |file|
- all = file.read
- }
- File.delete( file_new )
- data = JSON.parse( all )
- return data
- end
- def self.broadcast( data )
- # list all current clients
- client_list = self.list
- client_list.each {|local_client_id, local_client|
- Sessions.send( local_client_id, data )
- }
- return true
- end
- def self.destory( client_id )
- path = @path + '/' + client_id.to_s
- FileUtils.rm_rf path
- end
- end
|