websocket-server.rb 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. #!/usr/bin/env ruby
  2. # Copyright (C) 2012-2013 Zammad Foundation, http://zammad-foundation.org/
  3. $LOAD_PATH << './lib'
  4. require 'rubygems'
  5. require 'eventmachine'
  6. require 'em-websocket'
  7. require 'json'
  8. require 'fileutils'
  9. require 'optparse'
  10. require 'daemons'
  11. # load rails env
  12. dir = File.expand_path(File.join(File.dirname(__FILE__), '..'))
  13. Dir.chdir dir
  14. RAILS_ENV = ENV['RAILS_ENV'] || 'development'
  15. require File.join(dir, 'config', 'environment')
  16. require 'sessions'
  17. # Look for -o with argument, and -I and -D boolean arguments
  18. @options = {
  19. p: 6042,
  20. b: '0.0.0.0',
  21. s: false,
  22. v: false,
  23. d: false,
  24. k: '/path/to/server.key',
  25. c: '/path/to/server.crt',
  26. i: Dir.pwd.to_s + '/tmp/pids/websocket.pid'
  27. }
  28. tls_options = {}
  29. OptionParser.new do |opts|
  30. opts.banner = 'Usage: websocket-server.rb start|stop [options]'
  31. opts.on('-d', '--daemon', 'start as daemon') do |d|
  32. @options[:d] = d
  33. end
  34. opts.on('-v', '--verbose', 'enable debug messages') do |d|
  35. @options[:v] = d
  36. end
  37. opts.on('-p', '--port [OPT]', 'port of websocket server') do |p|
  38. @options[:p] = p
  39. end
  40. opts.on('-b', '--bind [OPT]', 'bind address') do |b|
  41. @options[:b] = b
  42. end
  43. opts.on('-s', '--secure', 'enable secure connections') do |s|
  44. @options[:s] = s
  45. end
  46. opts.on('-i', '--pid [OPT]', 'pid, default is tmp/pids/websocket.pid') do |i|
  47. @options[:i] = i
  48. end
  49. opts.on('-k', '--private-key [OPT]', '/path/to/server.key for secure connections') do |k|
  50. tls_options[:private_key_file] = k
  51. end
  52. opts.on('-c', '--certificate [OPT]', '/path/to/server.crt for secure connections') do |c|
  53. tls_options[:cert_chain_file] = c
  54. end
  55. end.parse!
  56. if ARGV[0] != 'start' && ARGV[0] != 'stop'
  57. puts "Usage: #{File.basename(__FILE__)} start|stop [options]"
  58. exit
  59. end
  60. if ARGV[0] == 'stop'
  61. puts "Stopping websocket server (pid:#{@options[:i]})"
  62. # read pid
  63. pid = File.open( @options[:i].to_s ).read
  64. pid.gsub!(/\r|\n/, '')
  65. # kill
  66. Process.kill( 9, pid.to_i )
  67. exit
  68. end
  69. if ARGV[0] == 'start' && @options[:d]
  70. puts "Starting websocket server on #{@options[:b]}:#{@options[:p]} (secure:#{@options[:s]},pid:#{@options[:i]})"
  71. Daemons.daemonize
  72. Dir.chdir dir
  73. name = 'websocket-server'
  74. $stdout.reopen( dir + '/log/' + name + '_out.log', 'w')
  75. $stderr.reopen( dir + '/log/' + name + '_err.log', 'w')
  76. # create pid file
  77. daemon_pid = File.new( @options[:i].to_s, 'w' )
  78. daemon_pid.sync = true
  79. daemon_pid.puts(Process.pid.to_s)
  80. daemon_pid.close
  81. end
  82. @clients = {}
  83. Rails.configuration.interface = 'websocket'
  84. EventMachine.run {
  85. EventMachine::WebSocket.start( host: @options[:b], port: @options[:p], secure: @options[:s], tls_options: tls_options ) do |ws|
  86. # register client connection
  87. ws.onopen { |handshake|
  88. headers = handshake.headers
  89. remote_ip = get_remote_ip(headers)
  90. client_id = ws.object_id.to_s
  91. log 'notice', 'Client connected.', client_id
  92. Sessions.create( client_id, {}, { type: 'websocket' } )
  93. if !@clients.include? client_id
  94. @clients[client_id] = {
  95. websocket: ws,
  96. last_ping: Time.now.utc.to_i,
  97. error_count: 0,
  98. headers: headers,
  99. remote_ip: remote_ip,
  100. }
  101. end
  102. }
  103. # unregister client connection
  104. ws.onclose {
  105. client_id = ws.object_id.to_s
  106. log 'notice', 'Client disconnected.', client_id
  107. # removed from current client list
  108. if @clients.include? client_id
  109. @clients.delete client_id
  110. end
  111. Sessions.destory(client_id)
  112. }
  113. # manage messages
  114. ws.onmessage { |msg|
  115. client_id = ws.object_id.to_s
  116. log 'debug', "received: #{msg} ", client_id
  117. begin
  118. data = JSON.parse(msg)
  119. rescue => e
  120. log 'error', "can't parse message: #{msg}, #{e.inspect}", client_id
  121. next
  122. end
  123. # check if connection not already exists
  124. next if !@clients[client_id]
  125. Sessions.touch(client_id)
  126. @clients[client_id][:last_ping] = Time.now.utc.to_i
  127. # spool messages for new connects
  128. if data['spool']
  129. Sessions.spool_create(data)
  130. end
  131. if data['event']
  132. log 'debug', "execute event '#{data['event']}'", client_id
  133. message = Sessions::Event.run(
  134. event: data['event'],
  135. payload: data,
  136. session: @clients[client_id][:session],
  137. remote_ip: @clients[client_id][:remote_ip],
  138. client_id: client_id,
  139. clients: @clients,
  140. options: @options,
  141. )
  142. if message
  143. websocket_send(client_id, message)
  144. end
  145. else
  146. log 'error', "unknown message '#{data.inspect}'", client_id
  147. end
  148. }
  149. end
  150. # check unused connections
  151. EventMachine.add_timer(0.5) {
  152. check_unused_connections
  153. }
  154. # check open unused connections, kick all connection without activitie in the last 2 minutes
  155. EventMachine.add_periodic_timer(120) {
  156. check_unused_connections
  157. }
  158. EventMachine.add_periodic_timer(20) {
  159. # websocket
  160. log 'notice', "Status: websocket clients: #{@clients.size}"
  161. @clients.each { |client_id, _client|
  162. log 'notice', 'working...', client_id
  163. }
  164. # ajax
  165. client_list = Sessions.list
  166. clients = 0
  167. client_list.each { |_client_id, client|
  168. next if client[:meta][:type] == 'websocket'
  169. clients = clients + 1
  170. }
  171. log 'notice', "Status: ajax clients: #{clients}"
  172. client_list.each { |client_id, client|
  173. next if client[:meta][:type] == 'websocket'
  174. log 'notice', 'working...', client_id
  175. }
  176. }
  177. EventMachine.add_periodic_timer(0.4) {
  178. next if @clients.size.zero?
  179. #log 'debug', 'checking for data to send...'
  180. @clients.each { |client_id, client|
  181. next if client[:disconnect]
  182. log 'debug', 'checking for data...', client_id
  183. begin
  184. queue = Sessions.queue( client_id )
  185. if queue && queue[0]
  186. log 'notice', 'send data to client', client_id
  187. websocket_send(client_id, queue)
  188. end
  189. rescue => e
  190. log 'error', 'problem:' + e.inspect, client_id
  191. # disconnect client
  192. client[:error_count] += 1
  193. if client[:error_count] > 20
  194. if @clients.include? client_id
  195. @clients.delete client_id
  196. end
  197. end
  198. end
  199. }
  200. }
  201. def get_remote_ip(headers)
  202. return headers['X-Forwarded-For'] if headers && headers['X-Forwarded-For']
  203. nil
  204. end
  205. def websocket_send(client_id, data)
  206. msg = if data.class != Array
  207. "[#{data.to_json}]"
  208. else
  209. data.to_json
  210. end
  211. log 'debug', "send #{msg}", client_id
  212. if !@clients[client_id]
  213. log 'error', "no such @clients for #{client_id}", client_id
  214. return
  215. end
  216. @clients[client_id][:websocket].send(msg)
  217. end
  218. def check_unused_connections
  219. log 'notice', 'check unused idle connections...'
  220. idle_time_in_sec = 4 * 60
  221. # close unused web socket sessions
  222. @clients.each { |client_id, client|
  223. next if ( client[:last_ping].to_i + idle_time_in_sec ) >= Time.now.utc.to_i
  224. log 'notice', 'closing idle websocket connection', client_id
  225. # remember to not use this connection anymore
  226. client[:disconnect] = true
  227. # try to close regular
  228. client[:websocket].close_websocket
  229. # delete session from client list
  230. sleep 0.3
  231. @clients.delete(client_id)
  232. }
  233. # close unused ajax long polling sessions
  234. clients = Sessions.destory_idle_sessions(idle_time_in_sec)
  235. clients.each { |client_id|
  236. log 'notice', 'closing idle long polling connection', client_id
  237. }
  238. end
  239. def log(level, data, client_id = '-')
  240. if !@options[:v]
  241. return if level == 'debug'
  242. end
  243. puts "#{Time.now.utc.iso8601}:client(#{client_id}) #{data}"
  244. #puts "#{Time.now.utc.iso8601}:#{ level }:client(#{ client_id }) #{ data }"
  245. end
  246. }