chain_test.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. //nolint:all
  2. package cron
  3. import (
  4. "io"
  5. "log"
  6. "reflect"
  7. "sync"
  8. "testing"
  9. "time"
  10. )
  11. func appendingJob(slice *[]int, value int) Job {
  12. var m sync.Mutex
  13. return FuncJob(func() {
  14. m.Lock()
  15. *slice = append(*slice, value)
  16. m.Unlock()
  17. })
  18. }
  19. func appendingWrapper(slice *[]int, value int) JobWrapper {
  20. return func(j Job) Job {
  21. return FuncJob(func() {
  22. appendingJob(slice, value).Run()
  23. j.Run()
  24. })
  25. }
  26. }
  27. func TestChain(t *testing.T) {
  28. var nums []int
  29. var (
  30. append1 = appendingWrapper(&nums, 1)
  31. append2 = appendingWrapper(&nums, 2)
  32. append3 = appendingWrapper(&nums, 3)
  33. append4 = appendingJob(&nums, 4)
  34. )
  35. NewChain(append1, append2, append3).Then(append4).Run()
  36. if !reflect.DeepEqual(nums, []int{1, 2, 3, 4}) {
  37. t.Error("unexpected order of calls:", nums)
  38. }
  39. }
  40. func TestChainRecover(t *testing.T) {
  41. panickingJob := FuncJob(func() {
  42. panic("panickingJob panics")
  43. })
  44. t.Run("panic exits job by default", func(*testing.T) {
  45. defer func() {
  46. if err := recover(); err == nil {
  47. t.Errorf("panic expected, but none received")
  48. }
  49. }()
  50. NewChain().Then(panickingJob).
  51. Run()
  52. })
  53. t.Run("Recovering JobWrapper recovers", func(*testing.T) {
  54. NewChain(Recover(PrintfLogger(log.New(io.Discard, "", 0)))).
  55. Then(panickingJob).
  56. Run()
  57. })
  58. t.Run("composed with the *IfStillRunning wrappers", func(*testing.T) {
  59. NewChain(Recover(PrintfLogger(log.New(io.Discard, "", 0)))).
  60. Then(panickingJob).
  61. Run()
  62. })
  63. }
  64. type countJob struct {
  65. m sync.Mutex
  66. started int
  67. done int
  68. delay time.Duration
  69. }
  70. func (j *countJob) Run() {
  71. j.m.Lock()
  72. j.started++
  73. j.m.Unlock()
  74. time.Sleep(j.delay)
  75. j.m.Lock()
  76. j.done++
  77. j.m.Unlock()
  78. }
  79. func (j *countJob) Started() int {
  80. defer j.m.Unlock()
  81. j.m.Lock()
  82. return j.started
  83. }
  84. func (j *countJob) Done() int {
  85. defer j.m.Unlock()
  86. j.m.Lock()
  87. return j.done
  88. }
  89. func TestChainDelayIfStillRunning(t *testing.T) {
  90. t.Run("runs immediately", func(*testing.T) {
  91. var j countJob
  92. wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
  93. go wrappedJob.Run()
  94. time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete.
  95. if c := j.Done(); c != 1 {
  96. t.Errorf("expected job run once, immediately, got %d", c)
  97. }
  98. })
  99. t.Run("second run immediate if first done", func(*testing.T) {
  100. var j countJob
  101. wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
  102. go func() {
  103. go wrappedJob.Run()
  104. time.Sleep(time.Millisecond)
  105. go wrappedJob.Run()
  106. }()
  107. time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete.
  108. if c := j.Done(); c != 2 {
  109. t.Errorf("expected job run twice, immediately, got %d", c)
  110. }
  111. })
  112. t.Run("second run delayed if first not done", func(*testing.T) {
  113. var j countJob
  114. j.delay = 10 * time.Millisecond
  115. wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
  116. go func() {
  117. go wrappedJob.Run()
  118. time.Sleep(time.Millisecond)
  119. go wrappedJob.Run()
  120. }()
  121. // After 5ms, the first job is still in progress, and the second job was
  122. // run but should be waiting for it to finish.
  123. time.Sleep(5 * time.Millisecond)
  124. started, done := j.Started(), j.Done()
  125. if started != 1 || done != 0 {
  126. t.Error("expected first job started, but not finished, got", started, done)
  127. }
  128. // Verify that the second job completes.
  129. time.Sleep(25 * time.Millisecond)
  130. started, done = j.Started(), j.Done()
  131. if started != 2 || done != 2 {
  132. t.Error("expected both jobs done, got", started, done)
  133. }
  134. })
  135. }
  136. func TestChainSkipIfStillRunning(t *testing.T) {
  137. t.Run("runs immediately", func(*testing.T) {
  138. var j countJob
  139. wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
  140. go wrappedJob.Run()
  141. time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete.
  142. if c := j.Done(); c != 1 {
  143. t.Errorf("expected job run once, immediately, got %d", c)
  144. }
  145. })
  146. t.Run("second run immediate if first done", func(*testing.T) {
  147. var j countJob
  148. wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
  149. go func() {
  150. go wrappedJob.Run()
  151. time.Sleep(time.Millisecond)
  152. go wrappedJob.Run()
  153. }()
  154. time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete.
  155. if c := j.Done(); c != 2 {
  156. t.Errorf("expected job run twice, immediately, got %d", c)
  157. }
  158. })
  159. t.Run("second run skipped if first not done", func(*testing.T) {
  160. var j countJob
  161. j.delay = 10 * time.Millisecond
  162. wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
  163. go func() {
  164. go wrappedJob.Run()
  165. time.Sleep(time.Millisecond)
  166. go wrappedJob.Run()
  167. }()
  168. // After 5ms, the first job is still in progress, and the second job was
  169. // aleady skipped.
  170. time.Sleep(5 * time.Millisecond)
  171. started, done := j.Started(), j.Done()
  172. if started != 1 || done != 0 {
  173. t.Error("expected first job started, but not finished, got", started, done)
  174. }
  175. // Verify that the first job completes and second does not run.
  176. time.Sleep(25 * time.Millisecond)
  177. started, done = j.Started(), j.Done()
  178. if started != 1 || done != 1 {
  179. t.Error("expected second job skipped, got", started, done)
  180. }
  181. })
  182. t.Run("skip 10 jobs on rapid fire", func(*testing.T) {
  183. var j countJob
  184. j.delay = 10 * time.Millisecond
  185. wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
  186. for i := 0; i < 11; i++ {
  187. go wrappedJob.Run()
  188. }
  189. time.Sleep(200 * time.Millisecond)
  190. done := j.Done()
  191. if done != 1 {
  192. t.Error("expected 1 jobs executed, 10 jobs dropped, got", done)
  193. }
  194. })
  195. t.Run("different jobs independent", func(*testing.T) {
  196. var j1, j2 countJob
  197. j1.delay = 10 * time.Millisecond
  198. j2.delay = 10 * time.Millisecond
  199. chain := NewChain(SkipIfStillRunning(DiscardLogger))
  200. wrappedJob1 := chain.Then(&j1)
  201. wrappedJob2 := chain.Then(&j2)
  202. for i := 0; i < 11; i++ {
  203. go wrappedJob1.Run()
  204. go wrappedJob2.Run()
  205. }
  206. time.Sleep(100 * time.Millisecond)
  207. var (
  208. done1 = j1.Done()
  209. done2 = j2.Done()
  210. )
  211. if done1 != 1 || done2 != 1 {
  212. t.Error("expected both jobs executed once, got", done1, "and", done2)
  213. }
  214. })
  215. }