Browse Source

add query replay tool to oss (#2392)

Vitalii Gridnev 1 year ago
parent
commit
b679bae7c5

+ 83 - 0
ydb/tools/query_replay/common_deps.inc

@@ -0,0 +1,83 @@
+SET(YDB_REPLAY_SRCS
+    query_compiler.cpp
+    query_replay.h
+    query_replay.cpp
+    query_proccessor.cpp
+    main.cpp
+)
+
+SET(YDB_REPLAY_PEERDIRS
+    ydb/library/actors/core
+    ydb/library/actors/interconnect
+    library/cpp/getopt
+    ydb/library/grpc/client
+    ydb/library/grpc/server
+    library/cpp/regex/pcre
+    ydb/core/actorlib_impl
+    ydb/core/base
+    ydb/core/blobstorage/pdisk
+    ydb/core/client
+    ydb/core/client/metadata
+    ydb/core/client/minikql_compile
+    ydb/core/client/scheme_cache_lib
+    ydb/core/cms/console
+    ydb/core/engine/minikql
+    ydb/core/fq/libs/init
+    ydb/core/fq/libs/mock
+    ydb/core/grpc_services
+    ydb/core/grpc_streaming
+    ydb/core/health_check
+    ydb/core/kesus/proxy
+    ydb/core/kesus/tablet
+    ydb/core/kqp
+    ydb/core/mind
+    ydb/core/mind/address_classification
+    ydb/core/mind/bscontroller
+    ydb/core/mind/hive
+    ydb/core/node_whiteboard
+    ydb/core/scheme
+    ydb/core/security
+    ydb/core/sys_view/processor
+    ydb/core/sys_view/service
+    ydb/core/tx/columnshard
+    ydb/core/tx/coordinator
+    ydb/core/tx/long_tx_service
+    ydb/core/tx/mediator
+    ydb/core/tx/time_cast
+    ydb/core/tx/tx_proxy
+    ydb/library/yql/core/services/mounts
+    ydb/library/yql/minikql/comp_nodes/llvm14
+    ydb/library/yql/public/udf/service/exception_policy
+    ydb/public/api/protos
+    ydb/public/lib/base
+    ydb/public/lib/deprecated/kicli
+    ydb/public/sdk/cpp/client/ydb_table
+    ydb/services/cms
+    ydb/services/datastreams
+    ydb/services/discovery
+    ydb/services/kesus
+    ydb/services/persqueue_cluster_discovery
+    ydb/services/persqueue_v1
+    ydb/services/rate_limiter
+    ydb/services/ydb
+    ydb/library/yql/udfs/common/clickhouse/client
+    ydb/library/yql/udfs/common/datetime
+    ydb/library/yql/udfs/common/datetime2
+    ydb/library/yql/udfs/common/digest
+    ydb/library/yql/udfs/common/histogram
+    ydb/library/yql/udfs/common/hyperloglog
+    ydb/library/yql/udfs/common/hyperscan
+    ydb/library/yql/udfs/common/json
+    ydb/library/yql/udfs/common/json2
+    ydb/library/yql/udfs/common/math
+    ydb/library/yql/udfs/common/pire
+    ydb/library/yql/udfs/common/re2
+    ydb/library/yql/udfs/common/set
+    ydb/library/yql/udfs/common/stat
+    ydb/library/yql/udfs/common/string
+    ydb/library/yql/udfs/common/top
+    ydb/library/yql/udfs/common/topfreq
+    ydb/library/yql/udfs/common/yson2
+    ydb/library/yql/udfs/logs/dsv
+    ydb/library/yql/sql/pg_dummy
+)

+ 176 - 0
ydb/tools/query_replay/main.cpp

@@ -0,0 +1,176 @@
+#include "query_replay.h"
+
+#include <ydb/library/actors/core/actorsystem.h>
+#include <ydb/library/actors/core/executor_pool_basic.h>
+#include <ydb/library/actors/core/scheduler_basic.h>
+#include <ydb/library/actors/core/log.h>
+#include <ydb/library/actors/core/actor_bootstrapped.h>
+#include <ydb/library/actors/util/should_continue.h>
+#include <util/system/sigset.h>
+#include <util/generic/xrange.h>
+#include <util/stream/file.h>
+#include <util/system/env.h>
+#include <util/folder/pathsplit.h>
+#include <util/string/strip.h>
+#include <ydb/core/client/minikql_compile/mkql_compile_service.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/getopt/last_getopt.h>
+#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
+
+using namespace NActors;
+
+static TProgramShouldContinue ShouldContinue;
+
+void OnTerminate(int) {
+    ShouldContinue.ShouldStop();
+}
+
+THolder<TActorSystemSetup> BuildActorSystemSetup(ui32 threads, ui32 pools) {
+    Y_ABORT_UNLESS(threads > 0 && threads < 100);
+    Y_ABORT_UNLESS(pools > 0 && pools < 10);
+
+    auto setup = MakeHolder<TActorSystemSetup>();
+
+    setup->NodeId = 1;
+
+    setup->ExecutorsCount = pools;
+    setup->Executors.Reset(new TAutoPtr<IExecutorPool>[pools]);
+    for (ui32 idx : xrange(pools)) {
+        setup->Executors[idx] = new TBasicExecutorPool(idx, threads, 50);
+    }
+
+    setup->Scheduler = new TBasicSchedulerThread(TSchedulerConfig(512, 0));
+
+    return setup;
+}
+
+int TQueryReplayApp::ParseConfig(int argc, char** argv) {
+    NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default();
+
+    TString authPath;
+
+    opts.AddLongOption('e', "endpoint", "YDB endpoint").RequiredArgument("HOST:PORT").StoreResult(&Endpoint);
+    opts.AddLongOption('d', "database", "YDB database name").RequiredArgument("PATH").StoreResult(&Database);
+    opts.AddLongOption('p', "path", "Target table").RequiredArgument("PATH").StoreResult(&Path);
+    opts.AddLongOption("stats-path", "Stats table").RequiredArgument("PATH").StoreResult(&StatsPath);
+    opts.AddLongOption('i', "in-flight", "Number of queries compiling concurrently").RequiredArgument("NUM").StoreResult(&MaxInFlight);
+    opts.AddLongOption('m', "modulo", "Number of queries compiling concurrently").RequiredArgument("NUM").StoreResult(&Modulo);
+    opts.AddLongOption('s', "shard-id", "Shard id").RequiredArgument("NUM").StoreResult(&ShardId);
+    opts.AddLongOption('t', "threads", "Number of ActorSystem threads").RequiredArgument("NUM").StoreResult(&ActorSystemThreadsCount);
+    opts.AddLongOption("auth", "Auth token file").RequiredArgument("PATH").StoreResult(&authPath);
+    opts.AddLongOption('q', "query", "Explicit query to replay").RequiredArgument("PATH").AppendTo(&Queries);
+    NLastGetopt::TOptsParseResult parseResult(&opts, argc, argv);
+
+    if (authPath) {
+        TAutoPtr<TMappedFileInput> fileInput(new TMappedFileInput(authPath));
+        AuthToken = Strip(fileInput->ReadAll());
+    } else {
+        AuthToken = Strip(GetEnv("YDB_TOKEN"));
+    }
+
+    DriverConfig = NYdb::TDriverConfig()
+                       .SetEndpoint(Endpoint)
+                       .SetDatabase(Database);
+
+    if (AuthToken)
+        DriverConfig.SetAuthToken(AuthToken);
+
+    if (!Path.empty() && Path.front() != '/')
+        Path = Database + "/" + Path;
+
+    if (!StatsPath.empty() && StatsPath.front() != '/') {
+        StatsPath = Database + "/" + StatsPath;
+    }
+
+    return 0;
+}
+
+void TQueryReplayApp::Start() {
+    QueryReplayStats.reset(new TQueryReplayStats());
+    RandomProvider = CreateDefaultRandomProvider();
+    InitializeLogger();
+    THolder<TActorSystemSetup> setup = BuildActorSystemSetup(ActorSystemThreadsCount, 1);
+    TypeRegistry.Reset(new NKikimr::NScheme::TKikimrTypeRegistry());
+    FunctionRegistry.Reset(NKikimr::NMiniKQL::CreateFunctionRegistry(NKikimr::NMiniKQL::CreateBuiltinRegistry())->Clone());
+    NKikimr::NMiniKQL::FillStaticModules(*FunctionRegistry);
+    AppData.Reset(new NKikimr::TAppData(0, 0, 0, 0, {}, TypeRegistry.Get(), FunctionRegistry.Get(), nullptr, nullptr));
+    AppData->Counters = MakeIntrusive<NMonitoring::TDynamicCounters>(new NMonitoring::TDynamicCounters());
+    ActorSystem.Reset(new TActorSystem(setup, AppData.Get(), LogSettings));
+    ActorSystem->Start();
+    ActorSystem->Register(NKikimr::NKqp::CreateKqpResourceManagerActor({}, nullptr));
+
+    if (Queries.empty()) {
+        const auto runId = RandomProvider->GenUuid4().AsUuidString();
+        Cout << "Starting replay over table " << Path << ", runId: " << runId << Endl;
+        Driver = std::make_unique<NYdb::TDriver>(DriverConfig);
+        ReplayActor = ActorSystem->Register(CreateQueryReplayActor(*Driver, runId, Path, StatsPath, QueryReplayStats, FunctionRegistry.Get(), MaxInFlight, Modulo, ShardId));
+    } else {
+        Cout << "Starting local replay, query ids count: " << Queries.size() << Endl;
+        ReplayActor = ActorSystem->Register(CreateQueryReplayActorSimple(std::move(Queries), QueryReplayStats, FunctionRegistry.Get()));
+    }
+}
+
+void TQueryReplayApp::Stop() {
+    ActorSystem->Stop();
+    if (Driver) {
+        Driver->Stop(true);
+    }
+}
+
+void TQueryReplayApp::InitializeLogger() {
+    TActorId loggerActorId = TActorId(1, "logger");
+    LogSettings.Reset(new NLog::TSettings(loggerActorId, NActorsServices::LOGGER,
+                                          NLog::EPriority::PRI_DEBUG, NLog::EPriority::PRI_DEBUG, 0));
+
+    LogSettings->Append(
+        NKikimrServices::EServiceKikimr_MIN,
+        NKikimrServices::EServiceKikimr_MAX,
+        NKikimrServices::EServiceKikimr_Name);
+    LogSettings->Append(
+        NActorsServices::EServiceCommon_MIN,
+        NActorsServices::EServiceCommon_MAX,
+        NActorsServices::EServiceCommon_Name);
+
+    TString expl;
+    LogSettings->SetLevel(NLog::EPriority::PRI_DEBUG, NKikimrServices::GRPC_SERVER, expl);
+    LogBackend.Reset(NActors::CreateStderrBackend().Release());
+}
+
+int main(int argc, char** argv) {
+    Y_UNUSED(argc);
+    Y_UNUSED(argv);
+
+#ifdef _unix_
+    signal(SIGPIPE, SIG_IGN);
+#endif
+    signal(SIGINT, &OnTerminate);
+    signal(SIGTERM, &OnTerminate);
+
+    TQueryReplayApp app;
+    if (int parseRet = app.ParseConfig(argc, argv)) {
+        Cerr << "Exit: " << parseRet << Endl;
+        return parseRet;
+    }
+    app.Start();
+    ui32 idx = 0;
+    while (ShouldContinue.PollState() == TProgramShouldContinue::Continue) {
+        if (app.QueryReplayStats->IsComplete()) {
+            ShouldContinue.ShouldStop();
+            break;
+        }
+
+        ++idx;
+        if (idx % 60 == 0) {
+            Cout << "Completed " << app.QueryReplayStats->GetCompletedCount() << " out of " << app.QueryReplayStats->GetTotalCount() << Endl;
+        }
+
+        Sleep(TDuration::MilliSeconds(200));
+    }
+
+    app.Stop();
+    Cout << "Total run queries: " << app.QueryReplayStats->GetTotalCount() << Endl;
+    Cout << "Successful queries: " << app.QueryReplayStats->GetCompletedCount() << Endl;
+    Cout << "Failed queries: " << app.QueryReplayStats->GetErrorsCount() << Endl;
+
+    return (ShouldContinue.GetReturnCode() != 0 || app.QueryReplayStats->GetErrorsCount() > 0) ? 1 : 0;
+}

+ 602 - 0
ydb/tools/query_replay/query_compiler.cpp

@@ -0,0 +1,602 @@
+#include "query_replay.h"
+
+#include <ydb/core/actorlib_impl/long_timer.h>
+#include <ydb/core/base/appdata.h>
+#include <ydb/core/kqp/counters/kqp_counters.h>
+#include <ydb/core/kqp/host/kqp_host.h>
+#include <ydb/core/kqp/gateway/kqp_gateway.h>
+#include <ydb/core/kqp/session_actor/kqp_worker_common.h>
+#include <ydb/library/yql/utils/actor_log/log.h>
+#include <ydb/public/api/protos/ydb_value.pb.h>
+
+#include <ydb/library/actors/core/actor_bootstrapped.h>
+#include <ydb/library/actors/core/hfunc.h>
+#include <library/cpp/json/json_writer.h>
+#include <library/cpp/protobuf/json/json2proto.h>
+#include <library/cpp/string_utils/base64/base64.h>
+#include <library/cpp/digest/md5/md5.h>
+#include <library/cpp/json/json_value.h>
+#include <library/cpp/json/json_reader.h>
+#include <ydb/core/client/minikql_compile/mkql_compile_service.h>
+#include <util/string/escape.h>
+#include <util/string/strip.h>
+
+#include <ydb/core/client/scheme_cache_lib/yql_db_scheme_resolver.h>
+
+#include <map>
+#include <memory>
+
+static const TString YqlName = "CompileActor";
+
+using namespace NKikimrConfig;
+using namespace NThreading;
+using namespace NYql;
+using namespace NYql::NDq;
+using namespace NKikimr;
+using namespace NKikimr::NKqp;
+
+
+enum EReadType : ui32 {
+    Lookup = 1,
+    Scan = 2,
+    FullScan = 3
+};
+
+TString ToString(EReadType readType) {
+    switch (readType) {
+        case Lookup:
+            return "lookup";
+        case Scan:
+            return "scan";
+        case FullScan:
+            return "fullscan";
+    }
+
+    return "unspecified";
+}
+
+struct TTableReadAccessInfo {
+    EReadType ReadType;
+    i32 PushedLimit = -1;
+    std::vector<std::string> ReadColumns;
+
+    constexpr bool operator==(const TTableReadAccessInfo& other) const {
+        return std::tie(ReadType, PushedLimit, ReadColumns) == std::tie(other.ReadType, other.PushedLimit, other.ReadColumns);
+    }
+
+    constexpr bool operator<(const TTableReadAccessInfo& other) const  {
+        return std::tie(ReadType, PushedLimit, ReadColumns) < std::tie(other.ReadType, other.PushedLimit, other.ReadColumns);
+    }
+};
+
+enum EWriteType : ui32 {
+    Upsert = 1,
+    Erase = 2
+};
+
+struct TTableWriteInfo {
+    EWriteType WriteType;
+    std::vector<std::string> WriteColumns;
+
+    constexpr bool operator==(const TTableWriteInfo& other) const {
+        return std::tie(WriteType, WriteColumns) == std::tie(other.WriteType, other.WriteColumns);
+    }
+
+    constexpr bool operator<(const TTableWriteInfo& other) const  {
+        return std::tie(WriteType, WriteColumns) < std::tie(other.WriteType, other.WriteColumns);
+    }
+};
+
+struct TTableStats {
+    TString Name;
+    std::vector<TTableReadAccessInfo> Reads;
+    std::vector<TTableWriteInfo> Writes;
+
+    constexpr bool operator==(const TTableStats& other) const {
+        return std::tie(Name, Reads, Writes) == std::tie(other.Name, other.Reads, other.Writes);
+    }
+
+    constexpr bool operator<(const TTableStats& other) const {
+        return std::tie(Name, Reads, Writes) < std::tie(other.Name, other.Reads, other.Writes);
+    }
+};
+
+struct TMetadataInfoHolder {
+    const THashMap<TString, NYql::TKikimrTableMetadataPtr> TableMetadata;
+    THashMap<TString, NYql::TKikimrTableMetadataPtr> Indexes;
+
+    explicit TMetadataInfoHolder(THashMap<TString,  NYql::TKikimrTableMetadataPtr>&& tableMetadata)
+        : TableMetadata(tableMetadata)
+    {
+        for (auto& [name, ptr] : TableMetadata) {
+            for (auto& secondary : ptr->SecondaryGlobalIndexMetadata) {
+                Indexes.emplace(secondary->Name, secondary);
+            }
+        }
+    }
+
+    THashMap<TString, NYql::TKikimrTableMetadataPtr>::const_iterator find(const TString& key) {
+        return TableMetadata.find(key);
+    }
+
+    const NYql::TKikimrTableMetadataPtr* FindPtr(const TString& key) const {
+        const auto* result = TableMetadata.FindPtr(key);
+        if (result != nullptr) {
+            return result;
+        }
+
+        return Indexes.FindPtr(key);
+    }
+
+    THashMap<TString, NYql::TKikimrTableMetadataPtr>::const_iterator begin() const {
+        return TableMetadata.begin();
+    }
+
+    THashMap<TString, NYql::TKikimrTableMetadataPtr>::const_iterator end() const {
+        return TableMetadata.end();
+    }
+};
+
+
+class TStaticTableMetadataLoader: public NYql::IKikimrGateway::IKqpTableMetadataLoader, public NYql::IDbSchemeResolver {
+    TActorSystem* ActorSystem;
+    std::shared_ptr<TMetadataInfoHolder> TableMetadata;
+
+public:
+    TStaticTableMetadataLoader(TActorSystem* actorSystem, std::shared_ptr<TMetadataInfoHolder>& tableMetadata)
+        : ActorSystem(actorSystem)
+        , TableMetadata(tableMetadata)
+    {}
+
+    NThreading::TFuture<NYql::IKikimrGateway::TTableMetadataResult> LoadTableMetadata(
+        const TString& cluster, const TString& table, const NYql::IKikimrGateway::TLoadTableMetadataSettings& settings, const TString& database,
+        const TIntrusiveConstPtr<NACLib::TUserToken>& userToken) override {
+        Y_UNUSED(cluster);
+        Y_UNUSED(settings);
+        Y_UNUSED(database);
+        Y_UNUSED(userToken);
+        auto ptr = TableMetadata->find(table);
+        Y_ABORT_UNLESS(ptr != TableMetadata->end());
+
+        NYql::IKikimrGateway::TTableMetadataResult result;
+        result.SetSuccess();
+        result.Metadata = ptr->second;
+        return MakeFuture<NYql::IKikimrGateway::TTableMetadataResult>(result);
+    }
+
+    TTableResult MakeTable(const TTable& table, const NYql::TKikimrTableMetadataPtr meta) const {
+        TTableResult reply{TTableResult::Ok};
+        reply.Status = TTableResult::Ok;
+        reply.Table.TableName = table.TableName;
+        reply.CacheGeneration = meta->SchemaVersion;
+        reply.TableId = new TTableId(meta->PathId.OwnerId(), meta->PathId.TableId(), meta->SchemaVersion);
+        reply.KeyColumnCount = meta->KeyColumnNames.size();
+        TVector<TString> ColumnNames;
+        ColumnNames.reserve(meta->Columns.size());
+        THashMap<TString, i32> KeyPosition;
+        i32 idx = 0;
+        for (auto& key : meta->KeyColumnNames) {
+            KeyPosition[key] = idx;
+            ++idx;
+        }
+
+        for (auto& [name, column] : meta->Columns) {
+            ColumnNames.push_back(name);
+            reply.Columns[name] = TTableResult::TColumn{column.Id, KeyPosition[name], column.TypeInfo, 0, EColumnTypeConstraint::Nullable};
+        }
+
+        return reply;
+    }
+
+    TTableResults Resolve(const TVector<TTable>& tables) const {
+        TTableResults results;
+        for (auto& table : tables) {
+            const NYql::TKikimrTableMetadataPtr* metaIt = TableMetadata->FindPtr(table.TableName);
+            if (metaIt != nullptr) {
+                results.push_back(MakeTable(table, *metaIt));
+                continue;
+            }
+
+            results.push_back({TTableResult::LookupError, TStringBuilder() << "Unknow table " << table.TableName});
+        }
+
+        return results;
+    }
+
+    virtual NThreading::TFuture<TTableResults> ResolveTables(const TVector<TTable>& tables) override {
+        return NThreading::MakeFuture(Resolve(tables));
+    }
+
+    virtual void ResolveTables(const TVector<TTable>& tables, NActors::TActorId responseTo) override {
+        auto results = Resolve(tables);
+        ActorSystem->Send(responseTo, new NYql::IDbSchemeResolver::TEvents::TEvResolveTablesResult(std::move(results)));
+    }
+
+    TVector<NKikimrKqp::TKqpTableMetadataProto> GetCollectedSchemeData() override {
+        return {};
+    }
+};
+
+class TReplayCompileActor: public TActorBootstrapped<TReplayCompileActor> {
+public:
+    TReplayCompileActor(const TActorId& ownerId, TIntrusivePtr<TModuleResolverState> moduleResolverState, const NMiniKQL::IFunctionRegistry* functionRegistry, NJson::TJsonValue&& replayDetails)
+        : Owner(ownerId)
+        , ModuleResolverState(moduleResolverState)
+        , KqpSettings()
+        , Config(MakeIntrusive<TKikimrConfiguration>())
+        , FunctionRegistry(functionRegistry)
+        , AlignedPagePoolCounters()
+        , ReplayDetails(std::move(replayDetails))
+    {
+        EMetaSerializationType metaType = EMetaSerializationType::EncodedProto;
+        if (ReplayDetails.Has("table_meta_serialization_type")) {
+            metaType = static_cast<EMetaSerializationType>(ReplayDetails["table_meta_serialization_type"].GetUIntegerSafe());
+        }
+        TableMetadata = std::make_shared<TMetadataInfoHolder>(std::move(ExtractStaticMetadata(ReplayDetails["table_metadata"], metaType)));
+        TString queryText = UnescapeC(ReplayDetails["query_text"].GetStringSafe());
+
+        std::map<TString, Ydb::Type> queryParameterTypes;
+        if (ReplayDetails.Has("query_parameter_types")) {
+            for (const auto& [paramName, paramType] : ReplayDetails["query_parameter_types"].GetMapSafe()) {
+                if (!queryParameterTypes[paramName].ParseFromString(Base64Decode(paramType.GetStringSafe()))) {
+                    queryParameterTypes.erase(paramName);
+                }
+            }
+        }
+
+        TKqpQuerySettings settings(NKikimrKqp::QUERY_TYPE_SQL_DML);
+        Query = std::make_unique<NKikimr::NKqp::TKqpQueryId>(
+            ReplayDetails["query_cluster"].GetStringSafe(),
+            ReplayDetails["query_database"].GetStringSafe(),
+            queryText,
+            settings,
+            !queryParameterTypes.empty()
+                ? std::make_shared<std::map<TString, Ydb::Type>>(std::move(queryParameterTypes))
+                : nullptr);
+
+        Config->Init(KqpSettings.DefaultSettings.GetDefaultSettings(), ReplayDetails["query_cluster"].GetStringSafe(), KqpSettings.Settings, false);
+        if (!Query->Database.empty()) {
+            Config->_KqpTablePathPrefix = ReplayDetails["query_database"].GetStringSafe();
+        }
+
+        ui32 syntax = (ReplayDetails["query_syntax"].GetStringSafe() == "1") ? 1 : 0;
+        Config->_KqpYqlSyntaxVersion = syntax;
+        Config->FreezeDefaults();
+    }
+
+    void Bootstrap() {
+        TYqlLogScope logScope(TlsActivationContext->AsActorContext(), NKikimrServices::KQP_YQL, YqlName, "");
+        std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = make_shared<TStaticTableMetadataLoader>(TlsActivationContext->ActorSystem(), TableMetadata);
+
+        auto c = MakeIntrusive<NMonitoring::TDynamicCounters>();
+        auto counters = MakeIntrusive<TKqpRequestCounters>();
+        counters->Counters = new TKqpCounters(c);
+        counters->TxProxyMon = new NTxProxy::TTxProxyMon(c);
+
+        Gateway = CreateKikimrIcGateway(Query->Cluster, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, Query->Database, std::move(loader),
+            TlsActivationContext->ExecutorThread.ActorSystem, SelfId().NodeId(), counters);
+        auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}});
+        KqpHost = CreateKqpHost(Gateway, Query->Cluster, Query->Database, Config, ModuleResolverState->ModuleResolver,
+            federatedQuerySetup, nullptr, Nothing(), FunctionRegistry, false);
+
+        IKqpHost::TPrepareSettings prepareSettings;
+        prepareSettings.DocumentApiRestricted = false;
+        AsyncCompileResult = KqpHost->PrepareDataQuery(Query->Text, prepareSettings);
+        Continue();
+
+        Schedule(TDuration::Seconds(60), new TEvents::TEvWakeup());
+        Become(&TThis::CompileState);
+    }
+
+    void PassAway() override {
+        TActor::PassAway();
+    }
+
+private:
+    STATEFN(CompileState) {
+        try {
+            switch (ev->GetTypeRewrite()) {
+                hFunc(TEvKqp::TEvContinueProcess, Handle);
+                cFunc(TEvents::TEvWakeup::EventType, HandleTimeout);
+                default:
+                    Reply(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected event in CompileState");
+            }
+        } catch (const yexception& e) {
+            Reply(Ydb::StatusIds::INTERNAL_ERROR, e.what());
+        }
+    }
+
+private:
+    void Continue() {
+        TActorSystem* actorSystem = TlsActivationContext->ExecutorThread.ActorSystem;
+        TActorId selfId = SelfId();
+        auto callback = [actorSystem, selfId](const TFuture<bool>& future) {
+            bool finished = future.GetValue();
+            auto processEv = MakeHolder<TEvKqp::TEvContinueProcess>(0, finished);
+            actorSystem->Send(selfId, processEv.Release());
+        };
+
+        AsyncCompileResult->Continue().Apply(callback);
+    }
+
+    NJson::TJsonValue ExtractQueryPlan(const TString& plan) {
+        static NJson::TJsonReaderConfig readConfig;
+        TStringInput in(plan);
+        NJson::TJsonValue reply;
+        NJson::ReadJsonTree(&in, &readConfig, &reply, false);
+        return reply;
+    }
+
+    std::map<std::string, TTableStats> ExtractTableStats(const NJson::TJsonValue& rhs) {
+        std::map<std::string, TTableStats> result;
+
+        if (rhs.Has("tables")) {
+            for(const auto& table : rhs["tables"].GetArraySafe()) {
+                TString name = table["name"].GetStringSafe();
+                std::vector<TTableReadAccessInfo> reads;
+                std::vector<TTableWriteInfo> writes;
+
+                if (table.Has("reads")) {
+                    for(const auto& read: table["reads"].GetArraySafe()) {
+                        std::vector <std::string> columns;
+                        if (read.Has("columns")) {
+                            const auto tableMetadata = TableMetadata->FindPtr(name);
+                            Y_ENSURE(tableMetadata);
+                            const auto &keyColumnNames = tableMetadata->Get()->KeyColumnNames;
+                            std::unordered_set <std::string_view> keyColumns(keyColumnNames.begin(),
+                                                                             keyColumnNames.end());
+                            for (const auto &column : read["columns"].GetArraySafe()) {
+                                if (!keyColumns.contains(column.GetStringSafe())) {
+                                    columns.push_back(column.GetStringSafe());
+                                }
+                            }
+                            std::sort(columns.begin(), columns.end());
+                        }
+
+                        const auto &type = read["type"].GetStringSafe();
+                        i32 limit = -1;
+                        if (read.Has("limit") && read["limit"].IsInteger()) {
+                            limit = read["limit"].GetIntegerSafe();
+                        }
+
+                        bool probablyLookupScan = false;
+                        if (read.Has("lookup_by") && read.Has("scan_by")) {
+                            probablyLookupScan = true;
+                        }
+
+                        if (type == "Scan") {
+                            reads.push_back(TTableReadAccessInfo{EReadType::Scan, limit, columns});
+                        } else if (type == "FullScan") {
+                            reads.push_back(TTableReadAccessInfo{EReadType::FullScan, limit, columns});
+                        } else if (type == "Lookup") {
+                            if (probablyLookupScan) {
+                                reads.push_back(TTableReadAccessInfo{EReadType::Scan, limit, columns});
+                            } else {
+                                reads.push_back(TTableReadAccessInfo{EReadType::Lookup, limit, columns});
+                            }
+                        } else if (type == "MultiLookup") {
+                            reads.push_back(TTableReadAccessInfo{EReadType::Lookup, limit, columns});
+                        }
+                    }
+
+                    std::sort(reads.begin(), reads.end());
+                    auto last = std::unique(reads.begin(), reads.end());
+                    reads.erase(last, reads.end());
+                }
+
+                if (table.Has("writes")) {
+                    for (const auto& write : table["writes"].GetArraySafe()) {
+                        std::vector<std::string> columns;
+                        if (write.Has("columns")) {
+                            const auto tableMetadata = TableMetadata->FindPtr(name);
+                            Y_ENSURE(tableMetadata);
+                            const auto& keyColumnNames = tableMetadata->Get()->KeyColumnNames;
+                            std::unordered_set<std::string_view> keyColumns(keyColumnNames.begin(), keyColumnNames.end());
+                            for (const auto& column : write["columns"].GetArraySafe()) {
+                                if (!keyColumns.contains(column.GetStringSafe())) {
+                                    columns.push_back(column.GetStringSafe());
+                                }
+                            }
+                            std::sort(columns.begin(), columns.end());
+                        }
+
+                        const auto& type = write["type"].GetStringSafe();
+                        if (type == "Upsert" || type == "MultiUpsert") {
+                            writes.push_back(TTableWriteInfo{EWriteType::Upsert, columns});
+                        } else if (type == "Erase" || type == "MultiErase") {
+                            writes.push_back(TTableWriteInfo{EWriteType::Erase, columns});
+                        }
+                    }
+
+                    std::sort(writes.begin(), writes.end());
+                }
+
+                result.emplace(name, TTableStats{name, std::move(reads), std::move(writes)});
+            }
+        }
+
+        return result;
+    }
+
+    void WriteJsonData(const TString& fname, const NJson::TJsonValue& data) {
+        TFileOutput out(ReplayDetails["query_id"].GetStringSafe() + fname);
+        NJson::WriteJson(&out, &data, true);
+    }
+
+    void WriteQueryMismatchInfo(const NJson::TJsonValue& lhs, const NJson::TJsonValue& rhs) {
+        Cerr << "Found plan mismatch, query " << ReplayDetails["query_id"].GetStringSafe() << Endl;
+        WriteJsonData("-repro.txt", ReplayDetails);
+        WriteJsonData("-plan-lhs.txt", lhs);
+        WriteJsonData("-plan-rhs.txt", rhs);
+    }
+
+    std::pair<TQueryReplayEvents::TCheckQueryPlanStatus, TString> OnTableOperationsMismatch(const TTableStats& oldEngineStats, const TTableStats& newEngineStats) {
+        Y_ENSURE(oldEngineStats.Name == newEngineStats.Name);
+        if (oldEngineStats.Reads.size() > newEngineStats.Reads.size()) {
+            return {TQueryReplayEvents::ExtraReadingOldEngine, TStringBuilder()
+                << "Extra reading in old engine plan for table " << oldEngineStats.Name};
+        }
+
+        if (oldEngineStats.Reads.size() < newEngineStats.Reads.size()) {
+            return {TQueryReplayEvents::ExtraReadingNewEngine, TStringBuilder()
+                << "Extra reading in new engine plan for table " << newEngineStats.Name};
+        }
+
+        for (size_t i = 0; i < oldEngineStats.Reads.size(); ++i) {
+            if (oldEngineStats.Reads[i].ReadType != newEngineStats.Reads[i].ReadType) {
+                return {TQueryReplayEvents::ReadTypesMismatch, TStringBuilder() << "Read types mismatch, old engine: "
+                    << ToString(oldEngineStats.Reads[i].ReadType) << ", new engine: " << ToString(newEngineStats.Reads[i].ReadType)};
+            }
+
+            if (oldEngineStats.Reads[i].PushedLimit != newEngineStats.Reads[i].PushedLimit) {
+                return {TQueryReplayEvents::ReadLimitsMismatch, TStringBuilder() << "Read limits mismatch, old engine: "
+                    << oldEngineStats.Reads[i].PushedLimit << ", new engine: " << newEngineStats.Reads[i].PushedLimit};
+            }
+
+            if (oldEngineStats.Reads[i].ReadColumns != newEngineStats.Reads[i].ReadColumns) {
+                return {TQueryReplayEvents::ReadColumnsMismatch, TStringBuilder() << "Read columns mismatch"};
+            }
+        }
+
+        if (oldEngineStats.Writes.size() > newEngineStats.Writes.size()) {
+            return {TQueryReplayEvents::ExtraWriting, TStringBuilder()
+                << "Extra write operation in old engine plan for table " << oldEngineStats.Name};
+        }
+
+        if (oldEngineStats.Writes.size() < newEngineStats.Writes.size()) {
+            return {TQueryReplayEvents::ExtraWriting, TStringBuilder()
+                << "Extra write operation in new engine plan for table " << newEngineStats.Name};
+        }
+
+        for (size_t i = 0; i < oldEngineStats.Writes.size(); ++i) {
+            if (oldEngineStats.Writes[i].WriteColumns != newEngineStats.Writes[i].WriteColumns) {
+                return {TQueryReplayEvents::WriteColumnsMismatch, TStringBuilder() << "Write columns mismatch"};
+            }
+        }
+
+        return {TQueryReplayEvents::UncategorizedPlanMismatch, ""};
+    }
+
+    std::pair<TQueryReplayEvents::TCheckQueryPlanStatus, TString> CheckQueryPlan(const NJson::TJsonValue& newEnginePlan) {
+        NJson::TJsonValue oldEnginePlan = ExtractQueryPlan(ReplayDetails["query_plan"].GetStringSafe());
+        const auto oldEngineStats = ExtractTableStats(oldEnginePlan);
+        const auto newEngineStats = ExtractTableStats(newEnginePlan);
+
+        for (const auto& [table, stats] : oldEngineStats) {
+            auto it = newEngineStats.find(table);
+            if (it == newEngineStats.end()) {
+                WriteQueryMismatchInfo(oldEnginePlan, newEnginePlan);
+                return {TQueryReplayEvents::TableMissing,
+                    TStringBuilder() << "Table " << table << " not found in new engine plan"};
+            }
+
+            if (stats != it->second) {
+                WriteQueryMismatchInfo(oldEnginePlan, newEnginePlan);
+                return OnTableOperationsMismatch(stats, it->second);
+            }
+        }
+
+        return {TQueryReplayEvents::Success, ""};
+    }
+
+    void Reply(const Ydb::StatusIds::StatusCode& status, const TString& message) {
+        NYql::TIssue issue(NYql::TPosition(), message);
+        Reply(status, {issue});
+    }
+
+    void Reply(const Ydb::StatusIds::StatusCode& status, const TIssues& issues, const std::optional<TString>& queryPlan = std::nullopt) {
+        std::unique_ptr<TQueryReplayEvents::TEvCompileResponse> ev = std::make_unique<TQueryReplayEvents::TEvCompileResponse>(true);
+        Y_UNUSED(queryPlan);
+        if (status != Ydb::StatusIds::SUCCESS) {
+            ev->Success = false;
+            ev->Status = status == Ydb::StatusIds::TIMEOUT ? TQueryReplayEvents::CompileTimeout : TQueryReplayEvents::CompileError;
+            ev->Message = issues.ToString();
+            Cerr << "Failed to compile query: " << ev->Message << Endl;
+            WriteJsonData("-repro.txt", ReplayDetails);
+        } else {
+            std::tie(ev->Status, ev->Message) = CheckQueryPlan(ExtractQueryPlan(*queryPlan));
+        }
+
+        Send(Owner, ev.release());
+        PassAway();
+    }
+
+    static THashMap<TString, NYql::TKikimrTableMetadataPtr> ExtractStaticMetadata(const NJson::TJsonValue& data, EMetaSerializationType metaType) {
+        THashMap<TString, NYql::TKikimrTableMetadataPtr> meta;
+
+        if (metaType == EMetaSerializationType::EncodedProto) {
+            static NJson::TJsonReaderConfig readerConfig;
+            NJson::TJsonValue tablemetajson;
+            TStringInput in(data.GetStringSafe());
+            NJson::ReadJsonTree(&in, &readerConfig, &tablemetajson, false);
+            Y_ABORT_UNLESS(tablemetajson.IsArray());
+            for (auto& node : tablemetajson.GetArray()) {
+                NKikimrKqp::TKqpTableMetadataProto proto;
+
+                TString decoded = Base64Decode(node.GetStringRobust());
+                Y_ABORT_UNLESS(proto.ParseFromString(decoded));
+
+                NYql::TKikimrTableMetadataPtr ptr = MakeIntrusive<NYql::TKikimrTableMetadata>(&proto);
+                meta.emplace(proto.GetName(), ptr);
+            }
+        } else {
+            Y_ABORT_UNLESS(data.IsArray());
+            for (auto& node : data.GetArray()) {
+                NKikimrKqp::TKqpTableMetadataProto proto;
+                NProtobufJson::Json2Proto(node.GetStringRobust(), proto);
+                NYql::TKikimrTableMetadataPtr ptr = MakeIntrusive<NYql::TKikimrTableMetadata>(&proto);
+                meta.emplace(proto.GetName(), ptr);
+            }
+        }
+        return meta;
+    }
+
+    void Handle(TEvKqp::TEvContinueProcess::TPtr& ev) {
+        Y_ENSURE(!ev->Get()->QueryId);
+
+        TYqlLogScope logScope(TlsActivationContext->AsActorContext(), NKikimrServices::KQP_YQL, YqlName, "");
+
+        if (!ev->Get()->Finished) {
+            Continue();
+            return;
+        }
+
+        auto kqpResult = AsyncCompileResult->GetResult();
+        auto status = GetYdbStatus(kqpResult);
+
+        std::optional<TString> queryPlan;
+
+        if (status == Ydb::StatusIds::SUCCESS) {
+            queryPlan = std::move(kqpResult.QueryPlan);
+        }
+
+        Reply(status, kqpResult.Issues(), queryPlan);
+    }
+
+    void HandleTimeout() {
+        return Reply(Ydb::StatusIds::TIMEOUT, "Query compilation timed out.");
+    }
+
+private:
+    TActorId Owner;
+    TIntrusivePtr<TModuleResolverState> ModuleResolverState;
+    TString Uid;
+    std::unique_ptr<TKqpQueryId> Query;
+    TKqpSettings KqpSettings;
+    TKikimrConfiguration::TPtr Config;
+    TIntrusivePtr<IKqpGateway> Gateway;
+    TIntrusivePtr<IKqpHost> KqpHost;
+    TIntrusivePtr<IKqpHost::IAsyncQueryResult> AsyncCompileResult;
+    const NMiniKQL::IFunctionRegistry* FunctionRegistry;
+    TAlignedPagePoolCounters AlignedPagePoolCounters;
+    std::shared_ptr<TMetadataInfoHolder> TableMetadata;
+    TActorId MiniKQLCompileService;
+    NJson::TJsonValue ReplayDetails;
+};
+
+IActor* CreateQueryCompiler(
+    const TActorId& ownerId, TIntrusivePtr<TModuleResolverState> moduleResolverState,
+    const NMiniKQL::IFunctionRegistry* functionRegistry, NJson::TJsonValue&& replayDetails)
+{
+    return new TReplayCompileActor(ownerId, moduleResolverState, functionRegistry, std::move(replayDetails));
+}

+ 196 - 0
ydb/tools/query_replay/query_proccessor.cpp

@@ -0,0 +1,196 @@
+#include "query_replay.h"
+
+#include <ydb/core/kqp/provider/yql_kikimr_gateway.h>
+
+#include <library/cpp/string_utils/base64/base64.h>
+#include <library/cpp/json/json_reader.h>
+
+#include <util/generic/hash.h>
+#include <util/string/escape.h>
+
+using namespace NYdb;
+using namespace NActors;
+using namespace NKikimr::NKqp;
+
+class TQueryProcessorActor: public TActorBootstrapped<TQueryProcessorActor> {
+private:
+    TString FetchQuery;
+    TString WriteStatsQuery;
+    std::shared_ptr<NYdb::NTable::TTableClient> Client;
+    TString RunId;
+    TString QueryId;
+    TActorId OwnerId;
+    TIntrusivePtr<NKikimr::NKqp::TModuleResolverState> ModuleResolverState;
+    const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
+
+public:
+    TQueryProcessorActor(std::shared_ptr<NYdb::NTable::TTableClient> client, const TString& runId, const TString& queryId, const TString& tablePath,
+        const TString& statsTablePath, const TActorId& ownerId, TIntrusivePtr<NKikimr::NKqp::TModuleResolverState> moduleResolverState,
+        const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry)
+        : Client(client)
+        , RunId(runId)
+        , QueryId(queryId)
+        , OwnerId(ownerId)
+        , ModuleResolverState(moduleResolverState)
+        , FunctionRegistry(functionRegistry)
+    {
+        FetchQuery = Sprintf(
+            R"(
+                --!syntax_v1
+                DECLARE $query_id as String;
+                SELECT * FROM `%s` WHERE query_id=$query_id;
+            )", tablePath.c_str());
+
+        WriteStatsQuery = Sprintf(
+            R"(
+                DECLARE $run_id as Utf8;
+                DECLARE $query_id as Utf8;
+                DECLARE $fail_reason as Utf8;
+                DECLARE $extra_message as Utf8;
+                UPSERT INTO `%s` (RunId, QueryId, Timestamp, FailReason, ExtraMessage)
+                    VALUES($run_id, $query_id, YQL::Now(), $fail_reason, $extra_message);
+            )", statsTablePath.c_str());
+    }
+
+    void Bootstrap() {
+        Become(&TThis::StateResolve);
+        ResolveQueryData();
+    }
+
+    void ResolveQueryData() {
+        const TString& query = FetchQuery;
+        TActorSystem* actorSystem = TlsActivationContext->ExecutorThread.ActorSystem;
+        TActorId self = SelfId();
+
+        NYdb::TParams params = Client->GetParamsBuilder().AddParam("$query_id").String(QueryId).Build().Build();
+        Client->RetryOperation([query, params, self, actorSystem](NTable::TSession session) {
+            auto txControl = NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx();
+            auto queryResult = session.ExecuteDataQuery(query, txControl, std::move(params));
+
+            return queryResult.Apply([self, actorSystem](const NTable::TAsyncDataQueryResult& asyncResult) {
+                NTable::TDataQueryResult result = asyncResult.GetValue();
+                actorSystem->Send(self, new TQueryReplayEvents::TEvQueryResolveResult(std::move(result)));
+                return NThreading::MakeFuture<TStatus>(result);
+            });
+        });
+    }
+
+    void Handle(TQueryReplayEvents::TEvQueryResolveResult::TPtr& ev) {
+        if (!ev->Get()->DataQueryResult.IsSuccess()) {
+            Cerr << "Failed to resolve query data " << ev->Get()->DataQueryResult.GetIssues().ToString() << Endl;
+            Send(OwnerId, new TQueryReplayEvents::TEvQueryProcessorResult(false));
+            PassAway();
+            return;
+        }
+
+        const auto& resultSet = ev->Get()->DataQueryResult.GetResultSet(0);
+        TResultSetParser parser(resultSet);
+        if (!parser.TryNextRow()) {
+            Cout << "Query data not foud " << QueryId << Endl;
+            Send(OwnerId, new TQueryReplayEvents::TEvQueryProcessorResult(true));
+            PassAway();
+            return;
+        }
+
+        NJson::TJsonValue json(NJson::JSON_MAP);
+        for(auto& col: resultSet.GetColumnsMeta()) {
+            if (col.Name == "_logfeller_timestamp")
+                continue;
+
+            TString value = parser.ColumnParser(col.Name).GetOptionalString().GetRef();
+            json.InsertValue(col.Name, NJson::TJsonValue(std::move(value)));
+        }
+
+        auto compiler = CreateQueryCompiler(SelfId(), ModuleResolverState, FunctionRegistry, std::move(json));
+        Register(compiler);
+        Become(&TThis::StateCompiling);
+    }
+
+    void WriteQueryStats(TQueryReplayEvents::TCheckQueryPlanStatus status, const TString& message) {
+        TString failReason;
+        switch (status) {
+            case TQueryReplayEvents::CompileError:
+                failReason = "compile_error";
+                break;
+            case TQueryReplayEvents::CompileTimeout:
+                failReason = "compile_timeout";
+                break;
+            case TQueryReplayEvents::TableMissing:
+                failReason = "table_missing";
+                break;
+            case TQueryReplayEvents::ExtraReadingOldEngine:
+                failReason = "extra_reading_old_engine";
+                break;
+            case TQueryReplayEvents::ExtraReadingNewEngine:
+                failReason = "extra_reading_new_engine";
+                break;
+            case TQueryReplayEvents::ReadTypesMismatch:
+                failReason = "read_types_mismatch";
+                break;
+            case TQueryReplayEvents::ReadLimitsMismatch:
+                failReason = "read_limits_mismatch";
+                break;
+            case TQueryReplayEvents::ReadColumnsMismatch:
+                failReason = "read_columns_mismatch";
+                break;
+            case TQueryReplayEvents::ExtraWriting:
+                failReason = "extra_writing";
+                break;
+            case TQueryReplayEvents::WriteColumnsMismatch:
+                failReason = "write_columns_mismatch";
+                break;
+            case TQueryReplayEvents::UncategorizedPlanMismatch:
+                failReason = "uncategorized_plan_mismatch";
+                break;
+            default:
+                failReason = "unspecified";
+        }
+
+        NYdb::TParams params = Client->GetParamsBuilder()
+            .AddParam("$run_id").Utf8(RunId).Build()
+            .AddParam("$query_id").Utf8(QueryId).Build()
+            .AddParam("$fail_reason").Utf8(failReason).Build()
+            .AddParam("$extra_message").Utf8(message).Build()
+        .Build();
+
+        Client->RetryOperation([query = WriteStatsQuery, params](NTable::TSession session) {
+            auto txControl = NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx();
+            auto queryResult = session.ExecuteDataQuery(query, txControl, std::move(params));
+
+            return queryResult.Apply([](const NTable::TAsyncDataQueryResult& asyncResult) {
+                NTable::TDataQueryResult result = asyncResult.GetValue();
+                if (!result.IsSuccess()) {
+                    Cerr << "Failed to write stats: " << result.GetIssues().ToString() << Endl;
+                }
+                return NThreading::MakeFuture<TStatus>(result);
+            });
+        });
+    }
+
+    void Handle(TQueryReplayEvents::TEvCompileResponse::TPtr& ev) {
+        if (ev->Get()->Status != TQueryReplayEvents::Success) {
+            WriteQueryStats(ev->Get()->Status, ev->Get()->Message);
+        }
+
+        Send(OwnerId, new TQueryReplayEvents::TEvQueryProcessorResult(ev->Get()->Success));
+    }
+
+public:
+    STATEFN(StateResolve) {
+        switch (ev->GetTypeRewrite()) {
+            hFunc(TQueryReplayEvents::TEvQueryResolveResult, Handle);
+        }
+    }
+
+    STATEFN(StateCompiling) {
+        switch (ev->GetTypeRewrite()) {
+            hFunc(TQueryReplayEvents::TEvCompileResponse, Handle);
+        }
+    }
+};
+
+NActors::IActor* CreateQueryProcessorActor(std::shared_ptr<NYdb::NTable::TTableClient> client, const TString& runId, const TString& queryId, const TString& tablePath,
+                                           const TString& statsTablePath, const TActorId& ownerId, TIntrusivePtr<NKikimr::NKqp::TModuleResolverState> moduleResolverState,
+                                           const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry) {
+    return new TQueryProcessorActor(client, runId, queryId, tablePath, statsTablePath, ownerId, moduleResolverState, functionRegistry);
+}

+ 277 - 0
ydb/tools/query_replay/query_replay.cpp

@@ -0,0 +1,277 @@
+#include "query_replay.h"
+
+#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
+
+#include <ydb/library/actors/core/actor.h>
+#include <ydb/library/actors/core/actor_bootstrapped.h>
+#include <ydb/library/actors/core/events.h>
+#include <ydb/library/actors/core/event_local.h>
+#include <ydb/library/actors/core/actorsystem.h>
+#include <library/cpp/json/json_reader.h>
+
+#include <util/digest/city.h>
+
+#include <optional>
+#include <unordered_set>
+
+using namespace NYdb;
+using namespace NActors;
+
+
+class TQueryReplayActorRunnerSimple: public TActorBootstrapped<TQueryReplayActorRunnerSimple> {
+private:
+    std::unordered_set<TActorId, THash<TActorId>> PendingResults;
+    std::vector<TString> Queries;
+    std::shared_ptr<TQueryReplayStats> Stats;
+    const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
+    TIntrusivePtr<NKikimr::NKqp::TModuleResolverState> ModuleResolverState;
+
+public:
+    TQueryReplayActorRunnerSimple(
+        std::vector<TString>&& queries,
+        std::shared_ptr<TQueryReplayStats> queryReplayStats,
+        const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry)
+        : Queries(std::move(queries))
+        , Stats(queryReplayStats)
+        , FunctionRegistry(functionRegistry)
+        , ModuleResolverState(MakeIntrusive<NKikimr::NKqp::TModuleResolverState>())
+    {
+        Y_ABORT_UNLESS(GetYqlDefaultModuleResolver(ModuleResolverState->ExprCtx, ModuleResolverState->ModuleResolver));
+    }
+
+    void Bootstrap() {
+        Become(&TThis::MainState);
+        for(auto& query: Queries) {
+            NJson::TJsonValue json = ReadQueryJson(query);
+            StartCompilation(std::move(json));
+        }
+    }
+
+    NJson::TJsonValue ReadQueryJson(const TString& p) {
+        static NJson::TJsonReaderConfig readConfig;
+        TFileInput in(p);
+        NJson::TJsonValue reply;
+        NJson::ReadJsonTree(&in, &readConfig, &reply, false);
+        return reply;
+    }
+
+    void StartCompilation(NJson::TJsonValue&& data) {
+        auto actor = CreateQueryCompiler(SelfId(), ModuleResolverState, FunctionRegistry, std::move(data));
+        PendingResults.emplace(Register(actor));
+    }
+
+    void Handle(TQueryReplayEvents::TEvCompileResponse::TPtr& ev) {
+        if (ev->Get()->Success) {
+            Stats->OnSuccess();
+        } else {
+            Stats->OnFailure();
+        }
+        Stats->AddNew(1);
+
+        auto it = PendingResults.find(ev->Sender);
+        Y_ABORT_UNLESS(it != PendingResults.end());
+        PendingResults.erase(it);
+        if (PendingResults.empty()) {
+            Stats->Complete();
+        }
+    }
+
+    STATEFN(MainState) {
+        switch(ev->GetTypeRewrite()) {
+           hFunc(TQueryReplayEvents::TEvCompileResponse, Handle);
+        }
+    }
+};
+
+class TQueryReplayActorRunner: public TActorBootstrapped<TQueryReplayActorRunner> {
+public:
+    TQueryReplayActorRunner(
+        NYdb::TDriver& driver, const TString& runId, const TString& queryTablePath, const TString& statsTablePath,
+        std::shared_ptr<TQueryReplayStats> queryReplayStats, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+        ui32 maxInFlight, ui32 modulo, ui32 shardId)
+        : Stats(queryReplayStats)
+        , QueryTablePath(queryTablePath)
+        , StatsTablePath(statsTablePath)
+        , ModuleResolverState(MakeIntrusive<NKikimr::NKqp::TModuleResolverState>())
+        , FunctionRegistry(functionRegistry)
+        , MaxInFlightSize(maxInFlight)
+        , Modulo(modulo)
+        , ShardId(shardId)
+        , RunId(runId)
+    {
+        Y_ABORT_UNLESS(GetYqlDefaultModuleResolver(ModuleResolverState->ExprCtx, ModuleResolverState->ModuleResolver));
+        Client = std::make_shared<NTable::TTableClient>(driver, NTable::TClientSettings());
+    }
+
+    void Bootstrap() {
+        Become(&TThis::StateRead);
+        StartRead();
+    }
+
+    void StartRead() {
+        PartIterator.reset();
+
+        TActorId self = SelfId();
+        TActorSystem* actorSystem = TlsActivationContext->ExecutorThread.ActorSystem;
+        const TString& path = QueryTablePath;
+        Client->GetSession().Subscribe([path, self, actorSystem](NTable::TAsyncCreateSessionResult asyncResult) {
+            const NTable::TCreateSessionResult& result = asyncResult.GetValue();
+            if (!result.IsSuccess()) {
+                actorSystem->Send(self, new TQueryReplayEvents::TEvReadFailed());
+                return;
+            }
+
+            NTable::TSession session = result.GetSession();
+            NTable::TReadTableSettings settings = NTable::TReadTableSettings().AppendColumns("query_id").AppendColumns("query_text");
+            session.ReadTable(path, settings).Subscribe([self, actorSystem](NTable::TAsyncTablePartIterator asyncResult) {
+                NTable::TTablePartIterator partIterator = asyncResult.ExtractValue();
+                if (!partIterator.IsSuccess()) {
+                    actorSystem->Send(self, new TQueryReplayEvents::TEvReadFailed());
+                } else {
+                    actorSystem->Send(self, new TQueryReplayEvents::TEvReadStarted(std::move(partIterator)));
+                }
+            });
+        });
+    }
+
+    void Continue() {
+        TActorId self = SelfId();
+        TActorSystem* actorSystem = TlsActivationContext->ExecutorThread.ActorSystem;
+        PartIterator->ReadNext().Subscribe([self, actorSystem](NTable::TAsyncSimpleStreamPart<TResultSet> asyncResult) {
+            NTable::TSimpleStreamPart<TResultSet> tablePart = asyncResult.ExtractValue();
+            if (tablePart.EOS()) {
+                actorSystem->Send(self, new TQueryReplayEvents::TEvQueryBatch(std::move(tablePart)));
+            } else if (!tablePart.IsSuccess()) {
+                actorSystem->Send(self, new TQueryReplayEvents::TEvReadFailed());
+            } else {
+                actorSystem->Send(self, new TQueryReplayEvents::TEvQueryBatch(std::move(tablePart)));
+            }
+        });
+    }
+
+    void StartQueryReplay(const TString& queryId) {
+        NActors::IActor* actor = CreateQueryProcessorActor(Client, RunId, queryId, QueryTablePath, StatsTablePath,
+            SelfId(), ModuleResolverState, FunctionRegistry);
+        TActorId actorId = Register(actor);
+        InFlightQueries.emplace(actorId);
+    }
+
+    void ProcessQueriesQueue() {
+        while (InFlightQueries.size() < MaxInFlightSize && !WaitingQueriesQueue.empty()) {
+            TString queryId = std::move(WaitingQueriesQueue.front());
+            WaitingQueriesQueue.pop_front();
+            StartQueryReplay(queryId);
+        }
+
+        if (InFlightQueries.size() == 0 && CurrentStateFunc() == &TThis::StateWaiting) {
+            Stats->Complete();
+            PassAway();
+            return;
+        }
+    }
+
+    void Handle(TQueryReplayEvents::TEvQueryBatch::TPtr& ev) {
+        if (ev->Get()->TablePart.EOS()) {
+            Become(&TThis::StateWaiting);
+            ProcessQueriesQueue();
+            return;
+        }
+
+        const auto& resultSet = ev->Get()->TablePart.GetPart();
+        TResultSetParser parser(resultSet);
+        Y_ABORT_UNLESS(parser.TryNextRow());
+
+        ui32 NewQueries = 0;
+        TValueParser& idParser = parser.ColumnParser("query_id");
+        TValueParser& queryText = parser.ColumnParser("query_text");
+        do {
+            TString queryId = std::move(idParser.GetOptionalString().GetRef());
+            if (AllQueries.find(queryId) != AllQueries.end())
+                continue;
+
+            ui64 QueryHash = CityHash64(queryText.GetOptionalString().GetRef());
+
+            if (Hashes.find(QueryHash) != Hashes.end() || QueryHash % Modulo != ShardId)
+                continue;
+
+            Hashes.emplace(QueryHash);
+            ++NewQueries;
+            AllQueries.insert(queryId);
+            WaitingQueriesQueue.emplace_back(queryId);
+        } while (parser.TryNextRow());
+
+        Stats->AddNew(NewQueries);
+        Continue();
+        ProcessQueriesQueue();
+    }
+
+    void Handle(TQueryReplayEvents::TEvReadFailed::TPtr&) {
+        StartRead();
+    }
+
+    void Handle(TQueryReplayEvents::TEvReadStarted::TPtr& ev) {
+        PartIterator = std::move(ev->Get()->PartIterator);
+        Continue();
+    }
+
+    void Handle(TQueryReplayEvents::TEvQueryProcessorResult::TPtr& ev) {
+        if (ev->Get()->Success) {
+            Stats->OnSuccess();
+        } else {
+            Stats->OnFailure();
+        }
+
+        InFlightQueries.erase(ev->Sender);
+        ProcessQueriesQueue();
+    }
+
+    STATEFN(StateRead) {
+        switch (ev->GetTypeRewrite()) {
+            hFunc(TQueryReplayEvents::TEvQueryBatch, Handle);
+            hFunc(TQueryReplayEvents::TEvReadFailed, Handle);
+            hFunc(TQueryReplayEvents::TEvReadStarted, Handle);
+            hFunc(TQueryReplayEvents::TEvQueryProcessorResult, Handle);
+        }
+    }
+
+    STATEFN(StateWaiting) {
+        switch (ev->GetTypeRewrite()) {
+            hFunc(TQueryReplayEvents::TEvQueryProcessorResult, Handle);
+        }
+    }
+
+private:
+    std::shared_ptr<NTable::TTableClient> Client;
+    std::deque<TString> WaitingQueriesQueue;
+    std::unordered_set<TString> AllQueries;
+    std::unordered_set<TActorId, THash<TActorId>> InFlightQueries;
+    std::optional<NTable::TTablePartIterator> PartIterator;
+    std::shared_ptr<TQueryReplayStats> Stats;
+    TString QueryTablePath;
+    TString StatsTablePath;
+    TIntrusivePtr<NKikimr::NKqp::TModuleResolverState> ModuleResolverState;
+    const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
+    THashSet<ui64> Hashes;
+    ui32 MaxInFlightSize;
+    ui32 Modulo;
+    ui32 ShardId;
+    TString RunId;
+};
+
+NActors::IActor* CreateQueryReplayActorSimple(
+    std::vector<TString>&& queries, std::shared_ptr<TQueryReplayStats> queryReplayStats,
+    const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry)
+{
+    return new TQueryReplayActorRunnerSimple(std::move(queries), queryReplayStats, functionRegistry);
+}
+
+NActors::IActor* CreateQueryReplayActor(
+    NYdb::TDriver& driver, const TString& runId, const TString& queryTablePath, const TString& statsTablePath,
+    std::shared_ptr<TQueryReplayStats> queryReplayStats, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+    ui32 maxInFlight, ui32 modulo, ui32 shardId)
+{
+    return new TQueryReplayActorRunner(driver, runId, queryTablePath, statsTablePath, queryReplayStats, functionRegistry,
+        maxInFlight, modulo, shardId);
+}

+ 196 - 0
ydb/tools/query_replay/query_replay.h

@@ -0,0 +1,196 @@
+#pragma once
+
+#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
+
+#include <ydb/core/base/appdata.h>
+#include <ydb/core/kqp/gateway/kqp_gateway.h>
+
+#include <ydb/library/actors/core/actor.h>
+#include <ydb/library/actors/core/actorid.h>
+#include <ydb/library/actors/core/actor_bootstrapped.h>
+#include <ydb/library/actors/core/executor_pool_basic.h>
+#include <ydb/library/actors/core/events.h>
+#include <ydb/library/actors/core/event_local.h>
+#include <ydb/library/actors/core/hfunc.h>
+#include <library/cpp/logger/backend.h>
+#include <ydb/library/actors/core/actorsystem.h>
+#include <ydb/core/scheme/scheme_type_registry.h>
+#include <library/cpp/json/json_value.h>
+#include <ydb/library/yql/minikql/mkql_function_registry.h>
+#include <optional>
+#include <unordered_set>
+#include <deque>
+
+#include <ydb/library/yql/core/services/mounts/yql_mounts.h>
+#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
+
+struct TQueryReplayStats;
+
+struct TQueryReplayApp {
+    TString Endpoint;
+    TString Database;
+    TString Path;
+    TString StatsPath;
+    TString AuthToken;
+    ui32 ActorSystemThreadsCount = 15;
+    ui32 MaxInFlight = 1000;
+    ui32 Modulo = 1;
+    ui32 ShardId = 0;
+    TVector<TString> Queries;
+
+    THolder<NActors::TActorSystem> ActorSystem;
+    THolder<NKikimr::TAppData> AppData;
+    TIntrusivePtr<NKikimr::NScheme::TKikimrTypeRegistry> TypeRegistry;
+    TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FunctionRegistry;
+    TIntrusivePtr<NActors::NLog::TSettings> LogSettings;
+    TAutoPtr<TLogBackend> LogBackend;
+    NYdb::TDriverConfig DriverConfig;
+    NActors::TActorId ReplayActor;
+    std::unique_ptr<NYdb::TDriver> Driver;
+    std::shared_ptr<TQueryReplayStats> QueryReplayStats;
+    TIntrusivePtr<IRandomProvider> RandomProvider;
+
+    void InitializeLogger();
+    void InitializeDriver();
+
+    int ParseConfig(int argc, char** argv);
+    void Start();
+    void Stop();
+};
+
+struct TQueryReplayStats {
+    std::atomic<ui32> TotalQueries;
+    std::atomic<ui32> CompilationErrors;
+    std::atomic<ui32> CompilationSuccess;
+    std::atomic<bool> ReplayComplete;
+
+    TQueryReplayStats()
+        : TotalQueries(0)
+        , CompilationErrors(0)
+        , CompilationSuccess(0)
+        , ReplayComplete(false)
+    {
+    }
+
+    void AddNew(ui32 val) {
+        TotalQueries.fetch_add(val);
+    }
+
+    void OnSuccess() {
+        ++CompilationSuccess;
+    }
+
+    void OnFailure() {
+        ++CompilationErrors;
+    }
+
+    ui32 GetTotalCount() {
+        return TotalQueries.load();
+    }
+
+    ui32 GetCompletedCount() {
+        return CompilationSuccess.load();
+    }
+
+    ui32 GetErrorsCount() {
+        return CompilationErrors.load();
+    }
+
+    void Complete() {
+        ReplayComplete.store(true);
+    }
+
+    bool IsComplete() {
+        return ReplayComplete.load();
+    }
+};
+
+struct TQueryReplayEvents {
+    enum EEv {
+        EvReadFailed = EventSpaceBegin(NActors::TEvents::ES_USERSPACE + 1),
+        EvReadStarted,
+        EvQueryBatch,
+        EvQueryProcessorResult,
+        EvQueryResolveResult,
+        EvCompileResponse,
+    };
+
+    struct TEvReadFailed: public NActors::TEventLocal<TEvReadFailed, EvReadFailed> {
+    };
+
+    struct TEvReadStarted: public NActors::TEventLocal<TEvReadStarted, EvReadStarted> {
+        NYdb::NTable::TTablePartIterator PartIterator;
+
+        TEvReadStarted(NYdb::NTable::TTablePartIterator&& partIterator)
+            : PartIterator(partIterator)
+        {
+        }
+    };
+
+    struct TEvQueryBatch: public NActors::TEventLocal<TEvQueryBatch, EvQueryBatch> {
+        NYdb::NTable::TSimpleStreamPart<NYdb::TResultSet> TablePart;
+
+        TEvQueryBatch(NYdb::NTable::TSimpleStreamPart<NYdb::TResultSet>&& tablePart)
+            : TablePart(tablePart)
+        {
+        }
+    };
+
+    struct TEvQueryProcessorResult: public NActors::TEventLocal<TEvQueryProcessorResult, EvQueryProcessorResult> {
+        bool Success;
+
+        TEvQueryProcessorResult(bool success)
+            : Success(success)
+        {
+        }
+    };
+
+    struct TEvQueryResolveResult: public NActors::TEventLocal<TEvQueryResolveResult, EvQueryResolveResult> {
+        NYdb::NTable::TDataQueryResult DataQueryResult;
+
+        TEvQueryResolveResult(NYdb::NTable::TDataQueryResult&& result)
+            : DataQueryResult(result)
+        {
+        }
+    };
+
+    enum TCheckQueryPlanStatus {
+        Success,
+        CompileError,
+        CompileTimeout,
+        TableMissing,
+        ExtraReadingOldEngine,
+        ExtraReadingNewEngine,
+        ReadTypesMismatch,
+        ReadLimitsMismatch,
+        ReadColumnsMismatch,
+        ExtraWriting,
+        WriteColumnsMismatch,
+        UncategorizedPlanMismatch,
+        Unspecified,
+    };
+
+    struct TEvCompileResponse: public NActors::TEventLocal<TEvCompileResponse, EvCompileResponse> {
+        bool Success;
+        TCheckQueryPlanStatus Status = Unspecified;
+        TString Message;
+
+        TEvCompileResponse(bool success)
+            : Success(success)
+        {
+        }
+    };
+};
+
+
+NActors::IActor* CreateQueryCompiler(
+    const NActors::TActorId& ownerId, TIntrusivePtr<NKikimr::NKqp::TModuleResolverState> moduleResolverState,
+    const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NJson::TJsonValue&& replayDetails);
+NActors::IActor* CreateQueryProcessorActor(std::shared_ptr<NYdb::NTable::TTableClient> client, const TString& runId, const TString& queryId, const TString& tablePath, const TString& statsTablePath,
+                                           const NActors::TActorId& ownerId, TIntrusivePtr<NKikimr::NKqp::TModuleResolverState> moduleResolverState, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry);
+NActors::IActor* CreateQueryReplayActor(NYdb::TDriver& driver, const TString& runId, const TString& queryTablePath, const TString& statsTablePath, std::shared_ptr<TQueryReplayStats> queryReplayStats,
+                                        const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, ui32 maxInFlight, ui32 modulo, ui32 shardId);
+NActors::IActor* CreateQueryReplayActorSimple(
+    std::vector<TString>&& queries, std::shared_ptr<TQueryReplayStats> queryReplayStats, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry);

+ 17 - 0
ydb/tools/query_replay/ya.make

@@ -0,0 +1,17 @@
+PROGRAM(ydb_query_replay)
+
+OWNER(
+    gvit
+    g:kikimr
+)
+
+ALLOCATOR(LF)
+
+YQL_LAST_ABI_VERSION()
+INCLUDE(${ARCADIA_ROOT}/ydb/tools/query_replay/common_deps.inc)
+
+SRCS(${YDB_REPLAY_SRCS})
+
+PEERDIR(${YDB_REPLAY_PEERDIRS})
+
+END()

+ 1 - 0
ydb/tools/ya.make

@@ -1,6 +1,7 @@
 RECURSE(
     blobsan
     cfg
+    query_replay
     simple_queue
     ydbd_slice
     tsserver