Browse Source

YQ Connector: DescribeTable omits columns with unsupported types

Представим, что есть таблица, состоящая из двух колонок, тип одной из которых не поддерживается в YQL. Раньше попытка любого чтения из такой таблицы возвращала ошибку ("тип не поддерживается").

Теперь такая ошибка будет возвращаться при `SELECT unsupported_column FROM table`. При `SELECT *` будут возвращены все колонки, кроме неподдерживаемых.
vitalyisaev 1 year ago
parent
commit
f01adafb13

+ 19 - 0
library/go/test/yatest/env_test.go

@@ -0,0 +1,19 @@
+package yatest
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestContextParameters(t *testing.T) {
+	val, ok := BuildFlag("AUTOCHECK")
+	if ok {
+		assert.Equal(t, "yes", val)
+	} else {
+		_, ok = BuildFlag("TESTS_REQUESTED")
+		assert.Equal(t, true, ok)
+	}
+
+	assert.Equal(t, "library/go/test/yatest/gotest", ProjectPath())
+}

+ 3 - 0
library/go/test/yatest/gotest/ya.make

@@ -0,0 +1,3 @@
+GO_TEST_FOR(library/go/test/yatest)
+
+END()

+ 8 - 0
tools/go_test_miner/gotest/ya.make

@@ -0,0 +1,8 @@
+GO_TEST_FOR(tools/go_test_miner)
+
+IF (GO_VET == "yes" OR GO_VET == "on")
+    SET_APPEND(GO_VET_FLAGS -tests=false)
+ENDIF()
+
+END()
+

+ 22 - 0
tools/go_test_miner/main_test.go

@@ -0,0 +1,22 @@
+package main
+
+import (
+	"testing"
+)
+
+func TestOk(t *testing.T) {
+}
+
+func Test1(a *testing.T) {
+}
+
+func Test_Function(tt *testing.T) {
+}
+
+func Test(t *testing.T) {
+}
+
+//nolint:tests
+func Testfail(t *testing.T) {
+	panic("Not a test function!")
+}

+ 4 - 1
ydb/library/yql/providers/generic/connector/app/server/postgresql/type_mapper.go

@@ -76,7 +76,10 @@ func (tm typeMapper) YDBTypeToAcceptor(ydbType *Ydb.Type) (any, error) {
 			return nil, fmt.Errorf("make acceptor from optional YDB type: %w", err)
 		}
 	default:
-		return nil, fmt.Errorf("only primitive types are supported, got '%v' instead", ydbType)
+		return nil, fmt.Errorf(
+			"only primitive types are supported, got '%v' instead: %w",
+			ydbType,
+			utils.ErrDataTypeNotSupported)
 	}
 
 	return acceptor, nil

+ 9 - 11
ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go

@@ -53,37 +53,35 @@ func (h *handlerImpl[CONN]) DescribeTable(
 		return nil, fmt.Errorf("query builder error: %w", err)
 	}
 
-	// logger.Debug("execute query", log.String("query", query))
 	defer func() { utils.LogCloserError(logger, rows, "close rows") }()
 
 	var (
 		columnName string
 		typeName   string
-		schema     api_service_protos.TSchema
 	)
 
+	sb := &schemaBuilder{typeMapper: h.typeMapper}
+
 	for rows.Next() {
 		if err := rows.Scan(&columnName, &typeName); err != nil {
 			return nil, fmt.Errorf("rows scan: %w", err)
 		}
 
-		column, err := h.typeMapper.SQLTypeToYDBColumn(columnName, typeName)
-		if err != nil {
-			return nil, fmt.Errorf("sql type to ydb column (%s, %s): %w", columnName, typeName, err)
+		if err := sb.addColumn(columnName, typeName); err != nil {
+			return nil, fmt.Errorf("add column to schema builder: %w", err)
 		}
-
-		schema.Columns = append(schema.Columns, column)
 	}
 
 	if err := rows.Err(); err != nil {
-		return nil, fmt.Errorf("rows error: %w", err)
+		return nil, fmt.Errorf("rows iteration: %w", err)
 	}
 
-	if len(schema.Columns) == 0 {
-		return nil, utils.ErrTableDoesNotExist
+	schema, err := sb.build(logger)
+	if err != nil {
+		return nil, fmt.Errorf("build schema: %w", err)
 	}
 
-	return &api_service_protos.TDescribeTableResponse{Schema: &schema}, nil
+	return &api_service_protos.TDescribeTableResponse{Schema: schema}, nil
 }
 
 func (h *handlerImpl[CONN]) ReadSplit(

+ 67 - 0
ydb/library/yql/providers/generic/connector/app/server/rdbms/schema_builder.go

@@ -0,0 +1,67 @@
+package rdbms
+
+import (
+	"errors"
+	"fmt"
+
+	"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
+	"github.com/ydb-platform/ydb/library/go/core/log"
+	"github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
+	api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
+)
+
+type schemaItem struct {
+	columnName string
+	columnType string
+	ydbColumn  *Ydb.Column
+}
+
+type schemaBuilder struct {
+	typeMapper utils.TypeMapper
+	items      []*schemaItem
+}
+
+func (sb *schemaBuilder) addColumn(columnName, columnType string) error {
+	item := &schemaItem{
+		columnName: columnName,
+		columnType: columnType,
+	}
+
+	var err error
+	item.ydbColumn, err = sb.typeMapper.SQLTypeToYDBColumn(columnName, columnType)
+
+	if err != nil && !errors.Is(err, utils.ErrDataTypeNotSupported) {
+		return fmt.Errorf("sql type to ydb column (%s, %s): %w", columnName, columnType, err)
+	}
+
+	sb.items = append(sb.items, item)
+	return nil
+}
+
+func (sb *schemaBuilder) build(logger log.Logger) (*api_service_protos.TSchema, error) {
+	if len(sb.items) == 0 {
+		return nil, utils.ErrTableDoesNotExist
+	}
+
+	var (
+		schema      api_service_protos.TSchema
+		unsupported []string
+	)
+
+	for _, item := range sb.items {
+		if item.ydbColumn == nil {
+			unsupported = append(unsupported, fmt.Sprintf("%s %s", item.columnName, item.columnType))
+		} else {
+			schema.Columns = append(schema.Columns, item.ydbColumn)
+		}
+	}
+
+	if len(unsupported) > 0 {
+		logger.Warn(
+			"the table schema was reduced because some column types are unsupported",
+			log.Strings("unsupported columns", unsupported),
+		)
+	}
+
+	return &schema, nil
+}

+ 98 - 0
ydb/library/yql/providers/generic/connector/app/server/rdbms/schema_builder_test.go

@@ -0,0 +1,98 @@
+package rdbms
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/require"
+	"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
+	"github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/clickhouse"
+	"github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/postgresql"
+	"github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
+	"google.golang.org/protobuf/proto"
+)
+
+func TestSchemaBuilder(t *testing.T) {
+	t.Run("ClickHouse", func(t *testing.T) {
+		sb := &schemaBuilder{
+			typeMapper: clickhouse.NewTypeMapper(),
+		}
+
+		require.NoError(t, sb.addColumn("col1", "Int32"))  // supported
+		require.NoError(t, sb.addColumn("col2", "String")) // supported
+		require.NoError(t, sb.addColumn("col3", "UUID"))   // yet unsupported
+
+		logger := utils.NewTestLogger(t)
+		schema, err := sb.build(logger)
+		require.NoError(t, err)
+		require.NotNil(t, schema)
+
+		require.Len(t, schema.Columns, 2)
+
+		require.Equal(t, schema.Columns[0].Name, "col1")
+		require.True(
+			t,
+			proto.Equal(schema.Columns[0].Type, &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_INT32}}),
+			schema.Columns[0].Type)
+
+		require.Equal(t, schema.Columns[1].Name, "col2")
+		require.True(
+			t,
+			proto.Equal(schema.Columns[1].Type, &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_STRING}}),
+			schema.Columns[1].Type)
+	})
+
+	t.Run("PostgreSQL", func(t *testing.T) {
+		sb := &schemaBuilder{
+			typeMapper: postgresql.NewTypeMapper(),
+		}
+
+		require.NoError(t, sb.addColumn("col1", "bigint")) // supported
+		require.NoError(t, sb.addColumn("col2", "text"))   // supported
+		require.NoError(t, sb.addColumn("col3", "time"))   // yet unsupported
+
+		logger := utils.NewTestLogger(t)
+		schema, err := sb.build(logger)
+		require.NoError(t, err)
+		require.NotNil(t, schema)
+
+		require.Len(t, schema.Columns, 2)
+
+		require.Equal(t, schema.Columns[0].Name, "col1")
+		require.True(
+			t,
+			proto.Equal(
+				schema.Columns[0].Type,
+				&Ydb.Type{Type: &Ydb.Type_OptionalType{OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_INT64}}}}},
+			),
+			schema.Columns[0].Type)
+
+		require.Equal(t, schema.Columns[1].Name, "col2")
+		require.True(
+			t,
+			proto.Equal(
+				schema.Columns[1].Type,
+				&Ydb.Type{Type: &Ydb.Type_OptionalType{OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_UTF8}}}}},
+			),
+			schema.Columns[1].Type)
+	})
+
+	t.Run("NonExistingTable", func(t *testing.T) {
+		sb := &schemaBuilder{}
+		schema, err := sb.build(utils.NewTestLogger(t))
+		require.ErrorIs(t, err, utils.ErrTableDoesNotExist)
+		require.Nil(t, schema)
+	})
+
+	t.Run("EmptyTable", func(t *testing.T) {
+		sb := &schemaBuilder{
+			typeMapper: clickhouse.NewTypeMapper(),
+		}
+
+		require.NoError(t, sb.addColumn("col1", "UUID")) // yet unsupported
+
+		schema, err := sb.build(utils.NewTestLogger(t))
+		require.NoError(t, err)
+		require.NotNil(t, schema)
+		require.Len(t, schema.Columns, 0)
+	})
+}

+ 5 - 0
ydb/library/yql/providers/generic/connector/app/server/rdbms/ut/ya.make

@@ -0,0 +1,5 @@
+GO_TEST_FOR(ydb/library/yql/providers/generic/connector/app/server/rdbms)
+
+SIZE(SMALL)
+
+END()

+ 7 - 0
ydb/library/yql/providers/generic/connector/app/server/rdbms/ya.make

@@ -3,6 +3,13 @@ GO_LIBRARY()
 SRCS(
     handler.go
     handler_factory.go
+    schema_builder.go
+)
+
+GO_TEST_SRCS(
+    schema_builder_test.go
 )
 
 END()
+
+RECURSE_FOR_TESTS(ut)

Some files were not shown because too many files changed in this diff