has_active_job_lock.rb 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. # Copyright (C) 2012-2024 Zammad Foundation, https://zammad-foundation.org/
  2. module HasActiveJobLock
  3. class LockKeyNotGeneratable < StandardError; end
  4. extend ActiveSupport::Concern
  5. included do
  6. before_enqueue do |job| # rubocop:disable Style/SymbolProc
  7. job.ensure_active_job_lock_for_enqueue!
  8. end
  9. around_perform do |job, block|
  10. # do not perform job if lock key cannot be generated anymore
  11. raise LockKeyNotGeneratable if job.safe_lock_key.nil?
  12. job.mark_active_job_lock_as_started
  13. block.call
  14. ensure
  15. job.release_active_job_lock!
  16. end
  17. end
  18. # Defines the lock key for the current job to prevent execution of jobs with the same key.
  19. # This is by default the name of the ActiveJob class.
  20. # If you're in the situation where you need to have a lock_key based on
  21. # the given arguments you can overwrite this method in your job and access
  22. # them via `arguments`. See ActiveJob::Core for more (e.g. queue).
  23. #
  24. # @example
  25. # # default
  26. # job = UniqueActiveJob.new
  27. # job.lock_key
  28. # # => "UniqueActiveJob"
  29. #
  30. # @example
  31. # # with lock_key: "#{self.class.name}/#{arguments[0]}/#{arguments[1]}"
  32. # job = SearchIndexJob.new('User', 42)
  33. # job.lock_key
  34. # # => "SearchIndexJob/User/42"
  35. #
  36. # return [String]
  37. def lock_key
  38. self.class.name
  39. end
  40. # Caches lock key for the duration of the job'
  41. # Silences errors thrown when generating lock key
  42. #
  43. # return [String]
  44. def safe_lock_key
  45. @safe_lock_key ||= lock_key
  46. rescue
  47. nil
  48. end
  49. def mark_active_job_lock_as_started
  50. release_active_job_lock_cache
  51. in_active_job_lock_transaction do
  52. # a perform_now job doesn't require any locking
  53. return if active_job_lock.blank?
  54. return if !active_job_lock.of?(self)
  55. # a perform_later job started to perform and will be marked as such
  56. active_job_lock.touch # rubocop:disable Rails/SkipsModelValidations
  57. end
  58. end
  59. def ensure_active_job_lock_for_enqueue!
  60. release_active_job_lock_cache
  61. throw :abort if safe_lock_key.nil?
  62. in_active_job_lock_transaction do
  63. return if active_job_lock_for_enqueue!.present?
  64. ActiveJobLock.create!(
  65. lock_key: safe_lock_key,
  66. active_job_id: job_id,
  67. )
  68. end
  69. end
  70. def release_active_job_lock!
  71. # nothing to release if lock key cannot be generated anymore
  72. return if safe_lock_key.nil?
  73. # only delete lock if the current job is the one holding the lock
  74. # perform_now jobs or perform_later jobs for which follow-up jobs were enqueued
  75. # don't need to remove any locks
  76. lock = ActiveJobLock.lock.find_by(lock_key: safe_lock_key, active_job_id: job_id)
  77. if !lock
  78. logger.debug { "Found no ActiveJobLock for #{self.class.name} (Job ID: #{job_id}) with key '#{safe_lock_key}'." }
  79. return
  80. end
  81. logger.debug { "Deleting ActiveJobLock for #{self.class.name} (Job ID: #{job_id}) with key '#{safe_lock_key}'." }
  82. lock.destroy!
  83. end
  84. private
  85. def in_active_job_lock_transaction(&)
  86. # re-use active DB transaction if present
  87. return yield if ActiveRecord::Base.connection.open_transactions.nonzero?
  88. # start own serializable DB transaction to prevent race conditions on DB level
  89. ActiveJobLock.transaction(isolation: :serializable, &)
  90. rescue ActiveRecord::SerializationFailure => e
  91. # PostgeSQL prevents locking on records that are already locked
  92. # for UPDATE in Serializable Isolation Level transactions,
  93. # but it's safe to retry as described in the docs:
  94. # https://www.postgresql.org/docs/10/transaction-iso.html
  95. e.message.include?('PG::TRSerializationFailure') ? retry : raise
  96. rescue ActiveRecord::Deadlocked => e
  97. # MySQL handles lock race condition differently and raises a Deadlock exception:
  98. # Mysql2::Error: Deadlock found when trying to get lock; try restarting transaction
  99. e.message.include?('Mysql2::Error: Deadlock found when trying to get lock') ? retry : raise
  100. rescue ActiveRecord::RecordNotUnique
  101. existing_active_job_lock!
  102. end
  103. def active_job_lock_for_enqueue!
  104. return if active_job_lock.blank?
  105. # don't enqueue perform_later jobs if a job with the same
  106. # lock key exists that hasn't started to perform yet
  107. existing_active_job_lock! if active_job_lock.peform_pending?
  108. active_job_lock.tap { |lock| lock.transfer_to(self) }
  109. end
  110. def active_job_lock
  111. @active_job_lock ||= ActiveJobLock.lock.find_by(lock_key: safe_lock_key)
  112. end
  113. def release_active_job_lock_cache
  114. @active_job_lock = nil
  115. end
  116. def existing_active_job_lock!
  117. throw :abort
  118. end
  119. end