volume_grpc_copy_incremental.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/storage/backend"
  8. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  9. )
  10. func (vs *VolumeServer) VolumeIncrementalCopy(req *volume_server_pb.VolumeIncrementalCopyRequest, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error {
  11. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  12. if v == nil {
  13. return fmt.Errorf("not found volume id %d", req.VolumeId)
  14. }
  15. stopOffset, _, _ := v.FileStat()
  16. foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(req.SinceNs)
  17. if err != nil {
  18. return fmt.Errorf("fail to locate by appendAtNs %d: %s", req.SinceNs, err)
  19. }
  20. if isLastOne {
  21. return nil
  22. }
  23. startOffset := foundOffset.ToActualOffset()
  24. buf := make([]byte, 1024*1024*2)
  25. return sendFileContent(v.DataBackend, buf, startOffset, int64(stopOffset), stream)
  26. }
  27. func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server_pb.VolumeSyncStatusRequest) (*volume_server_pb.VolumeSyncStatusResponse, error) {
  28. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  29. if v == nil {
  30. return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
  31. }
  32. resp := v.GetVolumeSyncStatus()
  33. return resp, nil
  34. }
  35. func sendFileContent(datBackend backend.BackendStorageFile, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error {
  36. var blockSizeLimit = int64(len(buf))
  37. for i := int64(0); i < stopOffset-startOffset; i += blockSizeLimit {
  38. n, readErr := datBackend.ReadAt(buf, startOffset+i)
  39. if readErr == nil || readErr == io.EOF {
  40. resp := &volume_server_pb.VolumeIncrementalCopyResponse{}
  41. resp.FileContent = buf[:int64(n)]
  42. sendErr := stream.Send(resp)
  43. if sendErr != nil {
  44. return sendErr
  45. }
  46. } else {
  47. return readErr
  48. }
  49. }
  50. return nil
  51. }