filer_pb_tail.go 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package pb
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. "google.golang.org/grpc"
  8. "io"
  9. "time"
  10. )
  11. type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error
  12. func FollowMetadata(filerAddress string, grpcDialOption grpc.DialOption,
  13. clientName string, pathPrefix string, lastTsNs int64, selfSignature int32,
  14. processEventFn ProcessMetadataFunc, fatalOnError bool) error {
  15. err := WithFilerClient(filerAddress, grpcDialOption, makeFunc(
  16. clientName, pathPrefix, lastTsNs, selfSignature, processEventFn, fatalOnError))
  17. if err != nil {
  18. return fmt.Errorf("subscribing filer meta change: %v", err)
  19. }
  20. return err
  21. }
  22. func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient,
  23. clientName string, pathPrefix string, lastTsNs int64, selfSignature int32,
  24. processEventFn ProcessMetadataFunc, fatalOnError bool) error {
  25. err := filerClient.WithFilerClient(makeFunc(
  26. clientName, pathPrefix, lastTsNs, selfSignature, processEventFn, fatalOnError))
  27. if err != nil {
  28. return fmt.Errorf("subscribing filer meta change: %v", err)
  29. }
  30. return nil
  31. }
  32. func makeFunc(clientName string, pathPrefix string, lastTsNs int64, selfSignature int32,
  33. processEventFn ProcessMetadataFunc, fatalOnError bool) func(client filer_pb.SeaweedFilerClient) error {
  34. return func(client filer_pb.SeaweedFilerClient) error {
  35. ctx, cancel := context.WithCancel(context.Background())
  36. defer cancel()
  37. stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  38. ClientName: clientName,
  39. PathPrefix: pathPrefix,
  40. SinceNs: lastTsNs,
  41. Signature: selfSignature,
  42. })
  43. if err != nil {
  44. return fmt.Errorf("subscribe: %v", err)
  45. }
  46. for {
  47. resp, listenErr := stream.Recv()
  48. if listenErr == io.EOF {
  49. return nil
  50. }
  51. if listenErr != nil {
  52. return listenErr
  53. }
  54. if err := processEventFn(resp); err != nil {
  55. if fatalOnError {
  56. glog.Fatalf("process %v: %v", resp, err)
  57. } else {
  58. glog.Errorf("process %v: %v", resp, err)
  59. }
  60. }
  61. lastTsNs = resp.TsNs
  62. }
  63. }
  64. }
  65. func AddOffsetFunc(processEventFn ProcessMetadataFunc, offsetInterval time.Duration, offsetFunc func(counter int64, offset int64) error) ProcessMetadataFunc {
  66. var counter int64
  67. var lastWriteTime time.Time
  68. return func(resp *filer_pb.SubscribeMetadataResponse) error {
  69. if err := processEventFn(resp); err != nil {
  70. return err
  71. }
  72. counter++
  73. if lastWriteTime.Add(offsetInterval).Before(time.Now()) {
  74. counter = 0
  75. lastWriteTime = time.Now()
  76. if err := offsetFunc(counter, resp.TsNs); err != nil {
  77. return err
  78. }
  79. }
  80. return nil
  81. }
  82. }