websocket_server.rb 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. class WebsocketServer
  2. cattr_reader :clients, :options
  3. def self.run(options)
  4. @options = options
  5. @clients = {}
  6. Rails.configuration.interface = 'websocket'
  7. EventMachine.run do
  8. EventMachine::WebSocket.start( host: @options[:b], port: @options[:p], secure: @options[:s], tls_options: @options[:tls_options] ) do |ws|
  9. # register client connection
  10. ws.onopen do |handshake|
  11. WebsocketServer.onopen(ws, handshake)
  12. end
  13. # unregister client connection
  14. ws.onclose do
  15. WebsocketServer.onclose(ws)
  16. end
  17. # manage messages
  18. ws.onmessage do |msg|
  19. WebsocketServer.onmessage(ws, msg)
  20. end
  21. end
  22. # check unused connections
  23. EventMachine.add_timer(0.5) do
  24. WebsocketServer.check_unused_connections
  25. end
  26. # check open unused connections, kick all connection without activitie in the last 2 minutes
  27. EventMachine.add_periodic_timer(120) do
  28. WebsocketServer.check_unused_connections
  29. end
  30. EventMachine.add_periodic_timer(20) do
  31. WebsocketServer.log_status
  32. end
  33. EventMachine.add_periodic_timer(0.4) do
  34. WebsocketServer.send_to_client
  35. end
  36. end
  37. end
  38. def self.onopen(websocket, handshake)
  39. headers = handshake.headers
  40. remote_ip = get_remote_ip(headers)
  41. client_id = websocket.object_id.to_s
  42. log 'notice', 'Client connected.', client_id
  43. Sessions.create( client_id, {}, { type: 'websocket' } )
  44. return if @clients.include? client_id
  45. @clients[client_id] = {
  46. websocket: websocket,
  47. last_ping: Time.now.utc.to_i,
  48. error_count: 0,
  49. headers: headers,
  50. remote_ip: remote_ip,
  51. }
  52. end
  53. def self.onclose(websocket)
  54. client_id = websocket.object_id.to_s
  55. log 'notice', 'Client disconnected.', client_id
  56. # removed from current client list
  57. if @clients.include? client_id
  58. @clients.delete client_id
  59. end
  60. Sessions.destroy(client_id)
  61. end
  62. def self.onmessage(websocket, msg)
  63. client_id = websocket.object_id.to_s
  64. log 'debug', "received: #{msg} ", client_id
  65. begin
  66. data = JSON.parse(msg)
  67. rescue => e
  68. log 'error', "can't parse message: #{msg}, #{e.inspect}", client_id
  69. return
  70. end
  71. # check if connection not already exists
  72. return if !@clients[client_id]
  73. Sessions.touch(client_id) # rubocop:disable Rails/SkipsModelValidations
  74. @clients[client_id][:last_ping] = Time.now.utc.to_i
  75. # spool messages for new connects
  76. if data['spool']
  77. Sessions.spool_create(data)
  78. end
  79. if data['event']
  80. log 'debug', "execute event '#{data['event']}'", client_id
  81. message = Sessions::Event.run(
  82. event: data['event'],
  83. payload: data,
  84. session: @clients[client_id][:session],
  85. remote_ip: @clients[client_id][:remote_ip],
  86. client_id: client_id,
  87. clients: @clients,
  88. options: @options,
  89. )
  90. if message
  91. websocket_send(client_id, message)
  92. end
  93. else
  94. log 'error', "unknown message '#{data.inspect}'", client_id
  95. end
  96. end
  97. def self.get_remote_ip(headers)
  98. return headers['X-Forwarded-For'] if headers && headers['X-Forwarded-For']
  99. nil
  100. end
  101. def self.websocket_send(client_id, data)
  102. msg = if data.class != Array
  103. "[#{data.to_json}]"
  104. else
  105. data.to_json
  106. end
  107. log 'debug', "send #{msg}", client_id
  108. if !@clients[client_id]
  109. log 'error', "no such @clients for #{client_id}", client_id
  110. return
  111. end
  112. @clients[client_id][:websocket].send(msg)
  113. end
  114. def self.check_unused_connections
  115. log 'notice', 'check unused idle connections...'
  116. idle_time_in_sec = 4 * 60
  117. # close unused web socket sessions
  118. @clients.each do |client_id, client|
  119. next if ( client[:last_ping].to_i + idle_time_in_sec ) >= Time.now.utc.to_i
  120. log 'notice', 'closing idle websocket connection', client_id
  121. # remember to not use this connection anymore
  122. client[:disconnect] = true
  123. # try to close regular
  124. client[:websocket].close_websocket
  125. # delete session from client list
  126. sleep 0.3
  127. @clients.delete(client_id)
  128. end
  129. # close unused ajax long polling sessions
  130. clients = Sessions.destroy_idle_sessions(idle_time_in_sec)
  131. clients.each do |client_id|
  132. log 'notice', 'closing idle long polling connection', client_id
  133. end
  134. end
  135. def self.send_to_client
  136. return if @clients.size.zero?
  137. #log 'debug', 'checking for data to send...'
  138. @clients.each do |client_id, client|
  139. next if client[:disconnect]
  140. log 'debug', 'checking for data...', client_id
  141. begin
  142. queue = Sessions.queue(client_id)
  143. next if queue.blank?
  144. log 'notice', 'send data to client', client_id
  145. websocket_send(client_id, queue)
  146. rescue => e
  147. log 'error', 'problem:' + e.inspect, client_id
  148. # disconnect client
  149. client[:error_count] += 1
  150. if client[:error_count] > 20
  151. if @clients.include? client_id
  152. @clients.delete client_id
  153. end
  154. end
  155. end
  156. end
  157. end
  158. def self.log_status
  159. # websocket
  160. log 'notice', "Status: websocket clients: #{@clients.size}"
  161. @clients.each_key do |client_id|
  162. log 'notice', 'working...', client_id
  163. end
  164. # ajax
  165. client_list = Sessions.list
  166. clients = 0
  167. client_list.each_value do |client|
  168. next if client[:meta][:type] == 'websocket'
  169. clients = clients + 1
  170. end
  171. log 'notice', "Status: ajax clients: #{clients}"
  172. client_list.each do |client_id, client|
  173. next if client[:meta][:type] == 'websocket'
  174. log 'notice', 'working...', client_id
  175. end
  176. end
  177. def self.log(level, data, client_id = '-')
  178. if !@options[:v]
  179. return if level == 'debug'
  180. end
  181. puts "#{Time.now.utc.iso8601}:client(#{client_id}) #{data}" # rubocop:disable Rails/Output
  182. #puts "#{Time.now.utc.iso8601}:#{ level }:client(#{ client_id }) #{ data }"
  183. end
  184. end