filer_notify_read.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. package filer
  2. import (
  3. "container/heap"
  4. "context"
  5. "fmt"
  6. "io"
  7. "math"
  8. "strings"
  9. "time"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  12. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  13. "google.golang.org/protobuf/proto"
  14. "github.com/seaweedfs/seaweedfs/weed/glog"
  15. "github.com/seaweedfs/seaweedfs/weed/util"
  16. )
  17. type LogFileEntry struct {
  18. TsNs int64
  19. FileEntry *Entry
  20. }
  21. func (f *Filer) collectPersistedLogBuffer(startPosition log_buffer.MessagePosition, stopTsNs int64) (v *OrderedLogVisitor, err error) {
  22. if stopTsNs != 0 && startPosition.Time.UnixNano() > stopTsNs {
  23. return nil, io.EOF
  24. }
  25. startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day())
  26. dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, math.MaxInt32, "", "", "")
  27. if listDayErr != nil {
  28. return nil, fmt.Errorf("fail to list log by day: %v", listDayErr)
  29. }
  30. return NewOrderedLogVisitor(f, startPosition, stopTsNs, dayEntries)
  31. }
  32. func (f *Filer) HasPersistedLogFiles(startPosition log_buffer.MessagePosition) (bool, error) {
  33. startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day())
  34. dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 1, "", "", "")
  35. if listDayErr != nil {
  36. return false, fmt.Errorf("fail to list log by day: %v", listDayErr)
  37. }
  38. if len(dayEntries) == 0 {
  39. return false, nil
  40. }
  41. return true, nil
  42. }
  43. // ----------
  44. type LogEntryItem struct {
  45. Entry *filer_pb.LogEntry
  46. filer string
  47. }
  48. // LogEntryItemPriorityQueue a priority queue for LogEntry
  49. type LogEntryItemPriorityQueue []*LogEntryItem
  50. func (pq LogEntryItemPriorityQueue) Len() int { return len(pq) }
  51. func (pq LogEntryItemPriorityQueue) Less(i, j int) bool {
  52. return pq[i].Entry.TsNs < pq[j].Entry.TsNs
  53. }
  54. func (pq LogEntryItemPriorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] }
  55. func (pq *LogEntryItemPriorityQueue) Push(x any) {
  56. item := x.(*LogEntryItem)
  57. *pq = append(*pq, item)
  58. }
  59. func (pq *LogEntryItemPriorityQueue) Pop() any {
  60. n := len(*pq)
  61. item := (*pq)[n-1]
  62. *pq = (*pq)[:n-1]
  63. return item
  64. }
  65. // ----------
  66. type OrderedLogVisitor struct {
  67. perFilerIteratorMap map[string]*LogFileQueueIterator
  68. pq *LogEntryItemPriorityQueue
  69. logFileEntryCollector *LogFileEntryCollector
  70. }
  71. func NewOrderedLogVisitor(f *Filer, startPosition log_buffer.MessagePosition, stopTsNs int64, dayEntries []*Entry) (*OrderedLogVisitor, error) {
  72. perFilerQueueMap := make(map[string]*LogFileQueueIterator)
  73. // initialize the priority queue
  74. pq := &LogEntryItemPriorityQueue{}
  75. heap.Init(pq)
  76. t := &OrderedLogVisitor{
  77. perFilerIteratorMap: perFilerQueueMap,
  78. pq: pq,
  79. logFileEntryCollector: NewLogFileEntryCollector(f, startPosition, stopTsNs, dayEntries),
  80. }
  81. if err := t.logFileEntryCollector.collectMore(t); err != nil && err != io.EOF {
  82. return nil, err
  83. }
  84. return t, nil
  85. }
  86. func (o *OrderedLogVisitor) GetNext() (logEntry *filer_pb.LogEntry, err error) {
  87. if o.pq.Len() == 0 {
  88. return nil, io.EOF
  89. }
  90. item := heap.Pop(o.pq).(*LogEntryItem)
  91. filerId := item.filer
  92. // fill the pq with the next log entry from the same filer
  93. it := o.perFilerIteratorMap[filerId]
  94. next, nextErr := it.getNext(o)
  95. if nextErr != nil {
  96. if nextErr == io.EOF {
  97. // do nothing since the filer has no more log entries
  98. } else {
  99. return nil, fmt.Errorf("failed to get next log entry: %v", nextErr)
  100. }
  101. } else {
  102. heap.Push(o.pq, &LogEntryItem{
  103. Entry: next,
  104. filer: filerId,
  105. })
  106. }
  107. return item.Entry, nil
  108. }
  109. func getFilerId(name string) string {
  110. idx := strings.LastIndex(name, ".")
  111. if idx < 0 {
  112. return ""
  113. }
  114. return name[idx+1:]
  115. }
  116. // ----------
  117. type LogFileEntryCollector struct {
  118. f *Filer
  119. startTsNs int64
  120. stopTsNs int64
  121. dayEntryQueue *util.Queue[*Entry]
  122. startDate string
  123. startHourMinute string
  124. stopDate string
  125. stopHourMinute string
  126. }
  127. func NewLogFileEntryCollector(f *Filer, startPosition log_buffer.MessagePosition, stopTsNs int64, dayEntries []*Entry) *LogFileEntryCollector {
  128. dayEntryQueue := util.NewQueue[*Entry]()
  129. for _, dayEntry := range dayEntries {
  130. dayEntryQueue.Enqueue(dayEntry)
  131. // println("enqueue day entry", dayEntry.Name())
  132. }
  133. startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day())
  134. startHourMinute := fmt.Sprintf("%02d-%02d", startPosition.Hour(), startPosition.Minute())
  135. var stopDate, stopHourMinute string
  136. if stopTsNs != 0 {
  137. stopTime := time.Unix(0, stopTsNs+24*60*60*int64(time.Nanosecond)).UTC()
  138. stopDate = fmt.Sprintf("%04d-%02d-%02d", stopTime.Year(), stopTime.Month(), stopTime.Day())
  139. stopHourMinute = fmt.Sprintf("%02d-%02d", stopTime.Hour(), stopTime.Minute())
  140. }
  141. return &LogFileEntryCollector{
  142. f: f,
  143. startTsNs: startPosition.UnixNano(),
  144. stopTsNs: stopTsNs,
  145. dayEntryQueue: dayEntryQueue,
  146. startDate: startDate,
  147. startHourMinute: startHourMinute,
  148. stopDate: stopDate,
  149. stopHourMinute: stopHourMinute,
  150. }
  151. }
  152. func (c *LogFileEntryCollector) hasMore() bool {
  153. return c.dayEntryQueue.Len() > 0
  154. }
  155. func (c *LogFileEntryCollector) collectMore(v *OrderedLogVisitor) (err error) {
  156. dayEntry := c.dayEntryQueue.Dequeue()
  157. if dayEntry == nil {
  158. return io.EOF
  159. }
  160. // println("dequeue day entry", dayEntry.Name())
  161. if c.stopDate != "" {
  162. if strings.Compare(dayEntry.Name(), c.stopDate) > 0 {
  163. return io.EOF
  164. }
  165. }
  166. hourMinuteEntries, _, listHourMinuteErr := c.f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, math.MaxInt32, "", "", "")
  167. if listHourMinuteErr != nil {
  168. return fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr)
  169. }
  170. freshFilerIds := make(map[string]string)
  171. for _, hourMinuteEntry := range hourMinuteEntries {
  172. // println("checking hh-mm", hourMinuteEntry.FullPath)
  173. hourMinute := util.FileNameBase(hourMinuteEntry.Name())
  174. if dayEntry.Name() == c.startDate {
  175. if strings.Compare(hourMinute, c.startHourMinute) < 0 {
  176. continue
  177. }
  178. }
  179. if dayEntry.Name() == c.stopDate {
  180. if strings.Compare(hourMinute, c.stopHourMinute) > 0 {
  181. break
  182. }
  183. }
  184. tsMinute := fmt.Sprintf("%s-%s", dayEntry.Name(), hourMinute)
  185. // println(" enqueue", tsMinute)
  186. t, parseErr := time.Parse("2006-01-02-15-04", tsMinute)
  187. if parseErr != nil {
  188. glog.Errorf("failed to parse %s: %v", tsMinute, parseErr)
  189. continue
  190. }
  191. filerId := getFilerId(hourMinuteEntry.Name())
  192. iter, found := v.perFilerIteratorMap[filerId]
  193. if !found {
  194. iter = newLogFileQueueIterator(c.f.MasterClient, util.NewQueue[*LogFileEntry](), c.startTsNs, c.stopTsNs)
  195. v.perFilerIteratorMap[filerId] = iter
  196. freshFilerIds[filerId] = hourMinuteEntry.Name()
  197. }
  198. iter.q.Enqueue(&LogFileEntry{
  199. TsNs: t.UnixNano(),
  200. FileEntry: hourMinuteEntry,
  201. })
  202. }
  203. // fill the pq with the next log entry if it is a new filer
  204. for filerId, entryName := range freshFilerIds {
  205. iter, found := v.perFilerIteratorMap[filerId]
  206. if !found {
  207. glog.Errorf("Unexpected! failed to find iterator for filer %s", filerId)
  208. continue
  209. }
  210. next, nextErr := iter.getNext(v)
  211. if nextErr != nil {
  212. if nextErr == io.EOF {
  213. // do nothing since the filer has no more log entries
  214. } else {
  215. return fmt.Errorf("failed to get next log entry for %v: %v", entryName, err)
  216. }
  217. } else {
  218. heap.Push(v.pq, &LogEntryItem{
  219. Entry: next,
  220. filer: filerId,
  221. })
  222. }
  223. }
  224. return nil
  225. }
  226. // ----------
  227. type LogFileQueueIterator struct {
  228. q *util.Queue[*LogFileEntry]
  229. masterClient *wdclient.MasterClient
  230. startTsNs int64
  231. stopTsNs int64
  232. currentFileIterator *LogFileIterator
  233. }
  234. func newLogFileQueueIterator(masterClient *wdclient.MasterClient, q *util.Queue[*LogFileEntry], startTsNs, stopTsNs int64) *LogFileQueueIterator {
  235. return &LogFileQueueIterator{
  236. q: q,
  237. masterClient: masterClient,
  238. startTsNs: startTsNs,
  239. stopTsNs: stopTsNs,
  240. }
  241. }
  242. // getNext will return io.EOF when done
  243. func (iter *LogFileQueueIterator) getNext(v *OrderedLogVisitor) (logEntry *filer_pb.LogEntry, err error) {
  244. for {
  245. if iter.currentFileIterator != nil {
  246. logEntry, err = iter.currentFileIterator.getNext()
  247. if err != io.EOF {
  248. return
  249. }
  250. }
  251. // now either iter.currentFileIterator is nil or err is io.EOF
  252. if iter.q.Len() == 0 {
  253. return nil, io.EOF
  254. }
  255. t := iter.q.Dequeue()
  256. if t == nil {
  257. continue
  258. }
  259. // skip the file if it is after the stopTsNs
  260. if iter.stopTsNs != 0 && t.TsNs > iter.stopTsNs {
  261. return nil, io.EOF
  262. }
  263. next := iter.q.Peek()
  264. if next == nil {
  265. if collectErr := v.logFileEntryCollector.collectMore(v); collectErr != nil && collectErr != io.EOF {
  266. return nil, collectErr
  267. }
  268. }
  269. // skip the file if the next entry is before the startTsNs
  270. if next != nil && next.TsNs <= iter.startTsNs {
  271. continue
  272. }
  273. iter.currentFileIterator = newLogFileIterator(iter.masterClient, t.FileEntry, iter.startTsNs, iter.stopTsNs)
  274. }
  275. }
  276. // ----------
  277. type LogFileIterator struct {
  278. r io.Reader
  279. sizeBuf []byte
  280. startTsNs int64
  281. stopTsNs int64
  282. }
  283. func newLogFileIterator(masterClient *wdclient.MasterClient, fileEntry *Entry, startTsNs, stopTsNs int64) *LogFileIterator {
  284. return &LogFileIterator{
  285. r: NewChunkStreamReaderFromFiler(masterClient, fileEntry.Chunks),
  286. sizeBuf: make([]byte, 4),
  287. startTsNs: startTsNs,
  288. stopTsNs: stopTsNs,
  289. }
  290. }
  291. // getNext will return io.EOF when done
  292. func (iter *LogFileIterator) getNext() (logEntry *filer_pb.LogEntry, err error) {
  293. var n int
  294. for {
  295. n, err = iter.r.Read(iter.sizeBuf)
  296. if err != nil {
  297. return
  298. }
  299. if n != 4 {
  300. return nil, fmt.Errorf("size %d bytes, expected 4 bytes", n)
  301. }
  302. size := util.BytesToUint32(iter.sizeBuf)
  303. // println("entry size", size)
  304. entryData := make([]byte, size)
  305. n, err = iter.r.Read(entryData)
  306. if err != nil {
  307. return
  308. }
  309. if n != int(size) {
  310. return nil, fmt.Errorf("entry data %d bytes, expected %d bytes", n, size)
  311. }
  312. logEntry = &filer_pb.LogEntry{}
  313. if err = proto.Unmarshal(entryData, logEntry); err != nil {
  314. return
  315. }
  316. if logEntry.TsNs <= iter.startTsNs {
  317. continue
  318. }
  319. if iter.stopTsNs != 0 && logEntry.TsNs > iter.stopTsNs {
  320. return nil, io.EOF
  321. }
  322. return
  323. }
  324. }