read_parquet_to_log.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package logstore
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "github.com/parquet-go/parquet-go"
  6. "github.com/seaweedfs/seaweedfs/weed/filer"
  7. "github.com/seaweedfs/seaweedfs/weed/mq/schema"
  8. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  12. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  13. "google.golang.org/protobuf/proto"
  14. "io"
  15. "math"
  16. "strings"
  17. )
  18. var (
  19. chunkCache = chunk_cache.NewChunkCacheInMemory(256) // 256 entries, 8MB max per entry
  20. )
  21. func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType {
  22. partitionDir := topic.PartitionDir(t, p)
  23. lookupFileIdFn := filer.LookupFn(filerClient)
  24. // read topic conf from filer
  25. var topicConf *mq_pb.ConfigureTopicResponse
  26. var err error
  27. if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  28. topicConf, err = t.ReadConfFile(client)
  29. return err
  30. }); err != nil {
  31. return nil
  32. }
  33. recordType := topicConf.GetRecordType()
  34. recordType = schema.NewRecordTypeBuilder(recordType).
  35. WithField(SW_COLUMN_NAME_TS, schema.TypeInt64).
  36. WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
  37. RecordTypeEnd()
  38. parquetSchema, err := schema.ToParquetSchema(t.Name, recordType)
  39. if err != nil {
  40. return nil
  41. }
  42. parquetLevels, err := schema.ToParquetLevels(recordType)
  43. if err != nil {
  44. return nil
  45. }
  46. // eachFileFn reads a parquet file and calls eachLogEntryFn for each log entry
  47. eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
  48. // create readerAt for the parquet file
  49. fileSize := filer.FileSize(entry)
  50. visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
  51. chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
  52. readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn)
  53. readerAt := filer.NewChunkReaderAtFromClient(readerCache, chunkViews, int64(fileSize))
  54. // create parquet reader
  55. parquetReader := parquet.NewReader(readerAt, parquetSchema)
  56. rows := make([]parquet.Row, 128)
  57. for {
  58. rowCount, readErr := parquetReader.ReadRows(rows)
  59. for i := 0; i < rowCount; i++ {
  60. row := rows[i]
  61. // convert parquet row to schema_pb.RecordValue
  62. recordValue, err := schema.ToRecordValue(recordType, parquetLevels, row)
  63. if err != nil {
  64. return processedTsNs, fmt.Errorf("ToRecordValue failed: %v", err)
  65. }
  66. processedTsNs = recordValue.Fields[SW_COLUMN_NAME_TS].GetInt64Value()
  67. if processedTsNs < starTsNs {
  68. continue
  69. }
  70. if stopTsNs != 0 && processedTsNs >= stopTsNs {
  71. return processedTsNs, nil
  72. }
  73. data, marshalErr := proto.Marshal(recordValue)
  74. if marshalErr != nil {
  75. return processedTsNs, fmt.Errorf("marshal record value: %v", marshalErr)
  76. }
  77. logEntry := &filer_pb.LogEntry{
  78. Key: recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue(),
  79. TsNs: processedTsNs,
  80. Data: data,
  81. }
  82. // fmt.Printf(" parquet entry %s ts %v\n", string(logEntry.Key), time.Unix(0, logEntry.TsNs).UTC())
  83. if _, err = eachLogEntryFn(logEntry); err != nil {
  84. return processedTsNs, fmt.Errorf("process log entry %v: %v", logEntry, err)
  85. }
  86. }
  87. if readErr != nil {
  88. if readErr == io.EOF {
  89. return processedTsNs, nil
  90. }
  91. return processedTsNs, readErr
  92. }
  93. }
  94. return
  95. }
  96. return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
  97. startFileName := startPosition.UTC().Format(topic.TIME_FORMAT)
  98. startTsNs := startPosition.Time.UnixNano()
  99. var processedTsNs int64
  100. err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  101. return filer_pb.SeaweedList(client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error {
  102. if entry.IsDirectory {
  103. return nil
  104. }
  105. if !strings.HasSuffix(entry.Name, ".parquet") {
  106. return nil
  107. }
  108. if len(entry.Extended) == 0 {
  109. return nil
  110. }
  111. // read minTs from the parquet file
  112. minTsBytes := entry.Extended["min"]
  113. if len(minTsBytes) != 8 {
  114. return nil
  115. }
  116. minTsNs := int64(binary.BigEndian.Uint64(minTsBytes))
  117. // read max ts
  118. maxTsBytes := entry.Extended["max"]
  119. if len(maxTsBytes) != 8 {
  120. return nil
  121. }
  122. maxTsNs := int64(binary.BigEndian.Uint64(maxTsBytes))
  123. if stopTsNs != 0 && stopTsNs <= minTsNs {
  124. isDone = true
  125. return nil
  126. }
  127. if maxTsNs < startTsNs {
  128. return nil
  129. }
  130. if processedTsNs, err = eachFileFn(entry, eachLogEntryFn, startTsNs, stopTsNs); err != nil {
  131. return err
  132. }
  133. return nil
  134. }, startFileName, true, math.MaxInt32)
  135. })
  136. lastReadPosition = log_buffer.NewMessagePosition(processedTsNs, -2)
  137. return
  138. }
  139. }