filer_pb_tail.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package pb
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/util"
  8. "google.golang.org/grpc"
  9. "io"
  10. "time"
  11. )
  12. type EventErrorType int
  13. const (
  14. TrivialOnError EventErrorType = iota
  15. FatalOnError
  16. RetryForeverOnError
  17. )
  18. // MetadataFollowOption is used to control the behavior of the metadata following
  19. // process. Part of it is used as a cursor to resume the following process.
  20. type MetadataFollowOption struct {
  21. ClientName string
  22. ClientId int32
  23. ClientEpoch int32
  24. SelfSignature int32
  25. PathPrefix string
  26. AdditionalPathPrefixes []string
  27. DirectoriesToWatch []string
  28. StartTsNs int64
  29. StopTsNs int64
  30. EventErrorType EventErrorType
  31. }
  32. type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error
  33. func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption, option *MetadataFollowOption, processEventFn ProcessMetadataFunc) error {
  34. err := WithFilerClient(true, option.SelfSignature, filerAddress, grpcDialOption, makeSubscribeMetadataFunc(option, processEventFn))
  35. if err != nil {
  36. return fmt.Errorf("subscribing filer meta change: %v", err)
  37. }
  38. return err
  39. }
  40. func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, option *MetadataFollowOption, processEventFn ProcessMetadataFunc) error {
  41. err := filerClient.WithFilerClient(true, makeSubscribeMetadataFunc(option, processEventFn))
  42. if err != nil {
  43. return fmt.Errorf("subscribing filer meta change: %v", err)
  44. }
  45. return nil
  46. }
  47. func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn ProcessMetadataFunc) func(client filer_pb.SeaweedFilerClient) error {
  48. return func(client filer_pb.SeaweedFilerClient) error {
  49. ctx, cancel := context.WithCancel(context.Background())
  50. defer cancel()
  51. stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  52. ClientName: option.ClientName,
  53. PathPrefix: option.PathPrefix,
  54. PathPrefixes: option.AdditionalPathPrefixes,
  55. Directories: option.DirectoriesToWatch,
  56. SinceNs: option.StartTsNs,
  57. Signature: option.SelfSignature,
  58. ClientId: option.ClientId,
  59. ClientEpoch: option.ClientEpoch,
  60. UntilNs: option.StopTsNs,
  61. })
  62. if err != nil {
  63. return fmt.Errorf("subscribe: %v", err)
  64. }
  65. for {
  66. resp, listenErr := stream.Recv()
  67. if listenErr == io.EOF {
  68. return nil
  69. }
  70. if listenErr != nil {
  71. return listenErr
  72. }
  73. if err := processEventFn(resp); err != nil {
  74. switch option.EventErrorType {
  75. case TrivialOnError:
  76. glog.Errorf("process %v: %v", resp, err)
  77. case FatalOnError:
  78. glog.Fatalf("process %v: %v", resp, err)
  79. case RetryForeverOnError:
  80. util.RetryUntil("followMetaUpdates", func() error {
  81. return processEventFn(resp)
  82. }, func(err error) bool {
  83. glog.Errorf("process %v: %v", resp, err)
  84. return true
  85. })
  86. default:
  87. glog.Errorf("process %v: %v", resp, err)
  88. }
  89. }
  90. option.StartTsNs = resp.TsNs
  91. }
  92. }
  93. }
  94. func AddOffsetFunc(processEventFn ProcessMetadataFunc, offsetInterval time.Duration, offsetFunc func(counter int64, offset int64) error) ProcessMetadataFunc {
  95. var counter int64
  96. var lastWriteTime time.Time
  97. return func(resp *filer_pb.SubscribeMetadataResponse) error {
  98. if err := processEventFn(resp); err != nil {
  99. return err
  100. }
  101. counter++
  102. if lastWriteTime.Add(offsetInterval).Before(time.Now()) {
  103. lastWriteTime = time.Now()
  104. if err := offsetFunc(counter, resp.TsNs); err != nil {
  105. return err
  106. }
  107. counter = 0
  108. }
  109. return nil
  110. }
  111. }