Просмотр исходного кода

KIKIMR-20025: fix stats collecting for stream lookup join

fix(kqp): collect stats
ulya-sidorina 1 год назад
Родитель
Сommit
750bbdd3ed

+ 10 - 9
ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp

@@ -28,7 +28,8 @@ public:
     TKqpStreamLookupActor(ui64 inputIndex, NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input,
         const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv,
         const NMiniKQL::THolderFactory& holderFactory, std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc,
-        NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters> counters)
+        const NYql::NDqProto::TTaskInput& inputDesc, NKikimrKqp::TKqpStreamLookupSettings&& settings,
+        TIntrusivePtr<TKqpCounters> counters)
         : LogPrefix(TStringBuilder() << "StreamLookupActor, inputIndex: " << inputIndex << ", CA Id " << computeActorId)
         , InputIndex(inputIndex)
         , Input(input)
@@ -38,7 +39,7 @@ public:
         , Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId())
         , LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
         , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
-        , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), typeEnv, holderFactory))
+        , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), typeEnv, holderFactory, inputDesc))
         , Counters(counters)
     {
         IngressStats.Level = statsLevel;
@@ -179,8 +180,8 @@ private:
         YQL_ENSURE(!batch.IsWide(), "Wide stream is not supported");
 
         auto replyResultStats = StreamLookupWorker->ReplyResult(batch, freeSpace);
-        ReadRowsCount += replyResultStats.RowsCount;
-        ReadBytesCount += replyResultStats.BytesCount;
+        ReadRowsCount += replyResultStats.ReadRowsCount;
+        ReadBytesCount += replyResultStats.ReadBytesCount;
 
         auto status = FetchInputRows();
 
@@ -192,10 +193,10 @@ private:
             && AllReadsFinished()
             && StreamLookupWorker->AllRowsProcessed();
 
-        CA_LOG_D("Returned " << replyResultStats.BytesCount << " bytes, " << replyResultStats.RowsCount
+        CA_LOG_D("Returned " << replyResultStats.ResultBytesCount << " bytes, " << replyResultStats.ResultRowsCount
             << " rows, finished: " << finished);
 
-        return replyResultStats.BytesCount;
+        return replyResultStats.ResultBytesCount;
     }
 
     TMaybe<google::protobuf::Any> ExtraData() override {
@@ -501,10 +502,10 @@ private:
 std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex,
     NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId,
     const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
-    std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, NKikimrKqp::TKqpStreamLookupSettings&& settings,
-    TIntrusivePtr<TKqpCounters> counters) {
+    std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, const NYql::NDqProto::TTaskInput& inputDesc,
+    NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters> counters) {
     auto actor = new TKqpStreamLookupActor(inputIndex, statsLevel, input, computeActorId, typeEnv, holderFactory,
-        alloc, std::move(settings), counters);
+        alloc, inputDesc, std::move(settings), counters);
     return {actor, actor};
 }
 

+ 2 - 2
ydb/core/kqp/runtime/kqp_stream_lookup_actor.h

@@ -10,8 +10,8 @@ namespace NKqp {
 std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex,
     NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId,
     const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
-    std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, NKikimrKqp::TKqpStreamLookupSettings&& settings,
-    TIntrusivePtr<TKqpCounters>);
+    std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, const NYql::NDqProto::TTaskInput& inputDesc,
+    NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters>);
 
 } // namespace NKqp
 } // namespace NKikimr

+ 1 - 1
ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp

@@ -8,7 +8,7 @@ void RegisterStreamLookupActorFactory(NYql::NDq::TDqAsyncIoFactory& factory, TIn
     factory.RegisterInputTransform<NKikimrKqp::TKqpStreamLookupSettings>("StreamLookupInputTransformer", [counters](NKikimrKqp::TKqpStreamLookupSettings&& settings,
         NYql::NDq::TDqAsyncIoFactory::TInputTransformArguments&& args) {
             return CreateStreamLookupActor(args.InputIndex, args.StatsLevel, args.TransformInput, args.ComputeActorId, args.TypeEnv,
-                args.HolderFactory, args.Alloc, std::move(settings), counters);
+                args.HolderFactory, args.Alloc, args.InputDesc, std::move(settings), counters);
     });
 }
 

+ 79 - 24
ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp

@@ -4,6 +4,8 @@
 #include <ydb/core/kqp/runtime/kqp_scan_data.h>
 #include <ydb/core/tx/datashard/range_ops.h>
 
+#include <ydb/library/yql/minikql/mkql_node_serialization.h>
+
 namespace NKikimr {
 namespace NKqp {
 
@@ -51,6 +53,20 @@ std::vector<std::pair<ui64, TOwnedTableRange>> GetRangePartitioning(const TKqpSt
     return rangePartition;
 }
 
+NScheme::TTypeInfo UnpackTypeInfo(NKikimr::NMiniKQL::TType* type) {
+    YQL_ENSURE(type);
+
+    if (type->GetKind() == NMiniKQL::TType::EKind::Pg) {
+        auto pgType = static_cast<NMiniKQL::TPgType*>(type);
+        auto pgTypeId = pgType->GetTypeId();
+        return NScheme::TTypeInfo(NScheme::NTypeIds::Pg, NPg::TypeDescFromPgTypeId(pgTypeId));
+    } else {
+        bool isOptional = false;
+        auto dataType = NMiniKQL::UnpackOptionalData(type, isOptional);
+        return NScheme::TTypeInfo(dataType->GetSchemeType());
+    }
+}
+
 struct THashableKey {
     TConstArrayRef<TCell> Cells;
 
@@ -118,9 +134,11 @@ struct TKeyEq {
 }  // !namespace
 
 TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,
-    const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory)
+    const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
+    const NYql::NDqProto::TTaskInput& inputDesc)
     : TypeEnv(typeEnv)
     , HolderFactory(holderFactory)
+    , InputDesc(inputDesc)
     , TablePath(settings.GetTable().GetPath())
     , TableId(MakeTableId(settings.GetTable())) {
 
@@ -180,7 +198,8 @@ std::vector<NScheme::TTypeInfo> TKqpStreamLookupWorker::GetKeyColumnTypes() cons
 class TKqpLookupRows : public TKqpStreamLookupWorker {
 public:
     TKqpLookupRows(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv,
-        const NMiniKQL::THolderFactory& holderFactory) : TKqpStreamLookupWorker(std::move(settings), typeEnv, holderFactory) {
+        const NMiniKQL::THolderFactory& holderFactory, const NYql::NDqProto::TTaskInput& inputDesc)
+        : TKqpStreamLookupWorker(std::move(settings), typeEnv, holderFactory, inputDesc) {
     }
 
     virtual ~TKqpLookupRows() {}
@@ -289,7 +308,7 @@ public:
                     }
                 }
 
-                if (rowSize > freeSpace - (i64)resultStats.BytesCount) {
+                if (rowSize > freeSpace - (i64)resultStats.ResultBytesCount) {
                     row.DeleteUnreferenced();
                     sizeLimitExceeded = true;
                     break;
@@ -297,8 +316,10 @@ public:
 
                 batch.push_back(std::move(row));
 
-                resultStats.RowsCount += 1;
-                resultStats.BytesCount += rowSize;
+                resultStats.ReadRowsCount += 1;
+                resultStats.ReadBytesCount += rowSize;
+                resultStats.ResultRowsCount += 1;
+                resultStats.ResultBytesCount += rowSize;
             }
 
             if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) {
@@ -389,7 +410,8 @@ private:
 class TKqpJoinRows : public TKqpStreamLookupWorker {
 public:
     TKqpJoinRows(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv,
-        const NMiniKQL::THolderFactory& holderFactory) : TKqpStreamLookupWorker(std::move(settings), typeEnv, holderFactory) {
+        const NMiniKQL::THolderFactory& holderFactory, const NYql::NDqProto::TTaskInput& inputDesc)
+        : TKqpStreamLookupWorker(std::move(settings), typeEnv, holderFactory, inputDesc) {
 
         // read columns should contain join key and result columns
         for (auto joinKey : LookupKeyColumns) {
@@ -574,9 +596,9 @@ public:
                 auto leftRowIt = PendingLeftRowsByKey.find(joinKeyCells);
                 YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end());
 
-                i64 resultRowSize = 0;
-                i64 availableSpace = freeSpace - (i64)resultStats.BytesCount;
-                auto resultRow = TryBuildResultRow(leftRowIt->second, row, resultRowSize, availableSpace, result.ShardId);
+                TReadResultStats rowStats;
+                i64 availableSpace = freeSpace - (i64)resultStats.ResultBytesCount;
+                auto resultRow = TryBuildResultRow(leftRowIt->second, row, rowStats, availableSpace, result.ShardId);
 
                 if (!resultRow.HasValue()) {
                     sizeLimitExceeded = true;
@@ -584,9 +606,7 @@ public:
                 }
 
                 batch.push_back(std::move(resultRow));
-
-                resultStats.RowsCount += 1;
-                resultStats.BytesCount += resultRowSize;
+                resultStats.Add(rowStats);
             }
 
             if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) {
@@ -620,15 +640,16 @@ public:
                     && !leftRowIt->second.RightRowExist;
 
                 if (leftRowCanBeSent) {
-                    i64 resultRowSize = 0;
-                    i64 availableSpace = freeSpace - (i64) resultStats.BytesCount;
-                    auto resultRow = TryBuildResultRow(leftRowIt->second, {}, resultRowSize, availableSpace);
+                    TReadResultStats rowStats;
+                    i64 availableSpace = freeSpace - (i64)resultStats.ResultBytesCount;
+                    auto resultRow = TryBuildResultRow(leftRowIt->second, {}, rowStats, availableSpace);
 
                     if (!resultRow.HasValue()) {
                         break;
                     }
 
                     batch.push_back(std::move(resultRow));
+                    resultStats.Add(rowStats);
                     PendingLeftRowsByKey.erase(leftRowIt++);
                 } else {
                     ++leftRowIt;
@@ -697,17 +718,44 @@ private:
         return range.From.subspan(0, LookupKeyColumns.size());
     }
 
-    NUdf::TUnboxedValue TryBuildResultRow(TLeftRowInfo& leftRowInfo, TConstArrayRef<TCell> rightRow, i64& resultRowSize,
-        i64 freeSpace, TMaybe<ui64> shardId = {}) {
+    NMiniKQL::TStructType* GetLeftRowType() const {
+        YQL_ENSURE(InputDesc.HasTransform());
+
+        auto outputTypeNode = NMiniKQL::DeserializeNode(TStringBuf{InputDesc.GetTransform().GetOutputType()}, TypeEnv);
+        YQL_ENSURE(outputTypeNode, "Failed to deserialize stream lookup transform output type");
+
+        auto outputType = static_cast<NMiniKQL::TType*>(outputTypeNode);
+        YQL_ENSURE(outputType->GetKind() == NMiniKQL::TType::EKind::Tuple, "Unexpected stream lookup output type");
+
+        const auto outputTupleType = AS_TYPE(NMiniKQL::TTupleType, outputType);
+        YQL_ENSURE(outputTupleType->GetElementsCount() == 2);
+
+        const auto outputLeftRowType = outputTupleType->GetElementType(0);
+        YQL_ENSURE(outputLeftRowType->GetKind() == NMiniKQL::TType::EKind::Struct);
+
+        return AS_TYPE(NMiniKQL::TStructType, outputLeftRowType);
+    }
+
+    NUdf::TUnboxedValue TryBuildResultRow(TLeftRowInfo& leftRowInfo, TConstArrayRef<TCell> rightRow,
+         TReadResultStats& rowStats, i64 freeSpace, TMaybe<ui64> shardId = {}) {
 
         NUdf::TUnboxedValue* resultRowItems = nullptr;
         auto resultRow = HolderFactory.CreateDirectArrayHolder(2, resultRowItems);
 
+        ui64 leftRowSize = 0;
+        ui64 rightRowSize = 0;
+
         resultRowItems[0] = leftRowInfo.Row;
+        auto leftRowType = GetLeftRowType();
+        YQL_ENSURE(leftRowType);
+
+        for (size_t i = 0; i < leftRowType->GetMembersCount(); ++i) {
+            auto columnTypeInfo = UnpackTypeInfo(leftRowType->GetMemberType(i));
+            leftRowSize += NMiniKQL::GetUnboxedValueSize(leftRowInfo.Row.GetElement(i), columnTypeInfo).AllocatedBytes;
+        }
 
         if (!rightRow.empty()) {
             leftRowInfo.RightRowExist = true;
-            // TODO: get size for left row
 
             NUdf::TUnboxedValue* rightRowItems = nullptr;
             resultRowItems[1] = HolderFactory.CreateDirectArrayHolder(Columns.size(), rightRowItems);
@@ -720,19 +768,25 @@ private:
                 if (IsSystemColumn(column.Name)) {
                     YQL_ENSURE(shardId);
                     NMiniKQL::FillSystemColumn(rightRowItems[colIndex], *shardId, column.Id, column.PType);
-                    resultRowSize += sizeof(NUdf::TUnboxedValue);
+                    rightRowSize += sizeof(NUdf::TUnboxedValue);
                 } else {
                     rightRowItems[colIndex] = NMiniKQL::GetCellValue(rightRow[std::distance(ReadColumns.begin(), it)],
                         column.PType);
-                    resultRowSize += NMiniKQL::GetUnboxedValueSize(rightRowItems[colIndex], column.PType).AllocatedBytes;
+                    rightRowSize += NMiniKQL::GetUnboxedValueSize(rightRowItems[colIndex], column.PType).AllocatedBytes;
                 }
             }
         } else {
             resultRowItems[1] = NUdf::TUnboxedValuePod();
         }
 
-        if (resultRowSize > freeSpace) {
+        rowStats.ReadRowsCount += (rightRowSize > 0 ? 1 : 0);
+        rowStats.ReadBytesCount += rightRowSize;
+        rowStats.ResultRowsCount += 1;
+        rowStats.ResultBytesCount += leftRowSize + rightRowSize;
+
+        if (rowStats.ResultBytesCount > (ui64)freeSpace) {
             resultRow.DeleteUnreferenced();
+            rowStats.Clear();
         }
 
         return resultRow;
@@ -748,13 +802,14 @@ private:
 };
 
 std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,
-    const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory) {
+    const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
+    const NYql::NDqProto::TTaskInput& inputDesc) {
 
     switch (settings.GetLookupStrategy()) {
         case NKqpProto::EStreamLookupStrategy::LOOKUP:
-            return std::make_unique<TKqpLookupRows>(std::move(settings), typeEnv, holderFactory);
+            return std::make_unique<TKqpLookupRows>(std::move(settings), typeEnv, holderFactory, inputDesc);
         case NKqpProto::EStreamLookupStrategy::JOIN:
-            return std::make_unique<TKqpJoinRows>(std::move(settings), typeEnv, holderFactory);
+            return std::make_unique<TKqpJoinRows>(std::move(settings), typeEnv, holderFactory, inputDesc);
         default:
             return {};
     }

+ 23 - 5
ydb/core/kqp/runtime/kqp_stream_lookup_worker.h

@@ -22,13 +22,29 @@ public:
     };
 
     struct TReadResultStats {
-        ui64 RowsCount = 0;
-        ui64 BytesCount = 0;
+        ui64 ReadRowsCount = 0;
+        ui64 ReadBytesCount = 0;
+        ui64 ResultRowsCount = 0;
+        ui64 ResultBytesCount = 0;
+
+        void Add(const TReadResultStats& other) {
+            ReadRowsCount += other.ReadRowsCount;
+            ReadBytesCount += other.ReadBytesCount;
+            ResultRowsCount += other.ResultRowsCount;
+            ResultBytesCount += other.ResultBytesCount;
+        }
+
+        void Clear() {
+            ReadRowsCount = 0;
+            ReadBytesCount = 0;
+            ResultRowsCount = 0;
+            ResultBytesCount = 0;
+        }
     };
 
 public:
-    TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,
-        const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory);
+    TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv,
+        const NMiniKQL::THolderFactory& holderFactory, const NYql::NDqProto::TTaskInput& inputDesc);
 
     virtual ~TKqpStreamLookupWorker();
 
@@ -46,6 +62,7 @@ public:
 protected:
     const NMiniKQL::TTypeEnvironment& TypeEnv;
     const NMiniKQL::THolderFactory& HolderFactory;
+    const NYql::NDqProto::TTaskInput& InputDesc;
     const TString TablePath;
     const TTableId TableId;
     std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns;
@@ -54,7 +71,8 @@ protected:
 };
 
 std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,
-    const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory);
+    const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
+    const NYql::NDqProto::TTaskInput& inputDesc);
 
 } // namespace NKqp
 } // namespace NKikimr

+ 50 - 0
ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp

@@ -275,6 +275,56 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
         UNIT_ASSERT_VALUES_EQUAL(totalTasks, EnableSourceRead ? 2 : 3);
     }
 
+    Y_UNIT_TEST_TWIN(IndexLookupJoin, EnableStreamLookup) {
+        NKikimrConfig::TAppConfig appConfig;
+        appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(EnableStreamLookup);
+        auto settings = TKikimrSettings()
+            .SetAppConfig(appConfig);
+        TKikimrRunner kikimr{settings};
+
+        auto db = kikimr.GetTableClient();
+        auto session = db.CreateSession().GetValueSync().GetSession();
+
+        NYdb::NTable::TExecDataQuerySettings execSettings;
+        execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
+
+        auto result = session.ExecuteDataQuery(Q1_(R"(
+            SELECT l.Key, r.Key1, r.Key2
+            FROM `/Root/Join1` AS l
+            INNER JOIN `/Root/Join2` AS r
+               ON l.Fk21 = r.Key1
+            ORDER BY l.Key, r.Key1, r.Key2;
+        )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync();
+        UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+        CompareYson(R"([
+            [[1];[101u];["One"]];
+            [[1];[101u];["Three"]];
+            [[1];[101u];["Two"]];
+            [[2];[102u];["One"]];
+            [[3];[103u];["One"]];
+            [[4];[104u];["One"]];
+            [[5];[105u];["One"]];
+            [[5];[105u];["Two"]];
+            [[6];[106u];["One"]];
+            [[8];[108u];["One"]];
+            [[9];[101u];["One"]];
+            [[9];[101u];["Three"]];
+            [[9];[101u];["Two"]]
+        ])", FormatResultSetYson(result.GetResultSet(0)));
+
+        auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
+
+        AssertTableStats(result, "/Root/Join1", {
+            .ExpectedReads = 9,
+        });
+
+        AssertTableStats(result, "/Root/Join2", {
+            .ExpectedReads = EnableStreamLookup ? 13 : 10,
+        });
+
+        UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), EnableStreamLookup ? 1 : 3);
+    }
+
     Y_UNIT_TEST(Upsert) {
         auto kikimr = DefaultKikimrRunner();
         auto db = kikimr.GetTableClient();