Browse Source

general steps for data fetching (#1088)

ivanmorozov333 1 year ago
parent
commit
a854215ff2

+ 3 - 3
ydb/core/formats/arrow/arrow_filter.cpp

@@ -323,7 +323,7 @@ bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const
     }
     if (filter.IsTotalDenyFilter()) {
         batch = batch->Slice(0, 0);
-        return false;
+        return true;
     }
     if (filter.IsTotalAllowFilter()) {
         return true;
@@ -343,11 +343,11 @@ bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const
     return false;
 }
 
-bool TColumnFilter::Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) {
+bool TColumnFilter::Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const {
     return ApplyImpl<arrow::Datum::TABLE>(*this, batch, startPos, count);
 }
 
-bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) {
+bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const {
     return ApplyImpl<arrow::Datum::RECORD_BATCH>(*this, batch, startPos, count);
 }
 

+ 2 - 2
ydb/core/formats/arrow/arrow_filter.h

@@ -171,8 +171,8 @@ public:
     // It makes a filter using composite predicate
     static TColumnFilter MakePredicateFilter(const arrow::Datum& datum, const arrow::Datum& border, ECompareType compareType);
 
-    bool Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {});
-    bool Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {});
+    bool Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
+    bool Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
     void Apply(const ui32 expectedRecordsCount, std::vector<arrow::Datum*>& datums) const;
 
     // Combines filters by 'and' operator (extFilter count is true positions count in self, thought extFitler patch exactly that positions)

+ 1 - 0
ydb/core/tx/columnshard/columnshard__read_base.cpp

@@ -45,6 +45,7 @@ bool TTxReadBase::ParseProgram(NKikimrSchemeOp::EOlapProgramType programType,
             AFL_VERIFY(namesChecker.emplace(names.back()).second);
         }
         NOlap::TProgramContainer container;
+        AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "overriden_columns")("columns", JoinSeq(",", names));
         container.OverrideProcessingColumns(std::vector<TString>(names.begin(), names.end()));
         read.SetProgram(std::move(container));
         return true;

+ 7 - 19
ydb/core/tx/columnshard/columnshard__scan.cpp

@@ -118,7 +118,7 @@ public:
         ResourceSubscribeActorId = ctx.Register(new NOlap::NResourceBroker::NSubscribe::TActor(TabletId, SelfId()));
         ReadCoordinatorActorId = ctx.Register(new NOlap::NBlobOperations::NRead::TReadCoordinatorActor(TabletId, SelfId()));
 
-        std::shared_ptr<NOlap::TReadContext> context = std::make_shared<NOlap::TReadContext>(StoragesManager, ScanCountersPool, false,
+        std::shared_ptr<NOlap::TReadContext> context = std::make_shared<NOlap::TReadContext>(StoragesManager, ScanCountersPool,
             ReadMetadataRanges[ReadMetadataIndex], SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy);
         ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context);
 
@@ -367,7 +367,7 @@ private:
             return Finish();
         }
 
-        auto context = std::make_shared<NOlap::TReadContext>(StoragesManager, ScanCountersPool, false, ReadMetadataRanges[ReadMetadataIndex], SelfId(),
+        auto context = std::make_shared<NOlap::TReadContext>(StoragesManager, ScanCountersPool, ReadMetadataRanges[ReadMetadataIndex], SelfId(),
             ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy);
         ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context);
     }
@@ -921,29 +921,17 @@ public:
         Results.emplace_back(std::move(res));
     }
 
-    void FillResult(std::vector<TPartialReadResult>& result, const bool mergePartsToMax) const {
+    void FillResult(std::vector<TPartialReadResult>& result) const {
         if (Results.empty()) {
             return;
         }
-        if (mergePartsToMax) {
-            std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
-            std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>> guards;
-            for (auto&& i : Results) {
-                batches.emplace_back(i.GetResultBatchPtrVerified());
-                guards.insert(guards.end(), i.GetResourcesGuards().begin(), i.GetResourcesGuards().end());
-            }
-            auto res = NArrow::CombineBatches(batches);
-            AFL_VERIFY(res);
-            result.emplace_back(TPartialReadResult(guards, NArrow::TShardedRecordBatch(res), Results.back().GetLastReadKey()));
-        } else {
-            for (auto&& i : Results) {
-                result.emplace_back(std::move(i));
-            }
+        for (auto&& i : Results) {
+            result.emplace_back(std::move(i));
         }
     }
 };
 
-std::vector<NKikimr::NOlap::TPartialReadResult> TPartialReadResult::SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 maxRecordsInResult, const bool mergePartsToMax) {
+std::vector<NKikimr::NOlap::TPartialReadResult> TPartialReadResult::SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 maxRecordsInResult) {
     std::vector<TCurrentBatch> resultBatches;
     TCurrentBatch currentBatch;
     for (auto&& i : resultsExt) {
@@ -960,7 +948,7 @@ std::vector<NKikimr::NOlap::TPartialReadResult> TPartialReadResult::SplitResults
 
     std::vector<TPartialReadResult> result;
     for (auto&& i : resultBatches) {
-        i.FillResult(result, mergePartsToMax);
+        i.FillResult(result);
     }
     return result;
 }

+ 1 - 1
ydb/core/tx/columnshard/columnshard__scan.h

@@ -47,7 +47,7 @@ public:
         return ResultBatch.GetRecordsCount();
     }
 
-    static std::vector<TPartialReadResult> SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 maxRecordsInResult, const bool mergePartsToMax);
+    static std::vector<TPartialReadResult> SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 maxRecordsInResult);
 
     const NArrow::TShardedRecordBatch& GetShardedBatch() const {
         return ResultBatch;

+ 18 - 14
ydb/core/tx/columnshard/engines/portions/portion_info.h

@@ -97,7 +97,7 @@ public:
     TSerializationStats GetSerializationStat(const ISnapshotSchema& schema) const {
         TSerializationStats result;
         for (auto&& i : Records) {
-            if (schema.GetFieldByColumnId(i.ColumnId)) {
+            if (schema.GetFieldByColumnIdOptional(i.ColumnId)) {
                 result.AddStat(i.GetSerializationStat(schema.GetFieldByColumnIdVerified(i.ColumnId)->name()));
             }
         }
@@ -151,8 +151,7 @@ public:
         : PathId(pathId)
         , Portion(portionId)
         , MinSnapshot(minSnapshot)
-        , BlobsOperator(blobsOperator)
-    {
+        , BlobsOperator(blobsOperator) {
     }
 
     TString DebugString(const bool withDetails = false) const;
@@ -399,22 +398,30 @@ public:
     public:
         TAssembleBlobInfo(const ui32 rowsCount)
             : NullRowsCount(rowsCount) {
-
+            AFL_VERIFY(NullRowsCount);
         }
 
         TAssembleBlobInfo(const TString& data)
             : Data(data) {
-
+            AFL_VERIFY(!!Data);
         }
 
         ui32 GetNullRowsCount() const noexcept {
-             return NullRowsCount;
+            return NullRowsCount;
         }
 
         const TString& GetData() const noexcept {
             return Data;
         }
 
+        bool IsBlob() const {
+            return !NullRowsCount && !!Data;
+        }
+
+        bool IsNull() const {
+            return NullRowsCount && !Data;
+        }
+
         std::shared_ptr<arrow::RecordBatch> BuildRecordBatch(const TColumnLoader& loader) const;
     };
 
@@ -437,8 +444,7 @@ public:
 
         TPreparedColumn(std::vector<TAssembleBlobInfo>&& blobs, const std::shared_ptr<TColumnLoader>& loader)
             : Loader(loader)
-            , Blobs(std::move(blobs))
-        {
+            , Blobs(std::move(blobs)) {
             Y_ABORT_UNLESS(Loader);
             Y_ABORT_UNLESS(Loader->GetExpectedSchema()->num_fields() == 1);
         }
@@ -505,8 +511,7 @@ public:
         TPreparedBatchData(std::vector<TPreparedColumn>&& columns, std::shared_ptr<arrow::Schema> schema, const size_t rowsCount)
             : Columns(std::move(columns))
             , Schema(schema)
-            , RowsCount(rowsCount)
-        {
+            , RowsCount(rowsCount) {
         }
 
         std::shared_ptr<arrow::RecordBatch> Assemble(const TAssembleOptions& options = {}) const;
@@ -525,8 +530,7 @@ public:
             : ColumnId(resultLoader->GetColumnId())
             , NumRows(numRows)
             , DataLoader(dataLoader)
-            , ResultLoader(resultLoader)
-        {
+            , ResultLoader(resultLoader) {
             AFL_VERIFY(ResultLoader);
             if (DataLoader) {
                 AFL_VERIFY(ResultLoader->GetColumnId() == DataLoader->GetColumnId());
@@ -598,8 +602,8 @@ public:
     }
 
     std::shared_ptr<arrow::RecordBatch> AssembleInBatch(const ISnapshotSchema& dataSchema,
-                                            const ISnapshotSchema& resultSchema,
-                                            THashMap<TBlobRange, TString>& data) const {
+        const ISnapshotSchema& resultSchema,
+        THashMap<TBlobRange, TString>& data) const {
         auto batch = PrepareForAssemble(dataSchema, resultSchema, data).Assemble();
         Y_ABORT_UNLESS(batch->Validate().ok());
         return batch;

+ 14 - 6
ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp

@@ -12,6 +12,12 @@ TString TColumnsSet::DebugString() const {
 }
 
 NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator-(const TColumnsSet& external) const {
+    if (external.IsEmpty()) {
+        return *this;
+    }
+    if (IsEmpty()) {
+        return external;
+    }
     TColumnsSet result = *this;
     for (auto&& i : external.ColumnIds) {
         result.ColumnIds.erase(i);
@@ -28,6 +34,12 @@ NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator-(const TColumnsS
 }
 
 NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator+(const TColumnsSet& external) const {
+    if (external.IsEmpty()) {
+        return *this;
+    }
+    if (IsEmpty()) {
+        return external;
+    }
     TColumnsSet result = *this;
     result.ColumnIds.insert(external.ColumnIds.begin(), external.ColumnIds.end());
     auto fields = result.Schema->fields();
@@ -42,7 +54,7 @@ NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator+(const TColumnsS
 }
 
 bool TColumnsSet::ColumnsOnly(const std::vector<std::string>& fieldNames) const {
-    if (fieldNames.size() != GetSize()) {
+    if (fieldNames.size() != GetColumnsCount()) {
         return false;
     }
     std::set<std::string> fieldNamesSet;
@@ -64,11 +76,7 @@ void TColumnsSet::Rebuild() {
         ColumnNamesVector.emplace_back(i);
         ColumnNames.emplace(i);
     }
-    if (ColumnIds.size()) {
-        FilteredSchema = std::make_shared<TFilteredSnapshotSchema>(FullReadSchema, ColumnIds);
-    } else {
-        FilteredSchema = FullReadSchema;
-    }
+    FilteredSchema = std::make_shared<TFilteredSnapshotSchema>(FullReadSchema, ColumnIds);
 }
 
 }

+ 8 - 24
ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h

@@ -18,12 +18,19 @@ private:
 
 public:
     TColumnsSet() = default;
+    bool IsEmpty() const {
+        return ColumnIds.empty();
+    }
+
+    bool operator!() const {
+        return IsEmpty();
+    }
 
     const std::vector<TString>& GetColumnNamesVector() const {
         return ColumnNamesVector;
     }
 
-    ui32 GetSize() const {
+    ui32 GetColumnsCount() const {
         return ColumnIds.size();
     }
 
@@ -96,27 +103,4 @@ public:
     TColumnsSet operator-(const TColumnsSet& external) const;
 };
 
-class TFetchingPlan {
-private:
-    YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, FilterStage);
-    YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, FetchingStage);
-    bool CanUseEarlyFilterImmediatelyFlag = false;
-public:
-    TFetchingPlan(const std::shared_ptr<TColumnsSet>& filterStage, const std::shared_ptr<TColumnsSet>& fetchingStage, const bool canUseEarlyFilterImmediately)
-        : FilterStage(filterStage)
-        , FetchingStage(fetchingStage)
-        , CanUseEarlyFilterImmediatelyFlag(canUseEarlyFilterImmediately) {
-
-    }
-
-    TString DebugString() const {
-        return TStringBuilder() << "{filter=" << (FilterStage ? FilterStage->DebugString() : "NO") << ";fetching=" <<
-            (FetchingStage ? FetchingStage->DebugString() : "NO") << ";use_filter=" << CanUseEarlyFilterImmediatelyFlag << "}";
-    }
-
-    bool CanUseEarlyFilterImmediately() const {
-        return CanUseEarlyFilterImmediatelyFlag;
-    }
-};
-
 }

+ 0 - 44
ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp

@@ -1,44 +0,0 @@
-#include "committed_assembler.h"
-#include "plain_read_data.h"
-
-namespace NKikimr::NOlap::NPlainReader {
-
-bool TCommittedAssembler::DoExecute() {
-    ResultBatch = NArrow::DeserializeBatch(BlobData, ReadMetadata->GetBlobSchema(SchemaVersion));
-    Y_ABORT_UNLESS(ResultBatch);
-    ResultBatch = ReadMetadata->GetIndexInfo().AddSpecialColumns(ResultBatch, DataSnapshot);
-    Y_ABORT_UNLESS(ResultBatch);
-    ReadMetadata->GetPKRangesFilter().BuildFilter(ResultBatch).Apply(ResultBatch);
-    auto t = NArrow::TStatusValidator::GetValid(arrow::Table::FromRecordBatches({ResultBatch}));
-    EarlyFilter = ReadMetadata->GetProgram().ApplyEarlyFilter(t, false);
-    ResultBatch = NArrow::ToBatch(t, true);
-    return true;
-}
-
-bool TCommittedAssembler::DoApply(IDataReader& /*owner*/) const {
-    if (Source->GetFetchingPlan().GetFilterStage()->GetSchema()) {
-        Source->InitFilterStageData(nullptr, EarlyFilter, NArrow::ExtractColumns(ResultBatch, Source->GetFetchingPlan().GetFilterStage()->GetSchema(), true), Source);
-    } else {
-        Source->InitFilterStageData(nullptr, EarlyFilter, nullptr, Source);
-    }
-    if (Source->GetFetchingPlan().GetFetchingStage()->GetSchema()) {
-        Source->InitFetchStageData(NArrow::ExtractColumns(ResultBatch, Source->GetFetchingPlan().GetFetchingStage()->GetSchema(), true));
-    } else {
-        Source->InitFetchStageData(nullptr);
-    }
-    return true;
-}
-
-TCommittedAssembler::TCommittedAssembler(const NActors::TActorId& scanActorId, const TString& blobData, const TReadMetadata::TConstPtr& readMetadata, const std::shared_ptr<IDataSource>& source,
-    const TCommittedBlob& cBlob, NColumnShard::TCounterGuard&& taskGuard)
-    : TBase(scanActorId)
-    , BlobData(blobData)
-    , ReadMetadata(readMetadata)
-    , Source(source)
-    , SchemaVersion(cBlob.GetSchemaVersion())
-    , DataSnapshot(cBlob.GetSnapshot())
-    , TaskGuard(std::move(taskGuard))
-{
-}
-
-}

+ 0 - 34
ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h

@@ -1,34 +0,0 @@
-#pragma once
-#include "source.h"
-#include <ydb/core/tx/columnshard/engines/reader/conveyor_task.h>
-#include <ydb/core/tx/columnshard/engines/reader/read_metadata.h>
-#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
-#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
-#include <ydb/core/tx/columnshard/counters/scan.h>
-#include <ydb/core/formats/arrow/arrow_filter.h>
-
-namespace NKikimr::NOlap::NPlainReader {
-class TCommittedAssembler: public NColumnShard::IDataTasksProcessor::ITask, public NColumnShard::TMonitoringObjectsCounter<TCommittedAssembler, true> {
-private:
-    using TBase = NColumnShard::IDataTasksProcessor::ITask;
-    TString BlobData;
-    TReadMetadata::TConstPtr ReadMetadata;
-    const std::shared_ptr<IDataSource> Source;
-    ui64 SchemaVersion;
-    TSnapshot DataSnapshot;
-
-    std::shared_ptr<NArrow::TColumnFilter> EarlyFilter;
-    std::shared_ptr<arrow::RecordBatch> ResultBatch;
-    const NColumnShard::TCounterGuard TaskGuard;
-protected:
-    virtual bool DoExecute() override;
-    virtual bool DoApply(IDataReader& owner) const override;
-public:
-    virtual TString GetTaskClassIdentifier() const override {
-        return "PlainReader::TCommittedAssembler";
-    }
-
-    TCommittedAssembler(const NActors::TActorId& scanActorId, const TString& blobData, const TReadMetadata::TConstPtr& readMetadata, const std::shared_ptr<IDataSource>& source,
-        const TCommittedBlob& cBlob, NColumnShard::TCounterGuard&& taskGuard);
-};
-}

Some files were not shown because too many files changed in this diff