process_sessions_jobs.rb 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. # Copyright (C) 2012-2024 Zammad Foundation, https://zammad-foundation.org/
  2. class BackgroundServices
  3. class Service
  4. class ProcessSessionsJobs < Service
  5. attr_reader :client_threads
  6. def self.max_workers
  7. 16
  8. end
  9. def self.pre_run
  10. previous_nodes_sessions = Sessions::Node.stats
  11. return if previous_nodes_sessions.blank?
  12. Rails.logger.info { "Cleaning up previous Sessions::Node sessions: #{previous_nodes_sessions}" }
  13. Sessions::Node.cleanup
  14. end
  15. def initialize(...)
  16. @client_threads = Concurrent::Hash.new
  17. super
  18. end
  19. def launch
  20. loop do
  21. break if BackgroundServices.shutdown_requested
  22. ActiveRecord::Base.clear_query_caches_for_current_thread
  23. start_threads_for_new_client_sessions
  24. sleep 1
  25. end
  26. client_threads.values.compact.each(&:join)
  27. end
  28. private
  29. def fetch_client_ids
  30. return Sessions.sessions if !fork_id
  31. Sessions::Node.register(fork_id)
  32. Sessions::Node.sessions_by(fork_id)
  33. end
  34. def start_threads_for_new_client_sessions
  35. fetch_client_ids.each do |client_id|
  36. # connection already open, ignore
  37. next if client_threads[client_id]
  38. # check current user
  39. next if !valid_client_session?(client_id)
  40. start_client_session_thread(client_id)
  41. sleep 1
  42. end
  43. end
  44. def valid_client_session?(client_id)
  45. session_user_id = Sessions.get(client_id)&.dig(:user, 'id')
  46. return false if session_user_id.blank?
  47. User.exists?(session_user_id)
  48. end
  49. def start_client_session_thread(client_id)
  50. client_threads[client_id] = Thread.new do
  51. Rails.application.executor.wrap do
  52. Sessions.thread_client(client_id, 0, Time.now.utc, fork_id)
  53. client_threads.delete(client_id)
  54. Rails.logger.info { "Closing session client (#{client_id}) thread" }
  55. end
  56. end
  57. end
  58. end
  59. end
  60. end