scheduler.rb 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. # Copyright (C) 2012-2016 Zammad Foundation, http://zammad-foundation.org/
  2. class Scheduler < ApplicationModel
  3. # rubocop:disable Style/ClassVars
  4. @@jobs_started = {}
  5. # rubocop:enable Style/ClassVars
  6. # start threads
  7. def self.threads
  8. Thread.abort_on_exception = true
  9. # reconnect in case db connection is lost
  10. begin
  11. ActiveRecord::Base.connection.reconnect!
  12. rescue => e
  13. logger.error "Can't reconnect to database #{e.inspect}"
  14. end
  15. # cleanup old background jobs
  16. cleanup
  17. # start worker for background jobs
  18. worker
  19. # start loop to execute scheduler jobs
  20. loop do
  21. logger.info 'Scheduler running...'
  22. # reconnect in case db connection is lost
  23. begin
  24. ActiveRecord::Base.connection.reconnect!
  25. rescue => e
  26. logger.error "Can't reconnect to database #{e.inspect}"
  27. end
  28. # read/load jobs and check if it is alredy started
  29. jobs = Scheduler.where('active = ?', true).order('prio ASC')
  30. jobs.each do |job|
  31. # ignore job is still running
  32. next if @@jobs_started[ job.id ]
  33. # check job.last_run
  34. next if job.last_run && job.period && job.last_run > (Time.zone.now - job.period)
  35. # run job as own thread
  36. @@jobs_started[ job.id ] = true
  37. start_job(job)
  38. sleep 10
  39. end
  40. sleep 60
  41. end
  42. end
  43. # Checks all delayed jobs that are locked and cleans them up.
  44. # Should only get called when the Scheduler gets started.
  45. #
  46. # @see Scheduler#cleanup_delayed
  47. #
  48. # @param [Boolean] force forces the cleanup if not called in Scheduler starting context.
  49. #
  50. # @example
  51. # Scheduler.cleanup
  52. #
  53. # @raise [RuntimeError] If called without force and not when Scheduler gets started.
  54. #
  55. # return [nil]
  56. def self.cleanup(force: false)
  57. if !force && caller_locations.first.label != 'threads'
  58. raise 'This method should only get called when Scheduler.threads are initialized. Use `force: true` to start anyway.'
  59. end
  60. Delayed::Job.all.each do |job|
  61. cleanup_delayed(job)
  62. end
  63. end
  64. # Checks if the given job can be rescheduled or destroys it. Logs the action as warn.
  65. # Works only for locked jobs. Jobs that are not locked are ignored and
  66. # should get destroyed directly.
  67. # Checks the delayed job object for a method called .reschedule?. The memthod is called
  68. # with the delayed job as a parameter. The result value is expected as a Boolean. If the
  69. # result is true the lock gets removed and the delayed job gets rescheduled. If the return
  70. # value is false it will get destroyed which is the default behaviour.
  71. #
  72. # @param [Delayed::Job] job the job that should get checked for destroying/rescheduling.
  73. #
  74. # @example
  75. # Scheduler.cleanup_delayed(job)
  76. #
  77. # return [nil]
  78. def self.cleanup_delayed(job)
  79. return if job.locked_at.blank?
  80. job_name = job.name
  81. payload_object = job.payload_object
  82. reschedule = false
  83. if payload_object.present?
  84. if payload_object.respond_to?(:object)
  85. object = payload_object.object
  86. if object.respond_to?(:id)
  87. job_name += " (id: #{object.id})"
  88. end
  89. if object.respond_to?(:reschedule?) && object.reschedule?(job)
  90. reschedule = true
  91. end
  92. end
  93. if payload_object.respond_to?(:args)
  94. job_name += " - ARGS: #{payload_object.args.inspect}"
  95. end
  96. end
  97. if reschedule
  98. action = 'Rescheduling'
  99. job.unlock
  100. job.save
  101. else
  102. action = 'Destroyed'
  103. job.destroy
  104. end
  105. Rails.logger.warn "#{action} locked delayed job: #{job_name}"
  106. end
  107. def self.start_job(job)
  108. Thread.new do
  109. ApplicationHandleInfo.current = 'scheduler'
  110. logger.info "Started job thread for '#{job.name}' (#{job.method})..."
  111. # start loop for periods under 5 minutes
  112. if job.period && job.period <= 300
  113. loop do
  114. _start_job(job)
  115. job = Scheduler.lookup(id: job.id)
  116. # exit is job got deleted
  117. break if !job
  118. # exit if job is not active anymore
  119. break if !job.active
  120. # exit if there is no loop period defined
  121. break if !job.period
  122. # wait until next run
  123. sleep job.period
  124. end
  125. else
  126. _start_job(job)
  127. end
  128. job.pid = ''
  129. job.save
  130. logger.info " ...stopped thread for '#{job.method}'"
  131. ActiveRecord::Base.connection.close
  132. # release thread lock
  133. @@jobs_started[ job.id ] = false
  134. end
  135. end
  136. def self._start_job(job, try_count = 0, try_run_time = Time.zone.now)
  137. job.update!(
  138. last_run: Time.zone.now,
  139. pid: Thread.current.object_id,
  140. status: 'ok',
  141. error_message: '',
  142. )
  143. logger.info "execute #{job.method} (try_count #{try_count})..."
  144. eval job.method() # rubocop:disable Lint/Eval
  145. rescue => e
  146. logger.error "execute #{job.method} (try_count #{try_count}) exited with error #{e.inspect}"
  147. # reconnect in case db connection is lost
  148. begin
  149. ActiveRecord::Base.connection.reconnect!
  150. rescue => e
  151. logger.error "Can't reconnect to database #{e.inspect}"
  152. end
  153. try_run_max = 10
  154. try_count += 1
  155. # reset error counter if to old
  156. if try_run_time + (60 * 5) < Time.zone.now
  157. try_count = 0
  158. end
  159. try_run_time = Time.zone.now
  160. # restart job again
  161. if try_run_max > try_count
  162. _start_job(job, try_count, try_run_time)
  163. else
  164. @@jobs_started[ job.id ] = false
  165. error = "Failed to run #{job.method} after #{try_count} tries #{e.inspect}"
  166. logger.error error
  167. job.update!(
  168. error_message: error,
  169. status: 'error',
  170. active: false,
  171. )
  172. end
  173. end
  174. def self.worker(foreground = false)
  175. # used for tests
  176. if foreground
  177. original_interface_handle = ApplicationHandleInfo.current
  178. ApplicationHandleInfo.current = 'scheduler'
  179. original_user_id = UserInfo.current_user_id
  180. UserInfo.current_user_id = nil
  181. loop do
  182. success, failure = Delayed::Worker.new.work_off
  183. if failure.nonzero?
  184. raise "ERROR: #{failure} failed background jobs: #{Delayed::Job.where('last_error IS NOT NULL').inspect}"
  185. end
  186. break if success.zero?
  187. end
  188. UserInfo.current_user_id = original_user_id
  189. ApplicationHandleInfo.current = original_interface_handle
  190. return
  191. end
  192. # used for production
  193. wait = 8
  194. Thread.new do
  195. sleep wait
  196. logger.info "Starting worker thread #{Delayed::Job}"
  197. loop do
  198. ApplicationHandleInfo.current = 'scheduler'
  199. result = nil
  200. realtime = Benchmark.realtime do
  201. logger.debug "*** worker thread, #{Delayed::Job.all.count} in queue"
  202. result = Delayed::Worker.new.work_off
  203. end
  204. count = result.sum
  205. if count.zero?
  206. sleep wait
  207. logger.debug '*** worker thread loop'
  208. else
  209. format "*** #{count} jobs processed at %.4f j/s, %d failed ...\n", count / realtime, result.last
  210. end
  211. end
  212. logger.info ' ...stopped worker thread'
  213. ActiveRecord::Base.connection.close
  214. end
  215. end
  216. # This function returns a list of failed jobs
  217. #
  218. # @example
  219. # Scheduler.failed_jobs
  220. #
  221. # return [Array]
  222. def self.failed_jobs
  223. where(status: 'error', active: false)
  224. end
  225. # This function restarts failed jobs to retry them
  226. #
  227. # @example
  228. # Scheduler.restart_failed_jobs
  229. #
  230. # return [true]
  231. def self.restart_failed_jobs
  232. failed_jobs.each do |job|
  233. job.update!(active: true)
  234. end
  235. true
  236. end
  237. end