log_to_parquet.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. package logstore
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "github.com/parquet-go/parquet-go"
  6. "github.com/parquet-go/parquet-go/compress/zstd"
  7. "github.com/seaweedfs/seaweedfs/weed/filer"
  8. "github.com/seaweedfs/seaweedfs/weed/mq/schema"
  9. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  10. "github.com/seaweedfs/seaweedfs/weed/operation"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/util"
  14. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  15. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  16. "google.golang.org/protobuf/proto"
  17. "io"
  18. "os"
  19. "strings"
  20. "time"
  21. )
  22. const (
  23. SW_COLUMN_NAME_TS = "_ts_ns"
  24. SW_COLUMN_NAME_KEY = "_key"
  25. )
  26. func CompactTopicPartitions(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration, recordType *schema_pb.RecordType, preference *operation.StoragePreference) error {
  27. // list the topic partition versions
  28. topicVersions, err := collectTopicVersions(filerClient, t, timeAgo)
  29. if err != nil {
  30. return fmt.Errorf("list topic files: %v", err)
  31. }
  32. // compact the partitions
  33. for _, topicVersion := range topicVersions {
  34. partitions, err := collectTopicVersionsPartitions(filerClient, t, topicVersion)
  35. if err != nil {
  36. return fmt.Errorf("list partitions %s/%s/%s: %v", t.Namespace, t.Name, topicVersion, err)
  37. }
  38. for _, partition := range partitions {
  39. err := compactTopicPartition(filerClient, t, timeAgo, recordType, partition, preference)
  40. if err != nil {
  41. return fmt.Errorf("compact partition %s/%s/%s/%s: %v", t.Namespace, t.Name, topicVersion, partition, err)
  42. }
  43. }
  44. }
  45. return nil
  46. }
  47. func collectTopicVersions(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration) (partitionVersions []time.Time, err error) {
  48. err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(t.Dir()), "", func(entry *filer_pb.Entry, isLast bool) error {
  49. t, err := topic.ParseTopicVersion(entry.Name)
  50. if err != nil {
  51. // skip non-partition directories
  52. return nil
  53. }
  54. if t.Unix() < time.Now().Unix()-int64(timeAgo/time.Second) {
  55. partitionVersions = append(partitionVersions, t)
  56. }
  57. return nil
  58. })
  59. return
  60. }
  61. func collectTopicVersionsPartitions(filerClient filer_pb.FilerClient, t topic.Topic, topicVersion time.Time) (partitions []topic.Partition, err error) {
  62. version := topicVersion.Format(topic.PartitionGenerationFormat)
  63. err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(t.Dir()).Child(version), "", func(entry *filer_pb.Entry, isLast bool) error {
  64. if !entry.IsDirectory {
  65. return nil
  66. }
  67. start, stop := topic.ParsePartitionBoundary(entry.Name)
  68. if start != stop {
  69. partitions = append(partitions, topic.Partition{
  70. RangeStart: start,
  71. RangeStop: stop,
  72. RingSize: topic.PartitionCount,
  73. UnixTimeNs: topicVersion.UnixNano(),
  74. })
  75. }
  76. return nil
  77. })
  78. return
  79. }
  80. func compactTopicPartition(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration, recordType *schema_pb.RecordType, partition topic.Partition, preference *operation.StoragePreference) error {
  81. partitionDir := topic.PartitionDir(t, partition)
  82. // compact the partition directory
  83. return compactTopicPartitionDir(filerClient, t.Name, partitionDir, timeAgo, recordType, preference)
  84. }
  85. func compactTopicPartitionDir(filerClient filer_pb.FilerClient, topicName, partitionDir string, timeAgo time.Duration, recordType *schema_pb.RecordType, preference *operation.StoragePreference) error {
  86. // read all existing parquet files
  87. minTsNs, maxTsNs, err := readAllParquetFiles(filerClient, partitionDir)
  88. if err != nil {
  89. return err
  90. }
  91. // read all log files
  92. logFiles, err := readAllLogFiles(filerClient, partitionDir, timeAgo, minTsNs, maxTsNs)
  93. if err != nil {
  94. return err
  95. }
  96. if len(logFiles) == 0 {
  97. return nil
  98. }
  99. // divide log files into groups of 128MB
  100. logFileGroups := groupFilesBySize(logFiles, 128*1024*1024)
  101. // write to parquet file
  102. parquetLevels, err := schema.ToParquetLevels(recordType)
  103. if err != nil {
  104. return fmt.Errorf("ToParquetLevels failed %+v: %v", recordType, err)
  105. }
  106. // create a parquet schema
  107. parquetSchema, err := schema.ToParquetSchema(topicName, recordType)
  108. if err != nil {
  109. return fmt.Errorf("ToParquetSchema failed: %v", err)
  110. }
  111. // TODO parallelize the writing
  112. for _, logFileGroup := range logFileGroups {
  113. if err = writeLogFilesToParquet(filerClient, partitionDir, recordType, logFileGroup, parquetSchema, parquetLevels, preference); err != nil {
  114. return err
  115. }
  116. }
  117. return nil
  118. }
  119. func groupFilesBySize(logFiles []*filer_pb.Entry, maxGroupSize int64) (logFileGroups [][]*filer_pb.Entry) {
  120. var logFileGroup []*filer_pb.Entry
  121. var groupSize int64
  122. for _, logFile := range logFiles {
  123. if groupSize+int64(logFile.Attributes.FileSize) > maxGroupSize {
  124. logFileGroups = append(logFileGroups, logFileGroup)
  125. logFileGroup = nil
  126. groupSize = 0
  127. }
  128. logFileGroup = append(logFileGroup, logFile)
  129. groupSize += int64(logFile.Attributes.FileSize)
  130. }
  131. if len(logFileGroup) > 0 {
  132. logFileGroups = append(logFileGroups, logFileGroup)
  133. }
  134. return
  135. }
  136. func readAllLogFiles(filerClient filer_pb.FilerClient, partitionDir string, timeAgo time.Duration, minTsNs, maxTsNs int64) (logFiles []*filer_pb.Entry, err error) {
  137. err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error {
  138. if strings.HasSuffix(entry.Name, ".parquet") {
  139. return nil
  140. }
  141. if entry.Attributes.Crtime > time.Now().Unix()-int64(timeAgo/time.Second) {
  142. return nil
  143. }
  144. logTime, err := time.Parse(topic.TIME_FORMAT, entry.Name)
  145. if err != nil {
  146. // glog.Warningf("parse log time %s: %v", entry.Name, err)
  147. return nil
  148. }
  149. if maxTsNs > 0 && logTime.UnixNano() <= maxTsNs {
  150. return nil
  151. }
  152. logFiles = append(logFiles, entry)
  153. return nil
  154. })
  155. return
  156. }
  157. func readAllParquetFiles(filerClient filer_pb.FilerClient, partitionDir string) (minTsNs, maxTsNs int64, err error) {
  158. err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error {
  159. if !strings.HasSuffix(entry.Name, ".parquet") {
  160. return nil
  161. }
  162. if len(entry.Extended) == 0 {
  163. return nil
  164. }
  165. // read min ts
  166. minTsBytes := entry.Extended["min"]
  167. if len(minTsBytes) != 8 {
  168. return nil
  169. }
  170. minTs := int64(binary.BigEndian.Uint64(minTsBytes))
  171. if minTsNs == 0 || minTs < minTsNs {
  172. minTsNs = minTs
  173. }
  174. // read max ts
  175. maxTsBytes := entry.Extended["max"]
  176. if len(maxTsBytes) != 8 {
  177. return nil
  178. }
  179. maxTs := int64(binary.BigEndian.Uint64(maxTsBytes))
  180. if maxTsNs == 0 || maxTs > maxTsNs {
  181. maxTsNs = maxTs
  182. }
  183. return nil
  184. })
  185. return
  186. }
  187. func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir string, recordType *schema_pb.RecordType, logFileGroups []*filer_pb.Entry, parquetSchema *parquet.Schema, parquetLevels *schema.ParquetLevels, preference *operation.StoragePreference) (err error) {
  188. tempFile, err := os.CreateTemp(".", "t*.parquet")
  189. if err != nil {
  190. return fmt.Errorf("create temp file: %v", err)
  191. }
  192. defer func() {
  193. tempFile.Close()
  194. os.Remove(tempFile.Name())
  195. }()
  196. writer := parquet.NewWriter(tempFile, parquetSchema, parquet.Compression(&zstd.Codec{Level: zstd.DefaultLevel}))
  197. rowBuilder := parquet.NewRowBuilder(parquetSchema)
  198. var startTsNs, stopTsNs int64
  199. for _, logFile := range logFileGroups {
  200. fmt.Printf("compact %s/%s ", partitionDir, logFile.Name)
  201. var rows []parquet.Row
  202. if err := iterateLogEntries(filerClient, logFile, func(entry *filer_pb.LogEntry) error {
  203. if startTsNs == 0 {
  204. startTsNs = entry.TsNs
  205. }
  206. stopTsNs = entry.TsNs
  207. if len(entry.Key) == 0 {
  208. return nil
  209. }
  210. // write to parquet file
  211. rowBuilder.Reset()
  212. record := &schema_pb.RecordValue{}
  213. if err := proto.Unmarshal(entry.Data, record); err != nil {
  214. return fmt.Errorf("unmarshal record value: %v", err)
  215. }
  216. record.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{
  217. Kind: &schema_pb.Value_Int64Value{
  218. Int64Value: entry.TsNs,
  219. },
  220. }
  221. record.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
  222. Kind: &schema_pb.Value_BytesValue{
  223. BytesValue: entry.Key,
  224. },
  225. }
  226. if err := schema.AddRecordValue(rowBuilder, recordType, parquetLevels, record); err != nil {
  227. return fmt.Errorf("add record value: %v", err)
  228. }
  229. rows = append(rows, rowBuilder.Row())
  230. return nil
  231. }); err != nil {
  232. return fmt.Errorf("iterate log entry %v/%v: %v", partitionDir, logFile.Name, err)
  233. }
  234. fmt.Printf("processed %d rows\n", len(rows))
  235. if _, err := writer.WriteRows(rows); err != nil {
  236. return fmt.Errorf("write rows: %v", err)
  237. }
  238. }
  239. if err := writer.Close(); err != nil {
  240. return fmt.Errorf("close writer: %v", err)
  241. }
  242. // write to parquet file to partitionDir
  243. parquetFileName := fmt.Sprintf("%s.parquet", time.Unix(0, startTsNs).UTC().Format("2006-01-02-15-04-05"))
  244. if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs); err != nil {
  245. return fmt.Errorf("save parquet file %s: %v", parquetFileName, err)
  246. }
  247. return nil
  248. }
  249. func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64) error {
  250. uploader, err := operation.NewUploader()
  251. if err != nil {
  252. return fmt.Errorf("new uploader: %v", err)
  253. }
  254. // get file size
  255. fileInfo, err := sourceFile.Stat()
  256. if err != nil {
  257. return fmt.Errorf("stat source file: %v", err)
  258. }
  259. // upload file in chunks
  260. chunkSize := int64(4 * 1024 * 1024)
  261. chunkCount := (fileInfo.Size() + chunkSize - 1) / chunkSize
  262. entry := &filer_pb.Entry{
  263. Name: parquetFileName,
  264. Attributes: &filer_pb.FuseAttributes{
  265. Crtime: time.Now().Unix(),
  266. Mtime: time.Now().Unix(),
  267. FileMode: uint32(os.FileMode(0644)),
  268. FileSize: uint64(fileInfo.Size()),
  269. Mime: "application/vnd.apache.parquet",
  270. },
  271. }
  272. entry.Extended = make(map[string][]byte)
  273. minTsBytes := make([]byte, 8)
  274. binary.BigEndian.PutUint64(minTsBytes, uint64(startTsNs))
  275. entry.Extended["min"] = minTsBytes
  276. maxTsBytes := make([]byte, 8)
  277. binary.BigEndian.PutUint64(maxTsBytes, uint64(stopTsNs))
  278. entry.Extended["max"] = maxTsBytes
  279. for i := int64(0); i < chunkCount; i++ {
  280. fileId, uploadResult, err, _ := uploader.UploadWithRetry(
  281. filerClient,
  282. &filer_pb.AssignVolumeRequest{
  283. Count: 1,
  284. Replication: preference.Replication,
  285. Collection: preference.Collection,
  286. TtlSec: 0, // TODO set ttl
  287. DiskType: preference.DiskType,
  288. Path: partitionDir + "/" + parquetFileName,
  289. },
  290. &operation.UploadOption{
  291. Filename: parquetFileName,
  292. Cipher: false,
  293. IsInputCompressed: false,
  294. MimeType: "application/vnd.apache.parquet",
  295. PairMap: nil,
  296. },
  297. func(host, fileId string) string {
  298. return fmt.Sprintf("http://%s/%s", host, fileId)
  299. },
  300. io.NewSectionReader(sourceFile, i*chunkSize, chunkSize),
  301. )
  302. if err != nil {
  303. return fmt.Errorf("upload chunk %d: %v", i, err)
  304. }
  305. if uploadResult.Error != "" {
  306. return fmt.Errorf("upload result: %v", uploadResult.Error)
  307. }
  308. entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(fileId, i*chunkSize, time.Now().UnixNano()))
  309. }
  310. // write the entry to partitionDir
  311. if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  312. return filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  313. Directory: partitionDir,
  314. Entry: entry,
  315. })
  316. }); err != nil {
  317. return fmt.Errorf("create entry: %v", err)
  318. }
  319. fmt.Printf("saved to %s/%s\n", partitionDir, parquetFileName)
  320. return nil
  321. }
  322. func iterateLogEntries(filerClient filer_pb.FilerClient, logFile *filer_pb.Entry, eachLogEntryFn func(entry *filer_pb.LogEntry) error) error {
  323. lookupFn := filer.LookupFn(filerClient)
  324. _, err := eachFile(logFile, lookupFn, func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
  325. if err := eachLogEntryFn(logEntry); err != nil {
  326. return true, err
  327. }
  328. return false, nil
  329. })
  330. return err
  331. }
  332. func eachFile(entry *filer_pb.Entry, lookupFileIdFn func(fileId string) (targetUrls []string, err error), eachLogEntryFn log_buffer.EachLogEntryFuncType) (processedTsNs int64, err error) {
  333. if len(entry.Content) > 0 {
  334. // skip .offset files
  335. return
  336. }
  337. var urlStrings []string
  338. for _, chunk := range entry.Chunks {
  339. if chunk.Size == 0 {
  340. continue
  341. }
  342. if chunk.IsChunkManifest {
  343. fmt.Printf("this should not happen. unexpected chunk manifest in %s", entry.Name)
  344. return
  345. }
  346. urlStrings, err = lookupFileIdFn(chunk.FileId)
  347. if err != nil {
  348. err = fmt.Errorf("lookup %s: %v", chunk.FileId, err)
  349. return
  350. }
  351. if len(urlStrings) == 0 {
  352. err = fmt.Errorf("no url found for %s", chunk.FileId)
  353. return
  354. }
  355. // try one of the urlString until util.Get(urlString) succeeds
  356. var processed bool
  357. for _, urlString := range urlStrings {
  358. var data []byte
  359. if data, _, err = util_http.Get(urlString); err == nil {
  360. processed = true
  361. if processedTsNs, err = eachChunk(data, eachLogEntryFn); err != nil {
  362. return
  363. }
  364. break
  365. }
  366. }
  367. if !processed {
  368. err = fmt.Errorf("no data processed for %s %s", entry.Name, chunk.FileId)
  369. return
  370. }
  371. }
  372. return
  373. }
  374. func eachChunk(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType) (processedTsNs int64, err error) {
  375. for pos := 0; pos+4 < len(buf); {
  376. size := util.BytesToUint32(buf[pos : pos+4])
  377. if pos+4+int(size) > len(buf) {
  378. err = fmt.Errorf("reach each log chunk: read [%d,%d) from [0,%d)", pos, pos+int(size)+4, len(buf))
  379. return
  380. }
  381. entryData := buf[pos+4 : pos+4+int(size)]
  382. logEntry := &filer_pb.LogEntry{}
  383. if err = proto.Unmarshal(entryData, logEntry); err != nil {
  384. pos += 4 + int(size)
  385. err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
  386. return
  387. }
  388. if _, err = eachLogEntryFn(logEntry); err != nil {
  389. err = fmt.Errorf("process log entry %v: %v", logEntry, err)
  390. return
  391. }
  392. processedTsNs = logEntry.TsNs
  393. pos += 4 + int(size)
  394. }
  395. return
  396. }