broker_topic_partition_read_write.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package broker
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/filer"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  11. "google.golang.org/protobuf/proto"
  12. "math"
  13. "sync/atomic"
  14. "time"
  15. )
  16. func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Partition) log_buffer.LogFlushFuncType {
  17. topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
  18. partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
  19. partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop)
  20. return func(logBuffer *log_buffer.LogBuffer, startTime, stopTime time.Time, buf []byte) {
  21. if len(buf) == 0 {
  22. return
  23. }
  24. startTime, stopTime = startTime.UTC(), stopTime.UTC()
  25. targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT))
  26. // TODO append block with more metadata
  27. for {
  28. if err := b.appendToFile(targetFile, buf); err != nil {
  29. glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err)
  30. time.Sleep(737 * time.Millisecond)
  31. } else {
  32. break
  33. }
  34. }
  35. atomic.StoreInt64(&logBuffer.LastFlushTsNs, stopTime.UnixNano())
  36. b.accessLock.Lock()
  37. defer b.accessLock.Unlock()
  38. p := topic.FromPbPartition(partition)
  39. if localPartition := b.localTopicManager.GetLocalPartition(t, p); localPartition != nil {
  40. localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs)
  41. }
  42. println("flushing at", logBuffer.LastFlushTsNs, "to", targetFile, "size", len(buf))
  43. }
  44. }
  45. func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_pb.Partition) log_buffer.LogReadFromDiskFuncType {
  46. topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
  47. partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
  48. partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop)
  49. lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
  50. return b.MasterClient.LookupFileId(fileId)
  51. }
  52. eachChunkFn := func(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
  53. for pos := 0; pos+4 < len(buf); {
  54. size := util.BytesToUint32(buf[pos : pos+4])
  55. if pos+4+int(size) > len(buf) {
  56. err = fmt.Errorf("LogOnDiskReadFunc: read [%d,%d) from [0,%d)", pos, pos+int(size)+4, len(buf))
  57. return
  58. }
  59. entryData := buf[pos+4 : pos+4+int(size)]
  60. logEntry := &filer_pb.LogEntry{}
  61. if err = proto.Unmarshal(entryData, logEntry); err != nil {
  62. pos += 4 + int(size)
  63. err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
  64. return
  65. }
  66. if logEntry.TsNs < starTsNs {
  67. pos += 4 + int(size)
  68. continue
  69. }
  70. if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
  71. println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs)
  72. return
  73. }
  74. if _, err = eachLogEntryFn(logEntry); err != nil {
  75. err = fmt.Errorf("process log entry %v: %v", logEntry, err)
  76. return
  77. }
  78. processedTsNs = logEntry.TsNs
  79. pos += 4 + int(size)
  80. }
  81. return
  82. }
  83. eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
  84. if len(entry.Content) > 0 {
  85. glog.Warningf("this should not happen. unexpected content in %s/%s", partitionDir, entry.Name)
  86. return
  87. }
  88. var urlStrings []string
  89. for _, chunk := range entry.Chunks {
  90. if chunk.Size == 0 {
  91. continue
  92. }
  93. if chunk.IsChunkManifest {
  94. glog.Warningf("this should not happen. unexpected chunk manifest in %s/%s", partitionDir, entry.Name)
  95. return
  96. }
  97. urlStrings, err = lookupFileIdFn(chunk.FileId)
  98. if err != nil {
  99. err = fmt.Errorf("lookup %s: %v", chunk.FileId, err)
  100. return
  101. }
  102. if len(urlStrings) == 0 {
  103. err = fmt.Errorf("no url found for %s", chunk.FileId)
  104. return
  105. }
  106. // try one of the urlString until util.Get(urlString) succeeds
  107. var processed bool
  108. for _, urlString := range urlStrings {
  109. // TODO optimization opportunity: reuse the buffer
  110. var data []byte
  111. if data, _, err = util.Get(urlString); err == nil {
  112. processed = true
  113. if processedTsNs, err = eachChunkFn(data, eachLogEntryFn, starTsNs, stopTsNs); err != nil {
  114. return
  115. }
  116. break
  117. }
  118. }
  119. if !processed {
  120. err = fmt.Errorf("no data processed for %s %s", entry.Name, chunk.FileId)
  121. return
  122. }
  123. }
  124. return
  125. }
  126. return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
  127. startFileName := startPosition.UTC().Format(topic.TIME_FORMAT)
  128. startTsNs := startPosition.Time.UnixNano()
  129. stopTime := time.Unix(0, stopTsNs)
  130. var processedTsNs int64
  131. err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  132. return filer_pb.SeaweedList(client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error {
  133. if entry.IsDirectory {
  134. return nil
  135. }
  136. if stopTsNs != 0 && entry.Name > stopTime.UTC().Format(topic.TIME_FORMAT) {
  137. isDone = true
  138. return nil
  139. }
  140. if entry.Name < startPosition.UTC().Format(topic.TIME_FORMAT) {
  141. return nil
  142. }
  143. if processedTsNs, err = eachFileFn(entry, eachLogEntryFn, startTsNs, stopTsNs); err != nil {
  144. return err
  145. }
  146. return nil
  147. }, startFileName, true, math.MaxInt32)
  148. })
  149. lastReadPosition = log_buffer.NewMessagePosition(processedTsNs, -2)
  150. return
  151. }
  152. }