scheduler.rb 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. # Copyright (C) 2012-2014 Zammad Foundation, http://zammad-foundation.org/
  2. class Scheduler < ApplicationModel
  3. def self.run( runner, runner_count )
  4. Thread.abort_on_exception = true
  5. jobs_started = {}
  6. while true
  7. puts "Scheduler running (runner #{runner} of #{runner_count})..."
  8. # read/load jobs and check if it is alredy started
  9. jobs = Scheduler.where( 'active = ? AND prio = ?', true, runner )
  10. jobs.each {|job|
  11. next if jobs_started[ job.id ]
  12. jobs_started[ job.id ] = true
  13. self.start_job( job, runner, runner_count )
  14. }
  15. sleep 45
  16. end
  17. end
  18. def self.start_job( job, runner, runner_count )
  19. puts "started job thread for '#{job.name}' (#{job.method})..."
  20. sleep 4
  21. Thread.new {
  22. if job.period
  23. while true
  24. self._start_job( job, runner, runner_count )
  25. job = Scheduler.where( :id => job.id ).first
  26. # exit is job got deleted
  27. break if !job
  28. # exit if job is not active anymore
  29. break if !job.active
  30. # exit if there is no loop period defined
  31. break if !job.period
  32. # wait until next run
  33. sleep job.period
  34. end
  35. else
  36. self._start_job( job, runner, runner_count )
  37. end
  38. # raise "Exception from thread"
  39. job.pid = ''
  40. job.save
  41. puts " ...stopped thread for '#{job.method}'"
  42. }
  43. end
  44. def self._start_job( job, runner, runner_count )
  45. puts "execute #{job.method} (runner #{runner} of #{runner_count})..."
  46. job.last_run = Time.now
  47. job.pid = Thread.current.object_id
  48. job.save
  49. eval job.method()
  50. end
  51. def self.worker
  52. wait = 10
  53. puts "*** Starting worker #{Delayed::Job.to_s}"
  54. loop do
  55. result = nil
  56. realtime = Benchmark.realtime do
  57. result = Delayed::Worker.new.work_off
  58. end
  59. count = result.sum
  60. break if $exit
  61. if count.zero?
  62. sleep(wait)
  63. puts "*** worker loop"
  64. else
  65. printf "*** #{count} jobs processed at %.4f j/s, %d failed ...\n" % [count / realtime, result.last]
  66. end
  67. end
  68. end
  69. def self.check( name, time_warning = 10, time_critical = 20 )
  70. time_warning_time = Time.now - time_warning.minutes
  71. time_critical_time = Time.now - time_critical.minutes
  72. scheduler = Scheduler.where( :name => name ).first
  73. if !scheduler
  74. puts "CRITICAL - no such scheduler jobs '#{name}'"
  75. return true
  76. end
  77. #puts "S " + scheduler.inspect
  78. if !scheduler.last_run
  79. puts "CRITICAL - scheduler jobs never started '#{name}'"
  80. exit 2
  81. end
  82. if scheduler.last_run < time_critical_time
  83. puts "CRITICAL - scheduler jobs was not running in last '#{time_critical.to_s}' minutes - last run at '#{scheduler.last_run.to_s}' '#{name}'"
  84. exit 2
  85. end
  86. if scheduler.last_run < time_warning_time
  87. puts "CRITICAL - scheduler jobs was not running in last '#{time_warning.to_s}' minutes - last run at '#{scheduler.last_run.to_s}' '#{name}'"
  88. exit 2
  89. end
  90. puts "ok - scheduler jobs was running at '#{scheduler.last_run.to_s}' '#{name}'"
  91. exit 0
  92. end
  93. end