websocket-server.rb 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. #!/usr/bin/env ruby
  2. # Copyright (C) 2012-2013 Zammad Foundation, http://zammad-foundation.org/
  3. $LOAD_PATH << './lib'
  4. require 'rubygems'
  5. require 'eventmachine'
  6. require 'em-websocket'
  7. require 'json'
  8. require 'fileutils'
  9. require 'sessions'
  10. require 'optparse'
  11. require 'daemons'
  12. # load rails env
  13. dir = File.expand_path(File.join(File.dirname(__FILE__), '..'))
  14. Dir.chdir dir
  15. RAILS_ENV = ENV['RAILS_ENV'] || 'development'
  16. require File.join(dir, 'config', 'environment')
  17. # Look for -o with argument, and -I and -D boolean arguments
  18. @options = {
  19. p: 6042,
  20. b: '0.0.0.0',
  21. s: false,
  22. v: false,
  23. d: false,
  24. k: '/path/to/server.key',
  25. c: '/path/to/server.crt',
  26. i: Dir.pwd.to_s + '/tmp/pids/websocket.pid'
  27. }
  28. tls_options = {}
  29. OptionParser.new do |opts|
  30. opts.banner = 'Usage: websocket-server.rb start|stop [options]'
  31. opts.on('-d', '--daemon', 'start as daemon') do |d|
  32. @options[:d] = d
  33. end
  34. opts.on('-v', '--verbose', 'enable debug messages') do |d|
  35. @options[:v] = d
  36. end
  37. opts.on('-p', '--port [OPT]', 'port of websocket server') do |p|
  38. @options[:p] = p
  39. end
  40. opts.on('-b', '--bind [OPT]', 'bind address') do |b|
  41. @options[:b] = b
  42. end
  43. opts.on('-s', '--secure', 'enable secure connections') do |s|
  44. @options[:s] = s
  45. end
  46. opts.on('-i', '--pid [OPT]', 'pid, default is tmp/pids/websocket.pid') do |i|
  47. @options[:i] = i
  48. end
  49. opts.on('-k', '--private-key [OPT]', '/path/to/server.key for secure connections') do |k|
  50. tls_options[:private_key_file] = k
  51. end
  52. opts.on('-c', '--certificate [OPT]', '/path/to/server.crt for secure connections') do |c|
  53. tls_options[:cert_chain_file] = c
  54. end
  55. end.parse!
  56. if ARGV[0] != 'start' && ARGV[0] != 'stop'
  57. puts "Usage: #{File.basename(__FILE__)} start|stop [options]"
  58. exit
  59. end
  60. if ARGV[0] == 'stop'
  61. puts "Stopping websocket server (pid:#{@options[:i]})"
  62. # read pid
  63. pid = File.open( @options[:i].to_s ).read
  64. pid.gsub!(/\r|\n/, '')
  65. # kill
  66. Process.kill( 9, pid.to_i )
  67. exit
  68. end
  69. if ARGV[0] == 'start' && @options[:d]
  70. puts "Starting websocket server on #{@options[:b]}:#{@options[:p]} (secure:#{@options[:s]},pid:#{@options[:i]})"
  71. Daemons.daemonize
  72. # create pid file
  73. daemon_pid = File.new( @options[:i].to_s, 'w' )
  74. daemon_pid.sync = true
  75. daemon_pid.puts(Process.pid.to_s)
  76. daemon_pid.close
  77. end
  78. @clients = {}
  79. EventMachine.run {
  80. EventMachine::WebSocket.start( host: @options[:b], port: @options[:p], secure: @options[:s], tls_options: tls_options ) do |ws|
  81. # register client connection
  82. ws.onopen {
  83. client_id = ws.object_id.to_s
  84. log 'notice', 'Client connected.', client_id
  85. Sessions.create( client_id, {}, { type: 'websocket' } )
  86. if !@clients.include? client_id
  87. @clients[client_id] = {
  88. websocket: ws,
  89. last_ping: Time.now.utc.to_i,
  90. error_count: 0,
  91. }
  92. end
  93. }
  94. # unregister client connection
  95. ws.onclose {
  96. client_id = ws.object_id.to_s
  97. log 'notice', 'Client disconnected.', client_id
  98. # removed from current client list
  99. if @clients.include? client_id
  100. @clients.delete client_id
  101. end
  102. Sessions.destory( client_id )
  103. }
  104. # manage messages
  105. ws.onmessage { |msg|
  106. client_id = ws.object_id.to_s
  107. log 'debug', "received: #{msg} ", client_id
  108. begin
  109. data = JSON.parse(msg)
  110. rescue => e
  111. log 'error', "can't parse message: #{msg}, #{e.inspect}", client_id
  112. next
  113. end
  114. # check if connection not already exists
  115. next if !@clients[client_id]
  116. # spool messages for new connects
  117. if data['spool']
  118. Sessions.spool_create(msg)
  119. end
  120. # get spool messages and send them to new client connection
  121. if data['action'] == 'spool'
  122. # error handling
  123. if data['timestamp']
  124. log 'notice', "request spool data > '#{Time.at(data['timestamp']).utc.iso8601}'", client_id
  125. else
  126. log 'notice', 'request spool with init data', client_id
  127. end
  128. if @clients[client_id] && @clients[client_id][:session] && @clients[client_id][:session]['id']
  129. spool = Sessions.spool_list( data['timestamp'], @clients[client_id][:session]['id'] )
  130. spool.each { |item|
  131. # create new msg to push to client
  132. if item[:type] == 'direct'
  133. log 'notice', "send spool to (user_id=#{@clients[client_id][:session]['id']})", client_id
  134. websocket_send(client_id, item[:message])
  135. else
  136. log 'notice', 'send spool', client_id
  137. websocket_send(client_id, item[:message])
  138. end
  139. }
  140. else
  141. log 'error', "can't send spool, session not authenticated", client_id
  142. end
  143. # send spool:sent event to client
  144. log 'notice', 'send spool:sent event', client_id
  145. message = {
  146. event: 'spool:sent',
  147. data: {
  148. timestamp: Time.now.utc.to_i,
  149. },
  150. }
  151. websocket_send(client_id, message)
  152. end
  153. # get session
  154. if data['action'] == 'login'
  155. # get user_id
  156. if data && data['session_id']
  157. ActiveRecord::Base.establish_connection
  158. session = ActiveRecord::SessionStore::Session.find_by( session_id: data['session_id'] )
  159. ActiveRecord::Base.remove_connection
  160. end
  161. if session && session.data && session.data['user_id']
  162. new_session_data = { 'id' => session.data['user_id'] }
  163. else
  164. new_session_data = {}
  165. end
  166. @clients[client_id][:session] = new_session_data
  167. Sessions.create( client_id, new_session_data, { type: 'websocket' } )
  168. # remember ping, send pong back
  169. elsif data['action'] == 'ping'
  170. Sessions.touch(client_id)
  171. @clients[client_id][:last_ping] = Time.now.utc.to_i
  172. message = {
  173. action: 'pong',
  174. }
  175. websocket_send(client_id, message)
  176. # broadcast
  177. elsif data['action'] == 'broadcast'
  178. # list all current clients
  179. client_list = Sessions.list
  180. client_list.each {|local_client_id, local_client|
  181. if local_client_id != client_id
  182. # broadcast to recipient list
  183. if data['recipient']
  184. if data['recipient'].class != Hash
  185. log 'error', "recipient attribute isn't a hash '#{data['recipient'].inspect}'"
  186. else
  187. if !data['recipient'].key?('user_id')
  188. log 'error', "need recipient.user_id attribute '#{data['recipient'].inspect}'"
  189. else
  190. if data['recipient']['user_id'].class != Array
  191. log 'error', "recipient.user_id attribute isn't an array '#{data['recipient']['user_id'].inspect}'"
  192. else
  193. data['recipient']['user_id'].each { |user_id|
  194. next if local_client[:user]['id'].to_i != user_id.to_i
  195. log 'notice', "send broadcast from (#{client_id}) to (user_id=#{user_id})", local_client_id
  196. if local_client[:meta][:type] == 'websocket' && @clients[ local_client_id ]
  197. websocket_send(local_client_id, data)
  198. else
  199. Sessions.send(local_client_id, data)
  200. end
  201. }
  202. end
  203. end
  204. end
  205. # broadcast every client
  206. else
  207. log 'notice', "send broadcast from (#{client_id})", local_client_id
  208. if local_client[:meta][:type] == 'websocket' && @clients[ local_client_id ]
  209. websocket_send(local_client_id, data)
  210. else
  211. Sessions.send(local_client_id, data)
  212. end
  213. end
  214. else
  215. log 'notice', 'do not send broadcast to it self', client_id
  216. end
  217. }
  218. end
  219. }
  220. end
  221. # check unused connections
  222. EventMachine.add_timer(0.5) {
  223. check_unused_connections
  224. }
  225. # check open unused connections, kick all connection without activitie in the last 2 minutes
  226. EventMachine.add_periodic_timer(120) {
  227. check_unused_connections
  228. }
  229. EventMachine.add_periodic_timer(20) {
  230. # websocket
  231. log 'notice', "Status: websocket clients: #{@clients.size}"
  232. @clients.each { |client_id, _client|
  233. log 'notice', 'working...', client_id
  234. }
  235. # ajax
  236. client_list = Sessions.list
  237. clients = 0
  238. client_list.each {|_client_id, client|
  239. next if client[:meta][:type] == 'websocket'
  240. clients = clients + 1
  241. }
  242. log 'notice', "Status: ajax clients: #{clients}"
  243. client_list.each {|client_id, client|
  244. next if client[:meta][:type] == 'websocket'
  245. log 'notice', 'working...', client_id
  246. }
  247. }
  248. EventMachine.add_periodic_timer(0.4) {
  249. next if @clients.size == 0
  250. #log 'debug', 'checking for data to send...'
  251. @clients.each { |client_id, client|
  252. next if client[:disconnect]
  253. log 'debug', 'checking for data...', client_id
  254. begin
  255. queue = Sessions.queue( client_id )
  256. if queue && queue[0]
  257. log 'notice', 'send data to client', client_id
  258. websocket_send(client_id, queue)
  259. end
  260. rescue => e
  261. log 'error', 'problem:' + e.inspect, client_id
  262. # disconnect client
  263. client[:error_count] += 1
  264. if client[:error_count] > 20
  265. if @clients.include? client_id
  266. @clients.delete client_id
  267. end
  268. end
  269. end
  270. }
  271. }
  272. def websocket_send(client_id, data)
  273. if data.class != Array
  274. msg = "[#{data.to_json}]"
  275. else
  276. msg = data.to_json
  277. end
  278. log 'debug', "send #{msg}", client_id
  279. if !@clients[client_id]
  280. log 'error', "no such @clients for #{client_id}", client_id
  281. return
  282. end
  283. @clients[client_id][:websocket].send(msg)
  284. end
  285. def check_unused_connections
  286. log 'notice', 'check unused idle connections...'
  287. idle_time_in_sec = 4 * 60
  288. # close unused web socket sessions
  289. @clients.each { |client_id, client|
  290. next if ( client[:last_ping].to_i + idle_time_in_sec ) >= Time.now.utc.to_i
  291. log 'notice', 'closing idle websocket connection', client_id
  292. # remember to not use this connection anymore
  293. client[:disconnect] = true
  294. # try to close regular
  295. client[:websocket].close_websocket
  296. # delete session from client list
  297. sleep 0.3
  298. @clients.delete(client_id)
  299. }
  300. # close unused ajax long polling sessions
  301. clients = Sessions.destory_idle_sessions(idle_time_in_sec)
  302. clients.each { |client_id|
  303. log 'notice', 'closing idle long polling connection', client_id
  304. }
  305. end
  306. def log( level, data, client_id = '-' )
  307. if !@options[:v]
  308. return if level == 'debug'
  309. end
  310. puts "#{Time.now.utc.iso8601}:client(#{client_id}) #{data}"
  311. #puts "#{Time.now.utc.iso8601}:#{ level }:client(#{ client_id }) #{ data }"
  312. end
  313. }