import_job.rb 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. # Copyright (C) 2012-2023 Zammad Foundation, https://zammad-foundation.org/
  2. class ImportJob < ApplicationModel
  3. extend ::Mixin::StartFinishLogger
  4. store :payload
  5. store :result
  6. scope :running, -> { where(finished_at: nil, dry_run: false).where.not(started_at: nil) }
  7. # Starts the import backend class based on the name attribute.
  8. # Import backend class is initialized with the current instance.
  9. # Logs the start and end time (if ended successfully) and logs
  10. # exceptions into result if they happen.
  11. #
  12. # @example
  13. # import = ImportJob.new(name: 'Import::Ldap', payload: LdapSource.first.preferences)
  14. # import.start
  15. #
  16. # return [nil]
  17. def start
  18. self.started_at = Time.zone.now
  19. save
  20. instance = name.constantize.new(self)
  21. instance.start
  22. rescue => e
  23. Rails.logger.error "ImportJob '#{name}' failed: #{e.message}"
  24. Rails.logger.error e
  25. # rubocop:disable Style/RedundantSelf
  26. if !self.result.is_a?(Hash)
  27. self.result = {}
  28. end
  29. self.result[:error] = e.message
  30. # rubocop:enable Style/RedundantSelf
  31. ensure
  32. self.finished_at = Time.zone.now
  33. save
  34. end
  35. # Gets called when the background worker gets (re-)started and this job was still
  36. # in the queue. If `finished_at` is blank the call is piped through to
  37. # the ImportJob backend which has to decide how to go from here. The delayed
  38. # job will get destroyed if rescheduled? is not implemented
  39. # as an ImportJob backend class method.
  40. #
  41. # @see BackgroundServices::Service::ProcessDelayedJobs::CleanupAction.cleanup_delayed_jobs
  42. #
  43. # @example
  44. # import.reschedule?(delayed_job)
  45. #
  46. # return [Boolean] whether the ImportJob should get rescheduled (true) or destroyed (false)
  47. def reschedule?(delayed_job)
  48. return false if finished_at.present?
  49. instance = name.constantize.new(self)
  50. return false if !instance.respond_to?(:reschedule?)
  51. instance.reschedule?(delayed_job)
  52. end
  53. # Convenience wrapper around the start method for starting (delayed) dry runs.
  54. # Logs the start and end time (if ended successfully) and logs
  55. # exceptions into result if they happen.
  56. # Only one running or pending dry run per backend is possible at the same time.
  57. #
  58. # @param [Hash] params the params used to initialize the ImportJob instance.
  59. # @option params [Boolean] :delay Defines if job should get executed delayed. Default is true.
  60. # @example
  61. # import = ImportJob.dry_run(name: 'Import::Ldap', payload: LdapSource.first.preferences, delay: false)
  62. #
  63. # return [nil]
  64. def self.dry_run(params)
  65. return if exists?(name: params[:name], dry_run: true, finished_at: nil)
  66. params[:dry_run] = true
  67. job = create(params.except(:delay))
  68. if params.fetch(:delay, true)
  69. AsyncImportJob.perform_later(job)
  70. else
  71. job.start
  72. end
  73. end
  74. # Queues and starts all import backends as import jobs.
  75. #
  76. # @example
  77. # ImportJob.start_registered
  78. #
  79. # return [nil]
  80. def self.start_registered
  81. queue_registered
  82. start
  83. end
  84. # Starts all import jobs that have not started yet and are no dry runs.
  85. #
  86. # @example
  87. # ImportJob.start
  88. #
  89. # return [nil]
  90. def self.start
  91. where(started_at: nil, dry_run: false).each(&:start)
  92. end
  93. # Queues all configured import backends from Setting 'import_backends' as import jobs
  94. # that are not yet queued. Backends which are not #queueable? are skipped.
  95. #
  96. # @example
  97. # ImportJob.queue_registered
  98. #
  99. # return [nil]
  100. def self.queue_registered
  101. backends.each do |backend|
  102. # skip backends that are not "ready" yet
  103. next if !backend.constantize.queueable?
  104. # skip if no entry exists
  105. # skip if a not finished entry exists
  106. next if ImportJob.exists?(name: backend, finished_at: nil)
  107. ImportJob.create(name: backend)
  108. end
  109. end
  110. # Checks if the given import backend is valid.
  111. #
  112. # @example
  113. # ImportJob.backend_valid?('Import::Ldap')
  114. # # => true
  115. #
  116. # return [Boolean]
  117. def self.backend_valid?(backend)
  118. backend.constantize
  119. true
  120. rescue NameError
  121. false
  122. end
  123. # Returns a list of valid import backends.
  124. #
  125. # @example
  126. # ImportJob.backends
  127. # # => ['Import::Ldap', 'Import::Exchange', ...]
  128. #
  129. # return [Boolean]
  130. def self.backends
  131. Setting.get('import_backends')&.select do |backend|
  132. if !backend_valid?(backend)
  133. logger.error "Invalid import backend '#{backend}'"
  134. next
  135. end
  136. # skip deactivated backends
  137. next if !backend.constantize.active?
  138. true
  139. end || []
  140. end
  141. # Checks for killed import jobs and marks them as finished and adds a note.
  142. #
  143. # @param [ActiveSupport::TimeWithZone] after the time the cleanup was started
  144. #
  145. # @example
  146. # ImportJob.cleanup_import_jobs(TimeZone.now)
  147. #
  148. # return [nil]
  149. def self.cleanup_import_jobs(after)
  150. log_start_finish(:info, "Cleanup of left over import jobs #{after}") do
  151. error = __('Interrupted by a restart of the background worker process. Please restart manually or wait until the next execution time.').freeze
  152. # we need to exclude jobs that were updated at or since we started
  153. # cleaning up (via the #reschedule? call) because they might
  154. # were started `.delay`-ed and are flagged for restart
  155. ImportJob.running.where('updated_at < ?', after).each do |job|
  156. job.update!(
  157. finished_at: after,
  158. result: {
  159. error: error
  160. }
  161. )
  162. end
  163. end
  164. end
  165. end