volume_grpc_tail.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/operation"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/storage"
  11. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  12. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  13. )
  14. func (vs *VolumeServer) VolumeTailSender(req *volume_server_pb.VolumeTailSenderRequest, stream volume_server_pb.VolumeServer_VolumeTailSenderServer) error {
  15. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  16. if v == nil {
  17. return fmt.Errorf("not found volume id %d", req.VolumeId)
  18. }
  19. defer glog.V(1).Infof("tailing volume %d finished", v.Id)
  20. lastTimestampNs := req.SinceNs
  21. drainingSeconds := req.IdleTimeoutSeconds
  22. for {
  23. lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs)
  24. if err != nil {
  25. glog.Infof("sendNeedlesSince: %v", err)
  26. return fmt.Errorf("streamFollow: %v", err)
  27. }
  28. time.Sleep(2 * time.Second)
  29. if req.IdleTimeoutSeconds == 0 {
  30. lastTimestampNs = lastProcessedTimestampNs
  31. continue
  32. }
  33. if lastProcessedTimestampNs == lastTimestampNs {
  34. drainingSeconds--
  35. if drainingSeconds <= 0 {
  36. return nil
  37. }
  38. glog.V(1).Infof("tailing volume %d drains requests with %d seconds remaining", v.Id, drainingSeconds)
  39. } else {
  40. lastTimestampNs = lastProcessedTimestampNs
  41. drainingSeconds = req.IdleTimeoutSeconds
  42. glog.V(1).Infof("tailing volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds)
  43. }
  44. }
  45. }
  46. func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) {
  47. foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs)
  48. if err != nil {
  49. return 0, fmt.Errorf("fail to locate by appendAtNs %d: %s", lastTimestampNs, err)
  50. }
  51. // log.Printf("reading ts %d offset %d isLast %v", lastTimestampNs, foundOffset, isLastOne)
  52. if isLastOne {
  53. // need to heart beat to the client to ensure the connection health
  54. sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{IsLastChunk: true})
  55. return lastTimestampNs, sendErr
  56. }
  57. scanner := &VolumeFileScanner4Tailing{
  58. stream: stream,
  59. }
  60. err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, foundOffset.ToActualOffset(), scanner)
  61. return scanner.lastProcessedTimestampNs, err
  62. }
  63. func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_server_pb.VolumeTailReceiverRequest) (*volume_server_pb.VolumeTailReceiverResponse, error) {
  64. resp := &volume_server_pb.VolumeTailReceiverResponse{}
  65. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  66. if v == nil {
  67. return resp, fmt.Errorf("receiver not found volume id %d", req.VolumeId)
  68. }
  69. defer glog.V(1).Infof("receive tailing volume %d finished", v.Id)
  70. return resp, operation.TailVolumeFromSource(pb.ServerAddress(req.SourceVolumeServer), vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
  71. _, err := vs.store.WriteVolumeNeedle(v.Id, n, false, false)
  72. return err
  73. })
  74. }
  75. // generate the volume idx
  76. type VolumeFileScanner4Tailing struct {
  77. stream volume_server_pb.VolumeServer_VolumeTailSenderServer
  78. lastProcessedTimestampNs uint64
  79. }
  80. func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock super_block.SuperBlock) error {
  81. return nil
  82. }
  83. func (scanner *VolumeFileScanner4Tailing) ReadNeedleBody() bool {
  84. return true
  85. }
  86. func (scanner *VolumeFileScanner4Tailing) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
  87. isLastChunk := false
  88. // need to send body by chunks
  89. for i := 0; i < len(needleBody); i += BufferSizeLimit {
  90. stopOffset := i + BufferSizeLimit
  91. if stopOffset >= len(needleBody) {
  92. isLastChunk = true
  93. stopOffset = len(needleBody)
  94. }
  95. sendErr := scanner.stream.Send(&volume_server_pb.VolumeTailSenderResponse{
  96. NeedleHeader: needleHeader,
  97. NeedleBody: needleBody[i:stopOffset],
  98. IsLastChunk: isLastChunk,
  99. })
  100. if sendErr != nil {
  101. return sendErr
  102. }
  103. }
  104. scanner.lastProcessedTimestampNs = n.AppendAtNs
  105. return nil
  106. }