websocket_server.rb 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. # Copyright (C) 2012-2022 Zammad Foundation, https://zammad-foundation.org/
  2. class WebsocketServer
  3. cattr_reader :clients, :options
  4. def self.run(options)
  5. @options = options
  6. @clients = {}
  7. Rails.configuration.interface = 'websocket'
  8. EventMachine.run do
  9. EventMachine::WebSocket.start(host: @options[:b], port: @options[:p], secure: @options[:s], tls_options: @options[:tls_options]) do |ws|
  10. # register client connection
  11. ws.onopen do |handshake|
  12. WebsocketServer.onopen(ws, handshake)
  13. end
  14. # unregister client connection
  15. ws.onclose do
  16. WebsocketServer.onclose(ws)
  17. end
  18. # manage messages
  19. ws.onmessage do |msg|
  20. WebsocketServer.onmessage(ws, msg)
  21. end
  22. end
  23. # check unused connections
  24. EventMachine.add_timer(0.5) do
  25. WebsocketServer.check_unused_connections
  26. end
  27. # check open unused connections, kick all connection without activity in the last 2 minutes
  28. EventMachine.add_periodic_timer(120) do
  29. WebsocketServer.check_unused_connections
  30. end
  31. EventMachine.add_periodic_timer(20) do
  32. WebsocketServer.log_status
  33. end
  34. EventMachine.add_periodic_timer(0.4) do
  35. WebsocketServer.send_to_client
  36. end
  37. end
  38. end
  39. def self.onopen(websocket, handshake)
  40. headers = handshake.headers
  41. client_id = websocket.object_id.to_s
  42. log 'info', 'Client connected.', client_id
  43. Sessions.create(client_id, {}, { type: 'websocket' })
  44. return if @clients.include? client_id
  45. @clients[client_id] = {
  46. websocket: websocket,
  47. last_ping: Time.now.utc.to_i,
  48. error_count: 0,
  49. headers: headers,
  50. }
  51. end
  52. def self.onclose(websocket)
  53. client_id = websocket.object_id.to_s
  54. log 'info', 'Client disconnected.', client_id
  55. # removed from current client list
  56. if @clients.include? client_id
  57. @clients.delete client_id
  58. end
  59. Sessions.destroy(client_id)
  60. end
  61. def self.onmessage(websocket, msg)
  62. client_id = websocket.object_id.to_s
  63. log 'debug', "received: #{msg} ", client_id
  64. begin
  65. data = JSON.parse(msg)
  66. rescue => e
  67. log 'error', "can't parse message: #{msg}, #{e.inspect}", client_id
  68. return
  69. end
  70. # check if connection not already exists
  71. return if !@clients[client_id]
  72. Sessions.touch(client_id) # rubocop:disable Rails/SkipsModelValidations
  73. @clients[client_id][:last_ping] = Time.now.utc.to_i
  74. # spool messages for new connects
  75. if data['spool']
  76. Sessions.spool_create(data)
  77. end
  78. if data['event']
  79. log 'debug', "execute event '#{data['event']}'", client_id
  80. message = Sessions::Event.run(
  81. event: data['event'],
  82. payload: data,
  83. session: @clients[client_id][:session],
  84. headers: @clients[client_id][:headers],
  85. client_id: client_id,
  86. clients: @clients,
  87. options: @options,
  88. )
  89. if message
  90. websocket_send(client_id, message)
  91. end
  92. else
  93. log 'error', "unknown message '#{data.inspect}'", client_id
  94. end
  95. ensure
  96. ActiveSupport::CurrentAttributes.clear_all
  97. end
  98. def self.websocket_send(client_id, data)
  99. msg = if data.instance_of?(Array)
  100. data.to_json
  101. else
  102. "[#{data.to_json}]"
  103. end
  104. log 'debug', "send #{msg}", client_id
  105. if !@clients[client_id]
  106. log 'error', "no such @clients for #{client_id}", client_id
  107. return
  108. end
  109. @clients[client_id][:websocket].send(msg)
  110. end
  111. def self.check_unused_connections
  112. log 'info', 'check unused idle connections...'
  113. idle_time_in_sec = 4 * 60
  114. # close unused web socket sessions
  115. @clients.each do |client_id, client|
  116. next if (client[:last_ping].to_i + idle_time_in_sec) >= Time.now.utc.to_i
  117. log 'info', 'closing idle websocket connection', client_id
  118. # remember to not use this connection anymore
  119. client[:disconnect] = true
  120. # try to close regular
  121. client[:websocket].close_websocket
  122. # delete session from client list
  123. sleep 0.3
  124. @clients.delete(client_id)
  125. end
  126. # close unused ajax long polling sessions
  127. clients = Sessions.destroy_idle_sessions(idle_time_in_sec)
  128. clients.each do |client_id|
  129. log 'info', 'closing idle long polling connection', client_id
  130. end
  131. end
  132. def self.send_to_client
  133. return if @clients.size.zero?
  134. # log 'debug', 'checking for data to send...'
  135. @clients.each do |client_id, client|
  136. next if client[:disconnect]
  137. log 'debug', 'checking for data...', client_id
  138. begin
  139. queue = Sessions.queue(client_id)
  140. next if queue.blank?
  141. log 'info', 'send data to client', client_id
  142. websocket_send(client_id, queue)
  143. rescue => e
  144. log 'error', "problem:#{e.inspect}", client_id
  145. # disconnect client
  146. client[:error_count] += 1
  147. if client[:error_count] > 20 && @clients.include?(client_id)
  148. @clients.delete client_id
  149. end
  150. end
  151. end
  152. end
  153. def self.log_status
  154. # websocket
  155. log 'info', "Status: websocket clients: #{@clients.size}"
  156. @clients.each_key do |client_id|
  157. log 'info', 'working...', client_id
  158. end
  159. # ajax
  160. client_list = Sessions.list
  161. clients = 0
  162. client_list.each_value do |client|
  163. next if client[:meta][:type] == 'websocket'
  164. clients += 1
  165. end
  166. log 'info', "Status: ajax clients: #{clients}"
  167. client_list.each do |client_id, client|
  168. next if client[:meta][:type] == 'websocket'
  169. log 'info', 'working...', client_id
  170. end
  171. end
  172. def self.log(level, data, client_id = '-')
  173. return if !@options[:v] && level == 'debug'
  174. puts "#{Time.now.utc.iso8601}:client(#{client_id}) #{data}" # rubocop:disable Rails/Output
  175. # puts "#{Time.now.utc.iso8601}:#{ level }:client(#{ client_id }) #{ data }"
  176. end
  177. end