Browse Source

Support block reader for partitioned datasets

auzhegov 1 year ago
parent
commit
20c0ade0ce

+ 45 - 12
ydb/library/yql/providers/common/mkql/parser.cpp

@@ -157,19 +157,53 @@ TRuntimeNode BuildParseCall(
     const auto* finalItemStructType = static_cast<TStructType*>(finalItemType);
 
     if (useBlocks) {
-        return ctx.ProgramBuilder.ExpandMap(ctx.ProgramBuilder.ToFlow(input), [&](TRuntimeNode item) {
-            MKQL_ENSURE(!extraColumnsByPathIndex && metadataColumns.empty(), "TODO");
-
-            TRuntimeNode::TList fields;
+        return ctx.ProgramBuilder.BlockExpandChunked(ctx.ProgramBuilder.ExpandMap(
+            ctx.ProgramBuilder.ToFlow(input), [&](TRuntimeNode item) {
+                auto parsedData = (extraColumnsByPathIndex || !metadataColumns.empty())
+                                      ? ctx.ProgramBuilder.Nth(item, 0)
+                                      : item;
+
+                TMaybe<TRuntimeNode> extra;
+                if (extraColumnsByPathIndex) {
+                    auto pathInd   = ctx.ProgramBuilder.Nth(item, 1);
+                    auto extraNode = ctx.ProgramBuilder.Lookup(
+                        ctx.ProgramBuilder.ToIndexDict(*extraColumnsByPathIndex), pathInd);
+                    extra = ctx.ProgramBuilder.Unwrap(
+                        extraNode,
+                        ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(
+                            "Failed to lookup path index"),
+                        pos.File,
+                        pos.Row,
+                        pos.Column);
+                }
 
-            for (ui32 i = 0; i < finalItemStructType->GetMembersCount(); ++i) {
-                TStringBuf name = finalItemStructType->GetMemberName(i);
-                fields.push_back(ctx.ProgramBuilder.Member(item, name));
-            }
+                auto blockLengthName =
+                    ctx.ProgramBuilder.Member(parsedData, BlockLengthColumnName);
+                TRuntimeNode::TList fields;
+                fields.reserve(finalItemStructType->GetMembersCount());
+
+                for (ui32 i = 0; i < finalItemStructType->GetMembersCount(); ++i) {
+                    TStringBuf name         = finalItemStructType->GetMemberName(i);
+                    const auto metadataIter = metadataColumns.find(TString(name));
+                    if (metadataIter != metadataColumns.end()) {
+                        fields.push_back(ctx.ProgramBuilder.ReplicateScalar(
+                            ctx.ProgramBuilder.AsScalar(
+                                ctx.ProgramBuilder.Nth(item, metadataIter->second)),
+                            blockLengthName));
+                    } else if (parseItemStructType->FindMemberIndex(name).Defined()) {
+                        fields.push_back(ctx.ProgramBuilder.Member(parsedData, name));
+                    } else {
+                        MKQL_ENSURE(extra, "Column " << name << " wasn't found");
+                        fields.push_back(ctx.ProgramBuilder.ReplicateScalar(
+                            ctx.ProgramBuilder.AsScalar(
+                                ctx.ProgramBuilder.Member(*extra, name)),
+                            blockLengthName));
+                    }
+                }
 
-            fields.push_back(ctx.ProgramBuilder.Member(item, BlockLengthColumnName));
-            return fields;
-        });
+                fields.push_back(blockLengthName);
+                return fields;
+            }));
     }
 
     if (!compression.empty()) {
@@ -204,7 +238,6 @@ TRuntimeNode BuildParseCall(
 
                 if (extraColumnsByPathIndex || !metadataColumns.empty()) {
                     auto data = ctx.ProgramBuilder.Nth(item, 0);
-                    TMaybe<TRuntimeNode> pathInd;
                     res.emplace_back(parseLambda(data));
                     if (extraColumnsByPathIndex) {
                         res.emplace_back(ctx.ProgramBuilder.Nth(item, res.size()));

+ 8 - 1
ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp

@@ -361,9 +361,16 @@ public:
 
         const TTypeAnnotationNode* itemType = nullptr;
         if (input->Content() == TS3ArrowSettings::CallableName()) {
+            std::unordered_set<TString> extraColumnNames(extraColumnsType->GetSize());
+            for (const auto& extraColumn : extraColumnsType->GetItems()) {
+                extraColumnNames.insert(TString{extraColumn->GetName()});
+            }
+
             TVector<const TItemExprType*> blockRowTypeItems;
             for (const auto& x : rowType->Cast<TStructExprType>()->GetItems()) {
-                blockRowTypeItems.push_back(ctx.MakeType<TItemExprType>(x->GetName(), ctx.MakeType<TBlockExprType>(x->GetItemType())));
+                if (!extraColumnNames.contains(TString{x->GetName()})) {
+                    blockRowTypeItems.push_back(ctx.MakeType<TItemExprType>(x->GetName(), ctx.MakeType<TBlockExprType>(x->GetItemType())));
+                }
             }
 
             blockRowTypeItems.push_back(ctx.MakeType<TItemExprType>(BlockLengthColumnName, ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64))));