to_parquet_schema.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package schema
  2. import (
  3. "fmt"
  4. parquet "github.com/parquet-go/parquet-go"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  6. )
  7. func ToParquetSchema(topicName string, recordType *schema_pb.RecordType) (*parquet.Schema, error) {
  8. rootNode, err := toParquetFieldTypeRecord(recordType)
  9. if err != nil {
  10. return nil, fmt.Errorf("failed to convert record type to parquet schema: %v", err)
  11. }
  12. // Fields are sorted by name, so the value should be sorted also
  13. // the sorting is inside parquet.`func (g Group) Fields() []Field`
  14. return parquet.NewSchema(topicName, rootNode), nil
  15. }
  16. func toParquetFieldType(fieldType *schema_pb.Type) (dataType parquet.Node, err error) {
  17. switch fieldType.Kind.(type) {
  18. case *schema_pb.Type_ScalarType:
  19. dataType, err = toParquetFieldTypeScalar(fieldType.GetScalarType())
  20. dataType = parquet.Optional(dataType)
  21. case *schema_pb.Type_RecordType:
  22. dataType, err = toParquetFieldTypeRecord(fieldType.GetRecordType())
  23. dataType = parquet.Optional(dataType)
  24. case *schema_pb.Type_ListType:
  25. dataType, err = toParquetFieldTypeList(fieldType.GetListType())
  26. default:
  27. return nil, fmt.Errorf("unknown field type: %T", fieldType.Kind)
  28. }
  29. return dataType, err
  30. }
  31. func toParquetFieldTypeList(listType *schema_pb.ListType) (parquet.Node, error) {
  32. elementType, err := toParquetFieldType(listType.ElementType)
  33. if err != nil {
  34. return nil, err
  35. }
  36. return parquet.Repeated(elementType), nil
  37. }
  38. func toParquetFieldTypeScalar(scalarType schema_pb.ScalarType) (parquet.Node, error) {
  39. switch scalarType {
  40. case schema_pb.ScalarType_BOOL:
  41. return parquet.Leaf(parquet.BooleanType), nil
  42. case schema_pb.ScalarType_INT32:
  43. return parquet.Leaf(parquet.Int32Type), nil
  44. case schema_pb.ScalarType_INT64:
  45. return parquet.Leaf(parquet.Int64Type), nil
  46. case schema_pb.ScalarType_FLOAT:
  47. return parquet.Leaf(parquet.FloatType), nil
  48. case schema_pb.ScalarType_DOUBLE:
  49. return parquet.Leaf(parquet.DoubleType), nil
  50. case schema_pb.ScalarType_BYTES:
  51. return parquet.Leaf(parquet.ByteArrayType), nil
  52. case schema_pb.ScalarType_STRING:
  53. return parquet.Leaf(parquet.ByteArrayType), nil
  54. default:
  55. return nil, fmt.Errorf("unknown scalar type: %v", scalarType)
  56. }
  57. }
  58. func toParquetFieldTypeRecord(recordType *schema_pb.RecordType) (parquet.Node, error) {
  59. recordNode := parquet.Group{}
  60. for _, field := range recordType.Fields {
  61. parquetFieldType, err := toParquetFieldType(field.Type)
  62. if err != nil {
  63. return nil, err
  64. }
  65. recordNode[field.Name] = parquetFieldType
  66. }
  67. return recordNode, nil
  68. }