websocket-server.rb 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. #!/usr/bin/env ruby
  2. # Copyright (C) 2012-2013 Zammad Foundation, http://zammad-foundation.org/
  3. $LOAD_PATH << './lib'
  4. require 'rubygems'
  5. require 'eventmachine'
  6. require 'em-websocket'
  7. require 'json'
  8. require 'fileutils'
  9. require 'sessions'
  10. require 'optparse'
  11. require 'daemons'
  12. # Look for -o with argument, and -I and -D boolean arguments
  13. @options = {
  14. :p => 6042,
  15. :b => '0.0.0.0',
  16. :s => false,
  17. :v => false,
  18. :d => false,
  19. :k => '/path/to/server.key',
  20. :c => '/path/to/server.crt',
  21. :i => Dir.pwd.to_s + '/tmp/pids/websocket.pid'
  22. }
  23. tls_options = {}
  24. OptionParser.new do |opts|
  25. opts.banner = "Usage: websocket-server.rb start|stop [options]"
  26. opts.on("-d", "--daemon", "start as daemon") do |d|
  27. @options[:d] = d
  28. end
  29. opts.on("-v", "--verbose", "enable debug messages") do |d|
  30. @options[:v] = d
  31. end
  32. opts.on("-p", "--port [OPT]", "port of websocket server") do |p|
  33. @options[:p] = p
  34. end
  35. opts.on("-b", "--bind [OPT]", "bind address") do |b|
  36. @options[:b] = b
  37. end
  38. opts.on("-s", "--secure", "enable secure connections") do |s|
  39. @options[:s] = s
  40. end
  41. opts.on("-i", "--pid [OPT]", "pid, default is tmp/pids/websocket.pid") do |i|
  42. @options[:i] = i
  43. end
  44. opts.on("-k", "--private-key [OPT]", "/path/to/server.key for secure connections") do |k|
  45. tls_options[:private_key_file] = k
  46. end
  47. opts.on("-c", "--certificate [OPT]", "/path/to/server.crt for secure connections") do |c|
  48. tls_options[:cert_chain_file] = c
  49. end
  50. end.parse!
  51. if ARGV[0] != 'start' && ARGV[0] != 'stop'
  52. puts "Usage: #{File.basename(__FILE__)} start|stop [options]"
  53. exit;
  54. end
  55. puts "Starting websocket server on #{ @options[:b] }:#{ @options[:p] } (secure:#{ @options[:s].to_s },pid:#{@options[:i].to_s})"
  56. #puts options.inspect
  57. if ARGV[0] == 'stop'
  58. # read pid
  59. pid =File.open( @options[:i].to_s ).read
  60. pid.gsub!(/\r|\n/, "")
  61. # kill
  62. Process.kill( 9, pid.to_i )
  63. exit
  64. end
  65. if ARGV[0] == 'start' && @options[:d]
  66. Daemons.daemonize
  67. # create pid file
  68. $daemon_pid = File.new( @options[:i].to_s,"w" )
  69. $daemon_pid.sync = true
  70. $daemon_pid.puts(Process.pid.to_s)
  71. $daemon_pid.close
  72. end
  73. @clients = {}
  74. EventMachine.run {
  75. EventMachine::WebSocket.start( :host => @options[:b], :port => @options[:p], :secure => @options[:s], :tls_options => tls_options ) do |ws|
  76. # register client connection
  77. ws.onopen {
  78. client_id = ws.object_id.to_s
  79. log 'notice', 'Client connected.', client_id
  80. Sessions.create( client_id, {}, { :type => 'websocket' } )
  81. if !@clients.include? client_id
  82. @clients[client_id] = {
  83. :websocket => ws,
  84. :last_ping => Time.new,
  85. :error_count => 0,
  86. }
  87. end
  88. }
  89. # unregister client connection
  90. ws.onclose {
  91. client_id = ws.object_id.to_s
  92. log 'notice', 'Client disconnected.', client_id
  93. # removed from current client list
  94. if @clients.include? client_id
  95. @clients.delete client_id
  96. end
  97. Sessions.destory( client_id )
  98. }
  99. # manage messages
  100. ws.onmessage { |msg|
  101. client_id = ws.object_id.to_s
  102. log 'debug', "received message: #{ msg } ", client_id
  103. begin
  104. data = JSON.parse(msg)
  105. rescue => e
  106. log 'error', "can't parse message: #{ msg }, #{ e.inspect }", client_id
  107. next
  108. end
  109. # check if connection already exists
  110. next if !@clients[client_id]
  111. # spool messages for new connects
  112. if data['spool']
  113. Sessions.spool_create(msg)
  114. end
  115. # get spool messages and send them to new client connection
  116. if data['action'] == 'spool'
  117. # error handling
  118. if data['timestamp']
  119. log 'notice', "request spool data > '#{Time.at(data['timestamp']).to_s}'", client_id
  120. else
  121. log 'notice', "request spool with init data", client_id
  122. end
  123. if @clients[client_id] && @clients[client_id][:session] && @clients[client_id][:session]['id']
  124. spool = Sessions.spool_list( data['timestamp'], @clients[client_id][:session]['id'] )
  125. spool.each { |item|
  126. # create new msg to push to client
  127. msg = JSON.generate( item[:message] )
  128. if item[:type] == 'direct'
  129. log 'notice', "send spool to (user_id=#{ @clients[client_id][:session]['id'] })", client_id
  130. @clients[client_id][:websocket].send( "[#{ msg }]" )
  131. else
  132. log 'notice', "send spool", client_id
  133. @clients[client_id][:websocket].send( "[#{ msg }]" )
  134. end
  135. }
  136. else
  137. log 'error', "can't send spool, session not authenticated", client_id
  138. end
  139. # send spool:sent event to client
  140. log 'notice', "send spool:sent event", client_id
  141. @clients[client_id][:websocket].send( '[{"event":"spool:sent","data":{"timestamp":' + Time.now.utc.to_i.to_s + '}}]' )
  142. end
  143. # get session
  144. if data['action'] == 'login'
  145. @clients[client_id][:session] = data['session']
  146. Sessions.create( client_id, data['session'], { :type => 'websocket' } )
  147. # remember ping, send pong back
  148. elsif data['action'] == 'ping'
  149. @clients[client_id][:last_ping] = Time.now
  150. @clients[client_id][:websocket].send( '[{"action":"pong"}]' )
  151. # broadcast
  152. elsif data['action'] == 'broadcast'
  153. # list all current clients
  154. client_list = Sessions.list
  155. client_list.each {|local_client_id, local_client|
  156. if local_client_id != client_id
  157. # broadcast to recipient list
  158. if data['recipient']
  159. if data['recipient'].class != Hash
  160. log 'error', "recipient attribute isn't a hash '#{ data['recipient'].inspect }'"
  161. else
  162. if !data['recipient'].has_key?('user_id')
  163. log 'error', "need recipient.user_id attribute '#{ data['recipient'].inspect }'"
  164. else
  165. if data['recipient']['user_id'].class != Array
  166. log 'error', "recipient.user_id attribute isn't an array '#{ data['recipient']['user_id'].inspect }'"
  167. else
  168. data['recipient']['user_id'].each { |user_id|
  169. if local_client[:user][:id].to_i == user_id.to_i
  170. log 'notice', "send broadcast from (#{client_id.to_s}) to (user_id=#{user_id})", local_client_id
  171. if local_client[:meta][:type] == 'websocket' && @clients[ local_client_id ]
  172. @clients[ local_client_id ][:websocket].send( "[#{msg}]" )
  173. else
  174. Sessions.send( local_client_id, data )
  175. end
  176. end
  177. }
  178. end
  179. end
  180. end
  181. # broadcast every client
  182. else
  183. log 'notice', "send broadcast from (#{client_id.to_s})", local_client_id
  184. if local_client[:meta][:type] == 'websocket' && @clients[ local_client_id ]
  185. @clients[ local_client_id ][:websocket].send( "[#{msg}]" )
  186. else
  187. Sessions.send( local_client_id, data )
  188. end
  189. end
  190. else
  191. log 'notice', "do not send broadcast to it self", client_id
  192. end
  193. }
  194. end
  195. }
  196. end
  197. # check unused connections
  198. EventMachine.add_timer(0.5) {
  199. check_unused_connections
  200. }
  201. # check open unused connections, kick all connection without activitie in the last 2 minutes
  202. EventMachine.add_periodic_timer(120) {
  203. check_unused_connections
  204. }
  205. EventMachine.add_periodic_timer(20) {
  206. # websocket
  207. log 'notice', "Status: websocket clients: #{ @clients.size }"
  208. @clients.each { |client_id, client|
  209. log 'notice', 'working...', client_id
  210. }
  211. # ajax
  212. client_list = Sessions.list
  213. clients = 0
  214. client_list.each {|client_id, client|
  215. next if client[:meta][:type] == 'websocket'
  216. clients = clients + 1
  217. }
  218. log 'notice', "Status: ajax clients: #{ clients }"
  219. client_list.each {|client_id, client|
  220. next if client[:meta][:type] == 'websocket'
  221. log 'notice', 'working...', client_id
  222. }
  223. }
  224. EventMachine.add_periodic_timer(0.4) {
  225. next if @clients.size == 0
  226. log 'debug', "checking for data to send..."
  227. @clients.each { |client_id, client|
  228. next if client[:disconnect]
  229. log 'debug', 'checking for data...', client_id
  230. begin
  231. queue = Sessions.queue( client_id )
  232. if queue && queue[0]
  233. # log "send " + queue.inspect, client_id
  234. log 'notice', "send data to client", client_id
  235. client[:websocket].send( queue.to_json )
  236. end
  237. rescue => e
  238. log 'error', 'problem:' + e.inspect, client_id
  239. # disconnect client
  240. client[:error_count] += 1
  241. if client[:error_count] > 20
  242. if @clients.include? client_id
  243. @clients.delete client_id
  244. end
  245. end
  246. end
  247. }
  248. }
  249. def check_unused_connections
  250. log 'notice', "check unused idle connections..."
  251. idle_time_in_min = 4
  252. # web sockets
  253. @clients.each { |client_id, client|
  254. if ( client[:last_ping] + ( 60 * idle_time_in_min ) ) < Time.now
  255. log 'notice', "closing idle websocket connection", client_id
  256. # remember to not use this connection anymore
  257. client[:disconnect] = true
  258. # try to close regular
  259. client[:websocket].close_websocket
  260. # delete session from client list
  261. sleep 0.3
  262. @clients.delete(client_id)
  263. end
  264. }
  265. # ajax
  266. clients = Sessions.list
  267. clients.each { |client_id, client|
  268. next if client[:meta][:type] == 'websocket'
  269. if ( client[:meta][:last_ping].to_i + ( 60 * idle_time_in_min ) ) < Time.now.to_i
  270. log 'notice', "closing idle ajax connection", client_id
  271. Sessions.destory( client_id )
  272. end
  273. }
  274. end
  275. def log( level, data, client_id = '-' )
  276. if !@options[:v]
  277. return if level == 'debug'
  278. end
  279. puts "#{Time.now}:client(#{ client_id }) #{ data }"
  280. # puts "#{Time.now}:#{ level }:client(#{ client_id }) #{ data }"
  281. end
  282. }