scheduler.rb 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. # Copyright (C) 2012-2014 Zammad Foundation, http://zammad-foundation.org/
  2. # rubocop:disable Rails/Output
  3. class Scheduler < ApplicationModel
  4. # rubocop:disable Style/ClassVars
  5. @@jobs_started = {}
  6. # rubocop:enable Style/ClassVars
  7. # start threads
  8. def self.threads
  9. Thread.abort_on_exception = true
  10. # start worker for background jobs
  11. worker
  12. # start loop to execute scheduler jobs
  13. loop do
  14. logger.info 'Scheduler running...'
  15. # reconnect in case db connection is lost
  16. begin
  17. ActiveRecord::Base.connection.reconnect!
  18. rescue => e
  19. logger.error "Can't reconnect to database #{ e.inspect }"
  20. end
  21. # read/load jobs and check if it is alredy started
  22. jobs = Scheduler.where( 'active = ?', true )
  23. jobs.each {|job|
  24. # ignore job is still running
  25. next if @@jobs_started[ job.id ]
  26. # check job.last_run
  27. next if job.last_run && job.period && job.last_run > ( Time.zone.now - job.period )
  28. # run job as own thread
  29. @@jobs_started[ job.id ] = true
  30. start_job( job )
  31. }
  32. sleep 60
  33. end
  34. end
  35. def self.start_job( job )
  36. sleep 4
  37. Thread.new {
  38. logger.info "Started job thread for '#{job.name}' (#{job.method})..."
  39. # start loop for periods under 5 minutes
  40. if job.period && job.period <= 300
  41. loop do
  42. _start_job( job )
  43. job = Scheduler.lookup( id: job.id )
  44. # exit is job got deleted
  45. break if !job
  46. # exit if job is not active anymore
  47. break if !job.active
  48. # exit if there is no loop period defined
  49. break if !job.period
  50. # wait until next run
  51. sleep job.period
  52. end
  53. else
  54. _start_job( job )
  55. end
  56. job.pid = ''
  57. job.save
  58. logger.info " ...stopped thread for '#{job.method}'"
  59. ActiveRecord::Base.connection.close
  60. # release thread lock
  61. @@jobs_started[ job.id ] = false
  62. }
  63. end
  64. def self._start_job( job, try_count = 0, try_run_time = Time.zone.now )
  65. sleep 5
  66. begin
  67. job.last_run = Time.zone.now
  68. job.pid = Thread.current.object_id
  69. job.save
  70. logger.info "execute #{job.method} (try_count #{try_count})..."
  71. eval job.method() # rubocop:disable Lint/Eval
  72. rescue => e
  73. logger.error "execute #{job.method} (try_count #{try_count}) exited with error #{ e.inspect }"
  74. # reconnect in case db connection is lost
  75. begin
  76. ActiveRecord::Base.connection.reconnect!
  77. rescue => e
  78. logger.error "Can't reconnect to database #{ e.inspect }"
  79. end
  80. try_run_max = 10
  81. try_count += 1
  82. # reset error counter if to old
  83. if try_run_time + ( 60 * 5 ) < Time.zone.now
  84. try_count = 0
  85. end
  86. try_run_time = Time.zone.now
  87. # restart job again
  88. if try_run_max > try_count
  89. _start_job( job, try_count, try_run_time)
  90. else
  91. raise "STOP thread for #{job.method} after #{try_count} tries"
  92. end
  93. end
  94. end
  95. def self.worker
  96. wait = 8
  97. Thread.new {
  98. sleep wait
  99. logger.info "Starting worker thread #{Delayed::Job}"
  100. loop do
  101. result = nil
  102. realtime = Benchmark.realtime do
  103. result = Delayed::Worker.new.work_off
  104. end
  105. count = result.sum
  106. if count.zero?
  107. sleep wait
  108. logger.debug '*** worker thread loop'
  109. else
  110. format "*** #{count} jobs processed at %.4f j/s, %d failed ...\n", count / realtime, result.last
  111. end
  112. end
  113. logger.info ' ...stopped worker thread'
  114. ActiveRecord::Base.connection.close
  115. }
  116. end
  117. def self.check( name, time_warning = 10, time_critical = 20 )
  118. time_warning_time = Time.zone.now - time_warning.minutes
  119. time_critical_time = Time.zone.now - time_critical.minutes
  120. scheduler = Scheduler.find_by( name: name )
  121. if !scheduler
  122. puts "CRITICAL - no such scheduler jobs '#{name}'"
  123. return true
  124. end
  125. logger.debug scheduler.inspect
  126. if !scheduler.last_run
  127. puts "CRITICAL - scheduler jobs never started '#{name}'"
  128. exit 2
  129. end
  130. if scheduler.last_run < time_critical_time
  131. puts "CRITICAL - scheduler jobs was not running in last '#{time_critical}' minutes - last run at '#{scheduler.last_run}' '#{name}'"
  132. exit 2
  133. end
  134. if scheduler.last_run < time_warning_time
  135. puts "CRITICAL - scheduler jobs was not running in last '#{time_warning}' minutes - last run at '#{scheduler.last_run}' '#{name}'"
  136. exit 2
  137. end
  138. puts "ok - scheduler jobs was running at '#{scheduler.last_run}' '#{name}'"
  139. exit 0
  140. end
  141. end