Browse Source

Partition direct read

komels 1 year ago
parent
commit
c47f199e3a

+ 7 - 0
.mapping.json

@@ -5824,6 +5824,13 @@
   "ydb/core/persqueue/config/CMakeLists.linux-x86_64.txt":"",
   "ydb/core/persqueue/config/CMakeLists.txt":"",
   "ydb/core/persqueue/config/CMakeLists.windows-x86_64.txt":"",
+  "ydb/core/persqueue/dread_cache_service/CMakeLists.txt":"",
+  "ydb/core/persqueue/dread_cache_service/ut/CMakeLists.darwin-arm64.txt":"",
+  "ydb/core/persqueue/dread_cache_service/ut/CMakeLists.darwin-x86_64.txt":"",
+  "ydb/core/persqueue/dread_cache_service/ut/CMakeLists.linux-aarch64.txt":"",
+  "ydb/core/persqueue/dread_cache_service/ut/CMakeLists.linux-x86_64.txt":"",
+  "ydb/core/persqueue/dread_cache_service/ut/CMakeLists.txt":"",
+  "ydb/core/persqueue/dread_cache_service/ut/CMakeLists.windows-x86_64.txt":"",
   "ydb/core/persqueue/events/CMakeLists.darwin-arm64.txt":"",
   "ydb/core/persqueue/events/CMakeLists.darwin-x86_64.txt":"",
   "ydb/core/persqueue/events/CMakeLists.linux-aarch64.txt":"",

+ 1 - 0
ydb/core/driver_lib/run/config.h

@@ -57,6 +57,7 @@ union TBasicKikimrServicesMask {
         bool EnablePersQueueClusterDiscovery:1;
         bool EnableNetClassifier:1;
         bool EnablePersQueueClusterTracker:1;
+        bool EnablePersQueueDirectReadCache:1;
         bool EnableSysViewService:1;
         bool EnableMeteringWriter:1;
         bool EnableAuditWriter:1;

+ 14 - 0
ydb/core/driver_lib/run/kikimr_services_initializers.cpp

@@ -102,6 +102,7 @@
 #include <ydb/core/node_whiteboard/node_whiteboard.h>
 
 #include <ydb/core/persqueue/cluster_tracker.h>
+#include <ydb/core/persqueue/dread_cache_service/caching_service.h>
 #include <ydb/core/persqueue/pq.h>
 #include <ydb/core/persqueue/pq_l2_service.h>
 
@@ -1954,6 +1955,19 @@ void TPersQueueClusterTrackerInitializer::InitializeServices(NActors::TActorSyst
         TActorSetupCmd(actor, TMailboxType::HTSwap, appData->UserPoolId)));
 }
 
+// TPersQueueDirectReadCache
+
+TPersQueueDirectReadCacheInitializer::TPersQueueDirectReadCacheInitializer(const TKikimrRunConfig& runConfig)
+    : IKikimrServicesInitializer(runConfig)
+{}
+
+void TPersQueueDirectReadCacheInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
+    IActor* actor = NPQ::CreatePQDReadCacheService(appData->Counters);
+    setup->LocalServices.push_back(std::pair<TActorId, TActorSetupCmd>(
+        NPQ::MakePQDReadCacheServiceActorId(),
+        TActorSetupCmd(actor, TMailboxType::HTSwap, appData->UserPoolId)));
+}
+
 // TMemProfMonitorInitializer
 
 TMemProfMonitorInitializer::TMemProfMonitorInitializer(const TKikimrRunConfig& runConfig, TIntrusivePtr<TMemObserver> memObserver)

+ 7 - 0
ydb/core/driver_lib/run/kikimr_services_initializers.h

@@ -355,6 +355,13 @@ public:
     void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
 };
 
+class TPersQueueDirectReadCacheInitializer : public IKikimrServicesInitializer {
+public:
+    TPersQueueDirectReadCacheInitializer(const TKikimrRunConfig& runConfig);
+
+    void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
+};
+
 class TMemProfMonitorInitializer : public IKikimrServicesInitializer {
     TIntrusivePtr<TMemObserver> MemObserver;
 

+ 4 - 0
ydb/core/driver_lib/run/run.cpp

@@ -1449,6 +1449,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
         sil->AddServiceInitializer(new TPersQueueClusterTrackerInitializer(runConfig));
     }
 
+    if (serviceMask.EnablePersQueueDirectReadCache) {
+        sil->AddServiceInitializer(new TPersQueueDirectReadCacheInitializer(runConfig));
+    }
+
     if (serviceMask.EnableIcNodeCacheService) {
         sil->AddServiceInitializer(new TIcNodeCacheServiceInitializer(runConfig));
     }

+ 1 - 0
ydb/core/grpc_services/base/base.h

@@ -103,6 +103,7 @@ struct TRpcServices {
         EvStreamPQMigrationRead,
         EvStreamTopicWrite,
         EvStreamTopicRead,
+        EvStreamTopicDirectRead,
         EvPQReadInfo,
         EvTopicCommitOffset,
         EvListOperations,

+ 1 - 0
ydb/core/grpc_services/grpc_request_proxy.cpp

@@ -548,6 +548,7 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev) {
         HFunc(TEvStreamPQMigrationReadRequest, PreHandle);
         HFunc(TEvStreamTopicWriteRequest, PreHandle);
         HFunc(TEvStreamTopicReadRequest, PreHandle);
+        HFunc(TEvStreamTopicDirectReadRequest, PreHandle);
         HFunc(TEvCommitOffsetRequest, PreHandle);
         HFunc(TEvPQReadInfoRequest, PreHandle);
         HFunc(TEvPQDropTopicRequest, PreHandle);

+ 1 - 0
ydb/core/grpc_services/grpc_request_proxy_handle_methods.h

@@ -12,6 +12,7 @@ protected:
     static void Handle(TEvStreamPQMigrationReadRequest::TPtr& ev, const TActorContext& ctx);
     static void Handle(TEvStreamTopicWriteRequest::TPtr& ev, const TActorContext& ctx);
     static void Handle(TEvStreamTopicReadRequest::TPtr& ev, const TActorContext& ctx);
+    static void Handle(TEvStreamTopicDirectReadRequest::TPtr& ev, const TActorContext& ctx);
     static void Handle(TEvCommitOffsetRequest::TPtr& ev, const TActorContext& ctx);
     static void Handle(TEvPQReadInfoRequest::TPtr& ev, const TActorContext& ctx);
     static void Handle(TEvPQDropTopicRequest::TPtr& ev, const TActorContext& ctx);

+ 6 - 0
ydb/core/grpc_services/rpc_calls.cpp

@@ -36,6 +36,12 @@ void FillYdbStatus(Ydb::Topic::StreamReadMessage::FromServer& resp, const NYql::
     NYql::IssuesToMessage(issues, resp.mutable_issues());
 }
 
+template <>
+void FillYdbStatus(Ydb::Topic::StreamDirectReadMessage::FromServer& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status) {
+    resp.set_status(status);
+    NYql::IssuesToMessage(issues, resp.mutable_issues());
+}
+
 template <>
 void FillYdbStatus(Draft::Dummy::PingResponse& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status) {
     Y_UNUSED(resp);

+ 1 - 0
ydb/core/grpc_services/rpc_calls.h

@@ -66,6 +66,7 @@ using TEvStreamPQWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStre
 using TEvStreamPQMigrationReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQMigrationRead, Ydb::PersQueue::V1::MigrationStreamingReadClientMessage, Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>;
 using TEvStreamTopicWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicWrite, Ydb::Topic::StreamWriteMessage::FromClient, Ydb::Topic::StreamWriteMessage::FromServer, TRateLimiterMode::RuManual>;
 using TEvStreamTopicReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicRead, Ydb::Topic::StreamReadMessage::FromClient, Ydb::Topic::StreamReadMessage::FromServer, TRateLimiterMode::RuManual>;
+using TEvStreamTopicDirectReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicDirectRead, Ydb::Topic::StreamDirectReadMessage::FromClient, Ydb::Topic::StreamDirectReadMessage::FromServer, TRateLimiterMode::RuManual>;
 using TEvCommitOffsetRequest = TGRpcRequestWrapper<TRpcServices::EvTopicCommitOffset, Ydb::Topic::CommitOffsetRequest, Ydb::Topic::CommitOffsetResponse, true>;
 using TEvPQReadInfoRequest = TGRpcRequestWrapper<TRpcServices::EvPQReadInfo, Ydb::PersQueue::V1::ReadInfoRequest, Ydb::PersQueue::V1::ReadInfoResponse, true>;
 using TEvPQDropTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQDropTopic, Ydb::PersQueue::V1::DropTopicRequest, Ydb::PersQueue::V1::DropTopicResponse, true>;

Some files were not shown because too many files changed in this diff