has_active_job_lock.rb 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. module HasActiveJobLock
  2. extend ActiveSupport::Concern
  3. included do
  4. before_enqueue do |job| # rubocop:disable Style/SymbolProc
  5. job.ensure_active_job_lock_for_enqueue!
  6. end
  7. around_perform do |job, block|
  8. job.mark_active_job_lock_as_started
  9. block.call
  10. ensure
  11. job.release_active_job_lock!
  12. end
  13. end
  14. # Defines the lock key for the current job to prevent execution of jobs with the same key.
  15. # This is by default the name of the ActiveJob class.
  16. # If you're in the situation where you need to have a lock_key based on
  17. # the given arguments you can overwrite this method in your job and access
  18. # them via `arguments`. See ActiveJob::Core for more (e.g. queue).
  19. #
  20. # @example
  21. # # default
  22. # job = UniqueActiveJob.new
  23. # job.lock_key
  24. # # => "UniqueActiveJob"
  25. #
  26. # @example
  27. # # with lock_key: "#{self.class.name}/#{arguments[0]}/#{arguments[1]}"
  28. # job = SearchIndexJob.new('User', 42)
  29. # job.lock_key
  30. # # => "SearchIndexJob/User/42"
  31. #
  32. # return [String]
  33. def lock_key
  34. self.class.name
  35. end
  36. def mark_active_job_lock_as_started
  37. release_active_job_lock_cache
  38. in_active_job_lock_transaction do
  39. # a perform_now job doesn't require any locking
  40. return if active_job_lock.blank?
  41. return if !active_job_lock.of?(self)
  42. # a perform_later job started to perform and will be marked as such
  43. active_job_lock.touch # rubocop:disable Rails/SkipsModelValidations
  44. end
  45. end
  46. def ensure_active_job_lock_for_enqueue!
  47. release_active_job_lock_cache
  48. in_active_job_lock_transaction do
  49. return if active_job_lock_for_enqueue!.present?
  50. ActiveJobLock.create!(
  51. lock_key: lock_key,
  52. active_job_id: job_id,
  53. )
  54. end
  55. end
  56. def release_active_job_lock!
  57. # only delete lock if the current job is the one holding the lock
  58. # perform_now jobs or perform_later jobs for which follow-up jobs were enqueued
  59. # don't need to remove any locks
  60. lock = ActiveJobLock.lock.find_by(lock_key: lock_key, active_job_id: job_id)
  61. if !lock
  62. logger.debug { "Found no ActiveJobLock for #{self.class.name} (Job ID: #{job_id}) with key '#{lock_key}'." }
  63. return
  64. end
  65. logger.debug { "Deleting ActiveJobLock for #{self.class.name} (Job ID: #{job_id}) with key '#{lock_key}'." }
  66. lock.destroy!
  67. end
  68. private
  69. def in_active_job_lock_transaction
  70. # re-use active DB transaction if present
  71. return yield if ActiveRecord::Base.connection.open_transactions.nonzero?
  72. # start own serializable DB transaction to prevent race conditions on DB level
  73. ActiveJobLock.transaction(isolation: :serializable) do
  74. yield
  75. end
  76. rescue ActiveRecord::RecordNotUnique
  77. existing_active_job_lock!
  78. end
  79. def active_job_lock_for_enqueue!
  80. return if active_job_lock.blank?
  81. # don't enqueue perform_later jobs if a job with the same
  82. # lock key exists that hasn't started to perform yet
  83. existing_active_job_lock! if active_job_lock.peform_pending?
  84. active_job_lock.tap { |lock| lock.transfer_to(self) }
  85. end
  86. def active_job_lock
  87. @active_job_lock ||= ActiveJobLock.lock.find_by(lock_key: lock_key)
  88. end
  89. def release_active_job_lock_cache
  90. @active_job_lock = nil
  91. end
  92. def existing_active_job_lock!
  93. logger.info "Won't enqueue #{self.class.name} (Job ID: #{job_id}) because of already existing job with lock key '#{lock_key}'."
  94. throw :abort
  95. end
  96. end