websocket-server.rb 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. #!/usr/bin/env ruby
  2. # Copyright (C) 2012-2016 Zammad Foundation, http://zammad-foundation.org/
  3. begin
  4. load File.expand_path('../bin/spring', __dir__)
  5. rescue LoadError => e
  6. raise unless e.message.include?('spring')
  7. end
  8. dir = File.expand_path(File.join(File.dirname(__FILE__), '..'))
  9. Dir.chdir dir
  10. require 'bundler'
  11. require File.join(dir, 'config', 'environment')
  12. require 'eventmachine'
  13. require 'em-websocket'
  14. require 'json'
  15. require 'fileutils'
  16. require 'optparse'
  17. require 'daemons'
  18. def before_fork
  19. # remember open file handles
  20. @files_to_reopen = []
  21. ObjectSpace.each_object(File) do |file|
  22. @files_to_reopen << file if !file.closed?
  23. end
  24. end
  25. def after_fork(dir)
  26. Dir.chdir dir
  27. # Re-open file handles
  28. @files_to_reopen.each do |file|
  29. file.reopen file.path, 'a+'
  30. file.sync = true
  31. end
  32. # Spring redirects STDOUT and STDERR to /dev/null
  33. # before we get here. This causes the `reopen` lines
  34. # below to fail because the handles are already
  35. # opened for write
  36. if defined?(Spring)
  37. $stdout.close
  38. $stderr.close
  39. end
  40. $stdout.reopen("#{dir}/log/websocket-server_out.log", 'w').sync = true
  41. $stderr.reopen("#{dir}/log/websocket-server_err.log", 'w').sync = true
  42. end
  43. before_fork
  44. # Look for -o with argument, and -I and -D boolean arguments
  45. @options = {
  46. p: 6042,
  47. b: '0.0.0.0',
  48. s: false,
  49. v: false,
  50. d: false,
  51. k: '/path/to/server.key',
  52. c: '/path/to/server.crt',
  53. i: "#{dir}/tmp/pids/websocket.pid"
  54. }
  55. tls_options = {}
  56. OptionParser.new do |opts|
  57. opts.banner = 'Usage: websocket-server.rb start|stop [options]'
  58. opts.on('-d', '--daemon', 'start as daemon') do |d|
  59. @options[:d] = d
  60. end
  61. opts.on('-v', '--verbose', 'enable debug messages') do |d|
  62. @options[:v] = d
  63. end
  64. opts.on('-p', '--port [OPT]', 'port of websocket server') do |p|
  65. @options[:p] = p
  66. end
  67. opts.on('-b', '--bind [OPT]', 'bind address') do |b|
  68. @options[:b] = IPAddr.new(b).to_s
  69. end
  70. opts.on('-s', '--secure', 'enable secure connections') do |s|
  71. @options[:s] = s
  72. end
  73. opts.on('-i', '--pid [OPT]', 'pid, default is tmp/pids/websocket.pid') do |i|
  74. @options[:i] = i
  75. end
  76. opts.on('-k', '--private-key [OPT]', '/path/to/server.key for secure connections') do |k|
  77. tls_options[:private_key_file] = k
  78. end
  79. opts.on('-c', '--certificate [OPT]', '/path/to/server.crt for secure connections') do |c|
  80. tls_options[:cert_chain_file] = c
  81. end
  82. end.parse!
  83. if ARGV[0] != 'start' && ARGV[0] != 'stop'
  84. puts "Usage: #{File.basename(__FILE__)} start|stop [options]"
  85. exit
  86. end
  87. if ARGV[0] == 'stop'
  88. pid = File.read(@options[:i]).to_i
  89. puts "Stopping websocket server (pid: #{pid})"
  90. # IMPORTANT: Use SIGTERM (15), not SIGKILL (9)
  91. # Daemons.rb cleans up the PID file automatically on termination;
  92. # SIGKILL ends the process immediately and bypasses cleanup.
  93. # See https://major.io/2010/03/18/sigterm-vs-sigkill/ for more.
  94. Process.kill(:SIGTERM, pid)
  95. exit
  96. end
  97. if ARGV[0] == 'start' && @options[:d]
  98. puts "Starting websocket server on #{@options[:b]}:#{@options[:p]} (secure: #{@options[:s]}, pidfile: #{@options[:i]})"
  99. # Use Daemons.rb's built-in facility for generating PID files
  100. Daemons.daemonize(
  101. app_name: File.basename(@options[:i], '.pid'),
  102. dir_mode: :normal,
  103. dir: File.dirname(@options[:i])
  104. )
  105. after_fork(dir)
  106. end
  107. @clients = {}
  108. Rails.configuration.interface = 'websocket'
  109. EventMachine.run do
  110. EventMachine::WebSocket.start( host: @options[:b], port: @options[:p], secure: @options[:s], tls_options: tls_options ) do |ws|
  111. # register client connection
  112. ws.onopen do |handshake|
  113. headers = handshake.headers
  114. remote_ip = get_remote_ip(headers)
  115. client_id = ws.object_id.to_s
  116. log 'notice', 'Client connected.', client_id
  117. Sessions.create( client_id, {}, { type: 'websocket' } )
  118. if !@clients.include? client_id
  119. @clients[client_id] = {
  120. websocket: ws,
  121. last_ping: Time.now.utc.to_i,
  122. error_count: 0,
  123. headers: headers,
  124. remote_ip: remote_ip,
  125. }
  126. end
  127. end
  128. # unregister client connection
  129. ws.onclose do
  130. client_id = ws.object_id.to_s
  131. log 'notice', 'Client disconnected.', client_id
  132. # removed from current client list
  133. if @clients.include? client_id
  134. @clients.delete client_id
  135. end
  136. Sessions.destroy(client_id)
  137. end
  138. # manage messages
  139. ws.onmessage do |msg|
  140. client_id = ws.object_id.to_s
  141. log 'debug', "received: #{msg} ", client_id
  142. begin
  143. data = JSON.parse(msg)
  144. rescue => e
  145. log 'error', "can't parse message: #{msg}, #{e.inspect}", client_id
  146. next
  147. end
  148. # check if connection not already exists
  149. next if !@clients[client_id]
  150. Sessions.touch(client_id) # rubocop:disable Rails/SkipsModelValidations
  151. @clients[client_id][:last_ping] = Time.now.utc.to_i
  152. # spool messages for new connects
  153. if data['spool']
  154. Sessions.spool_create(data)
  155. end
  156. if data['event']
  157. log 'debug', "execute event '#{data['event']}'", client_id
  158. message = Sessions::Event.run(
  159. event: data['event'],
  160. payload: data,
  161. session: @clients[client_id][:session],
  162. remote_ip: @clients[client_id][:remote_ip],
  163. client_id: client_id,
  164. clients: @clients,
  165. options: @options,
  166. )
  167. if message
  168. websocket_send(client_id, message)
  169. end
  170. else
  171. log 'error', "unknown message '#{data.inspect}'", client_id
  172. end
  173. end
  174. end
  175. # check unused connections
  176. EventMachine.add_timer(0.5) do
  177. check_unused_connections
  178. end
  179. # check open unused connections, kick all connection without activitie in the last 2 minutes
  180. EventMachine.add_periodic_timer(120) do
  181. check_unused_connections
  182. end
  183. EventMachine.add_periodic_timer(20) do
  184. # websocket
  185. log 'notice', "Status: websocket clients: #{@clients.size}"
  186. @clients.each_key do |client_id|
  187. log 'notice', 'working...', client_id
  188. end
  189. # ajax
  190. client_list = Sessions.list
  191. clients = 0
  192. client_list.each_value do |client|
  193. next if client[:meta][:type] == 'websocket'
  194. clients = clients + 1
  195. end
  196. log 'notice', "Status: ajax clients: #{clients}"
  197. client_list.each do |client_id, client|
  198. next if client[:meta][:type] == 'websocket'
  199. log 'notice', 'working...', client_id
  200. end
  201. end
  202. EventMachine.add_periodic_timer(0.4) do
  203. next if @clients.size.zero?
  204. #log 'debug', 'checking for data to send...'
  205. @clients.each do |client_id, client|
  206. next if client[:disconnect]
  207. log 'debug', 'checking for data...', client_id
  208. begin
  209. queue = Sessions.queue(client_id)
  210. next if queue.blank?
  211. log 'notice', 'send data to client', client_id
  212. websocket_send(client_id, queue)
  213. rescue => e
  214. log 'error', 'problem:' + e.inspect, client_id
  215. # disconnect client
  216. client[:error_count] += 1
  217. if client[:error_count] > 20
  218. if @clients.include? client_id
  219. @clients.delete client_id
  220. end
  221. end
  222. end
  223. end
  224. end
  225. def get_remote_ip(headers)
  226. return headers['X-Forwarded-For'] if headers && headers['X-Forwarded-For']
  227. nil
  228. end
  229. def websocket_send(client_id, data)
  230. msg = if data.class != Array
  231. "[#{data.to_json}]"
  232. else
  233. data.to_json
  234. end
  235. log 'debug', "send #{msg}", client_id
  236. if !@clients[client_id]
  237. log 'error', "no such @clients for #{client_id}", client_id
  238. return
  239. end
  240. @clients[client_id][:websocket].send(msg)
  241. end
  242. def check_unused_connections
  243. log 'notice', 'check unused idle connections...'
  244. idle_time_in_sec = 4 * 60
  245. # close unused web socket sessions
  246. @clients.each do |client_id, client|
  247. next if ( client[:last_ping].to_i + idle_time_in_sec ) >= Time.now.utc.to_i
  248. log 'notice', 'closing idle websocket connection', client_id
  249. # remember to not use this connection anymore
  250. client[:disconnect] = true
  251. # try to close regular
  252. client[:websocket].close_websocket
  253. # delete session from client list
  254. sleep 0.3
  255. @clients.delete(client_id)
  256. end
  257. # close unused ajax long polling sessions
  258. clients = Sessions.destroy_idle_sessions(idle_time_in_sec)
  259. clients.each do |client_id|
  260. log 'notice', 'closing idle long polling connection', client_id
  261. end
  262. end
  263. def log(level, data, client_id = '-')
  264. if !@options[:v]
  265. return if level == 'debug'
  266. end
  267. puts "#{Time.now.utc.iso8601}:client(#{client_id}) #{data}"
  268. #puts "#{Time.now.utc.iso8601}:#{ level }:client(#{ client_id }) #{ data }"
  269. end
  270. end