scheduler.rb 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. # Copyright (C) 2012-2016 Zammad Foundation, http://zammad-foundation.org/
  2. class Scheduler < ApplicationModel
  3. # rubocop:disable Style/ClassVars
  4. @@jobs_started = {}
  5. # rubocop:enable Style/ClassVars
  6. # start threads
  7. def self.threads
  8. Thread.abort_on_exception = true
  9. # start worker for background jobs
  10. worker
  11. # start loop to execute scheduler jobs
  12. loop do
  13. logger.info 'Scheduler running...'
  14. # reconnect in case db connection is lost
  15. begin
  16. ActiveRecord::Base.connection.reconnect!
  17. rescue => e
  18. logger.error "Can't reconnect to database #{e.inspect}"
  19. end
  20. # read/load jobs and check if it is alredy started
  21. jobs = Scheduler.where('active = ?', true).order('prio ASC')
  22. jobs.each { |job|
  23. # ignore job is still running
  24. next if @@jobs_started[ job.id ]
  25. # check job.last_run
  26. next if job.last_run && job.period && job.last_run > (Time.zone.now - job.period)
  27. # run job as own thread
  28. @@jobs_started[ job.id ] = true
  29. start_job(job)
  30. sleep 10
  31. }
  32. sleep 60
  33. end
  34. end
  35. def self.start_job(job)
  36. Thread.new {
  37. ApplicationHandleInfo.current = 'scheduler'
  38. logger.info "Started job thread for '#{job.name}' (#{job.method})..."
  39. # start loop for periods under 5 minutes
  40. if job.period && job.period <= 300
  41. loop do
  42. _start_job(job)
  43. job = Scheduler.lookup(id: job.id)
  44. # exit is job got deleted
  45. break if !job
  46. # exit if job is not active anymore
  47. break if !job.active
  48. # exit if there is no loop period defined
  49. break if !job.period
  50. # wait until next run
  51. sleep job.period
  52. end
  53. else
  54. _start_job(job)
  55. end
  56. job.pid = ''
  57. job.save
  58. logger.info " ...stopped thread for '#{job.method}'"
  59. ActiveRecord::Base.connection.close
  60. # release thread lock
  61. @@jobs_started[ job.id ] = false
  62. }
  63. end
  64. def self._start_job(job, try_count = 0, try_run_time = Time.zone.now)
  65. job.last_run = Time.zone.now
  66. job.pid = Thread.current.object_id
  67. job.save
  68. logger.info "execute #{job.method} (try_count #{try_count})..."
  69. eval job.method() # rubocop:disable Lint/Eval
  70. rescue => e
  71. logger.error "execute #{job.method} (try_count #{try_count}) exited with error #{e.inspect}"
  72. # reconnect in case db connection is lost
  73. begin
  74. ActiveRecord::Base.connection.reconnect!
  75. rescue => e
  76. logger.error "Can't reconnect to database #{e.inspect}"
  77. end
  78. try_run_max = 10
  79. try_count += 1
  80. # reset error counter if to old
  81. if try_run_time + (60 * 5) < Time.zone.now
  82. try_count = 0
  83. end
  84. try_run_time = Time.zone.now
  85. # restart job again
  86. if try_run_max > try_count
  87. _start_job(job, try_count, try_run_time)
  88. else
  89. raise "STOP thread for #{job.method} after #{try_count} tries"
  90. end
  91. end
  92. def self.worker(foreground = false)
  93. # used for tests
  94. if foreground
  95. original_interface_handle = ApplicationHandleInfo.current
  96. ApplicationHandleInfo.current = 'scheduler'
  97. original_user_id = UserInfo.current_user_id
  98. UserInfo.current_user_id = nil
  99. loop do
  100. success, failure = Delayed::Worker.new.work_off
  101. if failure.nonzero?
  102. raise "ERROR: #{failure} failed background jobs: #{Delayed::Job.where('last_error IS NOT NULL').inspect}"
  103. end
  104. break if success.zero?
  105. end
  106. UserInfo.current_user_id = original_user_id
  107. ApplicationHandleInfo.current = original_interface_handle
  108. return
  109. end
  110. # used for production
  111. wait = 8
  112. Thread.new {
  113. sleep wait
  114. logger.info "Starting worker thread #{Delayed::Job}"
  115. loop do
  116. ApplicationHandleInfo.current = 'scheduler'
  117. result = nil
  118. realtime = Benchmark.realtime do
  119. logger.debug "*** worker thread, #{Delayed::Job.all.count} in queue"
  120. result = Delayed::Worker.new.work_off
  121. end
  122. count = result.sum
  123. if count.zero?
  124. sleep wait
  125. logger.debug '*** worker thread loop'
  126. else
  127. format "*** #{count} jobs processed at %.4f j/s, %d failed ...\n", count / realtime, result.last
  128. end
  129. end
  130. logger.info ' ...stopped worker thread'
  131. ActiveRecord::Base.connection.close
  132. }
  133. end
  134. end