Browse Source

KIKIMR-20179: remove deprecated reader from tests

ivanmorozov 1 year ago
parent
commit
b094390321

+ 6 - 0
.mapping.json

@@ -6300,6 +6300,12 @@
   "ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt":"",
   "ydb/core/tx/columnshard/common/CMakeLists.txt":"",
   "ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt":"",
+  "ydb/core/tx/columnshard/common/tests/CMakeLists.darwin-arm64.txt":"",
+  "ydb/core/tx/columnshard/common/tests/CMakeLists.darwin-x86_64.txt":"",
+  "ydb/core/tx/columnshard/common/tests/CMakeLists.linux-aarch64.txt":"",
+  "ydb/core/tx/columnshard/common/tests/CMakeLists.linux-x86_64.txt":"",
+  "ydb/core/tx/columnshard/common/tests/CMakeLists.txt":"",
+  "ydb/core/tx/columnshard/common/tests/CMakeLists.windows-x86_64.txt":"",
   "ydb/core/tx/columnshard/counters/CMakeLists.darwin-arm64.txt":"",
   "ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt":"",
   "ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt":"",

+ 20 - 0
ydb/core/kqp/compute_actor/kqp_compute_events.h

@@ -14,6 +14,12 @@ struct TEvKqpCompute {
     struct TEvRemoteScanData : public TEventPB<TEvRemoteScanData, NKikimrKqp::TEvRemoteScanData,
         TKqpComputeEvents::EvRemoteScanData> {};
 
+    class IShardScanStats {
+    public:
+        virtual ~IShardScanStats() = default;
+        virtual THashMap<TString, ui64> GetMetrics() const = 0;
+    };
+
     /*
      * Scan communications.
      *
@@ -46,6 +52,19 @@ struct TEvKqpCompute {
         bool Finished = false;
         bool PageFault = false; // page fault was the reason for sending this message
         mutable THolder<TEvRemoteScanData> Remote;
+        std::shared_ptr<IShardScanStats> StatsOnFinished;
+
+        template <class T>
+        const T& GetStatsAs() const {
+            Y_ABORT_UNLESS(!!StatsOnFinished);
+            return VerifyDynamicCast<const T&>(*StatsOnFinished);
+        }
+
+        template <class T>
+        bool CheckStatsIs() const {
+            auto p = dynamic_cast<const T*>(StatsOnFinished.get());
+            return p;
+        }
 
         ui32 GetRowsCount() const {
             if (ArrowBatch) {
@@ -225,6 +244,7 @@ struct TEvKqpCompute {
 
     struct TEvKillScanTablet : public NActors::TEventPB<TEvKillScanTablet, NKikimrKqp::TEvKillScanTablet,
         TKqpComputeEvents::EvKillScanTablet> {};
+
 };
 
 } // namespace NKikimr::NKqp

+ 8 - 0
ydb/core/tx/columnshard/columnshard__index_scan.h

@@ -15,6 +15,10 @@ public:
         : IndexInfo(indexInfo)
     {}
 
+    virtual std::optional<ui32> GetColumnIdOptional(const TString& name) const override {
+        return IndexInfo.GetColumnIdOptional(name);
+    }
+
     TString GetColumnName(ui32 id, bool required) const override {
         return IndexInfo.GetColumnName(id, required);
     }
@@ -91,6 +95,10 @@ public:
         return ReadyResults.size();
     }
 
+    virtual const NOlap::TReadStats& GetStats() const override {
+        return *ReadMetadata->ReadStats;
+    }
+
     virtual TString DebugString(const bool verbose) const override {
         return TStringBuilder()
             << "ready_results:(" << ReadyResults.DebugString() << ");"

+ 28 - 5
ydb/core/tx/columnshard/columnshard__scan.cpp

@@ -97,6 +97,7 @@ public:
         , Stats(NTracing::TTraceClient::GetLocalClient("SHARD", ::ToString(TabletId)/*, "SCAN_TXID:" + ::ToString(TxId)*/))
         , ComputeShardingPolicy(computeShardingPolicy)
     {
+        AFL_VERIFY(ReadMetadataRanges.size() == 1);
         KeyYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetKeyYqlSchema();
     }
 
@@ -396,7 +397,25 @@ private:
         return singleRowWriter.Row;
     }
 
-    bool SendResult(bool pageFault, bool lastBatch){
+    class TScanStatsOwner: public NKqp::TEvKqpCompute::IShardScanStats {
+    private:
+        YDB_READONLY_DEF(NOlap::TReadStats, Stats);
+    public:
+        TScanStatsOwner(const NOlap::TReadStats& stats)
+            : Stats(stats) {
+
+        }
+
+        virtual THashMap<TString, ui64> GetMetrics() const override {
+            THashMap<TString, ui64> result;
+            result["compacted_bytes"] = Stats.CompactedPortionsBytes;
+            result["inserted_bytes"] = Stats.InsertedPortionsBytes;
+            result["committed_bytes"] = Stats.CommittedPortionsBytes;
+            return result;
+        }
+    };
+
+    bool SendResult(bool pageFault, bool lastBatch) {
         if (Finished) {
             return true;
         }
@@ -429,6 +448,7 @@ private:
                 << " bytes: " << Bytes << "/" << BytesSum << " rows: " << Rows << "/" << RowsSum << " page faults: " << Result->PageFaults
                 << " finished: " << Result->Finished << " pageFault: " << Result->PageFault
                 << " stats:" << Stats->ToJson() << ";iterator:" << (ScanIterator ? ScanIterator->DebugString(false) : "NO");
+            Result->StatsOnFinished = std::make_shared<TScanStatsOwner>(ScanIterator->GetStats());
         } else {
             Y_ABORT_UNLESS(ChunksLimiter.Take(Bytes));
             Result->RequestedBytesLimitReached = !ChunksLimiter.HasMore();
@@ -696,6 +716,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& /*ctx*/) {
     bool isIndexStats = read.TableName.EndsWith(NOlap::TIndexInfo::STORE_INDEX_STATS_TABLE) ||
         read.TableName.EndsWith(NOlap::TIndexInfo::TABLE_INDEX_STATS_TABLE);
     read.ColumnIds.assign(record.GetColumnTags().begin(), record.GetColumnTags().end());
+    read.StatsMode = record.GetStatsMode();
 
     const NOlap::TIndexInfo* indexInfo = nullptr;
     if (!isIndexStats) {
@@ -745,6 +766,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& /*ctx*/) {
         ReadMetadataRanges.emplace_back(newRange);
     }
     Y_ABORT_UNLESS(ReadMetadataRanges.size() == 1);
+
     return true;
 }
 
@@ -773,8 +795,7 @@ void TTxScan::Complete(const TActorContext& ctx) {
     const ui32 scanGen = request.GetGeneration();
     TString table = request.GetTablePath();
     auto dataFormat = request.GetDataFormat();
-    TDuration timeout = TDuration::MilliSeconds(request.GetTimeoutMs());
-
+    const TDuration timeout = TDuration::MilliSeconds(request.GetTimeoutMs());
     if (scanGen > 1) {
         Self->IncCounter(COUNTER_SCAN_RESTARTED);
     }
@@ -783,8 +804,6 @@ void TTxScan::Complete(const TActorContext& ctx) {
     if (IS_LOG_PRIORITY_ENABLED(NActors::NLog::PRI_TRACE, NKikimrServices::TX_COLUMNSHARD)) {
         detailedInfo << " read metadata: (" << TContainerPrinter(ReadMetadataRanges) << ")" << " req: " << request;
     }
-    std::vector<NOlap::TReadMetadata::TConstPtr> rMetadataRanges;
-
     if (ReadMetadataRanges.empty()) {
         LOG_S_DEBUG("TTxScan failed "
                 << " txId: " << txId
@@ -860,6 +879,10 @@ void TColumnShard::Handle(TEvColumnShard::TEvScan::TPtr& ev, const TActorContext
     Execute(new TTxScan(this, ev), ctx);
 }
 
+const NKikimr::NOlap::TReadStats& TScanIteratorBase::GetStats() const {
+    return Default<NOlap::TReadStats>();
+}
+
 }
 
 namespace NKikimr::NOlap {

+ 4 - 0
ydb/core/tx/columnshard/columnshard__scan.h

@@ -9,6 +9,7 @@
 #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
 
 namespace NKikimr::NOlap {
+struct TReadStats;
 // Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation
 class TPartialReadResult {
 private:
@@ -94,6 +95,9 @@ public:
     virtual void Apply(IDataTasksProcessor::ITask::TPtr /*processor*/) {
 
     }
+
+    virtual const NOlap::TReadStats& GetStats() const;
+
     virtual std::optional<ui32> GetAvailableResultsCount() const {
         return {};
     }

+ 9 - 0
ydb/core/tx/columnshard/columnshard__stats_scan.h

@@ -28,6 +28,15 @@ public:
         return it->second.Name;
     }
 
+    std::optional<ui32> GetColumnIdOptional(const TString& name) const override {
+        auto it = PrimaryIndexStatsSchema.ColumnNames.find(name);
+        if (it == PrimaryIndexStatsSchema.ColumnNames.end()) {
+            return {};
+        } else {
+            return it->second;
+        }
+    }
+
     const NTable::TScheme::TTableSchema& GetSchema() const override {
         return PrimaryIndexStatsSchema;
     }

+ 10 - 20
ydb/core/tx/columnshard/columnshard_ut_common.cpp

@@ -1,6 +1,7 @@
 #include "columnshard_ut_common.h"
 
 #include "columnshard__stats_scan.h"
+#include "common/tests/shard_reader.h"
 
 #include <ydb/core/base/tablet.h>
 #include <ydb/core/base/tablet_resolver.h>
@@ -449,26 +450,15 @@ namespace NKikimr::NColumnShard {
     }
 
     std::shared_ptr<arrow::RecordBatch> ReadAllAsBatch(TTestBasicRuntime& runtime, const ui64 tableId, const NOlap::TSnapshot& snapshot, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema) {
-        using namespace NTxUT;
-        TActorId sender = runtime.AllocateEdgeActor();
-
-        ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender,
-                new TEvColumnShard::TEvRead(sender, TTestTxConfig::TxTablet1, snapshot.GetPlanStep(), snapshot.GetTxId(), tableId));
-
-        std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
-        while(true) {
-            TAutoPtr<IEventHandle> handle;
-            auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
-            UNIT_ASSERT(event);
-            auto b = event->GetArrowBatch();
-            if (b) {
-                batches.push_back(b);
-            }
-            if (!event->HasMore()) {
-                break;
-            }
+        std::vector<TString> fields;
+        for (auto&& f : schema) {
+            fields.emplace_back(f.first);
         }
-        auto res = NArrow::CombineBatches(batches);
-        return res ? res : NArrow::MakeEmptyBatch(NArrow::MakeArrowSchema(schema));
+
+        NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, snapshot);
+        reader.SetReplyColumns(fields);
+        auto rb = reader.ReadAll();
+        UNIT_ASSERT(reader.IsCorrectlyFinished());
+        return rb ? rb : NArrow::MakeEmptyBatch(NArrow::MakeArrowSchema(schema));
     }
 }

+ 1 - 0
ydb/core/tx/columnshard/common/CMakeLists.darwin-arm64.txt

@@ -6,6 +6,7 @@
 # original buildsystem will not be accepted.
 
 
+add_subdirectory(tests)
 get_built_tool_path(
   TOOL_enum_parser_bin
   TOOL_enum_parser_dependency

+ 1 - 0
ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt

@@ -6,6 +6,7 @@
 # original buildsystem will not be accepted.
 
 
+add_subdirectory(tests)
 get_built_tool_path(
   TOOL_enum_parser_bin
   TOOL_enum_parser_dependency

+ 1 - 0
ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt

@@ -6,6 +6,7 @@
 # original buildsystem will not be accepted.
 
 
+add_subdirectory(tests)
 get_built_tool_path(
   TOOL_enum_parser_bin
   TOOL_enum_parser_dependency

Some files were not shown because too many files changed in this diff