Browse Source

delete table scheme resolving from stream lookup actor

refactor(kqp): delete table scheme resolving from stream lookup actor
ulya-sidorina 2 years ago
parent
commit
5944530524

+ 16 - 2
ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp

@@ -214,16 +214,30 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf
     settings.MutableTable()->CopyFrom(streamLookup.GetTable());
 
     auto table = tableKeys.GetTable(MakeTableId(streamLookup.GetTable()));
+    for (const auto& keyColumn : table.KeyColumns) {
+        auto columnIt = table.Columns.find(keyColumn);
+        YQL_ENSURE(columnIt != table.Columns.end(), "Unknown column: " << keyColumn);
+
+        auto* keyColumnProto = settings.AddKeyColumns();
+        keyColumnProto->SetName(keyColumn);
+        keyColumnProto->SetId(columnIt->second.Id);
+        keyColumnProto->SetTypeId(columnIt->second.Type.GetTypeId());
+    }
+
     for (const auto& keyColumn : streamLookup.GetKeyColumns()) {
         auto columnIt = table.Columns.find(keyColumn);
         YQL_ENSURE(columnIt != table.Columns.end(), "Unknown column: " << keyColumn);
-        settings.AddKeyColumns(keyColumn);
+        settings.AddLookupKeyColumns(keyColumn);
     }
 
     for (const auto& column : streamLookup.GetColumns()) {
         auto columnIt = table.Columns.find(column);
         YQL_ENSURE(columnIt != table.Columns.end(), "Unknown column: " << column);
-        settings.AddColumns(column);
+
+        auto* columnProto = settings.AddColumns();
+        columnProto->SetName(column);
+        columnProto->SetId(columnIt->second.Id);
+        columnProto->SetTypeId(columnIt->second.Type.GetTypeId());
     }
 
     TTransform streamLookupTransform;

+ 9 - 5
ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

@@ -942,17 +942,21 @@ private:
                 streamLookupProto.AddKeyColumns(TString(keyColumn->GetName()));
             }
 
-            for (const auto& column : streamLookup.Columns()) {
-                YQL_ENSURE(tableMeta->Columns.FindPtr(column), "Unknown column: " << TString(column));
-                streamLookupProto.AddColumns(TString(column));
-            }
-
             const auto resultType = streamLookup.Ref().GetTypeAnn();
             YQL_ENSURE(resultType, "Empty stream lookup result type");
             YQL_ENSURE(resultType->GetKind() == ETypeAnnotationKind::Stream, "Unexpected stream lookup result type");
             const auto resultItemType = resultType->Cast<TStreamExprType>()->GetItemType();
             streamLookupProto.SetResultType(NMiniKQL::SerializeNode(CompileType(pgmBuilder, *resultItemType), TypeEnv));
 
+            YQL_ENSURE(resultItemType->GetKind() == ETypeAnnotationKind::Struct);
+            const auto& resultColumns = resultItemType->Cast<TStructExprType>()->GetItems();
+            for (const auto column : resultColumns) {
+                const auto& systemColumns = GetSystemColumns();
+                YQL_ENSURE(tableMeta->Columns.FindPtr(column->GetName()) || systemColumns.find(column->GetName()) != systemColumns.end(),
+                    "Unknown column: " << column->GetName());
+                streamLookupProto.AddColumns(TString(column->GetName()));
+            }
+
             return;
         }
 

+ 163 - 221
ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp

@@ -11,6 +11,8 @@
 #include <ydb/core/tx/scheme_cache/scheme_cache.h>
 #include <ydb/core/tx/datashard/datashard.h>
 #include <ydb/core/kqp/common/kqp_event_ids.h>
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
+#include <ydb/core/kqp/runtime/kqp_scan_data.h>
 
 namespace NKikimr {
 namespace NKqp {
@@ -18,7 +20,7 @@ namespace NKqp {
 namespace {
 
 static constexpr TDuration SCHEME_CACHE_REQUEST_TIMEOUT = TDuration::Seconds(5);
-static constexpr TDuration RETRY_READ_TIMEOUT = TDuration::Seconds(10);
+static constexpr ui64 MAX_SHARD_RETRIES = 10;
 
 class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLookupActor>, public NYql::NDq::IDqComputeActorAsyncInput {
 public:
@@ -30,10 +32,37 @@ public:
         , Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId())
         , LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
         , ImmediateTx(settings.GetImmediateTx())
-        , KeyPrefixColumns(settings.GetKeyColumns().begin(), settings.GetKeyColumns().end())
-        , Columns(settings.GetColumns().begin(), settings.GetColumns().end())
-        , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
-        , RetryReadTimeout(RETRY_READ_TIMEOUT) {
+        , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT) {
+
+        KeyColumns.reserve(settings.GetKeyColumns().size());
+        i32 keyOrder = 0;
+        for (const auto& keyColumn : settings.GetKeyColumns()) {
+            KeyColumns.emplace(
+                keyColumn.GetName(),
+                TSysTables::TTableColumnInfo{
+                    keyColumn.GetName(),
+                    keyColumn.GetId(),
+                    NScheme::TTypeInfo{static_cast<NScheme::TTypeId>(keyColumn.GetTypeId())},
+                    keyOrder++
+                }
+            );
+        }
+
+        LookupKeyColumns.reserve(KeyColumns.size());
+        for (const auto& lookupKeyColumn : settings.GetLookupKeyColumns()) {
+            auto columnIt = KeyColumns.find(lookupKeyColumn);
+            YQL_ENSURE(columnIt != KeyColumns.end());
+            LookupKeyColumns.push_back(&columnIt->second);
+        }
+
+        Columns.reserve(settings.GetColumns().size());
+        for (const auto& column : settings.GetColumns()) {
+            Columns.emplace_back(TSysTables::TTableColumnInfo{
+                column.GetName(),
+                column.GetId(),
+                NScheme::TTypeInfo{static_cast<NScheme::TTypeId>(column.GetTypeId())}
+            });
+        }
     };
 
     virtual ~TKqpStreamLookupActor() {
@@ -44,8 +73,7 @@ public:
     }
 
     void Bootstrap() {
-        ResolveTable();
-
+        ResolveTableShards();
         Become(&TKqpStreamLookupActor::StateFunc);
     }
 
@@ -73,17 +101,11 @@ private:
             : Id(id)
             , ShardId(shardId)
             , Keys(std::move(keys))
-            , State(EReadState::Initial)
-            , Retried(false) {}
+            , State(EReadState::Initial) {}
 
-        void SetFinished(const NActors::TActorContext& ctx) {
+        void SetFinished() {
             Keys.clear();
             State = EReadState::Finished;
-
-            if (RetryDeadlineTimerId) {
-                ctx.Send(RetryDeadlineTimerId, new TEvents::TEvPoisonPill());
-                RetryDeadlineTimerId = {};
-            }
         }
 
         bool Finished() const {
@@ -94,13 +116,16 @@ private:
         const ui64 ShardId;
         std::vector<TOwnedTableRange> Keys;
         EReadState State;
-        TActorId RetryDeadlineTimerId;
-        bool Retried;
     };
 
-    enum EEvSchemeCacheRequestTag : ui64 {
-        TableSchemeResolving,
-        TableShardsResolving
+    struct TShardState {
+        ui64 RetryAttempts = 0;
+        std::vector<TReadState*> Reads;
+    };
+
+    struct TResult {
+        const ui64 ShardId;
+        THolder<TEventHandle<TEvDataShard::TEvReadResult>> ReadResult;
     };
 
     struct TEvPrivate {
@@ -110,39 +135,9 @@ private:
         };
 
         struct TEvSchemeCacheRequestTimeout : public TEventLocal<TEvSchemeCacheRequestTimeout, EvSchemeCacheRequestTimeout> {
-            TEvSchemeCacheRequestTimeout(EEvSchemeCacheRequestTag tag) : Tag(tag) {}
-
-            const EEvSchemeCacheRequestTag Tag;
-        };
-
-        struct TEvRetryReadTimeout : public TEventLocal<TEvRetryReadTimeout, EvRetryReadTimeout> {
-            TEvRetryReadTimeout(ui64 readId) : ReadId(readId) {}
-
-            const ui64 ReadId;
         };
     };
 
-    struct TTableScheme {
-        TTableScheme(const THashMap<ui32, TSysTables::TTableColumnInfo>& columns) {
-            std::map<ui32, NScheme::TTypeInfo> keyColumnTypesByKeyOrder;
-            for (const auto& [_, column] : columns) {
-                if (column.KeyOrder >= 0) {
-                    keyColumnTypesByKeyOrder[column.KeyOrder] = column.PType;
-                }
-
-                ColumnsByName.emplace(column.Name, std::move(column));
-            }
-
-            KeyColumnTypes.resize(keyColumnTypesByKeyOrder.size());
-            for (const auto& [keyOrder, keyColumnType] : keyColumnTypesByKeyOrder) {
-                KeyColumnTypes[keyOrder] = keyColumnType;
-            }
-        }
-
-        std::unordered_map<TString, TSysTables::TTableColumnInfo> ColumnsByName;
-        std::vector<NScheme::TTypeInfo> KeyColumnTypes;
-    };
-
 private:
     void SaveState(const NYql::NDqProto::TCheckpoint&, NYql::NDqProto::TSourceState&) final {}
     void LoadState(const NYql::NDqProto::TSourceState&) final {}
@@ -165,22 +160,18 @@ private:
     i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final {
         i64 totalDataSize = 0;
 
-        if (TableScheme) {
-            totalDataSize = PackResults(batch, freeSpace);
-            auto status = FetchLookupKeys();
-
-            if (Partitioning) {
-                ProcessLookupKeys();
-            }
+        totalDataSize = PackResults(batch, freeSpace);
+        auto status = FetchLookupKeys();
 
-            finished = (status == NUdf::EFetchStatus::Finish)
-                && UnprocessedKeys.empty()
-                && AllReadsFinished()
-                && Results.empty();
-        } else {
-            Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
+        if (Partitioning) {
+            ProcessLookupKeys();
         }
 
+        finished = (status == NUdf::EFetchStatus::Finish)
+            && UnprocessedKeys.empty()
+            && AllReadsFinished()
+            && Results.empty();
+
         return totalDataSize;
     }
 
@@ -205,11 +196,9 @@ private:
         try {
             switch (ev->GetTypeRewrite()) {
                 hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, Handle);
-                hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
                 hFunc(TEvDataShard::TEvReadResult, Handle);
                 hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
                 hFunc(TEvPrivate::TEvSchemeCacheRequestTimeout, Handle);
-                hFunc(TEvPrivate::TEvRetryReadTimeout, Handle);
                 IgnoreFunc(TEvTxProxySchemeCache::TEvInvalidateTableResult);
                 default:
                     RuntimeError(TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite(),
@@ -233,20 +222,6 @@ private:
         ProcessLookupKeys();
     }
 
-    void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
-        auto& resultSet = ev->Get()->Request->ResultSet;
-        YQL_ENSURE(resultSet.size() == 1, "Expected one result for table: " << TableId);
-        auto& result = resultSet[0];
-
-        if (result.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
-            return RuntimeError(TStringBuilder() << "Failed to resolve table: " << ToString(result.Status),
-                NYql::NDqProto::StatusIds::SCHEME_ERROR);
-        }
-
-        TableScheme = std::make_unique<TTableScheme>(result.Columns);
-        ResolveTableShards();
-    }
-
     void Handle(TEvDataShard::TEvReadResult::TPtr& ev) {
         const auto& record = ev->Get()->Record;
 
@@ -271,26 +246,26 @@ private:
             Snapshot = IKqpGateway::TKqpSnapshot(record.GetSnapshot().GetStep(), record.GetSnapshot().GetTxId());
         }
 
-        // TODO: refactor after KIKIMR-15102
-        if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) {
-            NKikimrTxDataShard::TReadContinuationToken continuationToken;
-            bool parseResult = continuationToken.ParseFromString(record.GetContinuationToken());
-            YQL_ENSURE(parseResult, "Failed to parse continuation token");
-            YQL_ENSURE(continuationToken.GetFirstUnprocessedQuery() <= read.Keys.size());
-
-            return RetryTableRead(read, continuationToken);
-        }
-
-        YQL_ENSURE(record.GetResultFormat() == NKikimrTxDataShard::EScanDataFormat::CELLVEC);
-        auto nrows = ev->Get()->GetRowsCount();
-        for (ui64 rowId = 0; rowId < nrows; ++rowId) {
-            Results.emplace_back(ev->Get()->GetCells(rowId));
+        switch (record.GetStatus().GetCode()) {
+            case Ydb::StatusIds::SUCCESS:
+                break;
+            case Ydb::StatusIds::OVERLOADED:
+            case Ydb::StatusIds::INTERNAL_ERROR: {
+                NKikimrTxDataShard::TReadContinuationToken continuationToken;
+                bool parseResult = continuationToken.ParseFromString(record.GetContinuationToken());
+                YQL_ENSURE(parseResult, "Failed to parse continuation token");
+                YQL_ENSURE(continuationToken.GetFirstUnprocessedQuery() <= read.Keys.size());
+                return RetryTableRead(read, continuationToken);
+            }
+            default: {
+                NYql::TIssues issues;
+                NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues);
+                return RuntimeError("Read request aborted", NYql::NDqProto::StatusIds::ABORTED, issues);
+            }
         }
 
-        Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
-
         if (record.GetFinished()) {
-            read.SetFinished(TlsActivationContext->AsActorContext());
+            read.SetFinished();
         } else {
             THolder<TEvDataShard::TEvReadAck> request(new TEvDataShard::TEvReadAck());
             request->Record.SetReadId(record.GetReadId());
@@ -298,6 +273,9 @@ private:
             Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), read.ShardId, true),
                 IEventHandle::FlagTrackDelivery);
         }
+
+        Results.emplace_back(TResult{read.ShardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())});
+        Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
     }
 
     void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
@@ -305,113 +283,86 @@ private:
         auto shardIt = ReadsPerShard.find(tabletId);
         YQL_ENSURE(shardIt != ReadsPerShard.end());
 
-        for (auto readId : shardIt->second) {
-            auto readIt = Reads.find(readId);
-            YQL_ENSURE(readIt != Reads.end());
-            auto& read = readIt->second;
-
-            if (read.State == EReadState::Running) {
-                for (auto& key : read.Keys) {
+        for (auto* read : shardIt->second.Reads) {
+            if (read->State == EReadState::Running) {
+                for (auto& key : read->Keys) {
                     UnprocessedKeys.emplace_back(std::move(key));
                 }
 
-                read.SetFinished(TlsActivationContext->AsActorContext());
+                read->SetFinished();
             }
         }
 
-        ReadsPerShard.erase(shardIt);
         ResolveTableShards();
     }
 
-    void Handle(TEvPrivate::TEvSchemeCacheRequestTimeout::TPtr& ev) {
-        switch (ev->Get()->Tag) {
-            case EEvSchemeCacheRequestTag::TableSchemeResolving:
-                if (!TableScheme) {
-                    RuntimeError(TStringBuilder() << "Failed to resolve scheme for table: " << TableId
-                        << " (request timeout exceeded)", NYql::NDqProto::StatusIds::TIMEOUT);
-                }
-                break;
-            case EEvSchemeCacheRequestTag::TableShardsResolving:
-                if (!Partitioning) {
-                    RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << TableId
-                        << " (request timeout exceeded)", NYql::NDqProto::StatusIds::TIMEOUT);
-                }
-                break;
-            default:
-                RuntimeError(TStringBuilder() << "Unexpected tag for TEvSchemeCacheRequestTimeout: " << (ui64)ev->Get()->Tag,
-                    NYql::NDqProto::StatusIds::INTERNAL_ERROR);
-        }
-    }
-
-    void Handle(TEvPrivate::TEvRetryReadTimeout::TPtr& ev) {
-        auto readIt = Reads.find(ev->Get()->ReadId);
-        YQL_ENSURE(readIt != Reads.end(), "Unexpected readId: " << ev->Get()->ReadId);
-        auto& read = readIt->second;
-
-        if (read.Retried) {
-            RuntimeError(TStringBuilder() << "Retry timeout exceeded for read: " << ev->Get()->ReadId,
-                NYql::NDqProto::StatusIds::TIMEOUT);
+    void Handle(TEvPrivate::TEvSchemeCacheRequestTimeout::TPtr&) {
+        if (!Partitioning) {
+            RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << TableId
+                << " (request timeout exceeded)", NYql::NDqProto::StatusIds::TIMEOUT);
         }
     }
 
     ui64 PackResults(NKikimr::NMiniKQL::TUnboxedValueVector& batch, i64 freeSpace) {
-        YQL_ENSURE(TableScheme);
-
         i64 totalSize = 0;
+        bool sizeLimitExceeded = false;
         batch.clear();
-        batch.reserve(Results.size());
 
-        std::vector<NKikimr::NScheme::TTypeInfo> columnTypes;
-        columnTypes.reserve(Columns.size());
-        for (const auto& column : Columns) {
-            auto colIt = TableScheme->ColumnsByName.find(column);
-            YQL_ENSURE(colIt != TableScheme->ColumnsByName.end());
-            columnTypes.push_back(colIt->second.PType);
-        }
-
-        for (; !Results.empty(); Results.pop_front()) {
-            const auto& result = Results.front();
-            YQL_ENSURE(result.size() == Columns.size(), "Result columns mismatch");
-
-            NUdf::TUnboxedValue* rowItems = nullptr;
-            auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems);
+        size_t rowsCount = 0;
+        for (const auto& result : Results) {
+            rowsCount += result.ReadResult->Get()->GetRowsCount();
+        }
+        batch.reserve(rowsCount);
+
+        for (; !Results.empty() && !sizeLimitExceeded; Results.pop_front()) {
+            const auto& readResult = Results.front().ReadResult;
+            const auto shardId = Results.front().ShardId;
+
+            for (size_t rowId = 0; rowId < readResult->Get()->GetRowsCount(); ++rowId) {
+                const auto& result = readResult->Get()->GetCells(rowId);
+                YQL_ENSURE(result.size() <= Columns.size(), "Result columns mismatch");
+
+                NUdf::TUnboxedValue* rowItems = nullptr;
+                auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems);
+
+                i64 rowSize = 0;
+                for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size() && resultColIndex < result.size(); ++colIndex) {
+                    const auto& column = Columns[colIndex];
+                    if (IsSystemColumn(column.Name)) {
+                        NMiniKQL::FillSystemColumn(rowItems[colIndex], shardId, column.Id, column.PType);
+                        rowSize += sizeof(NUdf::TUnboxedValue);
+                    } else {
+                        rowItems[colIndex] = NMiniKQL::GetCellValue(result[resultColIndex], column.PType);
+                        rowSize += NMiniKQL::GetUnboxedValueSize(rowItems[colIndex], column.PType).AllocatedBytes;
+                        ++resultColIndex;
+                    }
+                }
 
-            i64 rowSize = 0;
-            for (ui32 colId = 0; colId < Columns.size(); ++colId) {
-                rowItems[colId] = NMiniKQL::GetCellValue(result[colId], columnTypes[colId]);
-                rowSize += result[colId].Size();
-            }
+                if (totalSize + rowSize > freeSpace) {
+                    row.DeleteUnreferenced();
+                    sizeLimitExceeded = true;
+                    break;
+                }
 
-            if (totalSize + rowSize > freeSpace) {
-                row.DeleteUnreferenced();
-                break;
+                batch.push_back(std::move(row));
+                totalSize += rowSize;
             }
-
-            batch.push_back(std::move(row));
-            totalSize += rowSize;
         }
 
         return totalSize;
     }
 
     NUdf::EFetchStatus FetchLookupKeys() {
-        YQL_ENSURE(TableScheme);
-        YQL_ENSURE(KeyPrefixColumns.size() <= TableScheme->KeyColumnTypes.size());
-
-        TVector<i32> keyColumnOrder;
-        keyColumnOrder.reserve(KeyPrefixColumns.size());
-        for (const auto& keyColumn : KeyPrefixColumns) {
-            auto it = TableScheme->ColumnsByName.find(keyColumn);
-            YQL_ENSURE(it != TableScheme->ColumnsByName.end());
-            keyColumnOrder.push_back(it->second.KeyOrder);
-        }
+        YQL_ENSURE(LookupKeyColumns.size() <= KeyColumns.size());
 
         NUdf::EFetchStatus status;
         NUdf::TUnboxedValue key;
         while ((status = Input.Fetch(key)) == NUdf::EFetchStatus::Ok) {
-            std::vector<TCell> keyCells(KeyPrefixColumns.size());
-            for (ui32 colId = 0; colId < KeyPrefixColumns.size(); ++colId) {
-                keyCells[keyColumnOrder[colId]] = MakeCell(TableScheme->KeyColumnTypes[keyColumnOrder[colId]],
+            std::vector<TCell> keyCells(LookupKeyColumns.size());
+            for (size_t colId = 0; colId < LookupKeyColumns.size(); ++colId) {
+                const auto* lookupKeyColumn = LookupKeyColumns[colId];
+                YQL_ENSURE(lookupKeyColumn->KeyOrder < static_cast<i64>(keyCells.size()));
+                keyCells[lookupKeyColumn->KeyOrder] = MakeCell(lookupKeyColumn->PType,
                     key.GetElement(colId), TypeEnv, /* copy */ true);
             }
 
@@ -424,15 +375,15 @@ private:
     void ProcessLookupKeys() {
         YQL_ENSURE(Partitioning, "Table partitioning should be initialized before lookup keys processing");
 
-        std::map<ui64, std::vector<TOwnedTableRange>> shardKeys;
+        std::unordered_map<ui64, std::vector<TOwnedTableRange>> shardKeys;
         for (; !UnprocessedKeys.empty(); UnprocessedKeys.pop_front()) {
             const auto& key = UnprocessedKeys.front();
             YQL_ENSURE(key.Point);
 
             std::vector<ui64> shardIds;
-            if (KeyPrefixColumns.size() < TableScheme->KeyColumnTypes.size()) {
+            if (LookupKeyColumns.size() < KeyColumns.size()) {
                 /* build range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf]) */
-                std::vector<TCell> fromCells(TableScheme->KeyColumnTypes.size());
+                std::vector<TCell> fromCells(KeyColumns.size());
                 fromCells.insert(fromCells.begin(), key.From.begin(), key.From.end());
                 std::vector<TCell> toCells(key.From.begin(), key.From.end());
 
@@ -443,7 +394,7 @@ private:
             }
 
             for (auto shardId : shardIds) {
-                shardKeys[shardId].emplace_back(key);
+                shardKeys[shardId].emplace_back(std::move(key));
             }
         }
 
@@ -453,15 +404,20 @@ private:
     }
 
     std::vector<ui64> GetRangePartitioning(const TOwnedTableRange& range) {
-        YQL_ENSURE(TableScheme);
         YQL_ENSURE(Partitioning);
 
+        std::vector<NScheme::TTypeInfo> keyColumnTypes(KeyColumns.size());
+        for (const auto& [_, columnInfo] : KeyColumns) {
+            YQL_ENSURE(columnInfo.KeyOrder < static_cast<i64>(keyColumnTypes.size()));
+            keyColumnTypes[columnInfo.KeyOrder] = columnInfo.PType;
+        }
+
         auto it = LowerBound(Partitioning->begin(), Partitioning->end(), /* value */ true,
             [&](const auto& partition, bool) {
                 const int result = CompareBorders<true, false>(
                     partition.Range->EndKeyPrefix.GetCells(), range.From,
                     partition.Range->IsInclusive || partition.Range->IsPoint,
-                    range.InclusiveFrom || range.Point, TableScheme->KeyColumnTypes
+                    range.InclusiveFrom || range.Point, keyColumnTypes
                 );
 
                 return (result < 0);
@@ -481,7 +437,7 @@ private:
             auto cmp = CompareBorders<true, true>(
                 it->Range->EndKeyPrefix.GetCells(), range.To,
                 it->Range->IsInclusive || it->Range->IsPoint,
-                range.InclusiveTo || range.Point, TableScheme->KeyColumnTypes
+                range.InclusiveTo || range.Point, keyColumnTypes
             );
 
             if (cmp >= 0) {
@@ -518,9 +474,9 @@ private:
         record.MutableTableId()->SetSchemaVersion(TableId.SchemaVersion);
 
         for (const auto& column : Columns) {
-            auto colIt = TableScheme->ColumnsByName.find(column);
-            YQL_ENSURE(colIt != TableScheme->ColumnsByName.end());
-            record.AddColumns(colIt->second.Id);
+            if (!IsSystemColumn(column.Name)) {
+                record.AddColumns(column.Id);
+            }
         }
 
         for (auto& key : read.Keys) {
@@ -535,64 +491,51 @@ private:
 
         const auto [readIt, succeeded] = Reads.insert({readId, std::move(read)});
         YQL_ENSURE(succeeded);
-        ReadsPerShard[shardId].insert(readId);
+        ReadsPerShard[shardId].Reads.push_back(&readIt->second);
 
         return readIt->second;
     }
 
     void RetryTableRead(TReadState& failedRead, NKikimrTxDataShard::TReadContinuationToken& token) {
         YQL_ENSURE(token.GetFirstUnprocessedQuery() <= failedRead.Keys.size());
-        std::vector<TOwnedTableRange> unprocessedKeys;
-        unprocessedKeys.reserve(failedRead.Keys.size() - token.GetFirstUnprocessedQuery());
         for (ui64 idx = token.GetFirstUnprocessedQuery(); idx < failedRead.Keys.size(); ++idx) {
-            unprocessedKeys.emplace_back(std::move(failedRead.Keys[idx]));
+            UnprocessedKeys.emplace_back(std::move(failedRead.Keys[idx]));
         }
 
-        auto& newRead = StartTableRead(failedRead.ShardId, std::move(unprocessedKeys));
-        if (failedRead.Retried) {
-            newRead.RetryDeadlineTimerId = failedRead.RetryDeadlineTimerId;
-            failedRead.RetryDeadlineTimerId = {};
+        failedRead.SetFinished();
+
+        auto& shardState = ReadsPerShard[failedRead.ShardId];
+        if (shardState.RetryAttempts > MAX_SHARD_RETRIES) {
+            RuntimeError(TStringBuilder() << "Retry limit exceeded for shard: " << failedRead.ShardId,
+                NYql::NDqProto::StatusIds::ABORTED);
         } else {
-            failedRead.Retried = true;
-            newRead.RetryDeadlineTimerId = CreateLongTimer(TlsActivationContext->AsActorContext(), RetryReadTimeout,
-                new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvRetryReadTimeout(newRead.Id)));
+            ++shardState.RetryAttempts;
+            ResolveTableShards();
         }
-
-        failedRead.SetFinished(TlsActivationContext->AsActorContext());
-    }
-
-    void ResolveTable() {
-        TAutoPtr<NSchemeCache::TSchemeCacheNavigate> request(new NSchemeCache::TSchemeCacheNavigate());
-        NSchemeCache::TSchemeCacheNavigate::TEntry entry;
-        entry.TableId = TableId;
-        entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
-        entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable;
-        entry.ShowPrivatePath = true;
-        request->ResultSet.emplace_back(entry);
-        Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request));
-
-        SchemeCacheRequestTimeoutTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), SchemeCacheRequestTimeout,
-            new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvSchemeCacheRequestTimeout(EEvSchemeCacheRequestTag::TableSchemeResolving)));
     }
 
     void ResolveTableShards() {
-        YQL_ENSURE(TableScheme);
         Partitioning.reset();
 
         auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>();
 
-        TVector<TCell> minusInf(TableScheme->KeyColumnTypes.size());
+        TVector<TCell> minusInf(KeyColumns.size());
         TVector<TCell> plusInf;
         TTableRange range(minusInf, true, plusInf, true, false);
 
+        std::vector<NScheme::TTypeInfo> keyColumnTypes(KeyColumns.size());
+        for (const auto& [_, columnInfo] : KeyColumns) {
+            keyColumnTypes[columnInfo.KeyOrder] = columnInfo.PType;
+        }
+
         request->ResultSet.emplace_back(MakeHolder<TKeyDesc>(TableId, range, TKeyDesc::ERowOperation::Read,
-            TableScheme->KeyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));
+            keyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));
 
         Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}));
         Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
 
         SchemeCacheRequestTimeoutTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), SchemeCacheRequestTimeout,
-            new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvSchemeCacheRequestTimeout(EEvSchemeCacheRequestTag::TableShardsResolving)));
+            new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvSchemeCacheRequestTimeout()));
     }
 
     bool AllReadsFinished() const {
@@ -636,17 +579,16 @@ private:
     IKqpGateway::TKqpSnapshot Snapshot;
     const TMaybe<ui64> LockTxId;
     const bool ImmediateTx;
-    const std::vector<TString> KeyPrefixColumns;
-    const std::vector<TString> Columns;
-    std::unique_ptr<const TTableScheme> TableScheme;
-    std::deque<TOwnedCellVec> Results;
+    std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns;
+    std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns;
+    std::vector<TSysTables::TTableColumnInfo> Columns;
+    std::deque<TResult> Results;
     std::unordered_map<ui64, TReadState> Reads;
-    std::unordered_map<ui64, std::set<ui64>> ReadsPerShard;
+    std::unordered_map<ui64, TShardState> ReadsPerShard;
     std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning;
     std::deque<TOwnedTableRange> UnprocessedKeys;
     const TDuration SchemeCacheRequestTimeout;
     NActors::TActorId SchemeCacheRequestTimeoutTimer;
-    const TDuration RetryReadTimeout;
     TVector<NKikimrTxDataShard::TLock> Locks;
     TVector<NKikimrTxDataShard::TLock> BrokenLocks;
 };

+ 1 - 1
ydb/core/kqp/ut/common/kqp_ut_common.cpp

@@ -509,7 +509,7 @@ TDataQueryResult ExecQueryAndTestResult(TSession& session, const TString& query,
     const TString& expectedYson)
 {
     NYdb::NTable::TExecDataQuerySettings settings;
-    settings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+    settings.CollectQueryStats(ECollectQueryStatsMode::Profile);
 
     TDataQueryResult result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), params, settings)
             .ExtractValueSync();

+ 5 - 5
ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp

@@ -524,7 +524,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
             )"));
 
             NYdb::NTable::TExecDataQuerySettings execSettings;
-            execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+            execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
             auto result = session.ExecuteDataQuery(
                                  query1,
                                  TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
@@ -561,7 +561,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
             )");
 
             NYdb::NTable::TExecDataQuerySettings execSettings;
-            execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+            execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
             auto result = session.ExecuteDataQuery(
                                  query1,
                                  TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
@@ -596,7 +596,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
             )");
 
             NYdb::NTable::TExecDataQuerySettings execSettings;
-            execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+            execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
             auto result = session.ExecuteDataQuery(
                                  query2,
                                  TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
@@ -636,7 +636,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
             )");
 
             NYdb::NTable::TExecDataQuerySettings execSettings;
-            execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+            execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
             auto result = session.ExecuteDataQuery(
                                  query2,
                                  TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
@@ -676,7 +676,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
             )");
 
             NYdb::NTable::TExecDataQuerySettings execSettings;
-            execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+            execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
             auto result = session.ExecuteDataQuery(
                                  query2,
                                  TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),

+ 1 - 1
ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp

@@ -90,7 +90,7 @@ void Test(const TString& query, const TString& answer, size_t rightTableReads) {
     PrepareTables(session);
 
     TExecDataQuerySettings execSettings;
-    execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+    execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
 
     auto result = session.ExecuteDataQuery(Q_(query), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync();
     UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());

+ 2 - 2
ydb/core/kqp/ut/join/kqp_join_ut.cpp

@@ -186,7 +186,7 @@ Y_UNIT_TEST_SUITE(KqpJoin) {
         CreateSampleTables(session);
 
         NYdb::NTable::TExecDataQuerySettings execSettings;
-        execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+        execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
 
         auto result = session.ExecuteDataQuery(Q_(R"(
             PRAGMA DisableSimpleColumns;
@@ -225,7 +225,7 @@ Y_UNIT_TEST_SUITE(KqpJoin) {
         CreateSampleTables(session);
 
         NYdb::NTable::TExecDataQuerySettings execSettings;
-        execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+        execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
 
         auto result = session.ExecuteDataQuery(Q_(R"(
             PRAGMA DisableSimpleColumns;

+ 7 - 7
ydb/core/kqp/ut/opt/kqp_ne_ut.cpp

@@ -87,7 +87,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
             .Build();
 
         NYdb::NTable::TExecDataQuerySettings execSettings;
-        execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+        execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
 
         auto result = session.ExecuteDataQuery(query,
             TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params, execSettings).ExtractValueSync();
@@ -1109,7 +1109,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
         auto session = db.CreateSession().GetValueSync().GetSession();
 
         NYdb::NTable::TExecDataQuerySettings execSettings;
-        execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+        execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
 
         auto result = session.ExecuteDataQuery(R"(
             SELECT * FROM `/Root/EightShard` WHERE Key = 101 OR Key = 301
@@ -1142,7 +1142,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
             .Build();
 
         NYdb::NTable::TExecDataQuerySettings execSettings;
-        execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+        execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
 
         auto result = session.ExecuteDataQuery(R"(
             DECLARE $key AS Uint64;
@@ -1347,7 +1347,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
         auto session = db.CreateSession().GetValueSync().GetSession();
 
         NYdb::NTable::TExecDataQuerySettings execSettings;
-        execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+        execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
 
         auto result = session.ExecuteDataQuery(R"(
 
@@ -1742,7 +1742,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
             .Build();
 
         auto settings = TExecDataQuerySettings()
-            .CollectQueryStats(ECollectQueryStatsMode::Basic);
+            .CollectQueryStats(ECollectQueryStatsMode::Profile);
 
         auto it = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), std::move(params), settings).GetValueSync();
         UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
@@ -2622,7 +2622,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
         auto session = db.CreateSession().GetValueSync().GetSession();
 
         NYdb::NTable::TExecDataQuerySettings execSettings;
-        execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+        execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
 
         auto params = TParamsBuilder()
             .AddParam("$group").Uint32(1).Build()
@@ -3020,7 +3020,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
         auto session = db.CreateSession().GetValueSync().GetSession();
 
         NYdb::NTable::TExecDataQuerySettings execSettings;
-        execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+        execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
 
         auto result = session.ExecuteDataQuery(R"(
             --!syntax_v1

+ 4 - 4
ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp

@@ -641,7 +641,7 @@ Y_UNIT_TEST_SUITE(KqpRanges) {
 
         {
             NYdb::NTable::TExecDataQuerySettings execSettings;
-            execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+            execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
             auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync();
             result.GetIssues().PrintTo(Cerr);
             UNIT_ASSERT(result.IsSuccess());
@@ -844,7 +844,7 @@ Y_UNIT_TEST_SUITE(KqpRanges) {
         CreateSampleTables(session);
 
         NYdb::NTable::TExecDataQuerySettings execSettings;
-        execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+        execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
         auto result = session.ExecuteDataQuery(Q_(R"(
             SELECT Value FROM `/Root/TestDate`
             WHERE Key = Date("2019-07-01")
@@ -1034,7 +1034,7 @@ Y_UNIT_TEST_SUITE(KqpRanges) {
         )");
 
         NYdb::NTable::TExecDataQuerySettings settings;
-        settings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+        settings.CollectQueryStats(ECollectQueryStatsMode::Profile);
 
         auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
         UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
@@ -1065,7 +1065,7 @@ Y_UNIT_TEST_SUITE(KqpRanges) {
         CreateSampleTables(session);
 
         NYdb::NTable::TExecDataQuerySettings execSettings;
-        execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+        execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
         auto result = session.ExecuteDataQuery(Q_(R"(
             SELECT * FROM `/Root/EightShard`
             WHERE Key = 101 OR Key = 302 OR Key = 403 OR Key = 705

+ 1 - 1
ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp

@@ -994,7 +994,7 @@ Y_UNIT_TEST_SUITE(KqpSqlIn) {
             .Build();
 
         NYdb::NTable::TExecDataQuerySettings settings;
-        settings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+        settings.CollectQueryStats(ECollectQueryStatsMode::Profile);
 
         auto result = session.ExecuteDataQuery(Q1_(R"(
             DECLARE $keys AS List<Uint64>;

Some files were not shown because too many files changed in this diff