has_active_job_lock.rb 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. # Copyright (C) 2012-2022 Zammad Foundation, https://zammad-foundation.org/
  2. module HasActiveJobLock
  3. extend ActiveSupport::Concern
  4. included do
  5. before_enqueue do |job| # rubocop:disable Style/SymbolProc
  6. job.ensure_active_job_lock_for_enqueue!
  7. end
  8. around_perform do |job, block|
  9. job.mark_active_job_lock_as_started
  10. block.call
  11. ensure
  12. job.release_active_job_lock!
  13. end
  14. end
  15. # Defines the lock key for the current job to prevent execution of jobs with the same key.
  16. # This is by default the name of the ActiveJob class.
  17. # If you're in the situation where you need to have a lock_key based on
  18. # the given arguments you can overwrite this method in your job and access
  19. # them via `arguments`. See ActiveJob::Core for more (e.g. queue).
  20. #
  21. # @example
  22. # # default
  23. # job = UniqueActiveJob.new
  24. # job.lock_key
  25. # # => "UniqueActiveJob"
  26. #
  27. # @example
  28. # # with lock_key: "#{self.class.name}/#{arguments[0]}/#{arguments[1]}"
  29. # job = SearchIndexJob.new('User', 42)
  30. # job.lock_key
  31. # # => "SearchIndexJob/User/42"
  32. #
  33. # return [String]
  34. def lock_key
  35. self.class.name
  36. end
  37. def mark_active_job_lock_as_started
  38. release_active_job_lock_cache
  39. in_active_job_lock_transaction do
  40. # a perform_now job doesn't require any locking
  41. return if active_job_lock.blank?
  42. return if !active_job_lock.of?(self)
  43. # a perform_later job started to perform and will be marked as such
  44. active_job_lock.touch # rubocop:disable Rails/SkipsModelValidations
  45. end
  46. end
  47. def ensure_active_job_lock_for_enqueue!
  48. release_active_job_lock_cache
  49. in_active_job_lock_transaction do
  50. return if active_job_lock_for_enqueue!.present?
  51. ActiveJobLock.create!(
  52. lock_key: lock_key,
  53. active_job_id: job_id,
  54. )
  55. end
  56. end
  57. def release_active_job_lock!
  58. # only delete lock if the current job is the one holding the lock
  59. # perform_now jobs or perform_later jobs for which follow-up jobs were enqueued
  60. # don't need to remove any locks
  61. lock = ActiveJobLock.lock.find_by(lock_key: lock_key, active_job_id: job_id)
  62. if !lock
  63. logger.debug { "Found no ActiveJobLock for #{self.class.name} (Job ID: #{job_id}) with key '#{lock_key}'." }
  64. return
  65. end
  66. logger.debug { "Deleting ActiveJobLock for #{self.class.name} (Job ID: #{job_id}) with key '#{lock_key}'." }
  67. lock.destroy!
  68. end
  69. private
  70. def in_active_job_lock_transaction(&block)
  71. # re-use active DB transaction if present
  72. return yield if ActiveRecord::Base.connection.open_transactions.nonzero?
  73. # start own serializable DB transaction to prevent race conditions on DB level
  74. ActiveJobLock.transaction(isolation: :serializable, &block)
  75. rescue ActiveRecord::SerializationFailure => e
  76. # PostgeSQL prevents locking on records that are already locked
  77. # for UPDATE in Serializable Isolation Level transactions,
  78. # but it's safe to retry as described in the docs:
  79. # https://www.postgresql.org/docs/10/transaction-iso.html
  80. e.message.include?('PG::TRSerializationFailure') ? retry : raise
  81. rescue ActiveRecord::Deadlocked => e
  82. # MySQL handles lock race condition differently and raises a Deadlock exception:
  83. # Mysql2::Error: Deadlock found when trying to get lock; try restarting transaction
  84. e.message.include?('Mysql2::Error: Deadlock found when trying to get lock') ? retry : raise
  85. rescue ActiveRecord::RecordNotUnique
  86. existing_active_job_lock!
  87. end
  88. def active_job_lock_for_enqueue!
  89. return if active_job_lock.blank?
  90. # don't enqueue perform_later jobs if a job with the same
  91. # lock key exists that hasn't started to perform yet
  92. existing_active_job_lock! if active_job_lock.peform_pending?
  93. active_job_lock.tap { |lock| lock.transfer_to(self) }
  94. end
  95. def active_job_lock
  96. @active_job_lock ||= ActiveJobLock.lock.find_by(lock_key: lock_key)
  97. end
  98. def release_active_job_lock_cache
  99. @active_job_lock = nil
  100. end
  101. def existing_active_job_lock!
  102. throw :abort
  103. end
  104. end