watcher.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. //go:build !nowatcher
  2. package watcher
  3. // #cgo LDFLAGS: -lwatcher-c -lstdc++
  4. // #include <stdint.h>
  5. // #include <stdlib.h>
  6. // #include "watcher.h"
  7. import "C"
  8. import (
  9. "errors"
  10. "runtime/cgo"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. "unsafe"
  16. "go.uber.org/zap"
  17. )
  18. type watcher struct {
  19. sessions []C.uintptr_t
  20. callback func()
  21. trigger chan struct{}
  22. stop chan struct{}
  23. }
  24. // duration to wait before triggering a reload after a file change
  25. const debounceDuration = 150 * time.Millisecond
  26. // times to retry watching if the watcher was closed prematurely
  27. const maxFailureCount = 5
  28. const failureResetDuration = 5 * time.Second
  29. var failureMu = sync.Mutex{}
  30. var watcherIsActive = atomic.Bool{}
  31. var (
  32. // the currently active file watcher
  33. activeWatcher *watcher
  34. // after stopping the watcher we will wait for eventual reloads to finish
  35. reloadWaitGroup sync.WaitGroup
  36. // we are passing the logger from the main package to the watcher
  37. logger *zap.Logger
  38. AlreadyStartedError = errors.New("the watcher is already running")
  39. UnableToStartWatching = errors.New("unable to start the watcher")
  40. )
  41. func InitWatcher(filePatterns []string, callback func(), zapLogger *zap.Logger) error {
  42. if len(filePatterns) == 0 {
  43. return nil
  44. }
  45. if watcherIsActive.Load() {
  46. return AlreadyStartedError
  47. }
  48. watcherIsActive.Store(true)
  49. logger = zapLogger
  50. activeWatcher = &watcher{callback: callback}
  51. err := activeWatcher.startWatching(filePatterns)
  52. if err != nil {
  53. return err
  54. }
  55. reloadWaitGroup = sync.WaitGroup{}
  56. return nil
  57. }
  58. func DrainWatcher() {
  59. if !watcherIsActive.Load() {
  60. return
  61. }
  62. watcherIsActive.Store(false)
  63. logger.Debug("stopping watcher")
  64. activeWatcher.stopWatching()
  65. reloadWaitGroup.Wait()
  66. activeWatcher = nil
  67. }
  68. // TODO: how to test this?
  69. func retryWatching(watchPattern *watchPattern) {
  70. failureMu.Lock()
  71. defer failureMu.Unlock()
  72. if watchPattern.failureCount >= maxFailureCount {
  73. return
  74. }
  75. logger.Info("watcher was closed prematurely, retrying...", zap.String("dir", watchPattern.dir))
  76. watchPattern.failureCount++
  77. session, err := startSession(watchPattern)
  78. if err != nil {
  79. activeWatcher.sessions = append(activeWatcher.sessions, session)
  80. }
  81. // reset the failure-count if the watcher hasn't reached max failures after 5 seconds
  82. go func() {
  83. time.Sleep(failureResetDuration * time.Second)
  84. failureMu.Lock()
  85. if watchPattern.failureCount < maxFailureCount {
  86. watchPattern.failureCount = 0
  87. }
  88. failureMu.Unlock()
  89. }()
  90. }
  91. func (w *watcher) startWatching(filePatterns []string) error {
  92. w.trigger = make(chan struct{})
  93. w.stop = make(chan struct{})
  94. w.sessions = make([]C.uintptr_t, len(filePatterns))
  95. watchPatterns, err := parseFilePatterns(filePatterns)
  96. if err != nil {
  97. return err
  98. }
  99. for i, watchPattern := range watchPatterns {
  100. watchPattern.trigger = w.trigger
  101. session, err := startSession(watchPattern)
  102. if err != nil {
  103. return err
  104. }
  105. w.sessions[i] = session
  106. }
  107. go listenForFileEvents(w.trigger, w.stop)
  108. return nil
  109. }
  110. func (w *watcher) stopWatching() {
  111. close(w.stop)
  112. for _, session := range w.sessions {
  113. stopSession(session)
  114. }
  115. }
  116. func startSession(w *watchPattern) (C.uintptr_t, error) {
  117. handle := cgo.NewHandle(w)
  118. cDir := C.CString(w.dir)
  119. defer C.free(unsafe.Pointer(cDir))
  120. watchSession := C.start_new_watcher(cDir, C.uintptr_t(handle))
  121. if watchSession != 0 {
  122. logger.Debug("watching", zap.String("dir", w.dir), zap.Strings("patterns", w.patterns))
  123. return watchSession, nil
  124. }
  125. logger.Error("couldn't start watching", zap.String("dir", w.dir))
  126. return watchSession, UnableToStartWatching
  127. }
  128. func stopSession(session C.uintptr_t) {
  129. success := C.stop_watcher(session)
  130. if success == 0 {
  131. logger.Warn("couldn't close the watcher")
  132. }
  133. }
  134. //export go_handle_file_watcher_event
  135. func go_handle_file_watcher_event(path *C.char, eventType C.int, pathType C.int, handle C.uintptr_t) {
  136. watchPattern := cgo.Handle(handle).Value().(*watchPattern)
  137. goPath := C.GoString(path)
  138. if watchPattern.allowReload(goPath, int(eventType), int(pathType)) {
  139. watchPattern.trigger <- struct{}{}
  140. }
  141. // If the watcher prematurely sends the die@ event, retry watching
  142. if pathType == 4 && strings.HasPrefix(goPath, "e/self/die@") && watcherIsActive.Load() {
  143. retryWatching(watchPattern)
  144. }
  145. }
  146. func listenForFileEvents(triggerWatcher chan struct{}, stopWatcher chan struct{}) {
  147. timer := time.NewTimer(debounceDuration)
  148. timer.Stop()
  149. defer timer.Stop()
  150. for {
  151. select {
  152. case <-stopWatcher:
  153. break
  154. case <-triggerWatcher:
  155. timer.Reset(debounceDuration)
  156. case <-timer.C:
  157. timer.Stop()
  158. scheduleReload()
  159. }
  160. }
  161. }
  162. func scheduleReload() {
  163. logger.Info("filesystem change detected")
  164. reloadWaitGroup.Add(1)
  165. activeWatcher.callback()
  166. reloadWaitGroup.Done()
  167. }