Browse Source

YQL for create, alter and drop transfer from topic to table
commit_hash:09502f46a7ee665609d2c4ba8d9e0aa421720cdb

tesseract 1 month ago
parent
commit
e677409ecb

+ 57 - 3
yql/essentials/providers/common/provider/yql_provider.cpp

@@ -604,6 +604,60 @@ TWriteReplicationSettings ParseWriteReplicationSettings(TExprList node, TExprCon
     return ret;
 }
 
+TWriteTransferSettings ParseWriteTransferSettings(TExprList node, TExprContext& ctx) {
+    TMaybeNode<TCoAtom> mode;
+    TMaybeNode<TCoAtom> source;
+    TMaybeNode<TCoAtom> target;
+    TMaybeNode<TCoAtom> transformLambda;
+    TVector<TCoNameValueTuple> settings;
+    TVector<TCoNameValueTuple> other;
+
+    for (auto child : node) {
+        if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
+            auto tuple = maybeTuple.Cast();
+            auto name = tuple.Name().Value();
+
+            if (name == "mode") {
+                YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
+                mode = tuple.Value().Cast<TCoAtom>();
+            } else if (name == "source") {
+                YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
+                source = tuple.Value().Cast<TCoAtom>();
+            } else if (name == "target") {
+                YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
+                target = tuple.Value().Cast<TCoAtom>();
+            } else if (name == "transformLambda") {
+                YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
+                transformLambda = tuple.Value().Cast<TCoAtom>();
+            } else if (name == "settings") {
+                YQL_ENSURE(tuple.Value().Maybe<TCoNameValueTupleList>());
+                for (const auto& item : tuple.Value().Cast<TCoNameValueTupleList>()) {
+                    settings.push_back(item);
+                }
+            } else {
+                other.push_back(tuple);
+            }
+        }
+    }
+
+    const auto& builtSettings = Build<TCoNameValueTupleList>(ctx, node.Pos())
+        .Add(settings)
+        .Done();
+
+    const auto& builtOther = Build<TCoNameValueTupleList>(ctx, node.Pos())
+        .Add(other)
+        .Done();
+
+    TWriteTransferSettings ret(builtOther);
+    ret.Mode = mode;
+    ret.Source = source;
+    ret.Target = target;
+    ret.TransformLambda = transformLambda;
+    ret.TransferSettings = builtSettings;
+
+    return ret;
+}
+
 TWriteRoleSettings ParseWriteRoleSettings(TExprList node, TExprContext& ctx) {
     TMaybeNode<TCoAtom> mode;
     TVector<TCoAtom> roles;
@@ -858,7 +912,7 @@ bool FillUsedFilesImpl(
     if (node.GetTypeAnn()) {
         usedPgExtensions |= node.GetTypeAnn()->GetUsedPgExtensions();
     }
-    
+
     if (node.IsCallable("PgResolvedCall")) {
         auto procId = FromString<ui32>(node.Child(1)->Content());
         const auto& proc = NPg::LookupProc(procId);
@@ -1072,7 +1126,7 @@ void FillSecureParams(
     }
 }
 
-bool AddPgFile(bool isPath, const TString& pathOrContent, const TString& md5, const TString& alias, TUserDataTable& files, 
+bool AddPgFile(bool isPath, const TString& pathOrContent, const TString& md5, const TString& alias, TUserDataTable& files,
     const TTypeAnnotationContext& types, TPositionHandle pos, TExprContext& ctx) {
 
     TUserDataBlock block;
@@ -1140,7 +1194,7 @@ bool FillUsedFiles(
             return false;
         }
     }
-    
+
     Y_ENSURE(remainingPgExtensions == 0);
     if (!needFullPgCatalog) {
         return true;

+ 14 - 0
yql/essentials/providers/common/provider/yql_provider.h

@@ -97,6 +97,19 @@ struct TWriteReplicationSettings {
     {}
 };
 
+struct TWriteTransferSettings {
+    NNodes::TMaybeNode<NNodes::TCoAtom> Mode;
+    NNodes::TMaybeNode<NNodes::TCoAtom> Source;
+    NNodes::TMaybeNode<NNodes::TCoAtom> Target;
+    NNodes::TMaybeNode<NNodes::TCoAtom> TransformLambda;
+    NNodes::TMaybeNode<NNodes::TCoNameValueTupleList> TransferSettings;
+    NNodes::TCoNameValueTupleList Other;
+
+    TWriteTransferSettings(const NNodes::TCoNameValueTupleList& other)
+        : Other(other)
+    {}
+};
+
 struct TWriteRoleSettings {
     NNodes::TMaybeNode<NNodes::TCoAtom> Mode;
     NNodes::TMaybeNode<NNodes::TCoAtomList> Roles;
@@ -168,6 +181,7 @@ TVector<TString> GetResOrPullColumnHints(const TExprNode& node);
 TWriteTableSettings ParseWriteTableSettings(NNodes::TExprList node, TExprContext& ctx);
 TWriteTopicSettings ParseWriteTopicSettings(NNodes::TExprList node, TExprContext& ctx);
 TWriteReplicationSettings ParseWriteReplicationSettings(NNodes::TExprList node, TExprContext& ctx);
+TWriteTransferSettings ParseWriteTransferSettings(NNodes::TExprList node, TExprContext& ctx);
 
 TWriteRoleSettings ParseWriteRoleSettings(NNodes::TExprList node, TExprContext& ctx);
 TWriteObjectSettings ParseWriteObjectSettings(NNodes::TExprList node, TExprContext& ctx);

+ 5 - 1
yql/essentials/sql/sql.cpp

@@ -144,6 +144,10 @@ namespace NSQLTranslation {
     }
 
     NYql::TAstParseResult SqlASTToYql(const google::protobuf::Message& protoAst, const TSQLHints& hints, const TTranslationSettings& settings) {
+        return SqlASTToYql("", protoAst, hints, settings);
+    }
+
+    NYql::TAstParseResult SqlASTToYql(const TString& query, const google::protobuf::Message& protoAst, const TSQLHints& hints, const TTranslationSettings& settings) {
         NYql::TAstParseResult result;
         switch (settings.SyntaxVersion) {
             case 0:
@@ -161,7 +165,7 @@ namespace NSQLTranslation {
 
                 return NSQLTranslationV0::SqlASTToYql(protoAst, settings);
             case 1:
-                return NSQLTranslationV1::SqlASTToYql(protoAst, hints, settings);
+                return NSQLTranslationV1::SqlASTToYql(query, protoAst, hints, settings);
             default:
                 result.Issues.AddIssue(NYql::YqlIssue(NYql::TPosition(), NYql::TIssuesIds::DEFAULT_ERROR,
                     TStringBuilder() << "Unknown SQL syntax version: " << settings.SyntaxVersion));

+ 4 - 0
yql/essentials/sql/sql.h

@@ -21,7 +21,11 @@ namespace NSQLTranslation {
     google::protobuf::Message* SqlAST(const TString& query, const TString& queryName, NYql::TIssues& issues, size_t maxErrors,
         const TTranslationSettings& settings = {}, ui16* actualSyntaxVersion = nullptr);
     ILexer::TPtr SqlLexer(const TString& query, NYql::TIssues& issues, const TTranslationSettings& settings = {}, ui16* actualSyntaxVersion = nullptr);
+
+    /*[[deprecated]] Use SqlASTToYql(query, protoAst, hints, settings)*/
     NYql::TAstParseResult SqlASTToYql(const google::protobuf::Message& protoAst, const TSQLHints& hints, const TTranslationSettings& settings);
+    NYql::TAstParseResult SqlASTToYql(const TString& query, const google::protobuf::Message& protoAst, const TSQLHints& hints, const TTranslationSettings& settings);
+
     TVector<NYql::TAstParseResult> SqlToAstStatements(const TString& query, const TTranslationSettings& settings,
         NYql::TWarningRules* warningRules = nullptr, ui16* actualSyntaxVersion = nullptr, TVector<NYql::TStmtParseInfo>* stmtParseInfo = nullptr);
 

+ 29 - 0
yql/essentials/sql/v1/SQLv1.g.in

@@ -76,6 +76,9 @@ sql_stmt_core:
   | backup_stmt
   | restore_stmt
   | alter_sequence_stmt
+  | create_transfer_stmt
+  | alter_transfer_stmt
+  | drop_transfer_stmt
 ;
 
 expr:
@@ -906,6 +909,29 @@ alter_replication_set_setting: SET LPAREN replication_settings RPAREN;
 
 drop_replication_stmt: DROP ASYNC REPLICATION object_ref CASCADE?;
 
+create_transfer_stmt: CREATE TRANSFER object_ref
+    FROM object_ref TO object_ref (USING lambda_or_parameter)?
+    WITH LPAREN transfer_settings RPAREN
+;
+
+lambda_or_parameter:
+    lambda
+  | bind_parameter
+;
+transfer_settings: transfer_settings_entry (COMMA transfer_settings_entry)*;
+transfer_settings_entry: an_id EQUALS expr;
+
+alter_transfer_stmt: ALTER TRANSFER object_ref alter_transfer_action (COMMA alter_transfer_action)*;
+alter_transfer_action:
+    alter_transfer_set_setting
+  | alter_transfer_set_using
+;
+
+alter_transfer_set_setting: SET LPAREN transfer_settings RPAREN;
+alter_transfer_set_using: SET USING lambda_or_parameter;
+
+drop_transfer_stmt: DROP TRANSFER object_ref CASCADE?;
+
 action_or_subquery_args: opt_bind_parameter (COMMA opt_bind_parameter)*;
 
 define_action_or_subquery_stmt: DEFINE (ACTION|SUBQUERY) bind_parameter LPAREN action_or_subquery_args? RPAREN AS define_action_or_subquery_body END DEFINE;
@@ -1466,6 +1492,7 @@ keyword_as_compat:
   | TO
   | TOPIC
   | TRANSACTION
+  | TRANSFER
   | TRIGGER
   | TYPE
   | UNCONDITIONAL
@@ -1693,6 +1720,7 @@ keyword_compat: (
   | TO
   | TOPIC
   | TRANSACTION
+  | TRANSFER
   | TRIGGER
   | TYPE
   | UNCONDITIONAL
@@ -2072,6 +2100,7 @@ TIES: T I E S;
 TO: T O;
 TOPIC: T O P I C;
 TRANSACTION: T R A N S A C T I O N;
+TRANSFER: T R A N S F E R;
 TRIGGER: T R I G G E R;
 TRUE: T R U E;
 TUPLE: T U P L E;

+ 30 - 0
yql/essentials/sql/v1/SQLv1Antlr4.g.in

@@ -75,6 +75,9 @@ sql_stmt_core:
   | backup_stmt
   | restore_stmt
   | alter_sequence_stmt
+  | create_transfer_stmt
+  | alter_transfer_stmt
+  | drop_transfer_stmt
 ;
 
 expr:
@@ -905,6 +908,30 @@ alter_replication_set_setting: SET LPAREN replication_settings RPAREN;
 
 drop_replication_stmt: DROP ASYNC REPLICATION object_ref CASCADE?;
 
+lambda_or_parameter:
+    lambda
+  | bind_parameter
+;
+
+create_transfer_stmt: CREATE TRANSFER object_ref
+    FROM object_ref TO object_ref (USING lambda_or_parameter)?
+    WITH LPAREN transfer_settings RPAREN
+;
+
+transfer_settings: transfer_settings_entry (COMMA transfer_settings_entry)*;
+transfer_settings_entry: an_id EQUALS expr;
+
+alter_transfer_stmt: ALTER TRANSFER object_ref alter_transfer_action (COMMA alter_transfer_action)*;
+alter_transfer_action:
+    alter_transfer_set_setting
+  | alter_transfer_set_using
+;
+
+alter_transfer_set_setting: SET LPAREN transfer_settings RPAREN;
+alter_transfer_set_using: SET USING lambda_or_parameter;
+
+drop_transfer_stmt: DROP TRANSFER object_ref CASCADE?;
+
 action_or_subquery_args: opt_bind_parameter (COMMA opt_bind_parameter)*;
 
 define_action_or_subquery_stmt: DEFINE (ACTION|SUBQUERY) bind_parameter LPAREN action_or_subquery_args? RPAREN AS define_action_or_subquery_body END DEFINE;
@@ -1465,6 +1492,7 @@ keyword_as_compat:
   | TO
   | TOPIC
   | TRANSACTION
+  | TRANSFER
   | TRIGGER
   | TYPE
   | UNCONDITIONAL
@@ -1692,6 +1720,7 @@ keyword_compat: (
   | TO
   | TOPIC
   | TRANSACTION
+  | TRANSFER
   | TRIGGER
   | TYPE
   | UNCONDITIONAL
@@ -2071,6 +2100,7 @@ TIES: T I E S;
 TO: T O;
 TOPIC: T O P I C;
 TRANSACTION: T R A N S A C T I O N;
+TRANSFER: T R A N S F E R;
 TRIGGER: T R I G G E R;
 TRUE: T R U E;
 TUPLE: T U P L E;

+ 3 - 1
yql/essentials/sql/v1/context.cpp

@@ -83,12 +83,14 @@ THashMap<TStringBuf, TPragmaMaybeField> CTX_PRAGMA_MAYBE_FIELDS = {
 
 TContext::TContext(const NSQLTranslation::TTranslationSettings& settings,
                    const NSQLTranslation::TSQLHints& hints,
-                   TIssues& issues)
+                   TIssues& issues,
+                   const TString& query)
     : ClusterMapping(settings.ClusterMapping)
     , PathPrefix(settings.PathPrefix)
     , ClusterPathPrefixes(settings.ClusterPathPrefixes)
     , SQLHints(hints)
     , Settings(settings)
+    , Query(query)
     , Pool(new TMemoryPool(4096))
     , Issues(issues)
     , IncrementMonCounterFunction(settings.IncrementCounter)

+ 4 - 1
yql/essentials/sql/v1/context.h

@@ -92,7 +92,8 @@ namespace NSQLTranslationV1 {
     public:
         TContext(const NSQLTranslation::TTranslationSettings& settings,
                  const NSQLTranslation::TSQLHints& hints,
-                 NYql::TIssues& issues);
+                 NYql::TIssues& issues,
+                 const TString& query = {});
 
         virtual ~TContext();
 
@@ -237,6 +238,7 @@ namespace NSQLTranslationV1 {
         THashMap<TString, std::pair<TPosition, TNodePtr>> Variables;
         THashSet<TString> WeakVariables;
         NSQLTranslation::TTranslationSettings Settings;
+        const TString Query;
         std::unique_ptr<TMemoryPool> Pool;
         NYql::TIssues& Issues;
         TMap<TString, TNodePtr> UniversalAliases;
@@ -328,6 +330,7 @@ namespace NSQLTranslationV1 {
         bool DistinctOverWindow = false;
         bool SeqMode = false;
         bool EmitUnionMerge = false;
+        TVector<size_t> ForAllStatementsParts;
     };
 
     class TColumnRefScope {

+ 22 - 168
yql/essentials/sql/v1/format/sql_format.cpp

@@ -165,144 +165,6 @@ bool Validate(const TParsedTokenList& query, const TParsedTokenList& formattedQu
     return in == inEnd && out == outEnd && parenthesesBalance == 0;
 }
 
-enum EParenType {
-    Open,
-    Close,
-    None
-};
-
-using TAdvanceCallback = std::function<EParenType(TTokenIterator& curr, TTokenIterator end)>;
-
-TTokenIterator SkipToNextBalanced(TTokenIterator begin, TTokenIterator end, const TAdvanceCallback& advance) {
-    i64 level = 0;
-    TTokenIterator curr = begin;
-    while (curr != end) {
-        switch (advance(curr, end)) {
-            case EParenType::Open: {
-                ++level;
-                break;
-            }
-            case EParenType::Close: {
-                --level;
-                if (level < 0) {
-                    return end;
-                } else if (level == 0) {
-                    return curr;
-                }
-                break;
-            }
-            case EParenType::None:
-                break;
-        }
-    }
-    return curr;
-}
-
-TTokenIterator GetNextStatementBegin(TTokenIterator begin, TTokenIterator end) {
-    TAdvanceCallback advanceLambdaBody = [](TTokenIterator& curr, TTokenIterator end) -> EParenType {
-        Y_UNUSED(end);
-        if (curr->Name == "LBRACE_CURLY") {
-            ++curr;
-            return EParenType::Open;
-        } else if (curr->Name == "RBRACE_CURLY") {
-            ++curr;
-            return EParenType::Close;
-        } else {
-            ++curr;
-            return EParenType::None;
-        }
-    };
-
-    TAdvanceCallback advanceAction = [](TTokenIterator& curr, TTokenIterator end) -> EParenType {
-        auto tmp = curr;
-        if (curr->Name == "DEFINE") {
-            ++curr;
-            curr = SkipWSOrComment(curr, end);
-            if (curr != end && (curr->Name == "ACTION" || curr->Name == "SUBQUERY")) {
-                ++curr;
-                return EParenType::Open;
-            }
-        } else if (curr->Name == "END") {
-            ++curr;
-            curr = SkipWSOrComment(curr, end);
-            if (curr != end && curr->Name == "DEFINE") {
-                ++curr;
-                return EParenType::Close;
-            }
-        }
-
-        curr = tmp;
-        ++curr;
-        return EParenType::None;
-    };
-
-    TAdvanceCallback advanceInlineAction = [](TTokenIterator& curr, TTokenIterator end) -> EParenType {
-        auto tmp = curr;
-        if (curr->Name == "DO") {
-            ++curr;
-            curr = SkipWSOrComment(curr, end);
-            if (curr != end && curr->Name == "BEGIN") {
-                ++curr;
-                return EParenType::Open;
-            }
-        } else if (curr->Name == "END") {
-            ++curr;
-            curr = SkipWSOrComment(curr, end);
-            if (curr != end && curr->Name == "DO") {
-                ++curr;
-                return EParenType::Close;
-            }
-        }
-
-        curr = tmp;
-        ++curr;
-        return EParenType::None;
-    };
-
-    TTokenIterator curr = begin;
-    while (curr != end) {
-        bool matched = false;
-        for (auto cb : {advanceLambdaBody, advanceAction, advanceInlineAction}) {
-            TTokenIterator tmp = curr;
-            if (cb(tmp, end) == EParenType::Open) {
-                curr = SkipToNextBalanced(curr, end, cb);
-                matched = true;
-                if (curr == end) {
-                    return curr;
-                }
-            }
-        }
-        if (matched) {
-            continue;
-        }
-        if (curr->Name == "SEMICOLON") {
-            auto next = SkipWS(curr + 1, end);
-            while (next != end && next->Name == "COMMENT" && curr->Line == next->Line) {
-                curr = next;
-                next = SkipWS(next + 1, end);
-            }
-            ++curr;
-            break;
-        }
-        ++curr;
-    }
-
-    return curr;
-}
-
-void SplitByStatements(TTokenIterator begin, TTokenIterator end, TVector<TTokenIterator>& output) {
-    output.clear();
-    if (begin == end) {
-        return;
-    }
-    output.push_back(begin);
-    auto curr = begin;
-    while (curr != end) {
-        curr = GetNextStatementBegin(curr, end);
-        output.push_back(curr);
-    }
-}
-
 enum class EScope {
     Default,
     TypeName,
@@ -833,6 +695,7 @@ private:
             case TRule_sql_stmt_core::kAltSqlStmtCore14: // export
             case TRule_sql_stmt_core::kAltSqlStmtCore32: // drop external data source
             case TRule_sql_stmt_core::kAltSqlStmtCore34: // drop replication
+            case TRule_sql_stmt_core::kAltSqlStmtCore60: // drop transfer
                 return true;
             case TRule_sql_stmt_core::kAltSqlStmtCore3: { // named nodes
                 const auto& stmt = msg.GetAlt_sql_stmt_core3().GetRule_named_nodes_stmt1();
@@ -1541,6 +1404,21 @@ private:
         VisitAllFields(TRule_drop_replication_stmt::GetDescriptor(), msg);
     }
 
+    void VisitCreateTransfer(const TRule_create_transfer_stmt& msg) {
+        NewLine();
+        VisitAllFields(TRule_create_transfer_stmt::GetDescriptor(), msg);
+    }
+
+    void VisitAlterTransfer(const TRule_alter_transfer_stmt& msg) {
+        NewLine();
+        VisitAllFields(TRule_alter_transfer_stmt::GetDescriptor(), msg);
+    }
+
+    void VisitDropTransfer(const TRule_drop_transfer_stmt& msg) {
+        NewLine();
+        VisitAllFields(TRule_drop_transfer_stmt::GetDescriptor(), msg);
+    }
+
     void VisitCreateResourcePool(const TRule_create_resource_pool_stmt& msg) {
         NewLine();
         VisitAllFields(TRule_create_resource_pool_stmt::GetDescriptor(), msg);
@@ -3107,6 +2985,9 @@ TStaticData::TStaticData()
         {TRule_create_replication_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitCreateAsyncReplication)},
         {TRule_alter_replication_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitAlterAsyncReplication)},
         {TRule_drop_replication_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitDropAsyncReplication)},
+        {TRule_create_transfer_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitCreateTransfer)},
+        {TRule_alter_transfer_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitAlterTransfer)},
+        {TRule_drop_transfer_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitDropTransfer)},
         {TRule_create_topic_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitCreateTopic)},
         {TRule_alter_topic_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitAlterTopic)},
         {TRule_drop_topic_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitDropTopic)},
@@ -3188,42 +3069,15 @@ public:
         }
 
         auto lexer = NSQLTranslationV1::MakeLexer(parsedSettings.AnsiLexer, parsedSettings.Antlr4Parser);
-        TParsedTokenList allTokens;
-        auto onNextToken = [&](NSQLTranslation::TParsedToken&& token) {
-            if (token.Name != "EOF") {
-                allTokens.push_back(token);
-            }
-        };
-
-        if (!lexer->Tokenize(query, "Query", onNextToken, issues, NSQLTranslation::SQL_MAX_PARSER_ERRORS)) {
+        TVector<TString> statements;
+        if (!NSQLTranslationV1::SplitQueryToStatements(query, lexer, statements, issues)) {
             return false;
         }
 
-        TVector<TTokenIterator> statements;
-        SplitByStatements(allTokens.begin(), allTokens.end(), statements);
         TStringBuilder finalFormattedQuery;
         bool prevAddLine = false;
         TMaybe<ui32> prevStmtCoreAltCase;
-        for (size_t i = 1; i < statements.size(); ++i) {
-            TStringBuilder currentQueryBuilder;
-            for (auto it = statements[i - 1]; it != statements[i]; ++it) {
-                currentQueryBuilder << it->Content;
-            }
-
-            TString currentQuery = currentQueryBuilder;
-            currentQuery = StripStringLeft(currentQuery);
-            bool isBlank = true;
-            for (auto c : currentQuery) {
-                if (c != ';') {
-                    isBlank = false;
-                    break;
-                }
-            };
-
-            if (isBlank) {
-                continue;
-            }
-
+        for (const TString& currentQuery : statements) {
             TVector<NSQLTranslation::TParsedToken> comments;
             TParsedTokenList parsedTokens, stmtTokens;
             auto onNextRawToken = [&](NSQLTranslation::TParsedToken&& token) {

+ 20 - 0
yql/essentials/sql/v1/format/sql_format_ut.h

@@ -383,6 +383,26 @@ Y_UNIT_TEST(AsyncReplication) {
     setup.Run(cases);
 }
 
+Y_UNIT_TEST(Transfer) {
+    TCases cases = {
+        {"create transfer user from topic1 to table1 with (user='foo')",
+            "CREATE TRANSFER user FROM topic1 TO table1 WITH (user = 'foo');\n"},
+        {"alter transfer user set (user='foo')",
+            "ALTER TRANSFER user SET (user = 'foo');\n"},
+        {"drop transfer user",
+            "DROP TRANSFER user;\n"},
+        {"drop transfer user cascade",
+            "DROP TRANSFER user CASCADE;\n"},
+        {"create transfer user from topic1 to table1 using ($x) -> { $y = cast($x as String); return $y ; } with (user='foo')",
+            "CREATE TRANSFER user FROM topic1 TO table1 USING ($x) -> {\n    $y = CAST($x AS String);\n    RETURN $y;\n} WITH (user = 'foo');\n"},
+        {"create transfer user from topic1 to table1 using $xxx with (user='foo')",
+            "CREATE TRANSFER user FROM topic1 TO table1 USING $xxx WITH (user = 'foo');\n"},
+    };
+
+    TSetup setup;
+    setup.Run(cases);
+}
+
 Y_UNIT_TEST(ExternalTableOperations) {
     TCases cases = {
         {"creAte exTernAl TabLe usEr (a int) With (a = \"b\")",

Some files were not shown because too many files changed in this diff