volume_grpc_tail.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/util/log"
  7. "github.com/chrislusf/seaweedfs/weed/operation"
  8. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  9. "github.com/chrislusf/seaweedfs/weed/storage"
  10. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  11. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  12. )
  13. func (vs *VolumeServer) VolumeTailSender(req *volume_server_pb.VolumeTailSenderRequest, stream volume_server_pb.VolumeServer_VolumeTailSenderServer) error {
  14. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  15. if v == nil {
  16. return fmt.Errorf("not found volume id %d", req.VolumeId)
  17. }
  18. defer log.Debugf("tailing volume %d finished", v.Id)
  19. lastTimestampNs := req.SinceNs
  20. drainingSeconds := req.IdleTimeoutSeconds
  21. for {
  22. lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs)
  23. if err != nil {
  24. log.Infof("sendNeedlesSince: %v", err)
  25. return fmt.Errorf("streamFollow: %v", err)
  26. }
  27. time.Sleep(2 * time.Second)
  28. if req.IdleTimeoutSeconds == 0 {
  29. lastTimestampNs = lastProcessedTimestampNs
  30. continue
  31. }
  32. if lastProcessedTimestampNs == lastTimestampNs {
  33. drainingSeconds--
  34. if drainingSeconds <= 0 {
  35. return nil
  36. }
  37. log.Debugf("tailing volume %d drains requests with %d seconds remaining", v.Id, drainingSeconds)
  38. } else {
  39. lastTimestampNs = lastProcessedTimestampNs
  40. drainingSeconds = req.IdleTimeoutSeconds
  41. log.Debugf("tailing volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds)
  42. }
  43. }
  44. }
  45. func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) {
  46. foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs)
  47. if err != nil {
  48. return 0, fmt.Errorf("fail to locate by appendAtNs %d: %s", lastTimestampNs, err)
  49. }
  50. // log.Printf("reading ts %d offset %d isLast %v", lastTimestampNs, foundOffset, isLastOne)
  51. if isLastOne {
  52. // need to heart beat to the client to ensure the connection health
  53. sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{IsLastChunk: true})
  54. return lastTimestampNs, sendErr
  55. }
  56. scanner := &VolumeFileScanner4Tailing{
  57. stream: stream,
  58. }
  59. err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, foundOffset.ToAcutalOffset(), scanner)
  60. return scanner.lastProcessedTimestampNs, err
  61. }
  62. func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_server_pb.VolumeTailReceiverRequest) (*volume_server_pb.VolumeTailReceiverResponse, error) {
  63. resp := &volume_server_pb.VolumeTailReceiverResponse{}
  64. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  65. if v == nil {
  66. return resp, fmt.Errorf("receiver not found volume id %d", req.VolumeId)
  67. }
  68. defer log.Debugf("receive tailing volume %d finished", v.Id)
  69. return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
  70. _, err := vs.store.WriteVolumeNeedle(v.Id, n, false)
  71. return err
  72. })
  73. }
  74. // generate the volume idx
  75. type VolumeFileScanner4Tailing struct {
  76. stream volume_server_pb.VolumeServer_VolumeTailSenderServer
  77. lastProcessedTimestampNs uint64
  78. }
  79. func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock super_block.SuperBlock) error {
  80. return nil
  81. }
  82. func (scanner *VolumeFileScanner4Tailing) ReadNeedleBody() bool {
  83. return true
  84. }
  85. func (scanner *VolumeFileScanner4Tailing) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
  86. isLastChunk := false
  87. // need to send body by chunks
  88. for i := 0; i < len(needleBody); i += BufferSizeLimit {
  89. stopOffset := i + BufferSizeLimit
  90. if stopOffset >= len(needleBody) {
  91. isLastChunk = true
  92. stopOffset = len(needleBody)
  93. }
  94. sendErr := scanner.stream.Send(&volume_server_pb.VolumeTailSenderResponse{
  95. NeedleHeader: needleHeader,
  96. NeedleBody: needleBody[i:stopOffset],
  97. IsLastChunk: isLastChunk,
  98. })
  99. if sendErr != nil {
  100. return sendErr
  101. }
  102. }
  103. scanner.lastProcessedTimestampNs = n.AppendAtNs
  104. return nil
  105. }