123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330 |
- #!/usr/bin/env ruby
- # Copyright (C) 2012-2013 Zammad Foundation, http://zammad-foundation.org/
- $LOAD_PATH << './lib'
- require 'rubygems'
- require 'eventmachine'
- require 'em-websocket'
- require 'json'
- require 'fileutils'
- require 'sessions'
- require 'optparse'
- require 'daemons'
- # Look for -o with argument, and -I and -D boolean arguments
- @options = {
- :p => 6042,
- :b => '0.0.0.0',
- :s => false,
- :v => false,
- :d => false,
- :k => '/path/to/server.key',
- :c => '/path/to/server.crt',
- :i => Dir.pwd.to_s + '/tmp/pids/websocket.pid'
- }
- tls_options = {}
- OptionParser.new do |opts|
- opts.banner = "Usage: websocket-server.rb start|stop [options]"
- opts.on("-d", "--daemon", "start as daemon") do |d|
- @options[:d] = d
- end
- opts.on("-v", "--verbose", "enable debug messages") do |d|
- @options[:v] = d
- end
- opts.on("-p", "--port [OPT]", "port of websocket server") do |p|
- @options[:p] = p
- end
- opts.on("-b", "--bind [OPT]", "bind address") do |b|
- @options[:b] = b
- end
- opts.on("-s", "--secure", "enable secure connections") do |s|
- @options[:s] = s
- end
- opts.on("-i", "--pid [OPT]", "pid, default is tmp/pids/websocket.pid") do |i|
- @options[:i] = i
- end
- opts.on("-k", "--private-key [OPT]", "/path/to/server.key for secure connections") do |k|
- tls_options[:private_key_file] = k
- end
- opts.on("-c", "--certificate [OPT]", "/path/to/server.crt for secure connections") do |c|
- tls_options[:cert_chain_file] = c
- end
- end.parse!
- if ARGV[0] != 'start' && ARGV[0] != 'stop'
- puts "Usage: #{File.basename(__FILE__)} start|stop [options]"
- exit;
- end
- if ARGV[0] == 'stop'
- puts "Stopping websocket server"
- # read pid
- pid =File.open( @options[:i].to_s ).read
- pid.gsub!(/\r|\n/, "")
- # kill
- Process.kill( 9, pid.to_i )
- exit
- end
- puts "Starting websocket server on #{ @options[:b] }:#{ @options[:p] } (secure:#{ @options[:s].to_s },pid:#{@options[:i].to_s})"
- if ARGV[0] == 'start' && @options[:d]
- Daemons.daemonize
- # create pid file
- $daemon_pid = File.new( @options[:i].to_s,"w" )
- $daemon_pid.sync = true
- $daemon_pid.puts(Process.pid.to_s)
- $daemon_pid.close
- end
- @clients = {}
- EventMachine.run {
- EventMachine::WebSocket.start( :host => @options[:b], :port => @options[:p], :secure => @options[:s], :tls_options => tls_options ) do |ws|
- # register client connection
- ws.onopen {
- client_id = ws.object_id.to_s
- log 'notice', 'Client connected.', client_id
- Sessions.create( client_id, {}, { :type => 'websocket' } )
- if !@clients.include? client_id
- @clients[client_id] = {
- :websocket => ws,
- :last_ping => Time.new,
- :error_count => 0,
- }
- end
- }
- # unregister client connection
- ws.onclose {
- client_id = ws.object_id.to_s
- log 'notice', 'Client disconnected.', client_id
- # removed from current client list
- if @clients.include? client_id
- @clients.delete client_id
- end
- Sessions.destory( client_id )
- }
- # manage messages
- ws.onmessage { |msg|
- client_id = ws.object_id.to_s
- log 'debug', "received message: #{ msg } ", client_id
- begin
- data = JSON.parse(msg)
- rescue => e
- log 'error', "can't parse message: #{ msg }, #{ e.inspect }", client_id
- next
- end
- # check if connection already exists
- next if !@clients[client_id]
- # spool messages for new connects
- if data['spool']
- Sessions.spool_create(msg)
- end
- # get spool messages and send them to new client connection
- if data['action'] == 'spool'
- # error handling
- if data['timestamp']
- log 'notice', "request spool data > '#{Time.at(data['timestamp']).to_s}'", client_id
- else
- log 'notice', "request spool with init data", client_id
- end
- if @clients[client_id] && @clients[client_id][:session] && @clients[client_id][:session]['id']
- spool = Sessions.spool_list( data['timestamp'], @clients[client_id][:session]['id'] )
- spool.each { |item|
- # create new msg to push to client
- msg = JSON.generate( item[:message] )
- if item[:type] == 'direct'
- log 'notice', "send spool to (user_id=#{ @clients[client_id][:session]['id'] })", client_id
- @clients[client_id][:websocket].send( "[#{ msg }]" )
- else
- log 'notice', "send spool", client_id
- @clients[client_id][:websocket].send( "[#{ msg }]" )
- end
- }
- else
- log 'error', "can't send spool, session not authenticated", client_id
- end
- # send spool:sent event to client
- log 'notice', "send spool:sent event", client_id
- @clients[client_id][:websocket].send( '[{"event":"spool:sent","data":{"timestamp":' + Time.now.utc.to_i.to_s + '}}]' )
- end
- # get session
- if data['action'] == 'login'
- @clients[client_id][:session] = data['session']
- Sessions.create( client_id, data['session'], { :type => 'websocket' } )
- # remember ping, send pong back
- elsif data['action'] == 'ping'
- Sessions.touch(client_id)
- @clients[client_id][:last_ping] = Time.now
- @clients[client_id][:websocket].send( '[{"action":"pong"}]' )
- # broadcast
- elsif data['action'] == 'broadcast'
- # list all current clients
- client_list = Sessions.list
- client_list.each {|local_client_id, local_client|
- if local_client_id != client_id
- # broadcast to recipient list
- if data['recipient']
- if data['recipient'].class != Hash
- log 'error', "recipient attribute isn't a hash '#{ data['recipient'].inspect }'"
- else
- if !data['recipient'].has_key?('user_id')
- log 'error', "need recipient.user_id attribute '#{ data['recipient'].inspect }'"
- else
- if data['recipient']['user_id'].class != Array
- log 'error', "recipient.user_id attribute isn't an array '#{ data['recipient']['user_id'].inspect }'"
- else
- data['recipient']['user_id'].each { |user_id|
- if local_client[:user][:id].to_i == user_id.to_i
- log 'notice', "send broadcast from (#{client_id.to_s}) to (user_id=#{user_id})", local_client_id
- if local_client[:meta][:type] == 'websocket' && @clients[ local_client_id ]
- @clients[ local_client_id ][:websocket].send( "[#{msg}]" )
- else
- Sessions.send( local_client_id, data )
- end
- end
- }
- end
- end
- end
- # broadcast every client
- else
- log 'notice', "send broadcast from (#{client_id.to_s})", local_client_id
- if local_client[:meta][:type] == 'websocket' && @clients[ local_client_id ]
- @clients[ local_client_id ][:websocket].send( "[#{msg}]" )
- else
- Sessions.send( local_client_id, data )
- end
- end
- else
- log 'notice', "do not send broadcast to it self", client_id
- end
- }
- end
- }
- end
- # check unused connections
- EventMachine.add_timer(0.5) {
- check_unused_connections
- }
- # check open unused connections, kick all connection without activitie in the last 2 minutes
- EventMachine.add_periodic_timer(120) {
- check_unused_connections
- }
- EventMachine.add_periodic_timer(20) {
- # websocket
- log 'notice', "Status: websocket clients: #{ @clients.size }"
- @clients.each { |client_id, client|
- log 'notice', 'working...', client_id
- }
- # ajax
- client_list = Sessions.list
- clients = 0
- client_list.each {|client_id, client|
- next if client[:meta][:type] == 'websocket'
- clients = clients + 1
- }
- log 'notice', "Status: ajax clients: #{ clients }"
- client_list.each {|client_id, client|
- next if client[:meta][:type] == 'websocket'
- log 'notice', 'working...', client_id
- }
- }
- EventMachine.add_periodic_timer(0.4) {
- next if @clients.size == 0
- log 'debug', "checking for data to send..."
- @clients.each { |client_id, client|
- next if client[:disconnect]
- log 'debug', 'checking for data...', client_id
- begin
- queue = Sessions.queue( client_id )
- if queue && queue[0]
- # log "send " + queue.inspect, client_id
- log 'notice', "send data to client", client_id
- client[:websocket].send( queue.to_json )
- end
- rescue => e
- log 'error', 'problem:' + e.inspect, client_id
- # disconnect client
- client[:error_count] += 1
- if client[:error_count] > 20
- if @clients.include? client_id
- @clients.delete client_id
- end
- end
- end
- }
- }
- def check_unused_connections
- log 'notice', "check unused idle connections..."
- idle_time_in_min = 4
- # web sockets
- @clients.each { |client_id, client|
- if ( client[:last_ping] + ( 60 * idle_time_in_min ) ) < Time.now
- log 'notice', "closing idle websocket connection", client_id
- # remember to not use this connection anymore
- client[:disconnect] = true
- # try to close regular
- client[:websocket].close_websocket
- # delete session from client list
- sleep 0.3
- @clients.delete(client_id)
- end
- }
- # close unused sessions
- clients = Sessions.destory_idle_sessions(idle_time_in_min)
- clients.each { |client_id|
- log 'notice', "closing idle connection", client_id
- }
- end
- def log( level, data, client_id = '-' )
- if !@options[:v]
- return if level == 'debug'
- end
- puts "#{Time.now}:client(#{ client_id }) #{ data }"
- # puts "#{Time.now}:#{ level }:client(#{ client_id }) #{ data }"
- end
- }
|