filer_pb_tail.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package pb
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/util"
  8. "google.golang.org/grpc"
  9. "io"
  10. "time"
  11. )
  12. type EventErrorType int
  13. const (
  14. TrivialOnError EventErrorType = iota
  15. FatalOnError
  16. RetryForeverOnError
  17. )
  18. type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error
  19. func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption, clientName string, clientId int32, clientEpoch int32,
  20. pathPrefix string, additionalPathPrefixes []string, lastTsNs int64, untilTsNs int64, selfSignature int32,
  21. processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) error {
  22. err := WithFilerClient(true, clientId, filerAddress, grpcDialOption, makeSubscribeMetadataFunc(clientName, clientId, clientEpoch, pathPrefix, additionalPathPrefixes, nil, &lastTsNs, untilTsNs, selfSignature, processEventFn, eventErrorType))
  23. if err != nil {
  24. return fmt.Errorf("subscribing filer meta change: %v", err)
  25. }
  26. return err
  27. }
  28. func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, clientName string, clientId int32, clientEpoch int32, pathPrefix string, directoriesToWatch []string, lastTsNs *int64, untilTsNs int64, selfSignature int32, processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) error {
  29. err := filerClient.WithFilerClient(true, makeSubscribeMetadataFunc(clientName, clientId, clientEpoch, pathPrefix, nil, directoriesToWatch, lastTsNs, untilTsNs, selfSignature, processEventFn, eventErrorType))
  30. if err != nil {
  31. return fmt.Errorf("subscribing filer meta change: %v", err)
  32. }
  33. return nil
  34. }
  35. func makeSubscribeMetadataFunc(clientName string, clientId int32, clientEpoch int32, pathPrefix string, additionalPathPrefixes []string, directoriesToWatch []string, lastTsNs *int64, untilTsNs int64, selfSignature int32, processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) func(client filer_pb.SeaweedFilerClient) error {
  36. return func(client filer_pb.SeaweedFilerClient) error {
  37. ctx, cancel := context.WithCancel(context.Background())
  38. defer cancel()
  39. stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  40. ClientName: clientName,
  41. PathPrefix: pathPrefix,
  42. PathPrefixes: additionalPathPrefixes,
  43. Directories: directoriesToWatch,
  44. SinceNs: *lastTsNs,
  45. Signature: selfSignature,
  46. ClientId: clientId,
  47. ClientEpoch: clientEpoch,
  48. UntilNs: untilTsNs,
  49. })
  50. if err != nil {
  51. return fmt.Errorf("subscribe: %v", err)
  52. }
  53. for {
  54. resp, listenErr := stream.Recv()
  55. if listenErr == io.EOF {
  56. return nil
  57. }
  58. if listenErr != nil {
  59. return listenErr
  60. }
  61. if err := processEventFn(resp); err != nil {
  62. switch eventErrorType {
  63. case TrivialOnError:
  64. glog.Errorf("process %v: %v", resp, err)
  65. case FatalOnError:
  66. glog.Fatalf("process %v: %v", resp, err)
  67. case RetryForeverOnError:
  68. util.RetryForever("followMetaUpdates", func() error {
  69. return processEventFn(resp)
  70. }, func(err error) bool {
  71. glog.Errorf("process %v: %v", resp, err)
  72. return true
  73. })
  74. default:
  75. glog.Errorf("process %v: %v", resp, err)
  76. }
  77. }
  78. *lastTsNs = resp.TsNs
  79. }
  80. }
  81. }
  82. func AddOffsetFunc(processEventFn ProcessMetadataFunc, offsetInterval time.Duration, offsetFunc func(counter int64, offset int64) error) ProcessMetadataFunc {
  83. var counter int64
  84. var lastWriteTime time.Time
  85. return func(resp *filer_pb.SubscribeMetadataResponse) error {
  86. if err := processEventFn(resp); err != nil {
  87. return err
  88. }
  89. counter++
  90. if lastWriteTime.Add(offsetInterval).Before(time.Now()) {
  91. lastWriteTime = time.Now()
  92. if err := offsetFunc(counter, resp.TsNs); err != nil {
  93. return err
  94. }
  95. counter = 0
  96. }
  97. return nil
  98. }
  99. }