12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- package pb
- import (
- "context"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "google.golang.org/grpc"
- "io"
- "time"
- )
- type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error
- func FollowMetadata(filerAddress string, grpcDialOption grpc.DialOption,
- clientName string, pathPrefix string, lastTsNs int64, selfSignature int32,
- processEventFn ProcessMetadataFunc, fatalOnError bool) error {
- err := WithFilerClient(filerAddress, grpcDialOption, makeFunc(
- clientName, pathPrefix, lastTsNs, selfSignature, processEventFn, fatalOnError))
- if err != nil {
- return fmt.Errorf("subscribing filer meta change: %v", err)
- }
- return err
- }
- func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient,
- clientName string, pathPrefix string, lastTsNs int64, selfSignature int32,
- processEventFn ProcessMetadataFunc, fatalOnError bool) error {
- err := filerClient.WithFilerClient(makeFunc(
- clientName, pathPrefix, lastTsNs, selfSignature, processEventFn, fatalOnError))
- if err != nil {
- return fmt.Errorf("subscribing filer meta change: %v", err)
- }
- return nil
- }
- func makeFunc(clientName string, pathPrefix string, lastTsNs int64, selfSignature int32,
- processEventFn ProcessMetadataFunc, fatalOnError bool) func(client filer_pb.SeaweedFilerClient) error {
- return func(client filer_pb.SeaweedFilerClient) error {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: clientName,
- PathPrefix: pathPrefix,
- SinceNs: lastTsNs,
- Signature: selfSignature,
- })
- if err != nil {
- return fmt.Errorf("subscribe: %v", err)
- }
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
- if err := processEventFn(resp); err != nil {
- if fatalOnError {
- glog.Fatalf("process %v: %v", resp, err)
- } else {
- glog.Errorf("process %v: %v", resp, err)
- }
- }
- lastTsNs = resp.TsNs
- }
- }
- }
- func AddOffsetFunc(processEventFn ProcessMetadataFunc, offsetInterval time.Duration, offsetFunc func(counter int64, offset int64) error) ProcessMetadataFunc {
- var counter int64
- var lastWriteTime time.Time
- return func(resp *filer_pb.SubscribeMetadataResponse) error {
- if err := processEventFn(resp); err != nil {
- return err
- }
- counter++
- if lastWriteTime.Add(offsetInterval).Before(time.Now()) {
- counter = 0
- lastWriteTime = time.Now()
- if err := offsetFunc(counter, resp.TsNs); err != nil {
- return err
- }
- }
- return nil
- }
- }
|