import_job.rb 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. # Copyright (C) 2012-2024 Zammad Foundation, https://zammad-foundation.org/
  2. class ImportJob < ApplicationModel
  3. extend ::Mixin::StartFinishLogger
  4. store :payload
  5. store :result
  6. default_scope { order(started_at: :desc, id: :desc) }
  7. scope :running, -> { where(finished_at: nil, dry_run: false).where.not(started_at: nil) }
  8. # Starts the import backend class based on the name attribute.
  9. # Import backend class is initialized with the current instance.
  10. # Logs the start and end time (if ended successfully) and logs
  11. # exceptions into result if they happen.
  12. #
  13. # @example
  14. # import = ImportJob.new(name: 'Import::Ldap', payload: LdapSource.first.preferences)
  15. # import.start
  16. #
  17. # return [nil]
  18. def start
  19. self.started_at = Time.zone.now
  20. save
  21. instance = name.constantize.new(self)
  22. instance.start
  23. rescue => e
  24. Rails.logger.error "ImportJob '#{name}' failed: #{e.message}"
  25. Rails.logger.error e
  26. # rubocop:disable Style/RedundantSelf
  27. if !self.result.is_a?(Hash)
  28. self.result = {}
  29. end
  30. self.result[:error] = e.message
  31. # rubocop:enable Style/RedundantSelf
  32. ensure
  33. self.finished_at = Time.zone.now
  34. save
  35. end
  36. # Gets called when the background worker gets (re-)started and this job was still
  37. # in the queue. If `finished_at` is blank the call is piped through to
  38. # the ImportJob backend which has to decide how to go from here. The delayed
  39. # job will get destroyed if rescheduled? is not implemented
  40. # as an ImportJob backend class method.
  41. #
  42. # @see BackgroundServices::Service::ProcessDelayedJobs::CleanupAction.cleanup_delayed_jobs
  43. #
  44. # @example
  45. # import.reschedule?(delayed_job)
  46. #
  47. # return [Boolean] whether the ImportJob should get rescheduled (true) or destroyed (false)
  48. def reschedule?(delayed_job)
  49. return false if finished_at.present?
  50. instance = name.constantize.new(self)
  51. return false if !instance.respond_to?(:reschedule?)
  52. instance.reschedule?(delayed_job)
  53. end
  54. # Convenience wrapper around the start method for starting (delayed) dry runs.
  55. # Logs the start and end time (if ended successfully) and logs
  56. # exceptions into result if they happen.
  57. # Only one running or pending dry run per backend is possible at the same time.
  58. #
  59. # @param [Hash] params the params used to initialize the ImportJob instance.
  60. # @option params [Boolean] :delay Defines if job should get executed delayed. Default is true.
  61. # @example
  62. # import = ImportJob.dry_run(name: 'Import::Ldap', payload: LdapSource.first.preferences, delay: false)
  63. #
  64. # return [nil]
  65. def self.dry_run(params)
  66. return if exists?(name: params[:name], dry_run: true, finished_at: nil)
  67. params[:dry_run] = true
  68. job = create(params.except(:delay))
  69. if params.fetch(:delay, true)
  70. AsyncImportJob.perform_later(job)
  71. else
  72. job.start
  73. end
  74. end
  75. # Queues and starts all import backends as import jobs.
  76. #
  77. # @example
  78. # ImportJob.start_registered
  79. #
  80. # return [nil]
  81. def self.start_registered
  82. queue_registered
  83. start
  84. end
  85. # Starts all import jobs that have not started yet and are no dry runs.
  86. #
  87. # @example
  88. # ImportJob.start
  89. #
  90. # return [nil]
  91. def self.start
  92. where(started_at: nil, dry_run: false).each(&:start)
  93. end
  94. # Queues all configured import backends from Setting 'import_backends' as import jobs
  95. # that are not yet queued. Backends which are not #queueable? are skipped.
  96. #
  97. # @example
  98. # ImportJob.queue_registered
  99. #
  100. # return [nil]
  101. def self.queue_registered
  102. backends.each do |backend|
  103. # skip backends that are not "ready" yet
  104. next if !backend.constantize.queueable?
  105. # skip if no entry exists
  106. # skip if a not finished entry exists
  107. next if ImportJob.exists?(name: backend, finished_at: nil)
  108. ImportJob.create(name: backend)
  109. end
  110. end
  111. # Checks if the given import backend is valid.
  112. #
  113. # @example
  114. # ImportJob.backend_valid?('Import::Ldap')
  115. # # => true
  116. #
  117. # return [Boolean]
  118. def self.backend_valid?(backend)
  119. backend.constantize
  120. true
  121. rescue NameError
  122. false
  123. end
  124. # Returns a list of valid import backends.
  125. #
  126. # @example
  127. # ImportJob.backends
  128. # # => ['Import::Ldap', 'Import::Exchange', ...]
  129. #
  130. # return [Boolean]
  131. def self.backends
  132. Setting.get('import_backends')&.select do |backend|
  133. if !backend_valid?(backend)
  134. logger.error "Invalid import backend '#{backend}'"
  135. next
  136. end
  137. # skip deactivated backends
  138. next if !backend.constantize.active?
  139. true
  140. end || []
  141. end
  142. # Checks for killed import jobs and marks them as finished and adds a note.
  143. #
  144. # @param [ActiveSupport::TimeWithZone] after the time the cleanup was started
  145. #
  146. # @example
  147. # ImportJob.cleanup_import_jobs(TimeZone.now)
  148. #
  149. # return [nil]
  150. def self.cleanup_import_jobs(after)
  151. log_start_finish(:info, "Cleanup of left over import jobs #{after}") do
  152. error = __('Interrupted by a restart of the background worker process. Please restart manually or wait until the next execution time.').freeze
  153. # we need to exclude jobs that were updated at or since we started
  154. # cleaning up (via the #reschedule? call) because they might
  155. # were started `.delay`-ed and are flagged for restart
  156. ImportJob.running.where('updated_at < ?', after).each do |job|
  157. job.update!(
  158. finished_at: after,
  159. result: {
  160. error: error
  161. }
  162. )
  163. end
  164. end
  165. end
  166. end