1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- package operation
- import (
- "context"
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "io"
- "google.golang.org/grpc"
- "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle"
- )
- func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
- // find volume location, replication, ttl info
- lookup, err := LookupVolumeId(masterFn, grpcDialOption, vid.String())
- if err != nil {
- return fmt.Errorf("look up volume %d: %v", vid, err)
- }
- if len(lookup.Locations) == 0 {
- return fmt.Errorf("unable to locate volume %d", vid)
- }
- volumeServer := lookup.Locations[0].ServerAddress()
- return TailVolumeFromSource(volumeServer, grpcDialOption, vid, sinceNs, timeoutSeconds, fn)
- }
- func TailVolumeFromSource(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error {
- return WithVolumeServerClient(true, volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- stream, err := client.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{
- VolumeId: uint32(vid),
- SinceNs: sinceNs,
- IdleTimeoutSeconds: uint32(idleTimeoutSeconds),
- })
- if err != nil {
- return err
- }
- for {
- resp, recvErr := stream.Recv()
- if recvErr != nil {
- if recvErr == io.EOF {
- break
- } else {
- return recvErr
- }
- }
- needleHeader := resp.NeedleHeader
- needleBody := resp.NeedleBody
- if len(needleHeader) == 0 {
- continue
- }
- for !resp.IsLastChunk {
- resp, recvErr = stream.Recv()
- if recvErr != nil {
- if recvErr == io.EOF {
- break
- } else {
- return recvErr
- }
- }
- needleBody = append(needleBody, resp.NeedleBody...)
- }
- n := new(needle.Needle)
- n.ParseNeedleHeader(needleHeader)
- err = n.ReadNeedleBodyBytes(needleBody, needle.CurrentVersion)
- if err != nil {
- return err
- }
- err = fn(n)
- if err != nil {
- return err
- }
- }
- return nil
- })
- }
|