to_parquet_value.go 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package schema
  2. import (
  3. "fmt"
  4. parquet "github.com/parquet-go/parquet-go"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  6. )
  7. func rowBuilderVisit(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error) {
  8. switch fieldType.Kind.(type) {
  9. case *schema_pb.Type_ScalarType:
  10. var parquetValue parquet.Value
  11. parquetValue, err = toParquetValue(fieldValue)
  12. if err != nil {
  13. return
  14. }
  15. rowBuilder.Add(levels.startColumnIndex, parquetValue)
  16. // fmt.Printf("rowBuilder.Add %d %v\n", columnIndex, parquetValue)
  17. case *schema_pb.Type_ListType:
  18. rowBuilder.Next(levels.startColumnIndex)
  19. // fmt.Printf("rowBuilder.Next %d\n", columnIndex)
  20. elementType := fieldType.GetListType().ElementType
  21. for _, value := range fieldValue.GetListValue().Values {
  22. if err = rowBuilderVisit(rowBuilder, elementType, levels, value); err != nil {
  23. return
  24. }
  25. }
  26. }
  27. return
  28. }
  29. func AddRecordValue(rowBuilder *parquet.RowBuilder, recordType *schema_pb.RecordType, parquetLevels *ParquetLevels, recordValue *schema_pb.RecordValue) error {
  30. visitor := func(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error) {
  31. return rowBuilderVisit(rowBuilder, fieldType, levels, fieldValue)
  32. }
  33. fieldType := &schema_pb.Type{Kind: &schema_pb.Type_RecordType{RecordType: recordType}}
  34. fieldValue := &schema_pb.Value{Kind: &schema_pb.Value_RecordValue{RecordValue: recordValue}}
  35. return doVisitValue(fieldType, parquetLevels, fieldValue, visitor)
  36. }
  37. // typeValueVisitor is a function that is called for each value in a schema_pb.Value
  38. // Find the column index.
  39. // intended to be used in RowBuilder.Add(columnIndex, value)
  40. type typeValueVisitor func(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error)
  41. // endIndex is exclusive
  42. // same logic as RowBuilder.configure in row_builder.go
  43. func doVisitValue(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value, visitor typeValueVisitor) (err error) {
  44. switch fieldType.Kind.(type) {
  45. case *schema_pb.Type_ScalarType:
  46. return visitor(fieldType, levels, fieldValue)
  47. case *schema_pb.Type_ListType:
  48. return visitor(fieldType, levels, fieldValue)
  49. case *schema_pb.Type_RecordType:
  50. for _, field := range fieldType.GetRecordType().Fields {
  51. fieldValue, found := fieldValue.GetRecordValue().Fields[field.Name]
  52. if !found {
  53. // TODO check this if no such field found
  54. continue
  55. }
  56. fieldLevels := levels.levels[field.Name]
  57. err = doVisitValue(field.Type, fieldLevels, fieldValue, visitor)
  58. if err != nil {
  59. return
  60. }
  61. }
  62. return
  63. }
  64. return
  65. }
  66. func toParquetValue(value *schema_pb.Value) (parquet.Value, error) {
  67. switch value.Kind.(type) {
  68. case *schema_pb.Value_BoolValue:
  69. return parquet.BooleanValue(value.GetBoolValue()), nil
  70. case *schema_pb.Value_Int32Value:
  71. return parquet.Int32Value(value.GetInt32Value()), nil
  72. case *schema_pb.Value_Int64Value:
  73. return parquet.Int64Value(value.GetInt64Value()), nil
  74. case *schema_pb.Value_FloatValue:
  75. return parquet.FloatValue(value.GetFloatValue()), nil
  76. case *schema_pb.Value_DoubleValue:
  77. return parquet.DoubleValue(value.GetDoubleValue()), nil
  78. case *schema_pb.Value_BytesValue:
  79. return parquet.ByteArrayValue(value.GetBytesValue()), nil
  80. case *schema_pb.Value_StringValue:
  81. return parquet.ByteArrayValue([]byte(value.GetStringValue())), nil
  82. default:
  83. return parquet.NullValue(), fmt.Errorf("unknown value type: %T", value.Kind)
  84. }
  85. }