websocket_server.rb 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. # Copyright (C) 2012-2025 Zammad Foundation, https://zammad-foundation.org/
  2. class WebsocketServer
  3. cattr_reader :clients, :options
  4. def self.run(options)
  5. @options = options
  6. @clients = {}
  7. # By default, we are only logging errors to STDOUT.
  8. # To turn on some more logging to get some insights, please, provide one of the following parameters:
  9. # -n | --info => info
  10. # -v | --verbose => debug
  11. Rails.configuration.interface = 'websocket'
  12. AppVersion.start_maintenance_thread(process_name: 'websocket-server')
  13. Zammad::ProcessDebug.install_thread_status_handler
  14. EventMachine.run do
  15. EventMachine::WebSocket.start(host: @options[:b], port: @options[:p], secure: @options[:s], tls_options: @options[:tls_options]) do |ws|
  16. # register client connection
  17. ws.onopen do |handshake|
  18. WebsocketServer.onopen(ws, handshake)
  19. end
  20. # unregister client connection
  21. ws.onclose do
  22. WebsocketServer.onclose(ws)
  23. end
  24. # manage messages
  25. ws.onmessage do |msg|
  26. WebsocketServer.onmessage(ws, msg)
  27. end
  28. end
  29. # check unused connections
  30. EventMachine.add_timer(0.5) do
  31. WebsocketServer.check_unused_connections
  32. end
  33. # check open unused connections, kick all connection without activity in the last 2 minutes
  34. EventMachine.add_periodic_timer(120) do
  35. WebsocketServer.check_unused_connections
  36. end
  37. EventMachine.add_periodic_timer(20) do
  38. WebsocketServer.log_status
  39. end
  40. EventMachine.add_periodic_timer(0.4) do
  41. WebsocketServer.send_to_client
  42. end
  43. end
  44. end
  45. def self.onopen(websocket, handshake)
  46. headers = handshake.headers
  47. client_id = websocket.object_id.to_s
  48. log 'info', 'Client connected.', client_id
  49. Sessions.create(client_id, {}, { type: 'websocket' })
  50. return if @clients.include? client_id
  51. @clients[client_id] = {
  52. websocket: websocket,
  53. last_ping: Time.now.utc.to_i,
  54. error_count: 0,
  55. headers: headers,
  56. }
  57. end
  58. def self.onclose(websocket)
  59. client_id = websocket.object_id.to_s
  60. log 'info', 'Client disconnected.', client_id
  61. # removed from current client list
  62. if @clients.include? client_id
  63. @clients.delete client_id
  64. end
  65. Sessions.destroy(client_id)
  66. end
  67. def self.onmessage(websocket, msg)
  68. client_id = websocket.object_id.to_s
  69. log 'info', "receiving #{msg.to_s.bytesize} bytes", client_id
  70. log 'debug', "received: #{msg}", client_id
  71. begin
  72. log 'info', 'start: parse message to JSON', client_id
  73. data = JSON.parse(msg)
  74. log 'info', 'end: parse message to JSON', client_id
  75. rescue => e
  76. log 'error', "can't parse message: #{msg}, #{e.inspect}", client_id
  77. return
  78. end
  79. # check if connection not already exists
  80. return if !@clients[client_id]
  81. Sessions.touch(client_id) # rubocop:disable Rails/SkipsModelValidations
  82. @clients[client_id][:last_ping] = Time.now.utc.to_i
  83. if data['event']
  84. log 'info', "start: execute event '#{data['event']}'", client_id
  85. message = Sessions::Event.run(
  86. event: data['event'],
  87. payload: data,
  88. session: @clients[client_id][:session],
  89. headers: @clients[client_id][:headers],
  90. client_id: client_id,
  91. clients: @clients,
  92. options: @options,
  93. )
  94. log 'info', "end: execute event '#{data['event']}'", client_id
  95. if message
  96. websocket_send(client_id, message)
  97. end
  98. else
  99. log 'error', "unknown message '#{data.inspect}'", client_id
  100. end
  101. end
  102. def self.websocket_send(client_id, data)
  103. msg = if data.instance_of?(Array)
  104. data.to_json
  105. else
  106. "[#{data.to_json}]"
  107. end
  108. log 'info', "sending #{msg.to_s.bytesize} bytes", client_id
  109. log 'debug', "send: #{msg}", client_id
  110. if !@clients[client_id]
  111. log 'error', "no such @clients for #{client_id}", client_id
  112. return
  113. end
  114. @clients[client_id][:websocket].send(msg)
  115. end
  116. def self.check_unused_connections
  117. log 'info', 'check unused idle connections...'
  118. idle_time_in_sec = 4 * 60
  119. # close unused web socket sessions
  120. @clients.each do |client_id, client|
  121. next if (client[:last_ping].to_i + idle_time_in_sec) >= Time.now.utc.to_i
  122. log 'info', 'closing idle websocket connection', client_id
  123. # remember to not use this connection anymore
  124. client[:disconnect] = true
  125. # try to close regular
  126. client[:websocket].close_websocket
  127. # delete session from client list
  128. sleep 0.3
  129. @clients.delete(client_id)
  130. end
  131. # close unused ajax long polling sessions
  132. clients = Sessions.destroy_idle_sessions(idle_time_in_sec)
  133. clients.each do |client_id|
  134. log 'info', 'closing idle long polling connection', client_id
  135. end
  136. end
  137. def self.send_to_client
  138. return if @clients.empty?
  139. # log 'debug', 'checking for data to send...'
  140. @clients.each do |client_id, client|
  141. next if client[:disconnect]
  142. log 'debug', 'checking for data...', client_id
  143. begin
  144. queue = Sessions.queue(client_id)
  145. next if queue.blank?
  146. websocket_send(client_id, queue)
  147. rescue => e
  148. log 'error', "problem:#{e.inspect}", client_id
  149. # disconnect client
  150. client[:error_count] += 1
  151. if client[:error_count] > 20 && @clients.include?(client_id)
  152. @clients.delete client_id
  153. end
  154. end
  155. end
  156. end
  157. def self.log_status
  158. # websocket
  159. log 'info', "Status: websocket clients: #{@clients.size}"
  160. @clients.each_key do |client_id|
  161. log 'info', 'working...', client_id
  162. end
  163. # ajax
  164. client_list = Sessions.list
  165. clients = 0
  166. client_list.each_value do |client|
  167. next if client[:meta][:type] == 'websocket'
  168. clients += 1
  169. end
  170. log 'info', "Status: ajax clients: #{clients}"
  171. client_list.each do |client_id, client|
  172. next if client[:meta][:type] == 'websocket'
  173. log 'info', 'working...', client_id
  174. end
  175. end
  176. def self.log(level, data, client_id = '-')
  177. skip_log = true
  178. case level
  179. when 'error'
  180. skip_log = false
  181. when 'debug'
  182. if @options[:v]
  183. skip_log = false
  184. end
  185. when 'info'
  186. if @options[:n] || @options[:v]
  187. skip_log = false
  188. end
  189. end
  190. return if skip_log
  191. client_id_str = client_id.eql?('-') ? '' : "##{client_id}"
  192. # same format as in log/production.log
  193. puts "#{level.to_s.first.upcase}, [#{Time.now.utc.strftime('%FT%T.%6N')}#{client_id_str}] #{level.upcase} -- : #{data}" # rubocop:disable Rails/Output
  194. end
  195. end