Browse Source

Faster lock removal in datashard, KIKIMR-14732

ref:a6b3f9fad1e1aca817c5603fe8b1c7bfe8b87952
Aleksei Borzenkov 2 years ago
parent
commit
fd7cbab174

+ 1 - 1
CMakeLists.darwin.txt

@@ -543,6 +543,7 @@ add_subdirectory(ydb/core/kqp/runtime)
 add_subdirectory(ydb/core/kqp/common)
 add_subdirectory(ydb/core/kqp/expr_nodes)
 add_subdirectory(ydb/library/yql/dq/expr_nodes)
+add_subdirectory(ydb/core/tx/long_tx_service/public)
 add_subdirectory(ydb/library/yql/dq/actors)
 add_subdirectory(ydb/library/yql/dq/common)
 add_subdirectory(ydb/core/ydb_convert)
@@ -583,7 +584,6 @@ add_subdirectory(ydb/public/lib/value)
 add_subdirectory(ydb/library/yql/dq/actors/compute)
 add_subdirectory(ydb/library/yql/dq/tasks)
 add_subdirectory(ydb/services/lib/sharding)
-add_subdirectory(ydb/core/tx/long_tx_service/public)
 add_subdirectory(ydb/core/yq/libs/actors)
 add_subdirectory(ydb/core/yq/libs/actors/logging)
 add_subdirectory(ydb/core/yq/libs/checkpointing)

+ 1 - 1
CMakeLists.linux.txt

@@ -623,6 +623,7 @@ add_subdirectory(ydb/core/kqp/runtime)
 add_subdirectory(ydb/core/kqp/common)
 add_subdirectory(ydb/core/kqp/expr_nodes)
 add_subdirectory(ydb/library/yql/dq/expr_nodes)
+add_subdirectory(ydb/core/tx/long_tx_service/public)
 add_subdirectory(ydb/library/yql/dq/actors)
 add_subdirectory(ydb/library/yql/dq/common)
 add_subdirectory(ydb/core/ydb_convert)
@@ -663,7 +664,6 @@ add_subdirectory(ydb/public/lib/value)
 add_subdirectory(ydb/library/yql/dq/actors/compute)
 add_subdirectory(ydb/library/yql/dq/tasks)
 add_subdirectory(ydb/services/lib/sharding)
-add_subdirectory(ydb/core/tx/long_tx_service/public)
 add_subdirectory(ydb/core/yq/libs/actors)
 add_subdirectory(ydb/core/yq/libs/actors/logging)
 add_subdirectory(ydb/core/yq/libs/checkpointing)

+ 1 - 0
ydb/core/kqp/common/CMakeLists.txt

@@ -18,6 +18,7 @@ target_link_libraries(core-kqp-common PUBLIC
   ydb-core-engine
   core-kqp-expr_nodes
   core-kqp-provider
+  tx-long_tx_service-public
   ydb-library-aclib
   yql-core-issue
   yql-dq-actors

+ 2 - 0
ydb/core/kqp/common/kqp_gateway.h

@@ -6,6 +6,7 @@
 #include <ydb/library/yql/ast/yql_expr.h>
 #include <ydb/library/yql/dq/common/dq_value.h>
 #include <ydb/core/kqp/provider/yql_kikimr_gateway.h>
+#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
 
 #include <library/cpp/actors/core/actorid.h>
 
@@ -128,6 +129,7 @@ public:
 
     struct TExecPhysicalResult : public TGenericResult {
         NKikimrKqp::TExecuterTxResult ExecuterResult;
+        NLongTxService::TLockHandle LockHandle;
     };
 
     struct TAstQuerySettings {

+ 4 - 1
ydb/core/kqp/common/kqp_transform.h

@@ -7,6 +7,8 @@
 #include <ydb/core/kqp/provider/yql_kikimr_expr_nodes.h>
 #include <ydb/core/kqp/provider/yql_kikimr_provider.h>
 
+#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
+
 #include <ydb/library/yql/dq/common/dq_value.h>
 #include <ydb/library/yql/utils/log/log.h>
 
@@ -59,13 +61,14 @@ struct TKqpTxLocks {
     NKikimrMiniKQL::TType LockType;
     NKikimrMiniKQL::TListType LocksListType;
     THashMap<TKqpTxLock::TKey, TKqpTxLock> LocksMap;
+    NLongTxService::TLockHandle LockHandle;
 
     TMaybe<NYql::TIssue> LockIssue;
 
     bool HasLocks() const { return !LocksMap.empty(); }
     bool Broken() const { return LockIssue.Defined(); }
     void MarkBroken(NYql::TIssue lockIssue) { LockIssue.ConstructInPlace(std::move(lockIssue)); }
-    ui64 GetLockTxId() const { return HasLocks() ? LocksMap.begin()->second.GetLockId() : 0; }
+    ui64 GetLockTxId() const { return LockHandle ? LockHandle.GetLockId() : HasLocks() ? LocksMap.begin()->second.GetLockId() : 0; }
     size_t Size() const { return LocksMap.size(); }
 
     void ReportIssues(NYql::TExprContext& ctx) {

+ 1 - 0
ydb/core/kqp/executer/CMakeLists.txt

@@ -23,6 +23,7 @@ target_link_libraries(core-kqp-executer PUBLIC
   core-kqp-compile
   core-kqp-rm
   ydb-core-protos
+  tx-long_tx_service-public
   ydb-core-ydb_convert
   ydb-library-mkql_proto
   library-mkql_proto-protos

+ 10 - 0
ydb/core/kqp/executer/kqp_data_executer.cpp

@@ -19,6 +19,7 @@
 #include <ydb/core/tx/coordinator/coordinator_impl.h>
 #include <ydb/core/tx/datashard/datashard.h>
 #include <ydb/core/tx/long_tx_service/public/events.h>
+#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
 #include <ydb/core/tx/tx_proxy/proxy.h>
 
 #include <ydb/library/yql/dq/runtime/dq_columns_resolve.h>
@@ -30,6 +31,7 @@ namespace NKqp {
 
 using namespace NYql;
 using namespace NYql::NDq;
+using namespace NLongTxService;
 
 namespace {
 
@@ -1182,6 +1184,7 @@ private:
 
         if (lockTxId) {
             dataTransaction.SetLockTxId(*lockTxId);
+            dataTransaction.SetLockNodeId(SelfId().NodeId());
         }
 
         for (auto& task : dataTransaction.GetKqpTransaction().GetTasks()) {
@@ -1616,6 +1619,7 @@ private:
         auto lockTxId = Request.AcquireLocksTxId;
         if (lockTxId.Defined() && *lockTxId == 0) {
             lockTxId = TxId;
+            LockHandle = TLockHandle(TxId, TActivationContext::ActorSystem());
         }
 
         // first, start compute tasks
@@ -1707,6 +1711,9 @@ private:
         }
 
         if (!Locks.empty()) {
+            if (LockHandle) {
+                ResponseEv->LockHandle = std::move(LockHandle);
+            }
             BuildLocks(*response.MutableResult()->MutableLocks(), Locks);
         }
 
@@ -1849,6 +1856,9 @@ private:
     // Temporary storage during snapshot acquisition
     TVector<NDqProto::TDqTask> ComputeTasks;
     THashMap<ui64, NKikimrTxDataShard::TKqpTransaction> DatashardTxs;
+
+    // Lock handle for a newly acquired lock
+    TLockHandle LockHandle;
 };
 
 } // namespace

+ 10 - 1
ydb/core/kqp/executer/kqp_executer.h

@@ -3,6 +3,7 @@
 #include <ydb/core/kqp/common/kqp_common.h>
 #include <ydb/core/kqp/common/kqp_gateway.h>
 #include <ydb/core/kqp/counters/kqp_counters.h>
+#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
 #include <ydb/core/protos/config.pb.h>
 #include <ydb/core/protos/kqp.pb.h>
 
@@ -14,7 +15,15 @@ struct TEvKqpExecuter {
         TKqpExecuterEvents::EvTxRequest> {};
 
     struct TEvTxResponse : public TEventPB<TEvTxResponse, NKikimrKqp::TEvExecuterTxResponse,
-        TKqpExecuterEvents::EvTxResponse> {};
+        TKqpExecuterEvents::EvTxResponse>
+    {
+        NLongTxService::TLockHandle LockHandle;
+
+        bool IsSerializable() const override {
+            // We cannot serialize LockHandle, should always send locally
+            return false;
+        }
+    };
 
     struct TEvStreamData : public TEventPB<TEvStreamData, NKikimrKqp::TEvExecuterStreamData,
         TKqpExecuterEvents::EvStreamData> {};

+ 1 - 0
ydb/core/kqp/host/CMakeLists.txt

@@ -19,6 +19,7 @@ target_link_libraries(core-kqp-host PUBLIC
   core-kqp-opt
   core-kqp-prepare
   core-kqp-provider
+  tx-long_tx_service-public
   yql-core-services
   yql-minikql-invoke_builtins
   library-yql-sql

+ 5 - 1
ydb/core/kqp/host/kqp_run_data.cpp

@@ -102,7 +102,11 @@ protected:
         return TStatus::Async;
     }
 
-    bool OnExecuterResult(NKikimrKqp::TExecuterTxResult&& execResult, TExprContext& ctx, bool commit) override {
+    bool OnExecuterResult(NKikimrKqp::TExecuterTxResult&& execResult, NLongTxService::TLockHandle&& lockHandle, TExprContext& ctx, bool commit) override {
+        if (lockHandle) {
+            TxState->Tx().Locks.LockHandle = std::move(lockHandle);
+        }
+
         if (execResult.HasLocks()) {
             YQL_ENSURE(!commit);
 

Some files were not shown because too many files changed in this diff