filer_pb_tail.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package pb
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. "google.golang.org/grpc"
  8. "io"
  9. "time"
  10. )
  11. type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error
  12. func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption, clientName string, clientId int32,
  13. pathPrefix string, additionalPathPrefixes []string, lastTsNs int64, selfSignature int32,
  14. processEventFn ProcessMetadataFunc, fatalOnError bool) error {
  15. err := WithFilerClient(true, filerAddress, grpcDialOption, makeFunc(clientName, clientId,
  16. pathPrefix, additionalPathPrefixes, &lastTsNs, selfSignature, processEventFn, fatalOnError))
  17. if err != nil {
  18. return fmt.Errorf("subscribing filer meta change: %v", err)
  19. }
  20. return err
  21. }
  22. func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient,
  23. clientName string, clientId int32, pathPrefix string, lastTsNs *int64, selfSignature int32,
  24. processEventFn ProcessMetadataFunc, fatalOnError bool) error {
  25. err := filerClient.WithFilerClient(true, makeFunc(clientName, clientId,
  26. pathPrefix, nil, lastTsNs, selfSignature, processEventFn, fatalOnError))
  27. if err != nil {
  28. return fmt.Errorf("subscribing filer meta change: %v", err)
  29. }
  30. return nil
  31. }
  32. func makeFunc(clientName string, clientId int32, pathPrefix string, additionalPathPrefixes []string, lastTsNs *int64, selfSignature int32,
  33. processEventFn ProcessMetadataFunc, fatalOnError bool) func(client filer_pb.SeaweedFilerClient) error {
  34. return func(client filer_pb.SeaweedFilerClient) error {
  35. ctx, cancel := context.WithCancel(context.Background())
  36. defer cancel()
  37. stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  38. ClientName: clientName,
  39. PathPrefix: pathPrefix,
  40. PathPrefixes: additionalPathPrefixes,
  41. SinceNs: *lastTsNs,
  42. Signature: selfSignature,
  43. ClientId: clientId,
  44. })
  45. if err != nil {
  46. return fmt.Errorf("subscribe: %v", err)
  47. }
  48. for {
  49. resp, listenErr := stream.Recv()
  50. if listenErr == io.EOF {
  51. return nil
  52. }
  53. if listenErr != nil {
  54. return listenErr
  55. }
  56. if err := processEventFn(resp); err != nil {
  57. if fatalOnError {
  58. glog.Fatalf("process %v: %v", resp, err)
  59. } else {
  60. glog.Errorf("process %v: %v", resp, err)
  61. }
  62. }
  63. *lastTsNs = resp.TsNs
  64. }
  65. }
  66. }
  67. func AddOffsetFunc(processEventFn ProcessMetadataFunc, offsetInterval time.Duration, offsetFunc func(counter int64, offset int64) error) ProcessMetadataFunc {
  68. var counter int64
  69. var lastWriteTime time.Time
  70. return func(resp *filer_pb.SubscribeMetadataResponse) error {
  71. if err := processEventFn(resp); err != nil {
  72. return err
  73. }
  74. counter++
  75. if lastWriteTime.Add(offsetInterval).Before(time.Now()) {
  76. lastWriteTime = time.Now()
  77. if err := offsetFunc(counter, resp.TsNs); err != nil {
  78. return err
  79. }
  80. counter = 0
  81. }
  82. return nil
  83. }
  84. }