Browse Source

Maintenance: Improve scheduler shutdown handling

Martin Gruner 3 months ago
parent
commit
b5141f6670

+ 12 - 0
config/initializers/delayed_worker_stop.rb

@@ -0,0 +1,12 @@
+# Copyright (C) 2012-2024 Zammad Foundation, https://zammad-foundation.org/
+
+require 'delayed/worker'
+
+# Monkey patch for early exit during work_off, used by BackgroundServices::Service::ProcessDelayedJobs
+module Delayed
+  class Worker
+    def stop?
+      BackgroundServices.shutdown_requested
+    end
+  end
+end

+ 10 - 0
db/migrate/20241206194620_drop_sessions_jobs_scheduler.rb

@@ -0,0 +1,10 @@
+# Copyright (C) 2012-2024 Zammad Foundation, https://zammad-foundation.org/
+
+class DropSessionsJobsScheduler < ActiveRecord::Migration[7.2]
+  def change
+    # return if it's a new setup
+    return if !Setting.exists?(name: 'system_init_done')
+
+    Scheduler.where(method: 'Sessions.jobs').destroy_all
+  end
+end

+ 0 - 9
db/seeds/schedulers.rb

@@ -39,15 +39,6 @@ Scheduler.create_if_not_exists(
   updated_by_id: 1,
   created_by_id: 1,
 )
-Scheduler.create_if_not_exists(
-  name:          __("Generate 'Session' data."),
-  method:        'Sessions.jobs',
-  period:        60.seconds,
-  prio:          1,
-  active:        true,
-  updated_by_id: 1,
-  created_by_id: 1,
-)
 Scheduler.create_if_not_exists(
   name:          __('Execute planned jobs.'),
   method:        'Job.run',

+ 23 - 27
i18n/zammad.pot

@@ -2790,39 +2790,39 @@ msgstr ""
 msgid "City"
 msgstr ""
 
-#: db/seeds/schedulers.rb:178
+#: db/seeds/schedulers.rb:169
 msgid "Clean up 'Cti::Log'."
 msgstr ""
 
-#: db/seeds/schedulers.rb:187
+#: db/seeds/schedulers.rb:178
 msgid "Clean up 'DataPrivacyTask'."
 msgstr ""
 
-#: db/seeds/schedulers.rb:169
+#: db/seeds/schedulers.rb:160
 msgid "Clean up 'HttpLog'."
 msgstr ""
 
-#: db/seeds/schedulers.rb:124
+#: db/seeds/schedulers.rb:115
 msgid "Clean up ActiveJob locks."
 msgstr ""
 
-#: db/seeds/schedulers.rb:233
+#: db/seeds/schedulers.rb:224
 msgid "Clean up cache."
 msgstr ""
 
-#: db/seeds/schedulers.rb:115
+#: db/seeds/schedulers.rb:106
 msgid "Clean up closed sessions."
 msgstr ""
 
-#: db/seeds/schedulers.rb:133
+#: db/seeds/schedulers.rb:124
 msgid "Clean up dead sessions."
 msgstr ""
 
-#: db/seeds/schedulers.rb:61
+#: db/seeds/schedulers.rb:52
 msgid "Clean up expired sessions."
 msgstr ""
 
-#: db/seeds/schedulers.rb:268
+#: db/seeds/schedulers.rb:259
 msgid "Clean up mobile taskbars."
 msgstr ""
 
@@ -2923,7 +2923,7 @@ msgstr ""
 msgid "Close In Min"
 msgstr ""
 
-#: db/seeds/schedulers.rb:106
+#: db/seeds/schedulers.rb:97
 msgid "Close chat sessions where participants are offline."
 msgstr ""
 
@@ -4751,31 +4751,31 @@ msgstr ""
 msgid "Delete immediately"
 msgstr ""
 
-#: db/seeds/schedulers.rb:196
+#: db/seeds/schedulers.rb:187
 msgid "Delete obsolete classic IMAP backup."
 msgstr ""
 
-#: db/seeds/schedulers.rb:79
+#: db/seeds/schedulers.rb:70
 msgid "Delete old 'RecentView' entries."
 msgstr ""
 
-#: db/seeds/schedulers.rb:70
+#: db/seeds/schedulers.rb:61
 msgid "Delete old activity stream entries."
 msgstr ""
 
-#: db/seeds/schedulers.rb:88
+#: db/seeds/schedulers.rb:79
 msgid "Delete old online notification entries."
 msgstr ""
 
-#: db/seeds/schedulers.rb:160
+#: db/seeds/schedulers.rb:151
 msgid "Delete old stats store entries."
 msgstr ""
 
-#: db/seeds/schedulers.rb:97
+#: db/seeds/schedulers.rb:88
 msgid "Delete old token entries."
 msgstr ""
 
-#: db/seeds/schedulers.rb:224
+#: db/seeds/schedulers.rb:215
 msgid "Delete old upload cache entries."
 msgstr ""
 
@@ -6058,11 +6058,11 @@ msgstr ""
 msgid "Execute configured macros"
 msgstr ""
 
-#: db/seeds/schedulers.rb:205
+#: db/seeds/schedulers.rb:196
 msgid "Execute import jobs."
 msgstr ""
 
-#: db/seeds/schedulers.rb:52
+#: db/seeds/schedulers.rb:43
 msgid "Execute planned jobs."
 msgstr ""
 
@@ -6611,10 +6611,6 @@ msgstr ""
 msgid "General communication error, maybe internet is not available!"
 msgstr ""
 
-#: db/seeds/schedulers.rb:43
-msgid "Generate 'Session' data."
-msgstr ""
-
 #: app/assets/javascripts/app/controllers/api.coffee:180
 msgid "Generate Access Token for |%s|"
 msgstr ""
@@ -6636,7 +6632,7 @@ msgstr ""
 msgid "Generate recovery codes"
 msgstr ""
 
-#: db/seeds/schedulers.rb:151
+#: db/seeds/schedulers.rb:142
 msgid "Generate user-based stats."
 msgstr ""
 
@@ -7000,7 +6996,7 @@ msgstr ""
 msgid "HTTP type"
 msgstr ""
 
-#: db/seeds/schedulers.rb:214
+#: db/seeds/schedulers.rb:205
 msgid "Handle data privacy tasks."
 msgstr ""
 
@@ -13808,7 +13804,7 @@ msgstr ""
 msgid "Switch to new layout"
 msgstr ""
 
-#: db/seeds/schedulers.rb:142
+#: db/seeds/schedulers.rb:133
 msgid "Sync calendars with iCal feeds."
 msgstr ""
 
@@ -16821,7 +16817,7 @@ msgstr ""
 msgid "Update as closed"
 msgstr ""
 
-#: db/seeds/schedulers.rb:259
+#: db/seeds/schedulers.rb:250
 msgid "Update exchange oauth 2 token."
 msgstr ""
 

+ 58 - 18
lib/background_services.rb

@@ -6,14 +6,23 @@ class BackgroundServices
     BackgroundServices::Service.descendants
   end
 
-  attr_reader :config
+  # Waiting time before processes get killed.
+  SHUTDOWN_GRACE_PERIOD = 30.seconds
+
+  class_attribute :shutdown_requested
+
+  attr_accessor :threads, :child_pids
+  attr_reader   :config
 
   def initialize(config)
     @config = Array(config)
+    @child_pids = []
+    @threads    = []
+    install_signal_trap
   end
 
   def run
-    Rails.logger.debug 'Starting BackgroundServices...'
+    Rails.logger.info 'Starting BackgroundServices...'
 
     # Fork before starting the threads in the main process to ensure a consistent state
     #   and minimal memory overhead (see also #5420).
@@ -23,22 +32,51 @@ class BackgroundServices
         run_service service_config
       end
 
-    Process.waitall
-
-    loop do
-      sleep 1
-    end
-  rescue Interrupt
-    nil
+    child_pids.each { |pid| Process.waitpid(pid) }
+    threads.each(&:join)
   ensure
-    Rails.logger.debug('Stopping BackgroundServices.')
+    Rails.logger.info('Stopping BackgroundServices.')
   end
 
   private
 
+  def install_signal_trap
+    Signal.trap('TERM') { handle_signal('TERM') }
+    Signal.trap('INT')  { handle_signal('INT')  }
+  end
+
+  def handle_signal(signal)
+    # Write operations cannot be handled in a signal handler, use a thread for that.
+    #   This thread is not waited for via `join`, so that the main process should end
+    #   somewhere during the sleep statement if it is able to shut down cleanly.
+    #   If it doesn't, it will send KILL signals to all child processes and the main process
+    #   to enforce the termination.
+    Thread.new do
+      Rails.logger.info { "BackgroundServices shutdown requested via #{signal} signal for process #{Process.pid}" }
+
+      sleep SHUTDOWN_GRACE_PERIOD
+
+      Rails.logger.error { "BackgroundServices did not shutdown cleanly after #{SHUTDOWN_GRACE_PERIOD}s, forcing termination" }
+      child_pids.each { |pid| Process.kill('KILL', pid) }
+      Process.kill('KILL', Process.pid)
+    end
+
+    self.class.shutdown_requested = true
+    child_pids.each do |pid|
+      Process.kill(signal, pid)
+    rescue Errno::ESRCH, RangeError
+      # Don't fail if processes terminated already.
+    end
+  end
+
   def run_service(service_config)
     if !service_config.enabled?
-      Rails.logger.debug { "Skipping disabled service #{service_config.service.service_name}." }
+      Rails.logger.info { "Skipping disabled service #{service_config.service.service_name}." }
+      return
+    end
+
+    if service_config.service.skip?(manager: self)
+      Rails.logger.info { "Skipping service #{service_config.service.service_name}." }
       return
     end
 
@@ -46,17 +84,19 @@ class BackgroundServices
 
     case service_config.start_as
     when :fork
-      start_as_forks(service_config.service, service_config.workers)
+      child_pids.push(*start_as_forks(service_config.service, service_config.workers))
     when :thread
-      start_as_thread(service_config.service)
+      threads.push start_as_thread(service_config.service)
     end
   end
 
   def start_as_forks(service, forks)
-    (1..forks).map do
+    (1..forks).map do |i|
       Process.fork do
-        Rails.logger.debug { "Starting process ##{Process.pid} for service #{service.service_name}." }
-        service.new.run
+        Process.setproctitle("#{$PROGRAM_NAME} #{service.service_name}##{i}")
+        install_signal_trap
+        Rails.logger.info { "Starting process ##{Process.pid} for service #{service.service_name}." }
+        service.new(manager: self, fork_id: i).run
       rescue Interrupt
         nil
       end
@@ -67,8 +107,8 @@ class BackgroundServices
     Thread.new do
       Thread.current.abort_on_exception = true
 
-      Rails.logger.debug { "Starting thread for service #{service.service_name} in the main process." }
-      service.new.run
+      Rails.logger.info { "Starting thread for service #{service.service_name} in the main process." }
+      service.new(manager: self).run
     # BackgroundServices rspec test is using Timeout.timeout to stop background services.
     # It was fine for a long time, but started throwing following error in Rails 7.2.
     # This seems to affect that test case only.

+ 16 - 0
lib/background_services/concerns/has_interruptible_sleep.rb

@@ -0,0 +1,16 @@
+# Copyright (C) 2012-2024 Zammad Foundation, https://zammad-foundation.org/
+
+module BackgroundServices::Concerns::HasInterruptibleSleep
+  extend ActiveSupport::Concern
+
+  # Sleep in short intervals so that we can handle TERM/INT signals timely.
+  # @param [Integer] seconds to sleep for
+  def interruptible_sleep(interval)
+    interval.times do
+      break if BackgroundServices.shutdown_requested
+
+      sleep 1
+    end
+  end
+
+end

+ 18 - 6
lib/background_services/service.rb

@@ -2,8 +2,11 @@
 
 # Base class for background services
 class BackgroundServices::Service
+  include BackgroundServices::Concerns::HasInterruptibleSleep
   include Mixin::RequiredSubPaths
 
+  attr_reader :fork_id, :manager
+
   def self.service_name
     name.demodulize
   end
@@ -13,6 +16,10 @@ class BackgroundServices::Service
     1
   end
 
+  def self.skip?(manager:)
+    false
+  end
+
   # Use this method to prepare for a service task.
   # This would be called only once regardless of how many workers would start.
   def self.pre_run
@@ -21,6 +28,17 @@ class BackgroundServices::Service
     end
   end
 
+  def self.run_in_service_context(&)
+    Rails.application.executor.wrap do
+      ApplicationHandleInfo.use('scheduler', &)
+    end
+  end
+
+  def initialize(manager:, fork_id: nil)
+    @fork_id = fork_id
+    @manager = manager
+  end
+
   # Use this method to run a background service.
   def run
     self.class.run_in_service_context do
@@ -28,12 +46,6 @@ class BackgroundServices::Service
     end
   end
 
-  def self.run_in_service_context(&)
-    Rails.application.executor.wrap do
-      ApplicationHandleInfo.use('scheduler', &)
-    end
-  end
-
   protected
 
   # Override this method in service classes.

+ 46 - 0
lib/background_services/service/manage_sessions_jobs.rb

@@ -0,0 +1,46 @@
+# Copyright (C) 2012-2024 Zammad Foundation, https://zammad-foundation.org/
+
+class BackgroundServices
+  class Service
+    class ManageSessionsJobs < Service
+      def self.max_workers
+        1
+      end
+
+      # This service is only needed when we have forked ProcessSessionJobs running,
+      #   to coordinate the sessions between the different processes.
+      def self.skip?(manager:)
+        session_jobs_config = manager.config.find { |elem| elem.service == BackgroundServices::Service::ProcessSessionsJobs }
+
+        !session_jobs_config || session_jobs_config.disabled || session_jobs_config.start_as != :fork
+      end
+
+      def launch
+        loop do
+          break if BackgroundServices.shutdown_requested
+
+          single_run
+
+          sleep 1
+        end
+      end
+
+      private
+
+      def single_run
+        nodes_stats = Sessions::Node.stats
+
+        Sessions
+          .sessions
+          .each do |client_id|
+            # ask nodes for nodes
+            next if nodes_stats[client_id]
+
+            # assign to node
+            Sessions::Node.session_assigne(client_id)
+            sleep 1
+          end
+      end
+    end
+  end
+end

+ 5 - 1
lib/background_services/service/process_delayed_jobs.rb

@@ -18,10 +18,14 @@ class BackgroundServices
 
       def launch
         loop do
+          break if BackgroundServices.shutdown_requested
+
           result = nil
 
           realtime = Benchmark.realtime do
             Rails.logger.debug { "*** worker thread, #{::Delayed::Job.count} in queue" }
+            # ::Delayed::Worker#stop? is monkey patched by config/initializers/delayed_worker_stop.rb
+            #   to ensure an early exit even during work_off().
             result = ::Delayed::Worker.new.work_off
           end
 
@@ -35,7 +39,7 @@ class BackgroundServices
         count = result.sum
 
         if count.zero?
-          sleep SLEEP_IF_EMPTY
+          interruptible_sleep SLEEP_IF_EMPTY
           Rails.logger.debug { '*** worker thread loop' }
         else
           Rails.logger.debug { format("*** #{count} jobs processed at %<jps>.4f j/s, %<failed>d failed ...\n", jps: count / realtime, failed: result.last) }

+ 12 - 4
lib/background_services/service/process_scheduled_jobs.rb

@@ -8,27 +8,35 @@ class BackgroundServices
 
       attr_reader :jobs_started
 
-      def initialize
-        super
+      def initialize(...)
         @jobs_started = Concurrent::Hash.new
+        super
       end
 
       def launch
         loop do
+          break if BackgroundServices.shutdown_requested
+
           Rails.logger.info 'ProcessScheduledJobs running...'
 
           run_jobs
 
-          sleep SLEEP_AFTER_LOOP
+          interruptible_sleep SLEEP_AFTER_LOOP
         end
+
+        # Wait for threads to finish for a clean shutdown.
+        jobs_started.each(&:join)
       end
 
       private
 
       def run_jobs
         scope.each do |job|
+          break if BackgroundServices.shutdown_requested
+
           result = Manager.new(job, jobs_started).run
-          sleep SLEEP_AFTER_JOB_START if result
+
+          interruptible_sleep SLEEP_AFTER_JOB_START if result
         end
       end
 

Some files were not shown because too many files changed in this diff