chain.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package cron
  2. import (
  3. "errors"
  4. "fmt"
  5. "runtime"
  6. "sync"
  7. "time"
  8. )
  9. // JobWrapper decorates the given Job with some behavior.
  10. type JobWrapper func(Job) Job
  11. // Chain is a sequence of JobWrappers that decorates submitted jobs with
  12. // cross-cutting behaviors like logging or synchronization.
  13. type Chain struct {
  14. wrappers []JobWrapper
  15. }
  16. // NewChain returns a Chain consisting of the given JobWrappers.
  17. func NewChain(c ...JobWrapper) Chain {
  18. return Chain{c}
  19. }
  20. // Then decorates the given job with all JobWrappers in the chain.
  21. //
  22. // This:
  23. //
  24. // NewChain(m1, m2, m3).Then(job)
  25. //
  26. // is equivalent to:
  27. //
  28. // m1(m2(m3(job)))
  29. func (c Chain) Then(j Job) Job {
  30. for i := range c.wrappers {
  31. j = c.wrappers[len(c.wrappers)-i-1](j)
  32. }
  33. return j
  34. }
  35. // Recover panics in wrapped jobs and log them with the provided logger.
  36. func Recover(logger Logger) JobWrapper {
  37. return func(j Job) Job {
  38. return FuncJob(func() {
  39. defer func() {
  40. if r := recover(); r != nil {
  41. const size = 64 << 10
  42. buf := make([]byte, size)
  43. buf = buf[:runtime.Stack(buf, false)]
  44. err, ok := r.(error)
  45. if !ok {
  46. err = errors.New("panic: " + fmt.Sprint(r))
  47. }
  48. logger.Error(err, "panic", "stack", "...\n"+string(buf))
  49. }
  50. }()
  51. j.Run()
  52. })
  53. }
  54. }
  55. // DelayIfStillRunning serializes jobs, delaying subsequent runs until the
  56. // previous one is complete. Jobs running after a delay of more than a minute
  57. // have the delay logged at Info.
  58. func DelayIfStillRunning(logger Logger) JobWrapper {
  59. return func(j Job) Job {
  60. var mu sync.Mutex
  61. return FuncJob(func() {
  62. start := time.Now()
  63. mu.Lock()
  64. defer mu.Unlock()
  65. if dur := time.Since(start); dur > time.Minute {
  66. logger.Info("delay", "duration", dur)
  67. }
  68. j.Run()
  69. })
  70. }
  71. }
  72. // SkipIfStillRunning skips an invocation of the Job if a previous invocation is
  73. // still running. It logs skips to the given logger at Info level.
  74. func SkipIfStillRunning(logger Logger) JobWrapper {
  75. return func(j Job) Job {
  76. var ch = make(chan struct{}, 1)
  77. ch <- struct{}{}
  78. return FuncJob(func() {
  79. select {
  80. case v := <-ch:
  81. defer func() { ch <- v }()
  82. j.Run()
  83. default:
  84. logger.Info("skip")
  85. }
  86. })
  87. }
  88. }