websocket-server.rb 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. #!/usr/bin/env ruby
  2. # Copyright (C) 2012-2016 Zammad Foundation, http://zammad-foundation.org/
  3. $LOAD_PATH << './lib'
  4. require 'rubygems'
  5. # load rails env
  6. dir = File.expand_path(File.join(File.dirname(__FILE__), '..'))
  7. Dir.chdir dir
  8. RAILS_ENV = ENV['RAILS_ENV'] || 'development'
  9. require 'rails/all'
  10. require 'bundler'
  11. require File.join(dir, 'config', 'environment')
  12. require 'eventmachine'
  13. require 'em-websocket'
  14. require 'json'
  15. require 'fileutils'
  16. require 'optparse'
  17. require 'daemons'
  18. require 'sessions'
  19. def before_fork
  20. # remember open file handles
  21. @files_to_reopen = []
  22. ObjectSpace.each_object(File) do |file|
  23. @files_to_reopen << file unless file.closed?
  24. end
  25. end
  26. def after_fork(dir)
  27. Dir.chdir dir
  28. # Re-open file handles
  29. @files_to_reopen.each do |file|
  30. file.reopen file.path, 'a+'
  31. file.sync = true
  32. end
  33. $stdout.reopen( "#{dir}/log/websocket-server_out.log", 'w')
  34. $stderr.reopen( "#{dir}/log/websocket-server_err.log", 'w')
  35. end
  36. before_fork
  37. # Look for -o with argument, and -I and -D boolean arguments
  38. @options = {
  39. p: 6042,
  40. b: '0.0.0.0',
  41. s: false,
  42. v: false,
  43. d: false,
  44. k: '/path/to/server.key',
  45. c: '/path/to/server.crt',
  46. i: Dir.pwd.to_s + '/tmp/pids/websocket.pid'
  47. }
  48. tls_options = {}
  49. OptionParser.new do |opts|
  50. opts.banner = 'Usage: websocket-server.rb start|stop [options]'
  51. opts.on('-d', '--daemon', 'start as daemon') do |d|
  52. @options[:d] = d
  53. end
  54. opts.on('-v', '--verbose', 'enable debug messages') do |d|
  55. @options[:v] = d
  56. end
  57. opts.on('-p', '--port [OPT]', 'port of websocket server') do |p|
  58. @options[:p] = p
  59. end
  60. opts.on('-b', '--bind [OPT]', 'bind address') do |b|
  61. @options[:b] = b
  62. end
  63. opts.on('-s', '--secure', 'enable secure connections') do |s|
  64. @options[:s] = s
  65. end
  66. opts.on('-i', '--pid [OPT]', 'pid, default is tmp/pids/websocket.pid') do |i|
  67. @options[:i] = i
  68. end
  69. opts.on('-k', '--private-key [OPT]', '/path/to/server.key for secure connections') do |k|
  70. tls_options[:private_key_file] = k
  71. end
  72. opts.on('-c', '--certificate [OPT]', '/path/to/server.crt for secure connections') do |c|
  73. tls_options[:cert_chain_file] = c
  74. end
  75. end.parse!
  76. if ARGV[0] != 'start' && ARGV[0] != 'stop'
  77. puts "Usage: #{File.basename(__FILE__)} start|stop [options]"
  78. exit
  79. end
  80. if ARGV[0] == 'stop'
  81. puts "Stopping websocket server (pid:#{@options[:i]})"
  82. # read pid
  83. pid = File.open( @options[:i].to_s ).read
  84. pid.gsub!(/\r|\n/, '')
  85. # kill
  86. Process.kill( 9, pid.to_i )
  87. exit
  88. end
  89. if ARGV[0] == 'start' && @options[:d]
  90. puts "Starting websocket server on #{@options[:b]}:#{@options[:p]} (secure:#{@options[:s]},pid:#{@options[:i]})"
  91. Daemons.daemonize
  92. after_fork(dir)
  93. # create pid file
  94. daemon_pid = File.new(@options[:i].to_s, 'w')
  95. daemon_pid.sync = true
  96. daemon_pid.puts(Process.pid.to_s)
  97. daemon_pid.close
  98. end
  99. @clients = {}
  100. Rails.configuration.interface = 'websocket'
  101. EventMachine.run do
  102. EventMachine::WebSocket.start( host: @options[:b], port: @options[:p], secure: @options[:s], tls_options: tls_options ) do |ws|
  103. # register client connection
  104. ws.onopen do |handshake|
  105. headers = handshake.headers
  106. remote_ip = get_remote_ip(headers)
  107. client_id = ws.object_id.to_s
  108. log 'notice', 'Client connected.', client_id
  109. Sessions.create( client_id, {}, { type: 'websocket' } )
  110. if !@clients.include? client_id
  111. @clients[client_id] = {
  112. websocket: ws,
  113. last_ping: Time.now.utc.to_i,
  114. error_count: 0,
  115. headers: headers,
  116. remote_ip: remote_ip,
  117. }
  118. end
  119. end
  120. # unregister client connection
  121. ws.onclose do
  122. client_id = ws.object_id.to_s
  123. log 'notice', 'Client disconnected.', client_id
  124. # removed from current client list
  125. if @clients.include? client_id
  126. @clients.delete client_id
  127. end
  128. Sessions.destroy(client_id)
  129. end
  130. # manage messages
  131. ws.onmessage do |msg|
  132. client_id = ws.object_id.to_s
  133. log 'debug', "received: #{msg} ", client_id
  134. begin
  135. data = JSON.parse(msg)
  136. rescue => e
  137. log 'error', "can't parse message: #{msg}, #{e.inspect}", client_id
  138. next
  139. end
  140. # check if connection not already exists
  141. next if !@clients[client_id]
  142. Sessions.touch(client_id)
  143. @clients[client_id][:last_ping] = Time.now.utc.to_i
  144. # spool messages for new connects
  145. if data['spool']
  146. Sessions.spool_create(data)
  147. end
  148. if data['event']
  149. log 'debug', "execute event '#{data['event']}'", client_id
  150. message = Sessions::Event.run(
  151. event: data['event'],
  152. payload: data,
  153. session: @clients[client_id][:session],
  154. remote_ip: @clients[client_id][:remote_ip],
  155. client_id: client_id,
  156. clients: @clients,
  157. options: @options,
  158. )
  159. if message
  160. websocket_send(client_id, message)
  161. end
  162. else
  163. log 'error', "unknown message '#{data.inspect}'", client_id
  164. end
  165. end
  166. end
  167. # check unused connections
  168. EventMachine.add_timer(0.5) do
  169. check_unused_connections
  170. end
  171. # check open unused connections, kick all connection without activitie in the last 2 minutes
  172. EventMachine.add_periodic_timer(120) do
  173. check_unused_connections
  174. end
  175. EventMachine.add_periodic_timer(20) do
  176. # websocket
  177. log 'notice', "Status: websocket clients: #{@clients.size}"
  178. @clients.each do |client_id, _client|
  179. log 'notice', 'working...', client_id
  180. end
  181. # ajax
  182. client_list = Sessions.list
  183. clients = 0
  184. client_list.each do |_client_id, client|
  185. next if client[:meta][:type] == 'websocket'
  186. clients = clients + 1
  187. end
  188. log 'notice', "Status: ajax clients: #{clients}"
  189. client_list.each do |client_id, client|
  190. next if client[:meta][:type] == 'websocket'
  191. log 'notice', 'working...', client_id
  192. end
  193. end
  194. EventMachine.add_periodic_timer(0.4) do
  195. next if @clients.size.zero?
  196. #log 'debug', 'checking for data to send...'
  197. @clients.each do |client_id, client|
  198. next if client[:disconnect]
  199. log 'debug', 'checking for data...', client_id
  200. begin
  201. queue = Sessions.queue(client_id)
  202. next if queue.blank?
  203. log 'notice', 'send data to client', client_id
  204. websocket_send(client_id, queue)
  205. rescue => e
  206. log 'error', 'problem:' + e.inspect, client_id
  207. # disconnect client
  208. client[:error_count] += 1
  209. if client[:error_count] > 20
  210. if @clients.include? client_id
  211. @clients.delete client_id
  212. end
  213. end
  214. end
  215. end
  216. end
  217. def get_remote_ip(headers)
  218. return headers['X-Forwarded-For'] if headers && headers['X-Forwarded-For']
  219. nil
  220. end
  221. def websocket_send(client_id, data)
  222. msg = if data.class != Array
  223. "[#{data.to_json}]"
  224. else
  225. data.to_json
  226. end
  227. log 'debug', "send #{msg}", client_id
  228. if !@clients[client_id]
  229. log 'error', "no such @clients for #{client_id}", client_id
  230. return
  231. end
  232. @clients[client_id][:websocket].send(msg)
  233. end
  234. def check_unused_connections
  235. log 'notice', 'check unused idle connections...'
  236. idle_time_in_sec = 4 * 60
  237. # close unused web socket sessions
  238. @clients.each do |client_id, client|
  239. next if ( client[:last_ping].to_i + idle_time_in_sec ) >= Time.now.utc.to_i
  240. log 'notice', 'closing idle websocket connection', client_id
  241. # remember to not use this connection anymore
  242. client[:disconnect] = true
  243. # try to close regular
  244. client[:websocket].close_websocket
  245. # delete session from client list
  246. sleep 0.3
  247. @clients.delete(client_id)
  248. end
  249. # close unused ajax long polling sessions
  250. clients = Sessions.destroy_idle_sessions(idle_time_in_sec)
  251. clients.each do |client_id|
  252. log 'notice', 'closing idle long polling connection', client_id
  253. end
  254. end
  255. def log(level, data, client_id = '-')
  256. if !@options[:v]
  257. return if level == 'debug'
  258. end
  259. puts "#{Time.now.utc.iso8601}:client(#{client_id}) #{data}"
  260. #puts "#{Time.now.utc.iso8601}:#{ level }:client(#{ client_id }) #{ data }"
  261. end
  262. end