Browse Source

Restrict query cache for scheme operations. (KIKIMR-18956)

spuchin 1 year ago
parent
commit
9b54275e5f

+ 5 - 1
ydb/core/grpc_services/query/rpc_execute_query.cpp

@@ -246,6 +246,10 @@ private:
             ? NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY
             ? NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY
             : NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY;
             : NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY;
 
 
+
+        auto cachePolicy = google::protobuf::Arena::CreateMessage<Ydb::Table::QueryCachePolicy>(Request_->GetArena());
+        cachePolicy->set_keep_in_cache(true);
+
         auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
         auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
             queryAction,
             queryAction,
             queryType,
             queryType,
@@ -257,7 +261,7 @@ private:
             txControl,
             txControl,
             &req->parameters(),
             &req->parameters(),
             GetCollectStatsMode(req->stats_mode()),
             GetCollectStatsMode(req->stats_mode()),
-            nullptr, // queryCachePolicy
+            cachePolicy,
             nullptr, // operationParams
             nullptr, // operationParams
             false, // keepSession
             false, // keepSession
             false, // useCancelAfter
             false, // useCancelAfter

+ 1 - 0
ydb/core/kqp/common/compilation/result.h

@@ -45,6 +45,7 @@ struct TKqpCompileResult {
     TString Uid;
     TString Uid;
 
 
     ETableReadType MaxReadType;
     ETableReadType MaxReadType;
+    bool AllowCache = true;
 
 
     std::shared_ptr<const TPreparedQueryHolder> PreparedQuery;
     std::shared_ptr<const TPreparedQueryHolder> PreparedQuery;
 };
 };

+ 1 - 0
ydb/core/kqp/compile_service/kqp_compile_actor.cpp

@@ -332,6 +332,7 @@ private:
                     kqpResult.PreparingQuery.release(), AppData()->FunctionRegistry);
                     kqpResult.PreparingQuery.release(), AppData()->FunctionRegistry);
                 preparedQueryHolder->MutableLlvmSettings().Fill(Config, queryType);
                 preparedQueryHolder->MutableLlvmSettings().Fill(Config, queryType);
                 KqpCompileResult->PreparedQuery = preparedQueryHolder;
                 KqpCompileResult->PreparedQuery = preparedQueryHolder;
+                KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery());
             }
             }
 
 
             auto now = TInstant::Now();
             auto now = TInstant::Now();

+ 3 - 1
ydb/core/kqp/compile_service/kqp_compile_service.cpp

@@ -630,11 +630,13 @@ private:
             << ", status: " << compileResult->Status
             << ", status: " << compileResult->Status
             << ", compileActor: " << ev->Sender);
             << ", compileActor: " << ev->Sender);
 
 
+        bool keepInCache = compileRequest.KeepInCache && compileResult->AllowCache;
+
         try {
         try {
             if (compileResult->Status == Ydb::StatusIds::SUCCESS) {
             if (compileResult->Status == Ydb::StatusIds::SUCCESS) {
                 if (QueryCache.FindByUid(compileResult->Uid, false)) {
                 if (QueryCache.FindByUid(compileResult->Uid, false)) {
                     QueryCache.Replace(compileResult);
                     QueryCache.Replace(compileResult);
-                } else if (compileRequest.KeepInCache) {
+                } else if (keepInCache) {
                     if (QueryCache.Insert(compileResult)) {
                     if (QueryCache.Insert(compileResult)) {
                         Counters->CompileQueryCacheEvicted->Inc();
                         Counters->CompileQueryCacheEvicted->Inc();
                     }
                     }

+ 10 - 0
ydb/core/kqp/session_actor/kqp_worker_common.cpp

@@ -173,4 +173,14 @@ bool IsSameProtoType(const NKikimrMiniKQL::TType& actual, const NKikimrMiniKQL::
     }
     }
 }
 }
 
 
+bool CanCacheQuery(const NKqpProto::TKqpPhyQuery& query) {
+    for (const auto& tx : query.GetTransactions()) {
+        if (tx.GetType() == NKqpProto::TKqpPhyTx::TYPE_SCHEME) {
+            return false;
+        }
+    }
+
+    return true;
+}
+
 } // namespace NKikimr::NKqp
 } // namespace NKikimr::NKqp

+ 2 - 0
ydb/core/kqp/session_actor/kqp_worker_common.h

@@ -127,6 +127,8 @@ inline ETableReadType ExtractMostHeavyReadType(const TString& queryPlan) {
     return maxReadType;
     return maxReadType;
 }
 }
 
 
+bool CanCacheQuery(const NKqpProto::TKqpPhyQuery& query);
+
 void SlowLogQuery(const TActorContext &ctx, const NYql::TKikimrConfiguration* config, const TKqpRequestInfo& requestInfo,
 void SlowLogQuery(const TActorContext &ctx, const NYql::TKikimrConfiguration* config, const TKqpRequestInfo& requestInfo,
     const TDuration& duration, Ydb::StatusIds::StatusCode status, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, ui64 parametersSize,
     const TDuration& duration, Ydb::StatusIds::StatusCode status, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, ui64 parametersSize,
     NKikimrKqp::TEvQueryResponse *record, const std::function<TString()> extractQueryText);
     NKikimrKqp::TEvQueryResponse *record, const std::function<TString()> extractQueryText);

+ 48 - 0
ydb/core/kqp/ut/service/kqp_query_service_ut.cpp

@@ -392,6 +392,54 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
         CompareYson(R"([[[1u];["One"]]])", FormatResultSetYson(result.GetResultSet(0)));
         CompareYson(R"([[[1u];["One"]]])", FormatResultSetYson(result.GetResultSet(0)));
     }
     }
 
 
+    Y_UNIT_TEST(QueryDdlCache) {
+        NKikimrConfig::TAppConfig appConfig;
+        appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
+        auto setting = NKikimrKqp::TKqpSetting();
+        auto serverSettings = TKikimrSettings()
+            .SetAppConfig(appConfig)
+            .SetKqpSettings({setting});
+
+        TKikimrRunner kikimr(serverSettings);
+        auto db = kikimr.GetQueryClient();
+
+        auto settings = TExecuteQuerySettings()
+            .StatsMode(EStatsMode::Basic);
+
+        auto result = db.ExecuteQuery(R"(
+            CREATE TABLE TestDdl (
+                Key Uint64,
+                Value String,
+                PRIMARY KEY (Key)
+            );
+        )", TTxControl::NoTx(), settings).ExtractValueSync();
+        UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+        auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
+        UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false);
+
+        {
+            // TODO: Switch to query service.
+            auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
+
+            UNIT_ASSERT(session.ExecuteSchemeQuery(R"(
+                DROP TABLE TestDdl;
+            )").GetValueSync().IsSuccess());
+        }
+
+        result = db.ExecuteQuery(R"(
+            CREATE TABLE TestDdl (
+                Key Uint64,
+                Value String,
+                PRIMARY KEY (Key)
+            );
+        )", TTxControl::NoTx(), settings).ExtractValueSync();
+        UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+        stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
+        UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false);
+    }
+
     Y_UNIT_TEST(MaterializeTxResults) {
     Y_UNIT_TEST(MaterializeTxResults) {
         auto kikimr = DefaultKikimrRunner();
         auto kikimr = DefaultKikimrRunner();
         auto db = kikimr.GetQueryClient();
         auto db = kikimr.GetQueryClient();