123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- class ImportJob < ApplicationModel
- extend ::Mixin::StartFinishLogger
- store :payload
- store :result
- default_scope { order(started_at: :desc, id: :desc) }
- scope :running, -> { where(finished_at: nil, dry_run: false).where.not(started_at: nil) }
-
-
-
-
-
-
-
-
-
-
- def start
- self.started_at = Time.zone.now
- save
- instance = name.constantize.new(self)
- instance.start
- rescue => e
- Rails.logger.error "ImportJob '#{name}' failed: #{e.message}"
- Rails.logger.error e
-
- if !self.result.is_a?(Hash)
- self.result = {}
- end
- self.result[:error] = e.message
-
- ensure
- self.finished_at = Time.zone.now
- save
- end
-
-
-
-
-
-
-
-
-
-
-
-
- def reschedule?(delayed_job)
- return false if finished_at.present?
- instance = name.constantize.new(self)
- return false if !instance.respond_to?(:reschedule?)
- instance.reschedule?(delayed_job)
- end
-
-
-
-
-
-
-
-
-
-
-
- def self.dry_run(params)
- return if exists?(name: params[:name], dry_run: true, finished_at: nil)
- params[:dry_run] = true
- job = create(params.except(:delay))
- if params.fetch(:delay, true)
- AsyncImportJob.perform_later(job)
- else
- job.start
- end
- end
-
-
-
-
-
-
- def self.start_registered
- queue_registered
- start
- end
-
-
-
-
-
-
- def self.start
- where(started_at: nil, dry_run: false).each(&:start)
- end
-
-
-
-
-
-
-
- def self.queue_registered
- backends.each do |backend|
-
- next if !backend.constantize.queueable?
-
-
- next if ImportJob.exists?(name: backend, finished_at: nil)
- ImportJob.create(name: backend)
- end
- end
-
-
-
-
-
-
-
- def self.backend_valid?(backend)
- backend.constantize
- true
- rescue NameError
- false
- end
-
-
-
-
-
-
-
- def self.backends
- Setting.get('import_backends')&.select do |backend|
- if !backend_valid?(backend)
- logger.error "Invalid import backend '#{backend}'"
- next
- end
-
- next if !backend.constantize.active?
- true
- end || []
- end
-
-
-
-
-
-
-
-
- def self.cleanup_import_jobs(after)
- log_start_finish(:info, "Cleanup of left over import jobs #{after}") do
- error = __('Interrupted by a restart of the background worker process. Please restart manually or wait until the next execution time.').freeze
-
-
-
- running
- .where(updated_at: ...after)
- .each do |job|
- job.update!(
- finished_at: after,
- result: {
- error: error
- }
- )
- end
- end
- end
- end
|