package filer import ( "container/heap" "context" "fmt" "io" "math" "strings" "time" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "github.com/seaweedfs/seaweedfs/weed/wdclient" "google.golang.org/protobuf/proto" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/util" ) type LogFileEntry struct { TsNs int64 FileEntry *Entry } func (f *Filer) collectPersistedLogBuffer(startPosition log_buffer.MessagePosition, stopTsNs int64) (v *OrderedLogVisitor, err error) { if stopTsNs != 0 && startPosition.Time.UnixNano() > stopTsNs { return nil, io.EOF } startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day()) dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, math.MaxInt32, "", "", "") if listDayErr != nil { return nil, fmt.Errorf("fail to list log by day: %v", listDayErr) } return NewOrderedLogVisitor(f, startPosition, stopTsNs, dayEntries) } func (f *Filer) HasPersistedLogFiles(startPosition log_buffer.MessagePosition) (bool, error) { startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day()) dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 1, "", "", "") if listDayErr != nil { return false, fmt.Errorf("fail to list log by day: %v", listDayErr) } if len(dayEntries) == 0 { return false, nil } return true, nil } // ---------- type LogEntryItem struct { Entry *filer_pb.LogEntry filer string } // LogEntryItemPriorityQueue a priority queue for LogEntry type LogEntryItemPriorityQueue []*LogEntryItem func (pq LogEntryItemPriorityQueue) Len() int { return len(pq) } func (pq LogEntryItemPriorityQueue) Less(i, j int) bool { return pq[i].Entry.TsNs < pq[j].Entry.TsNs } func (pq LogEntryItemPriorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] } func (pq *LogEntryItemPriorityQueue) Push(x any) { item := x.(*LogEntryItem) *pq = append(*pq, item) } func (pq *LogEntryItemPriorityQueue) Pop() any { n := len(*pq) item := (*pq)[n-1] *pq = (*pq)[:n-1] return item } // ---------- type OrderedLogVisitor struct { perFilerIteratorMap map[string]*LogFileQueueIterator pq *LogEntryItemPriorityQueue logFileEntryCollector *LogFileEntryCollector } func NewOrderedLogVisitor(f *Filer, startPosition log_buffer.MessagePosition, stopTsNs int64, dayEntries []*Entry) (*OrderedLogVisitor, error) { perFilerQueueMap := make(map[string]*LogFileQueueIterator) // initialize the priority queue pq := &LogEntryItemPriorityQueue{} heap.Init(pq) t := &OrderedLogVisitor{ perFilerIteratorMap: perFilerQueueMap, pq: pq, logFileEntryCollector: NewLogFileEntryCollector(f, startPosition, stopTsNs, dayEntries), } if err := t.logFileEntryCollector.collectMore(t); err != nil && err != io.EOF { return nil, err } return t, nil } func (o *OrderedLogVisitor) GetNext() (logEntry *filer_pb.LogEntry, err error) { if o.pq.Len() == 0 { return nil, io.EOF } item := heap.Pop(o.pq).(*LogEntryItem) filerId := item.filer // fill the pq with the next log entry from the same filer it := o.perFilerIteratorMap[filerId] next, nextErr := it.getNext(o) if nextErr != nil { if nextErr == io.EOF { // do nothing since the filer has no more log entries } else { return nil, fmt.Errorf("failed to get next log entry: %v", nextErr) } } else { heap.Push(o.pq, &LogEntryItem{ Entry: next, filer: filerId, }) } return item.Entry, nil } func getFilerId(name string) string { idx := strings.LastIndex(name, ".") if idx < 0 { return "" } return name[idx+1:] } // ---------- type LogFileEntryCollector struct { f *Filer startTsNs int64 stopTsNs int64 dayEntryQueue *util.Queue[*Entry] startDate string startHourMinute string stopDate string stopHourMinute string } func NewLogFileEntryCollector(f *Filer, startPosition log_buffer.MessagePosition, stopTsNs int64, dayEntries []*Entry) *LogFileEntryCollector { dayEntryQueue := util.NewQueue[*Entry]() for _, dayEntry := range dayEntries { dayEntryQueue.Enqueue(dayEntry) // println("enqueue day entry", dayEntry.Name()) } startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day()) startHourMinute := fmt.Sprintf("%02d-%02d", startPosition.Hour(), startPosition.Minute()) var stopDate, stopHourMinute string if stopTsNs != 0 { stopTime := time.Unix(0, stopTsNs+24*60*60*int64(time.Nanosecond)).UTC() stopDate = fmt.Sprintf("%04d-%02d-%02d", stopTime.Year(), stopTime.Month(), stopTime.Day()) stopHourMinute = fmt.Sprintf("%02d-%02d", stopTime.Hour(), stopTime.Minute()) } return &LogFileEntryCollector{ f: f, startTsNs: startPosition.UnixNano(), stopTsNs: stopTsNs, dayEntryQueue: dayEntryQueue, startDate: startDate, startHourMinute: startHourMinute, stopDate: stopDate, stopHourMinute: stopHourMinute, } } func (c *LogFileEntryCollector) hasMore() bool { return c.dayEntryQueue.Len() > 0 } func (c *LogFileEntryCollector) collectMore(v *OrderedLogVisitor) (err error) { dayEntry := c.dayEntryQueue.Dequeue() if dayEntry == nil { return io.EOF } // println("dequeue day entry", dayEntry.Name()) if c.stopDate != "" { if strings.Compare(dayEntry.Name(), c.stopDate) > 0 { return io.EOF } } hourMinuteEntries, _, listHourMinuteErr := c.f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, math.MaxInt32, "", "", "") if listHourMinuteErr != nil { return fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr) } freshFilerIds := make(map[string]string) for _, hourMinuteEntry := range hourMinuteEntries { // println("checking hh-mm", hourMinuteEntry.FullPath) hourMinute := util.FileNameBase(hourMinuteEntry.Name()) if dayEntry.Name() == c.startDate { if strings.Compare(hourMinute, c.startHourMinute) < 0 { continue } } if dayEntry.Name() == c.stopDate { if strings.Compare(hourMinute, c.stopHourMinute) > 0 { break } } tsMinute := fmt.Sprintf("%s-%s", dayEntry.Name(), hourMinute) // println(" enqueue", tsMinute) t, parseErr := time.Parse("2006-01-02-15-04", tsMinute) if parseErr != nil { glog.Errorf("failed to parse %s: %v", tsMinute, parseErr) continue } filerId := getFilerId(hourMinuteEntry.Name()) iter, found := v.perFilerIteratorMap[filerId] if !found { iter = newLogFileQueueIterator(c.f.MasterClient, util.NewQueue[*LogFileEntry](), c.startTsNs, c.stopTsNs) v.perFilerIteratorMap[filerId] = iter freshFilerIds[filerId] = hourMinuteEntry.Name() } iter.q.Enqueue(&LogFileEntry{ TsNs: t.UnixNano(), FileEntry: hourMinuteEntry, }) } // fill the pq with the next log entry if it is a new filer for filerId, entryName := range freshFilerIds { iter, found := v.perFilerIteratorMap[filerId] if !found { glog.Errorf("Unexpected! failed to find iterator for filer %s", filerId) continue } next, nextErr := iter.getNext(v) if nextErr != nil { if nextErr == io.EOF { // do nothing since the filer has no more log entries } else { return fmt.Errorf("failed to get next log entry for %v: %v", entryName, err) } } else { heap.Push(v.pq, &LogEntryItem{ Entry: next, filer: filerId, }) } } return nil } // ---------- type LogFileQueueIterator struct { q *util.Queue[*LogFileEntry] masterClient *wdclient.MasterClient startTsNs int64 stopTsNs int64 currentFileIterator *LogFileIterator } func newLogFileQueueIterator(masterClient *wdclient.MasterClient, q *util.Queue[*LogFileEntry], startTsNs, stopTsNs int64) *LogFileQueueIterator { return &LogFileQueueIterator{ q: q, masterClient: masterClient, startTsNs: startTsNs, stopTsNs: stopTsNs, } } // getNext will return io.EOF when done func (iter *LogFileQueueIterator) getNext(v *OrderedLogVisitor) (logEntry *filer_pb.LogEntry, err error) { for { if iter.currentFileIterator != nil { logEntry, err = iter.currentFileIterator.getNext() if err != io.EOF { return } } // now either iter.currentFileIterator is nil or err is io.EOF if iter.q.Len() == 0 { return nil, io.EOF } t := iter.q.Dequeue() if t == nil { continue } // skip the file if it is after the stopTsNs if iter.stopTsNs != 0 && t.TsNs > iter.stopTsNs { return nil, io.EOF } next := iter.q.Peek() if next == nil { if collectErr := v.logFileEntryCollector.collectMore(v); collectErr != nil && collectErr != io.EOF { return nil, collectErr } } // skip the file if the next entry is before the startTsNs if next != nil && next.TsNs <= iter.startTsNs { continue } iter.currentFileIterator = newLogFileIterator(iter.masterClient, t.FileEntry, iter.startTsNs, iter.stopTsNs) } } // ---------- type LogFileIterator struct { r io.Reader sizeBuf []byte startTsNs int64 stopTsNs int64 } func newLogFileIterator(masterClient *wdclient.MasterClient, fileEntry *Entry, startTsNs, stopTsNs int64) *LogFileIterator { return &LogFileIterator{ r: NewChunkStreamReaderFromFiler(masterClient, fileEntry.Chunks), sizeBuf: make([]byte, 4), startTsNs: startTsNs, stopTsNs: stopTsNs, } } // getNext will return io.EOF when done func (iter *LogFileIterator) getNext() (logEntry *filer_pb.LogEntry, err error) { var n int for { n, err = iter.r.Read(iter.sizeBuf) if err != nil { return } if n != 4 { return nil, fmt.Errorf("size %d bytes, expected 4 bytes", n) } size := util.BytesToUint32(iter.sizeBuf) // println("entry size", size) entryData := make([]byte, size) n, err = iter.r.Read(entryData) if err != nil { return } if n != int(size) { return nil, fmt.Errorf("entry data %d bytes, expected %d bytes", n, size) } logEntry = &filer_pb.LogEntry{} if err = proto.Unmarshal(entryData, logEntry); err != nil { return } if logEntry.TsNs <= iter.startTsNs { continue } if iter.stopTsNs != 0 && logEntry.TsNs > iter.stopTsNs { return nil, io.EOF } return } }