scheduler.rb 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. # Copyright (C) 2012-2013 Zammad Foundation, http://zammad-foundation.org/
  2. class Scheduler < ApplicationModel
  3. def self.run( worker, worker_count )
  4. Thread.abort_on_exception = true
  5. jobs_started = {}
  6. while true
  7. puts "Scheduler running (worker #{worker} of #{worker_count})..."
  8. # read/load jobs and check if it is alredy started
  9. jobs = Scheduler.where( 'active = ? AND prio = ?', true, worker )
  10. jobs.each {|job|
  11. next if jobs_started[ job.id ]
  12. jobs_started[ job.id ] = true
  13. self.start_job( job, worker, worker_count )
  14. }
  15. sleep 45
  16. end
  17. end
  18. def self.start_job( job, worker, worker_count )
  19. puts "started job thread for '#{job.name}' (#{job.method})..."
  20. sleep 4
  21. Thread.new {
  22. if job.period
  23. while true
  24. self._start_job( job, worker, worker_count )
  25. job = Scheduler.where( :id => job.id ).first
  26. # exit is job got deleted
  27. break if !job
  28. # exit if job is not active anymore
  29. break if !job.active
  30. # exit if there is no loop period defined
  31. break if !job.period
  32. # wait until next run
  33. sleep job.period
  34. end
  35. else
  36. self._start_job( job, worker, worker_count )
  37. end
  38. # raise "Exception from thread"
  39. job.pid = ''
  40. job.save
  41. puts " ...stopped thread for '#{job.method}'"
  42. }
  43. end
  44. def self._start_job( job, worker, worker_count )
  45. puts "execute #{job.method} (worker #{worker} of #{worker_count})..."
  46. job.last_run = Time.now
  47. job.pid = Thread.current.object_id
  48. job.save
  49. eval job.method()
  50. end
  51. end