Browse Source

[dq] Move spilling to yql/dq/actors

YQL-16013
udovichenko-r 1 year ago
parent
commit
7330532936

+ 10 - 0
.mapping.json

@@ -7010,6 +7010,16 @@
   "ydb/library/yql/dq/actors/protos/CMakeLists.linux-x86_64.txt":"",
   "ydb/library/yql/dq/actors/protos/CMakeLists.txt":"",
   "ydb/library/yql/dq/actors/protos/CMakeLists.windows-x86_64.txt":"",
+  "ydb/library/yql/dq/actors/spilling/CMakeLists.darwin-x86_64.txt":"",
+  "ydb/library/yql/dq/actors/spilling/CMakeLists.linux-aarch64.txt":"",
+  "ydb/library/yql/dq/actors/spilling/CMakeLists.linux-x86_64.txt":"",
+  "ydb/library/yql/dq/actors/spilling/CMakeLists.txt":"",
+  "ydb/library/yql/dq/actors/spilling/CMakeLists.windows-x86_64.txt":"",
+  "ydb/library/yql/dq/actors/spilling/ut/CMakeLists.darwin-x86_64.txt":"",
+  "ydb/library/yql/dq/actors/spilling/ut/CMakeLists.linux-aarch64.txt":"",
+  "ydb/library/yql/dq/actors/spilling/ut/CMakeLists.linux-x86_64.txt":"",
+  "ydb/library/yql/dq/actors/spilling/ut/CMakeLists.txt":"",
+  "ydb/library/yql/dq/actors/spilling/ut/CMakeLists.windows-x86_64.txt":"",
   "ydb/library/yql/dq/actors/task_runner/CMakeLists.darwin-x86_64.txt":"",
   "ydb/library/yql/dq/actors/task_runner/CMakeLists.linux-aarch64.txt":"",
   "ydb/library/yql/dq/actors/task_runner/CMakeLists.linux-x86_64.txt":"",

+ 0 - 5
ydb/core/kqp/common/simple/services.h

@@ -31,11 +31,6 @@ inline NActors::TActorId MakeKqpNodeServiceID(ui32 nodeId) {
     return NActors::TActorId(nodeId, TStringBuf(name, 12));
 }
 
-inline NActors::TActorId MakeKqpLocalFileSpillingServiceID(ui32 nodeId) {
-    const char name[12] = "kqp_lfspill";
-    return NActors::TActorId(nodeId, TStringBuf(name, 12));
-}
-
 inline NActors::TActorId MakeKqpCompileComputationPatternServiceID(ui32 nodeId) {
     const char name[12] = "kqp_comp_cp";
     return NActors::TActorId(nodeId, TStringBuf(name, 12));

+ 6 - 22
ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h

@@ -2,7 +2,7 @@
 
 #include "kqp_compute_actor.h"
 
-#include <ydb/core/kqp/runtime/kqp_channel_storage.h>
+#include <ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h>
 #include <ydb/core/kqp/runtime/kqp_tasks_runner.h>
 
 
@@ -12,14 +12,12 @@ namespace NKqp {
 using namespace NYql;
 using namespace NYql::NDq;
 
-class TKqpTaskRunnerExecutionContext : public IDqTaskRunnerExecutionContext {
+class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext {
 public:
-    TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp,
-        const TActorContext& ctx)
-        : TxId(txId)
-        , WakeUp(std::move(wakeUp))
-        , Ctx(ctx)
-        , WithSpilling(withSpilling) {}
+    TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp, const TActorContext& ctx)
+        : TDqTaskRunnerExecutionContext(txId, withSpilling, std::move(wakeUp), ctx)
+    {
+    }
 
     IDqOutputConsumer::TPtr CreateOutputConsumer(const NDqProto::TTaskOutput& outputDesc,
         const NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx, const NMiniKQL::TTypeEnvironment& typeEnv,
@@ -28,20 +26,6 @@ public:
     {
         return KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs));
     }
-
-    IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override {
-        if (WithSpilling) {
-            return CreateKqpChannelStorage(TxId, channelId, WakeUp, Ctx);
-        } else {
-            return nullptr;
-        }
-    }
-
-private:
-    const ui64 TxId;
-    const IDqChannelStorage::TWakeUpCallback WakeUp;
-    const TActorContext& Ctx;
-    const bool WithSpilling;
 };
 
 } // namespace NKqp

+ 2 - 0
ydb/core/kqp/counters/CMakeLists.darwin-x86_64.txt

@@ -14,6 +14,8 @@ target_link_libraries(core-kqp-counters PUBLIC
   ydb-core-base
   ydb-core-protos
   core-sys_view-service
+  dq-actors-spilling
+  library-yql-minikql
 )
 target_sources(core-kqp-counters PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/counters/kqp_counters.cpp

+ 2 - 0
ydb/core/kqp/counters/CMakeLists.linux-aarch64.txt

@@ -15,6 +15,8 @@ target_link_libraries(core-kqp-counters PUBLIC
   ydb-core-base
   ydb-core-protos
   core-sys_view-service
+  dq-actors-spilling
+  library-yql-minikql
 )
 target_sources(core-kqp-counters PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/counters/kqp_counters.cpp

+ 2 - 0
ydb/core/kqp/counters/CMakeLists.linux-x86_64.txt

@@ -15,6 +15,8 @@ target_link_libraries(core-kqp-counters PUBLIC
   ydb-core-base
   ydb-core-protos
   core-sys_view-service
+  dq-actors-spilling
+  library-yql-minikql
 )
 target_sources(core-kqp-counters PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/counters/kqp_counters.cpp

+ 2 - 0
ydb/core/kqp/counters/CMakeLists.windows-x86_64.txt

@@ -14,6 +14,8 @@ target_link_libraries(core-kqp-counters PUBLIC
   ydb-core-base
   ydb-core-protos
   core-sys_view-service
+  dq-actors-spilling
+  library-yql-minikql
 )
 target_sources(core-kqp-counters PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/kqp/counters/kqp_counters.cpp

+ 2 - 10
ydb/core/kqp/counters/kqp_counters.cpp

@@ -724,7 +724,8 @@ void TKqpCounters::UpdateTxCounters(const TKqpTransactionInfo& txInfo,
 }
 
 TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, const TActorContext* ctx)
-    : AllocCounters(counters, "kqp")
+    : NYql::NDq::TSpillingCounters(counters)
+    , AllocCounters(counters, "kqp")
 {
     Counters = counters;
     KqpGroup = GetServiceCounters(counters, "kqp");
@@ -786,15 +787,6 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
     RmMaxSnapshotLatency = KqpGroup->GetCounter("RM/MaxSnapshotLatency", false);
     RmNodeNumberInSnapshot = KqpGroup->GetCounter("RM/NodeNumberInSnapshot", false);
 
-    /* Spilling */
-    SpillingWriteBlobs = KqpGroup->GetCounter("Spilling/WriteBlobs", true);
-    SpillingReadBlobs = KqpGroup->GetCounter("Spilling/ReadBlobs", true);
-    SpillingStoredBlobs = KqpGroup->GetCounter("Spilling/StoredBlobs", false);
-    SpillingTotalSpaceUsed = KqpGroup->GetCounter("Spilling/TotalSpaceUsed", false);
-    SpillingTooBigFileErrors = KqpGroup->GetCounter("Spilling/TooBigFileErrors", true);
-    SpillingNoSpaceErrors = KqpGroup->GetCounter("Spilling/NoSpaceErrors", true);
-    SpillingIoErrors = KqpGroup->GetCounter("Spilling/IoErrors", true);
-
     /* Scan queries */
     ScanQueryShardDisconnect = KqpGroup->GetCounter("ScanQuery/ShardDisconnect", true);
     ScanQueryShardResolve = KqpGroup->GetCounter("ScanQuery/ShardResolve", true);

+ 2 - 10
ydb/core/kqp/counters/kqp_counters.h

@@ -12,6 +12,7 @@
 #include <ydb/core/tx/tx_proxy/mon.h>
 
 #include <ydb/library/yql/minikql/aligned_page_pool.h>
+#include <ydb/library/yql/dq/actors/spilling/spilling_counters.h>
 
 #include <util/system/spinlock.h>
 
@@ -246,7 +247,7 @@ private:
 
 using TKqpDbCountersPtr = TIntrusivePtr<TKqpDbCounters>;
 
-class TKqpCounters : public TThrRefBase, public TKqpCountersBase {
+class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounters {
 private:
     struct TTxByKindCounters {
         NMonitoring::THistogramPtr TotalDuration;
@@ -378,15 +379,6 @@ public:
     ::NMonitoring::TDynamicCounters::TCounterPtr RmMaxSnapshotLatency;
     ::NMonitoring::TDynamicCounters::TCounterPtr RmNodeNumberInSnapshot;
 
-    // Spilling counters
-    ::NMonitoring::TDynamicCounters::TCounterPtr SpillingWriteBlobs;
-    ::NMonitoring::TDynamicCounters::TCounterPtr SpillingReadBlobs;
-    ::NMonitoring::TDynamicCounters::TCounterPtr SpillingStoredBlobs;
-    ::NMonitoring::TDynamicCounters::TCounterPtr SpillingTotalSpaceUsed;
-    ::NMonitoring::TDynamicCounters::TCounterPtr SpillingTooBigFileErrors;
-    ::NMonitoring::TDynamicCounters::TCounterPtr SpillingNoSpaceErrors;
-    ::NMonitoring::TDynamicCounters::TCounterPtr SpillingIoErrors;
-
     // Scan queries counters
     ::NMonitoring::TDynamicCounters::TCounterPtr ScanQueryShardDisconnect;
     ::NMonitoring::TDynamicCounters::TCounterPtr ScanQueryShardResolve;

+ 2 - 0
ydb/core/kqp/counters/ya.make

@@ -10,6 +10,8 @@ PEERDIR(
     ydb/core/base
     ydb/core/protos
     ydb/core/sys_view/service
+    ydb/library/yql/dq/actors/spilling
+    ydb/library/yql/minikql
 )
 
 END()

Some files were not shown because too many files changed in this diff