filer_pb_tail.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package pb
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/util"
  8. "google.golang.org/grpc"
  9. "io"
  10. "time"
  11. )
  12. type EventErrorType int
  13. const (
  14. TrivialOnError EventErrorType = iota
  15. FatalOnError
  16. RetryForeverOnError
  17. DontLogError
  18. )
  19. // MetadataFollowOption is used to control the behavior of the metadata following
  20. // process. Part of it is used as a cursor to resume the following process.
  21. type MetadataFollowOption struct {
  22. ClientName string
  23. ClientId int32
  24. ClientEpoch int32
  25. SelfSignature int32
  26. PathPrefix string
  27. AdditionalPathPrefixes []string
  28. DirectoriesToWatch []string
  29. StartTsNs int64
  30. StopTsNs int64
  31. EventErrorType EventErrorType
  32. }
  33. type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error
  34. func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption, option *MetadataFollowOption, processEventFn ProcessMetadataFunc) error {
  35. err := WithFilerClient(true, option.SelfSignature, filerAddress, grpcDialOption, makeSubscribeMetadataFunc(option, processEventFn))
  36. if err != nil {
  37. return fmt.Errorf("subscribing filer meta change: %v", err)
  38. }
  39. return err
  40. }
  41. func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, option *MetadataFollowOption, processEventFn ProcessMetadataFunc) error {
  42. err := filerClient.WithFilerClient(true, makeSubscribeMetadataFunc(option, processEventFn))
  43. if err != nil {
  44. return fmt.Errorf("subscribing filer meta change: %v", err)
  45. }
  46. return nil
  47. }
  48. func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn ProcessMetadataFunc) func(client filer_pb.SeaweedFilerClient) error {
  49. return func(client filer_pb.SeaweedFilerClient) error {
  50. ctx, cancel := context.WithCancel(context.Background())
  51. defer cancel()
  52. stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  53. ClientName: option.ClientName,
  54. PathPrefix: option.PathPrefix,
  55. PathPrefixes: option.AdditionalPathPrefixes,
  56. Directories: option.DirectoriesToWatch,
  57. SinceNs: option.StartTsNs,
  58. Signature: option.SelfSignature,
  59. ClientId: option.ClientId,
  60. ClientEpoch: option.ClientEpoch,
  61. UntilNs: option.StopTsNs,
  62. })
  63. if err != nil {
  64. return fmt.Errorf("subscribe: %v", err)
  65. }
  66. for {
  67. resp, listenErr := stream.Recv()
  68. if listenErr == io.EOF {
  69. return nil
  70. }
  71. if listenErr != nil {
  72. return listenErr
  73. }
  74. if err := processEventFn(resp); err != nil {
  75. switch option.EventErrorType {
  76. case TrivialOnError:
  77. glog.Errorf("process %v: %v", resp, err)
  78. case FatalOnError:
  79. glog.Fatalf("process %v: %v", resp, err)
  80. case RetryForeverOnError:
  81. util.RetryUntil("followMetaUpdates", func() error {
  82. return processEventFn(resp)
  83. }, func(err error) bool {
  84. glog.Errorf("process %v: %v", resp, err)
  85. return true
  86. })
  87. case DontLogError:
  88. // pass
  89. default:
  90. glog.Errorf("process %v: %v", resp, err)
  91. }
  92. }
  93. option.StartTsNs = resp.TsNs
  94. }
  95. }
  96. }
  97. func AddOffsetFunc(processEventFn ProcessMetadataFunc, offsetInterval time.Duration, offsetFunc func(counter int64, offset int64) error) ProcessMetadataFunc {
  98. var counter int64
  99. var lastWriteTime time.Time
  100. return func(resp *filer_pb.SubscribeMetadataResponse) error {
  101. if err := processEventFn(resp); err != nil {
  102. return err
  103. }
  104. counter++
  105. if lastWriteTime.Add(offsetInterval).Before(time.Now()) {
  106. lastWriteTime = time.Now()
  107. if err := offsetFunc(counter, resp.TsNs); err != nil {
  108. return err
  109. }
  110. counter = 0
  111. }
  112. return nil
  113. }
  114. }