watcher.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. //go:build watcher
  2. package watcher
  3. // #cgo LDFLAGS: -lwatcher -lstdc++
  4. // #cgo CFLAGS: -Wall -Werror
  5. // #include <stdint.h>
  6. // #include <stdlib.h>
  7. // #include "watcher.h"
  8. import "C"
  9. import (
  10. "errors"
  11. "runtime/cgo"
  12. "sync"
  13. "time"
  14. "unsafe"
  15. "go.uber.org/zap"
  16. )
  17. type watcher struct {
  18. sessions []C.uintptr_t
  19. callback func()
  20. trigger chan struct{}
  21. stop chan struct{}
  22. }
  23. // duration to wait before triggering a reload after a file change
  24. const debounceDuration = 150 * time.Millisecond
  25. var (
  26. // the currently active file watcher
  27. activeWatcher *watcher
  28. // after stopping the watcher we will wait for eventual reloads to finish
  29. reloadWaitGroup sync.WaitGroup
  30. // we are passing the logger from the main package to the watcher
  31. logger *zap.Logger
  32. AlreadyStartedError = errors.New("the watcher is already running")
  33. UnableToStartWatching = errors.New("unable to start the watcher")
  34. )
  35. func InitWatcher(filePatterns []string, callback func(), zapLogger *zap.Logger) error {
  36. if len(filePatterns) == 0 {
  37. return nil
  38. }
  39. if activeWatcher != nil {
  40. return AlreadyStartedError
  41. }
  42. logger = zapLogger
  43. activeWatcher = &watcher{callback: callback}
  44. err := activeWatcher.startWatching(filePatterns)
  45. if err != nil {
  46. return err
  47. }
  48. reloadWaitGroup = sync.WaitGroup{}
  49. return nil
  50. }
  51. func DrainWatcher() {
  52. if activeWatcher == nil {
  53. return
  54. }
  55. logger.Debug("stopping watcher")
  56. activeWatcher.stopWatching()
  57. reloadWaitGroup.Wait()
  58. activeWatcher = nil
  59. }
  60. func (w *watcher) startWatching(filePatterns []string) error {
  61. w.trigger = make(chan struct{})
  62. w.stop = make(chan struct{})
  63. w.sessions = make([]C.uintptr_t, len(filePatterns))
  64. watchPatterns, err := parseFilePatterns(filePatterns)
  65. if err != nil {
  66. return err
  67. }
  68. for i, watchPattern := range watchPatterns {
  69. watchPattern.trigger = w.trigger
  70. session, err := startSession(watchPattern)
  71. if err != nil {
  72. return err
  73. }
  74. w.sessions[i] = session
  75. }
  76. go listenForFileEvents(w.trigger, w.stop)
  77. return nil
  78. }
  79. func (w *watcher) stopWatching() {
  80. close(w.stop)
  81. for _, session := range w.sessions {
  82. stopSession(session)
  83. }
  84. }
  85. func startSession(w *watchPattern) (C.uintptr_t, error) {
  86. handle := cgo.NewHandle(w)
  87. cDir := C.CString(w.dir)
  88. defer C.free(unsafe.Pointer(cDir))
  89. watchSession := C.start_new_watcher(cDir, C.uintptr_t(handle))
  90. if watchSession != 0 {
  91. logger.Debug("watching", zap.String("dir", w.dir), zap.Strings("patterns", w.patterns))
  92. return watchSession, nil
  93. }
  94. logger.Error("couldn't start watching", zap.String("dir", w.dir))
  95. return watchSession, UnableToStartWatching
  96. }
  97. func stopSession(session C.uintptr_t) {
  98. success := C.stop_watcher(session)
  99. if success == 0 {
  100. logger.Warn("couldn't close the watcher")
  101. }
  102. }
  103. //export go_handle_file_watcher_event
  104. func go_handle_file_watcher_event(path *C.char, eventType C.int, pathType C.int, handle C.uintptr_t) {
  105. watchPattern := cgo.Handle(handle).Value().(*watchPattern)
  106. if watchPattern.allowReload(C.GoString(path), int(eventType), int(pathType)) {
  107. watchPattern.trigger <- struct{}{}
  108. }
  109. }
  110. func listenForFileEvents(triggerWatcher chan struct{}, stopWatcher chan struct{}) {
  111. timer := time.NewTimer(debounceDuration)
  112. timer.Stop()
  113. defer timer.Stop()
  114. for {
  115. select {
  116. case <-stopWatcher:
  117. break
  118. case <-triggerWatcher:
  119. timer.Reset(debounceDuration)
  120. case <-timer.C:
  121. timer.Stop()
  122. scheduleReload()
  123. }
  124. }
  125. }
  126. func scheduleReload() {
  127. logger.Info("filesystem change detected")
  128. reloadWaitGroup.Add(1)
  129. activeWatcher.callback()
  130. reloadWaitGroup.Done()
  131. }