to_parquet_levels.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. package schema
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  5. )
  6. type ParquetLevels struct {
  7. startColumnIndex int
  8. endColumnIndex int
  9. definitionDepth int
  10. levels map[string]*ParquetLevels
  11. }
  12. func ToParquetLevels(recordType *schema_pb.RecordType) (*ParquetLevels, error) {
  13. return toRecordTypeLevels(recordType, 0, 0)
  14. }
  15. func toFieldTypeLevels(fieldType *schema_pb.Type, startColumnIndex, definitionDepth int) (*ParquetLevels, error) {
  16. switch fieldType.Kind.(type) {
  17. case *schema_pb.Type_ScalarType:
  18. return toFieldTypeScalarLevels(fieldType.GetScalarType(), startColumnIndex, definitionDepth)
  19. case *schema_pb.Type_RecordType:
  20. return toRecordTypeLevels(fieldType.GetRecordType(), startColumnIndex, definitionDepth)
  21. case *schema_pb.Type_ListType:
  22. return toFieldTypeListLevels(fieldType.GetListType(), startColumnIndex, definitionDepth)
  23. }
  24. return nil, fmt.Errorf("unknown field type: %T", fieldType.Kind)
  25. }
  26. func toFieldTypeListLevels(listType *schema_pb.ListType, startColumnIndex, definitionDepth int) (*ParquetLevels, error) {
  27. return toFieldTypeLevels(listType.ElementType, startColumnIndex, definitionDepth)
  28. }
  29. func toFieldTypeScalarLevels(scalarType schema_pb.ScalarType, startColumnIndex, definitionDepth int) (*ParquetLevels, error) {
  30. return &ParquetLevels{
  31. startColumnIndex: startColumnIndex,
  32. endColumnIndex: startColumnIndex + 1,
  33. definitionDepth: definitionDepth,
  34. }, nil
  35. }
  36. func toRecordTypeLevels(recordType *schema_pb.RecordType, startColumnIndex, definitionDepth int) (*ParquetLevels, error) {
  37. recordTypeLevels := &ParquetLevels{
  38. startColumnIndex: startColumnIndex,
  39. definitionDepth: definitionDepth,
  40. levels: make(map[string]*ParquetLevels),
  41. }
  42. for _, field := range recordType.Fields {
  43. fieldTypeLevels, err := toFieldTypeLevels(field.Type, startColumnIndex, definitionDepth+1)
  44. if err != nil {
  45. return nil, err
  46. }
  47. recordTypeLevels.levels[field.Name] = fieldTypeLevels
  48. startColumnIndex = fieldTypeLevels.endColumnIndex
  49. }
  50. recordTypeLevels.endColumnIndex = startColumnIndex
  51. return recordTypeLevels, nil
  52. }