filer_notify_read.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. package filer
  2. import (
  3. "container/heap"
  4. "context"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  8. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  9. "google.golang.org/protobuf/proto"
  10. "io"
  11. "math"
  12. "strings"
  13. "time"
  14. "github.com/seaweedfs/seaweedfs/weed/glog"
  15. "github.com/seaweedfs/seaweedfs/weed/util"
  16. )
  17. type LogFileEntry struct {
  18. TsNs int64
  19. FileEntry *Entry
  20. }
  21. func (f *Filer) collectPersistedLogBuffer(startPosition log_buffer.MessagePosition, stopTsNs int64) (v *OrderedLogVisitor, err error) {
  22. if stopTsNs != 0 && startPosition.Time.UnixNano() > stopTsNs {
  23. return nil, io.EOF
  24. }
  25. startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day())
  26. dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, math.MaxInt32, "", "", "")
  27. if listDayErr != nil {
  28. return nil, fmt.Errorf("fail to list log by day: %v", listDayErr)
  29. }
  30. return NewOrderedLogVisitor(f, startPosition, stopTsNs, dayEntries)
  31. }
  32. // ----------
  33. type LogEntryItem struct {
  34. Entry *filer_pb.LogEntry
  35. filer string
  36. }
  37. // LogEntryItemPriorityQueue a priority queue for LogEntry
  38. type LogEntryItemPriorityQueue []*LogEntryItem
  39. func (pq LogEntryItemPriorityQueue) Len() int { return len(pq) }
  40. func (pq LogEntryItemPriorityQueue) Less(i, j int) bool {
  41. return pq[i].Entry.TsNs < pq[j].Entry.TsNs
  42. }
  43. func (pq LogEntryItemPriorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] }
  44. func (pq *LogEntryItemPriorityQueue) Push(x any) {
  45. item := x.(*LogEntryItem)
  46. *pq = append(*pq, item)
  47. }
  48. func (pq *LogEntryItemPriorityQueue) Pop() any {
  49. n := len(*pq)
  50. item := (*pq)[n-1]
  51. *pq = (*pq)[:n-1]
  52. return item
  53. }
  54. // ----------
  55. type OrderedLogVisitor struct {
  56. perFilerIteratorMap map[string]*LogFileQueueIterator
  57. pq *LogEntryItemPriorityQueue
  58. logFileEntryCollector *LogFileEntryCollector
  59. }
  60. func NewOrderedLogVisitor(f *Filer, startPosition log_buffer.MessagePosition, stopTsNs int64, dayEntries []*Entry) (*OrderedLogVisitor, error) {
  61. perFilerQueueMap := make(map[string]*LogFileQueueIterator)
  62. // initialize the priority queue
  63. pq := &LogEntryItemPriorityQueue{}
  64. heap.Init(pq)
  65. t := &OrderedLogVisitor{
  66. perFilerIteratorMap: perFilerQueueMap,
  67. pq: pq,
  68. logFileEntryCollector: NewLogFileEntryCollector(f, startPosition, stopTsNs, dayEntries),
  69. }
  70. if err := t.logFileEntryCollector.collectMore(t); err != nil && err != io.EOF {
  71. return nil, err
  72. }
  73. return t, nil
  74. }
  75. func (o *OrderedLogVisitor) GetNext() (logEntry *filer_pb.LogEntry, err error) {
  76. if o.pq.Len() == 0 {
  77. return nil, io.EOF
  78. }
  79. item := heap.Pop(o.pq).(*LogEntryItem)
  80. filerId := item.filer
  81. // fill the pq with the next log entry from the same filer
  82. it := o.perFilerIteratorMap[filerId]
  83. next, nextErr := it.getNext(o)
  84. if nextErr != nil {
  85. if nextErr == io.EOF {
  86. // do nothing since the filer has no more log entries
  87. }else {
  88. return nil, fmt.Errorf("failed to get next log entry: %v", nextErr)
  89. }
  90. } else {
  91. heap.Push(o.pq, &LogEntryItem{
  92. Entry: next,
  93. filer: filerId,
  94. })
  95. }
  96. return item.Entry, nil
  97. }
  98. func getFilerId(name string) string {
  99. idx := strings.LastIndex(name, ".")
  100. if idx < 0 {
  101. return ""
  102. }
  103. return name[idx+1:]
  104. }
  105. // ----------
  106. type LogFileEntryCollector struct {
  107. f *Filer
  108. startTsNs int64
  109. stopTsNs int64
  110. dayEntryQueue *util.Queue[*Entry]
  111. startDate string
  112. startHourMinute string
  113. stopDate string
  114. stopHourMinute string
  115. }
  116. func NewLogFileEntryCollector(f *Filer, startPosition log_buffer.MessagePosition, stopTsNs int64, dayEntries []*Entry) *LogFileEntryCollector {
  117. dayEntryQueue := util.NewQueue[*Entry]()
  118. for _, dayEntry := range dayEntries {
  119. dayEntryQueue.Enqueue(dayEntry)
  120. // println("enqueue day entry", dayEntry.Name())
  121. }
  122. startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day())
  123. startHourMinute := fmt.Sprintf("%02d-%02d", startPosition.Hour(), startPosition.Minute())
  124. var stopDate, stopHourMinute string
  125. if stopTsNs != 0 {
  126. stopTime := time.Unix(0, stopTsNs+24*60*60*int64(time.Nanosecond)).UTC()
  127. stopDate = fmt.Sprintf("%04d-%02d-%02d", stopTime.Year(), stopTime.Month(), stopTime.Day())
  128. stopHourMinute = fmt.Sprintf("%02d-%02d", stopTime.Hour(), stopTime.Minute())
  129. }
  130. return &LogFileEntryCollector{
  131. f: f,
  132. startTsNs: startPosition.UnixNano(),
  133. stopTsNs: stopTsNs,
  134. dayEntryQueue: dayEntryQueue,
  135. startDate: startDate,
  136. startHourMinute: startHourMinute,
  137. stopDate: stopDate,
  138. stopHourMinute: stopHourMinute,
  139. }
  140. }
  141. func (c *LogFileEntryCollector) hasMore() bool {
  142. return c.dayEntryQueue.Len() > 0
  143. }
  144. func (c *LogFileEntryCollector) collectMore(v *OrderedLogVisitor) (err error) {
  145. dayEntry := c.dayEntryQueue.Dequeue()
  146. if dayEntry == nil {
  147. return io.EOF
  148. }
  149. // println("dequeue day entry", dayEntry.Name())
  150. if c.stopDate != "" {
  151. if strings.Compare(dayEntry.Name(), c.stopDate) > 0 {
  152. return io.EOF
  153. }
  154. }
  155. hourMinuteEntries, _, listHourMinuteErr := c.f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, math.MaxInt32, "", "", "")
  156. if listHourMinuteErr != nil {
  157. return fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr)
  158. }
  159. freshFilerIds := make(map[string]string)
  160. for _, hourMinuteEntry := range hourMinuteEntries {
  161. // println("checking hh-mm", hourMinuteEntry.FullPath)
  162. hourMinute := util.FileNameBase(hourMinuteEntry.Name())
  163. if dayEntry.Name() == c.startDate {
  164. if strings.Compare(hourMinute, c.startHourMinute) < 0 {
  165. continue
  166. }
  167. }
  168. if dayEntry.Name() == c.stopDate {
  169. if strings.Compare(hourMinute, c.stopHourMinute) > 0 {
  170. break
  171. }
  172. }
  173. tsMinute := fmt.Sprintf("%s-%s", dayEntry.Name(), hourMinute)
  174. // println(" enqueue", tsMinute)
  175. t, parseErr := time.Parse("2006-01-02-15-04", tsMinute)
  176. if parseErr != nil {
  177. glog.Errorf("failed to parse %s: %v", tsMinute, parseErr)
  178. continue
  179. }
  180. filerId := getFilerId(hourMinuteEntry.Name())
  181. iter, found := v.perFilerIteratorMap[filerId]
  182. if !found {
  183. iter = newLogFileQueueIterator(c.f.MasterClient, util.NewQueue[*LogFileEntry](), c.startTsNs, c.stopTsNs)
  184. v.perFilerIteratorMap[filerId] = iter
  185. freshFilerIds[filerId] = hourMinuteEntry.Name()
  186. }
  187. iter.q.Enqueue(&LogFileEntry{
  188. TsNs: t.UnixNano(),
  189. FileEntry: hourMinuteEntry,
  190. })
  191. }
  192. // fill the pq with the next log entry if it is a new filer
  193. for filerId, entryName := range freshFilerIds {
  194. iter, found := v.perFilerIteratorMap[filerId]
  195. if !found {
  196. glog.Errorf("Unexpected! failed to find iterator for filer %s", filerId)
  197. continue
  198. }
  199. next, nextErr := iter.getNext(v)
  200. if nextErr != nil {
  201. if nextErr == io.EOF {
  202. // do nothing since the filer has no more log entries
  203. }else {
  204. return fmt.Errorf("failed to get next log entry for %v: %v", entryName, err)
  205. }
  206. } else {
  207. heap.Push(v.pq, &LogEntryItem{
  208. Entry: next,
  209. filer: filerId,
  210. })
  211. }
  212. }
  213. return nil
  214. }
  215. // ----------
  216. type LogFileQueueIterator struct {
  217. q *util.Queue[*LogFileEntry]
  218. masterClient *wdclient.MasterClient
  219. startTsNs int64
  220. stopTsNs int64
  221. currentFileIterator *LogFileIterator
  222. }
  223. func newLogFileQueueIterator(masterClient *wdclient.MasterClient, q *util.Queue[*LogFileEntry], startTsNs, stopTsNs int64) *LogFileQueueIterator {
  224. return &LogFileQueueIterator{
  225. q: q,
  226. masterClient: masterClient,
  227. startTsNs: startTsNs,
  228. stopTsNs: stopTsNs,
  229. }
  230. }
  231. // getNext will return io.EOF when done
  232. func (iter *LogFileQueueIterator) getNext(v *OrderedLogVisitor) (logEntry *filer_pb.LogEntry, err error) {
  233. for {
  234. if iter.currentFileIterator != nil {
  235. logEntry, err = iter.currentFileIterator.getNext()
  236. if err != io.EOF {
  237. return
  238. }
  239. }
  240. // now either iter.currentFileIterator is nil or err is io.EOF
  241. if iter.q.Len() == 0 {
  242. return nil, io.EOF
  243. }
  244. t := iter.q.Dequeue()
  245. if t == nil {
  246. continue
  247. }
  248. // skip the file if it is after the stopTsNs
  249. if iter.stopTsNs != 0 && t.TsNs > iter.stopTsNs {
  250. return nil, io.EOF
  251. }
  252. next := iter.q.Peek()
  253. if next == nil {
  254. if collectErr := v.logFileEntryCollector.collectMore(v); collectErr != nil && collectErr != io.EOF {
  255. return nil, collectErr
  256. }
  257. }
  258. // skip the file if the next entry is before the startTsNs
  259. if next != nil && next.TsNs <= iter.startTsNs {
  260. continue
  261. }
  262. iter.currentFileIterator = newLogFileIterator(iter.masterClient, t.FileEntry, iter.startTsNs, iter.stopTsNs)
  263. }
  264. }
  265. // ----------
  266. type LogFileIterator struct {
  267. r io.Reader
  268. sizeBuf []byte
  269. startTsNs int64
  270. stopTsNs int64
  271. }
  272. func newLogFileIterator(masterClient *wdclient.MasterClient, fileEntry *Entry, startTsNs, stopTsNs int64) *LogFileIterator {
  273. return &LogFileIterator{
  274. r: NewChunkStreamReaderFromFiler(masterClient, fileEntry.Chunks),
  275. sizeBuf: make([]byte, 4),
  276. startTsNs: startTsNs,
  277. stopTsNs: stopTsNs,
  278. }
  279. }
  280. // getNext will return io.EOF when done
  281. func (iter *LogFileIterator) getNext() (logEntry *filer_pb.LogEntry, err error) {
  282. var n int
  283. for {
  284. n, err = iter.r.Read(iter.sizeBuf)
  285. if err != nil {
  286. return
  287. }
  288. if n != 4 {
  289. return nil, fmt.Errorf("size %d bytes, expected 4 bytes", n)
  290. }
  291. size := util.BytesToUint32(iter.sizeBuf)
  292. // println("entry size", size)
  293. entryData := make([]byte, size)
  294. n, err = iter.r.Read(entryData)
  295. if err != nil {
  296. return
  297. }
  298. if n != int(size) {
  299. return nil, fmt.Errorf("entry data %d bytes, expected %d bytes", n, size)
  300. }
  301. logEntry = &filer_pb.LogEntry{}
  302. if err = proto.Unmarshal(entryData, logEntry); err != nil {
  303. return
  304. }
  305. if logEntry.TsNs <= iter.startTsNs {
  306. continue
  307. }
  308. if iter.stopTsNs != 0 && logEntry.TsNs > iter.stopTsNs {
  309. return nil, io.EOF
  310. }
  311. return
  312. }
  313. }