Browse Source

Add discovery mutator for underlay (#566)

* Add discovery mutator for underlay

* Fix prefix

* TString -> StringBuf
Dmitry Kardymon 1 year ago
parent
commit
bf9be3d207

+ 26 - 2
ydb/core/fq/libs/shared_resources/shared_resources.cpp

@@ -1,10 +1,12 @@
 #include "shared_resources.h"
 
-#include <ydb/library/services/services.pb.h>
 #include <ydb/core/fq/libs/events/events.h>
+#include <ydb/library/actors/core/actorsystem.h>
 #include <ydb/library/logger/actor.h>
+#include <ydb/library/services/services.pb.h>
 
-#include <ydb/library/actors/core/actorsystem.h>
+#include <ydb/public/api/protos/ydb_discovery.pb.h>
+#include <ydb/public/sdk/cpp/client/extensions/discovery_mutator/discovery_mutator.h>
 
 #include <util/generic/cast.h>
 #include <util/generic/strbuf.h>
@@ -33,6 +35,7 @@ struct TYqSharedResourcesImpl : public TActorSystemPtrMixin, public TYqSharedRes
         : TYqSharedResources(NYdb::TDriver(GetYdbDriverConfig(config.GetCommon().GetYdbDriverConfig())))
     {
         CreateDbPoolHolder(PrepareDbPoolConfig(config), credentialsProviderFactory, counters);
+        AddUnderlayDiscoveryMutator();
     }
 
     NDbPool::TConfig PrepareDbPoolConfig(const NFq::NConfig::TConfig& config) {
@@ -81,6 +84,27 @@ struct TYqSharedResourcesImpl : public TActorSystemPtrMixin, public TYqSharedRes
         const ::NMonitoring::TDynamicCounterPtr& counters) {
         DbPoolHolder = MakeIntrusive<NDbPool::TDbPoolHolder>(config, CoreYdbDriver, credentialsProviderFactory, counters);
     }
+
+    void AddUnderlayDiscoveryMutator() {
+
+        auto mutator = [](Ydb::Discovery::ListEndpointsResult* proto, NYdb::TStatus status, const NYdb::IDiscoveryMutatorApi::TAuxInfo& aux) {
+            TStringBuf underlayPrefix{"u-"};
+            if (!aux.DiscoveryEndpoint.starts_with(underlayPrefix) || !proto) {
+                return status;
+            }
+
+            for (size_t i = 0; i < proto->endpointsSize(); ++i) {
+                Ydb::Discovery::EndpointInfo* endpointInfo = proto->Mutableendpoints(i);
+                const TString& address = endpointInfo->address();
+                if (address.StartsWith(underlayPrefix)) {
+                    continue;
+                }
+                endpointInfo->set_address(underlayPrefix + address);
+            }
+            return status;
+        };
+        UserSpaceYdbDriver.AddExtension<NDiscoveryMutator::TDiscoveryMutator>(NDiscoveryMutator::TDiscoveryMutator::TParams(std::move(mutator)));
+    }
 };
 
 } // namespace

+ 1 - 0
ydb/core/fq/libs/shared_resources/ya.make

@@ -21,6 +21,7 @@ PEERDIR(
     ydb/library/logger
     ydb/library/security
     ydb/public/sdk/cpp/client/ydb_driver
+    ydb/public/sdk/cpp/client/ydb_extension
     ydb/public/sdk/cpp/client/ydb_table
 )