log_read.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package log_buffer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "time"
  6. "google.golang.org/protobuf/proto"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. )
  11. var (
  12. ResumeError = fmt.Errorf("resume")
  13. ResumeFromDiskError = fmt.Errorf("resumeFromDisk")
  14. )
  15. func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startReadTime time.Time, stopTsNs int64,
  16. waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, isDone bool, err error) {
  17. // loop through all messages
  18. var bytesBuf *bytes.Buffer
  19. lastReadTime = startReadTime
  20. defer func() {
  21. if bytesBuf != nil {
  22. logBuffer.ReleaseMemory(bytesBuf)
  23. }
  24. }()
  25. for {
  26. if bytesBuf != nil {
  27. logBuffer.ReleaseMemory(bytesBuf)
  28. }
  29. bytesBuf, err = logBuffer.ReadFromBuffer(lastReadTime)
  30. if err == ResumeFromDiskError {
  31. time.Sleep(1127 * time.Millisecond)
  32. return lastReadTime, isDone, ResumeFromDiskError
  33. }
  34. // glog.V(4).Infof("%s ReadFromBuffer by %v", readerName, lastReadTime)
  35. if bytesBuf == nil {
  36. if stopTsNs != 0 {
  37. isDone = true
  38. return
  39. }
  40. if waitForDataFn() {
  41. continue
  42. } else {
  43. return
  44. }
  45. }
  46. buf := bytesBuf.Bytes()
  47. // fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadTime, len(buf))
  48. batchSize := 0
  49. for pos := 0; pos+4 < len(buf); {
  50. size := util.BytesToUint32(buf[pos : pos+4])
  51. if pos+4+int(size) > len(buf) {
  52. err = ResumeError
  53. glog.Errorf("LoopProcessLogData: %s read buffer %v read %d [%d,%d) from [0,%d)", readerName, lastReadTime, batchSize, pos, pos+int(size)+4, len(buf))
  54. return
  55. }
  56. entryData := buf[pos+4 : pos+4+int(size)]
  57. logEntry := &filer_pb.LogEntry{}
  58. if err = proto.Unmarshal(entryData, logEntry); err != nil {
  59. glog.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
  60. pos += 4 + int(size)
  61. continue
  62. }
  63. if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
  64. isDone = true
  65. return
  66. }
  67. lastReadTime = time.Unix(0, logEntry.TsNs)
  68. if err = eachLogDataFn(logEntry); err != nil {
  69. return
  70. }
  71. pos += 4 + int(size)
  72. batchSize++
  73. }
  74. // glog.V(4).Infof("%s sent messages ts[%+v,%+v] size %d\n", readerName, startReadTime, lastReadTime, batchSize)
  75. }
  76. }