merged_read.go 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. package logstore
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  5. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  6. )
  7. func GenMergedReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType {
  8. fromParquetFn := GenParquetReadFunc(filerClient, t, p)
  9. readLogDirectFn := GenLogOnDiskReadFunc(filerClient, t, p)
  10. return mergeReadFuncs(fromParquetFn, readLogDirectFn)
  11. }
  12. func mergeReadFuncs(fromParquetFn, readLogDirectFn log_buffer.LogReadFromDiskFuncType) log_buffer.LogReadFromDiskFuncType {
  13. var exhaustedParquet bool
  14. var lastProcessedPosition log_buffer.MessagePosition
  15. return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
  16. if !exhaustedParquet {
  17. // glog.V(4).Infof("reading from parquet startPosition: %v\n", startPosition.UTC())
  18. lastReadPosition, isDone, err = fromParquetFn(startPosition, stopTsNs, eachLogEntryFn)
  19. // glog.V(4).Infof("read from parquet: %v %v %v %v\n", startPosition, lastReadPosition, isDone, err)
  20. if isDone {
  21. isDone = false
  22. }
  23. if err != nil {
  24. return
  25. }
  26. lastProcessedPosition = lastReadPosition
  27. }
  28. exhaustedParquet = true
  29. if startPosition.Before(lastProcessedPosition.Time) {
  30. startPosition = lastProcessedPosition
  31. }
  32. // glog.V(4).Infof("reading from direct log startPosition: %v\n", startPosition.UTC())
  33. lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn)
  34. return
  35. }
  36. }