Browse Source

Emulate PQ events from file (#9349)

Fiodar Miron 5 months ago
parent
commit
6e215d5d4c

+ 11 - 1
ydb/core/fq/libs/init/init.cpp

@@ -44,6 +44,7 @@
 #include <ydb/library/yql/providers/s3/proto/retry_config.pb.h>
 #include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h>
 #include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h>
+#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>
 #include <ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h>
 #include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h>
 
@@ -207,7 +208,16 @@ void Init(
         NYql::NDq::TS3ReadActorFactoryConfig readActorFactoryCfg = NYql::NDq::CreateReadActorFactoryConfig(protoConfig.GetGateways().GetS3());
 
         RegisterDqInputTransformLookupActorFactory(*asyncIoFactory);
-        RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, yqCounters->GetSubgroup("subsystem", "DqSourceTracker"));
+        
+        NYql::TPqGatewayServices pqServices(
+            yqSharedResources->UserSpaceYdbDriver,
+            pqCmConnections,
+            credentialsFactory,
+            std::make_shared<NYql::TPqGatewayConfig>(protoConfig.GetGateways().GetPq()),
+            appData->FunctionRegistry
+        );
+        RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, NYql::CreatePqNativeGateway(std::move(pqServices)), 
+            yqCounters->GetSubgroup("subsystem", "DqSourceTracker"));
 
         s3ActorsFactory->RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg,
             yqCounters->GetSubgroup("subsystem", "S3ReadActor"), protoConfig.GetGateways().GetS3().GetAllowLocalFiles());

+ 14 - 8
ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp

@@ -128,7 +128,8 @@ public:
         std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
         const NActors::TActorId& computeActorId,
         const ::NMonitoring::TDynamicCounterPtr& counters,
-        i64 bufferSize)
+        i64 bufferSize,
+        const IPqGateway::TPtr& pqGateway)
         : TActor<TDqPqReadActor>(&TDqPqReadActor::StateFunc)
         , TDqPqReadActorBase(inputIndex, taskId, this->SelfId(), txId, std::move(sourceParams), std::move(readParams), computeActorId)
         , Metrics(txId, taskId, counters)
@@ -136,6 +137,7 @@ public:
         , HolderFactory(holderFactory)
         , Driver(std::move(driver))
         , CredentialsProviderFactory(std::move(credentialsProviderFactory))
+        , PqGateway(pqGateway)
     {
         MetadataFields.reserve(SourceParams.MetadataFieldsSize());
         TPqMetaExtractor fieldsExtractor;
@@ -185,9 +187,9 @@ public:
         }
     }
 
-    NYdb::NTopic::TTopicClient& GetTopicClient() {
+    ITopicClient& GetTopicClient() {
         if (!TopicClient) {
-            TopicClient = std::make_unique<NYdb::NTopic::TTopicClient>(Driver, GetTopicClientSettings());
+            TopicClient = PqGateway->GetTopicClient(Driver, GetTopicClientSettings());
         }
         return *TopicClient;
     }
@@ -229,7 +231,7 @@ private:
             ReadSession->Close(TDuration::Zero());
             ReadSession.reset();
         }
-        TopicClient.reset();
+        TopicClient.Reset();
         TActor<TDqPqReadActor>::PassAway();
     }
 
@@ -568,7 +570,7 @@ private:
     const THolderFactory& HolderFactory;
     NYdb::TDriver Driver;
     std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
-    std::unique_ptr<NYdb::NTopic::TTopicClient> TopicClient;
+    ITopicClient::TPtr TopicClient;
     std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession;
     NThreading::TFuture<void> EventFuture;
     std::queue<std::pair<ui64, NYdb::NTopic::TDeferredCommit>> DeferredCommits;
@@ -578,6 +580,7 @@ private:
     std::queue<TReadyBatch> ReadyBuffer;
     TMaybe<TDqSourceWatermarkTracker<TPartitionKey>> WatermarkTracker;
     TMaybe<TInstant> NextIdlenesCheckAt;
+    IPqGateway::TPtr PqGateway;
 };
 
 std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
@@ -593,6 +596,7 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
     const NActors::TActorId& computeActorId,
     const NKikimr::NMiniKQL::THolderFactory& holderFactory,
     const ::NMonitoring::TDynamicCounterPtr& counters,
+    IPqGateway::TPtr pqGateway,
     i64 bufferSize
     )
 {
@@ -618,15 +622,16 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
         CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token, addBearerToToken),
         computeActorId,
         counters,
-        bufferSize
+        bufferSize,
+        pqGateway
     );
 
     return {actor, actor};
 }
 
-void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const ::NMonitoring::TDynamicCounterPtr& counters) {
+void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters) {
     factory.RegisterSource<NPq::NProto::TDqPqTopicSource>("PqSource",
-        [driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters](
+        [driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters, pqGateway](
             NPq::NProto::TDqPqTopicSource&& settings,
             IDqAsyncIoFactory::TSourceArguments&& args)
     {
@@ -646,6 +651,7 @@ void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driv
                 args.ComputeActorId,
                 args.HolderFactory,
                 counters,
+                pqGateway,
                 PQReadDefaultFreeSpace);
         }
 

+ 3 - 1
ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h

@@ -4,6 +4,7 @@
 #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
 
 #include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
+#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
 #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
 
 #include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
@@ -35,9 +36,10 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
     const NActors::TActorId& computeActorId,
     const NKikimr::NMiniKQL::THolderFactory& holderFactory,
     const ::NMonitoring::TDynamicCounterPtr& counters,
+    IPqGateway::TPtr pqGateway,
     i64 bufferSize = PQReadDefaultFreeSpace
     );
 
-void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>());
+void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>());
 
 } // namespace NYql::NDq

+ 1 - 0
ydb/library/yql/providers/pq/gateway/dummy/ya.make

@@ -2,6 +2,7 @@ LIBRARY()
 
 SRCS(
     yql_pq_dummy_gateway.cpp
+    yql_pq_file_topic_client.cpp
 )
 
 PEERDIR(

+ 9 - 0
ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp

@@ -1,4 +1,5 @@
 #include "yql_pq_dummy_gateway.h"
+#include "yql_pq_file_topic_client.h"
 
 #include <util/generic/is_in.h>
 #include <util/generic/yexception.h>
@@ -59,6 +60,14 @@ TDummyPqGateway& TDummyPqGateway::AddDummyTopic(const TDummyTopic& topic) {
     }
 }
 
+IPqGateway::TPtr CreatePqFileGateway() {
+    return MakeIntrusive<TDummyPqGateway>();
+}
+
+ITopicClient::TPtr TDummyPqGateway::GetTopicClient(const NYdb::TDriver&, const NYdb::NTopic::TTopicClientSettings&) {
+    return MakeIntrusive<TFileTopicClient>(Topics);
+}
+
 void TDummyPqGateway::UpdateClusterConfigs(
     const TString& clusterName,
     const TString& endpoint,

+ 11 - 3
ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h

@@ -9,9 +9,10 @@
 namespace NYql {
 
 struct TDummyTopic {
-    TDummyTopic(const TString& cluster, const TString& path)
+    TDummyTopic(const TString& cluster, const TString& path, const TMaybe<TString>& filePath = {})
         : Cluster(cluster)
         , Path(path)
+        , FilePath(filePath)
     {
     }
 
@@ -22,6 +23,7 @@ struct TDummyTopic {
 
     TString Cluster;
     TString Path;
+    TMaybe<TString> FilePath;
     size_t PartitionsCount = 1;
 };
 
@@ -29,8 +31,8 @@ struct TDummyTopic {
 class TDummyPqGateway : public IPqGateway {
 public:
     TDummyPqGateway& AddDummyTopic(const TDummyTopic& topic);
+    ~TDummyPqGateway() {}
 
-public:
     NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override;
     NThreading::TFuture<void> CloseSession(const TString& sessionId) override;
 
@@ -54,11 +56,17 @@ public:
         const TString& endpoint,
         const TString& database,
         bool secure) override;
+    
+    ITopicClient::TPtr GetTopicClient(const NYdb::TDriver& driver, const NYdb::NTopic::TTopicClientSettings& settings) override;
 
+    using TClusterNPath = std::pair<TString, TString>;
 private:
     mutable TMutex Mutex;
-    THashMap<std::pair<TString, TString>, TDummyTopic> Topics;
+    THashMap<TClusterNPath, TDummyTopic> Topics;
+
     THashSet<TString> OpenedSessions;
 };
 
+IPqGateway::TPtr CreatePqFileGateway();
+
 } // namespace NYql

+ 301 - 0
ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp

@@ -0,0 +1,301 @@
+#include "yql_pq_file_topic_client.h"
+#include "util/stream/file.h"
+
+#include <thread>
+
+#include <library/cpp/threading/blocking_queue/blocking_queue.h>
+#include <library/cpp/threading/future/async.h>
+
+#include <util/system/file.h>
+
+namespace NYql {
+
+class TBlockingEQueue {
+public:
+    void Push(NYdb::NTopic::TReadSessionEvent::TEvent&& e) {
+        with_lock(Mutex_) {
+            Events_.push_back(std::move(e));
+        }
+        CanPop_.BroadCast();
+    }
+    
+    void BlockUntilEvent() {
+        with_lock(Mutex_) {
+            CanPop_.WaitI(Mutex_, [this] () {return Stopped_ || !Events_.empty();});
+        }
+    }
+
+    TMaybe<NYdb::NTopic::TReadSessionEvent::TEvent> Pop(bool block) {
+        with_lock(Mutex_) {
+            if (block) {
+                CanPop_.WaitI(Mutex_, [this] () {return CanPopPredicate();});
+
+                auto front = Events_.front();
+                Events_.pop_front();
+                return front;
+            } else {
+                if (!CanPopPredicate()) {
+                    return {};
+                }
+
+                auto front = Events_.front();
+                Events_.pop_front();
+                return front;
+            }
+        }
+    }
+
+    void Stop() {
+        with_lock(Mutex_) {
+            Stopped_ = true;
+            CanPop_.BroadCast();
+        }
+    }
+    
+    bool IsStopped() {
+        with_lock(Mutex_) {
+            return Stopped_;
+        }
+    }
+
+private:
+    bool CanPopPredicate() {
+        return !Events_.empty() && !Stopped_;
+    }
+
+    TDeque<NYdb::NTopic::TReadSessionEvent::TEvent> Events_;
+    bool Stopped_ = false;
+    TMutex Mutex_;
+    TCondVar CanPop_;
+};
+
+class TFileTopicReadSession : public NYdb::NTopic::IReadSession {
+
+constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(100);    
+
+public:
+    TFileTopicReadSession(TFile file, NYdb::NTopic::TPartitionSession::TPtr session, const TString& producerId = ""): 
+        File_(std::move(file)), Session_(std::move(session)), ProducerId_(producerId), 
+        FilePoller_([this] () {
+            PollFileForChanges();
+        }), Counters_()
+    {
+        Pool_.Start(1);
+    }
+
+    NThreading::TFuture<void> WaitEvent() override {
+        return NThreading::Async([this] () {
+            EventsQ_.BlockUntilEvent();
+            return NThreading::MakeFuture();
+        }, Pool_);
+    }
+
+    TVector<NYdb::NTopic::TReadSessionEvent::TEvent> GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) override {
+        // TODO
+        Y_UNUSED(maxByteSize);
+
+        TVector<NYdb::NTopic::TReadSessionEvent::TEvent> res;
+        for (auto event = EventsQ_.Pop(block); !event.Empty() &&  res.size() <= maxEventsCount.GetOrElse(std::numeric_limits<size_t>::max()); event = EventsQ_.Pop(/*block=*/ false)) {
+            res.push_back(*event);
+        }
+        return res;
+    }
+
+    TVector<NYdb::NTopic::TReadSessionEvent::TEvent> GetEvents(const NYdb::NTopic::TReadSessionGetEventSettings& settings) override {
+        return GetEvents(settings.Block_, settings.MaxEventsCount_, settings.MaxByteSize_);
+    }
+
+    TMaybe<NYdb::NTopic::TReadSessionEvent::TEvent> GetEvent(bool block, size_t maxByteSize) override {
+        // TODO
+        Y_UNUSED(maxByteSize);
+
+        return EventsQ_.Pop(block);
+    }
+
+    TMaybe<NYdb::NTopic::TReadSessionEvent::TEvent> GetEvent(const NYdb::NTopic::TReadSessionGetEventSettings& settings) override {
+        return GetEvent(settings.Block_, settings.MaxByteSize_);
+    }
+
+    bool Close(TDuration timeout = TDuration::Max()) override {
+        Y_UNUSED(timeout);
+        // TOOD send TSessionClosedEvent
+        EventsQ_.Stop();
+        Pool_.Stop();
+
+        if (FilePoller_.joinable()) {
+            FilePoller_.join();
+        }
+        return true;
+    }
+
+    NYdb::NTopic::TReaderCounters::TPtr GetCounters() const override {
+        return Counters_;
+    }
+
+    TString GetSessionId() const override {
+        return ToString(Session_->GetPartitionSessionId());
+    }
+
+    ~TFileTopicReadSession() {
+        EventsQ_.Stop();
+        Pool_.Stop();
+        if (FilePoller_.joinable()) {
+            FilePoller_.join();
+        }
+    }
+
+private:
+    using TMessageInformation = NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageInformation; 
+    using TMessage = NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage; 
+
+    TMessageInformation MakeNextMessageInformation(size_t offset, size_t uncompressedSize, const TString& messageGroupId = "") { 
+        auto now = TInstant::Now(); 
+        TMessageInformation msgInfo(
+            offset,
+            ProducerId_,
+            SeqNo_,
+            now,
+            now,
+            MakeIntrusive<NYdb::NTopic::TWriteSessionMeta>(),
+            MakeIntrusive<NYdb::NTopic::TMessageMeta>(),
+            uncompressedSize,
+            messageGroupId
+        );
+        return msgInfo;
+    }
+    
+    TMessage MakeNextMessage(const TString& msgBuff) {
+        TMessage msg(msgBuff, nullptr, MakeNextMessageInformation(MsgOffset_, msgBuff.size()), Session_);
+        return msg;
+    }
+
+    void PollFileForChanges() {
+        TFileInput fi(File_);
+        while (!EventsQ_.IsStopped()) {
+            TString rawMsg;
+            TVector<TMessage> msgs;
+
+            while (size_t read = fi.ReadLine(rawMsg)) {
+                msgs.emplace_back(MakeNextMessage(rawMsg));
+                MsgOffset_++;
+            }
+            if (!msgs.empty()) {
+                EventsQ_.Push(NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent(msgs, {}, Session_));
+            }
+
+            Sleep(FILE_POLL_PERIOD);
+        }
+    }
+    
+    TFile File_;
+    TBlockingEQueue EventsQ_;
+    NYdb::NTopic::TPartitionSession::TPtr Session_;
+    TString ProducerId_;
+    std::thread FilePoller_;
+    NYdb::NTopic::TReaderCounters::TPtr Counters_;
+
+    TThreadPool Pool_;
+    size_t MsgOffset_ = 0;
+    ui64 SeqNo_ = 0;
+};
+
+struct TDummyPartitionSession: public NYdb::NTopic::TPartitionSession {
+    TDummyPartitionSession(ui64 sessionId, const TString& topicPath, ui64 partId) {
+        PartitionSessionId = sessionId;
+        TopicPath = topicPath;
+        PartitionId = partId;
+    }
+
+    void RequestStatus() override {
+        // TODO send TPartitionSessionStatusEvent
+    }
+};
+
+std::shared_ptr<NYdb::NTopic::IReadSession> TFileTopicClient::CreateReadSession(const NYdb::NTopic::TReadSessionSettings& settings) {
+    Y_ENSURE(!settings.Topics_.empty());
+    TString topicPath = settings.Topics_.front().Path_;
+
+    auto topicsIt = Topics_.find(make_pair("pq", topicPath));
+    Y_ENSURE(topicsIt != Topics_.end());
+    auto filePath = topicsIt->second.FilePath;
+    Y_ENSURE(filePath);
+    
+    // TODO
+    ui64 sessionId = 0;
+    ui64 partitionId = 0;
+
+    return std::make_shared<TFileTopicReadSession>(
+        TFile(*filePath, EOpenMode::TEnum::RdOnly),
+        MakeIntrusive<TDummyPartitionSession>(sessionId, topicPath, partitionId)
+    );
+}
+
+NYdb::TAsyncStatus TFileTopicClient::CreateTopic(const TString& path, const NYdb::NTopic::TCreateTopicSettings& settings) {
+    Y_UNUSED(path);
+    Y_UNUSED(settings);
+    return NThreading::MakeFuture(NYdb::TStatus(NYdb::EStatus::SUCCESS, {}));
+}
+
+NYdb::TAsyncStatus TFileTopicClient::AlterTopic(const TString& path, const NYdb::NTopic::TAlterTopicSettings& settings) {
+    Y_UNUSED(path);
+    Y_UNUSED(settings);
+    return NThreading::MakeFuture(NYdb::TStatus(NYdb::EStatus::SUCCESS, {}));
+}
+
+NYdb::TAsyncStatus TFileTopicClient::DropTopic(const TString& path, const NYdb::NTopic::TDropTopicSettings& settings) {
+    Y_UNUSED(path);
+    Y_UNUSED(settings);
+    return NThreading::MakeFuture(NYdb::TStatus(NYdb::EStatus::SUCCESS, {}));
+}
+
+NYdb::NTopic::TAsyncDescribeTopicResult TFileTopicClient::DescribeTopic(const TString& path, 
+    const NYdb::NTopic::TDescribeTopicSettings& settings) {
+    Y_UNUSED(path);
+    Y_UNUSED(settings);
+
+    NYdb::TStatus success(NYdb::EStatus::SUCCESS, {});
+    return NThreading::MakeFuture(NYdb::NTopic::TDescribeTopicResult(std::move(success), {}));
+}
+
+NYdb::NTopic::TAsyncDescribeConsumerResult TFileTopicClient::DescribeConsumer(const TString& path, const TString& consumer, 
+    const NYdb::NTopic::TDescribeConsumerSettings& settings) {
+    Y_UNUSED(path);
+    Y_UNUSED(consumer);
+    Y_UNUSED(settings);
+
+    NYdb::TStatus success(NYdb::EStatus::SUCCESS, {});
+    return NThreading::MakeFuture(NYdb::NTopic::TDescribeConsumerResult(std::move(success), {}));
+}
+
+NYdb::NTopic::TAsyncDescribePartitionResult TFileTopicClient::DescribePartition(const TString& path, i64 partitionId, 
+    const NYdb::NTopic::TDescribePartitionSettings& settings) {
+    Y_UNUSED(path);
+    Y_UNUSED(partitionId);
+    Y_UNUSED(settings);
+
+    NYdb::TStatus success(NYdb::EStatus::SUCCESS, {});
+    return NThreading::MakeFuture(NYdb::NTopic::TDescribePartitionResult(std::move(success), {}));
+}
+
+std::shared_ptr<NYdb::NTopic::ISimpleBlockingWriteSession> TFileTopicClient::CreateSimpleBlockingWriteSession(
+    const NYdb::NTopic::TWriteSessionSettings& settings) {
+    Y_UNUSED(settings);
+    return nullptr;
+}
+
+std::shared_ptr<NYdb::NTopic::IWriteSession> TFileTopicClient::CreateWriteSession(const NYdb::NTopic::TWriteSessionSettings& settings) {
+    Y_UNUSED(settings);
+    return nullptr;
+}
+
+NYdb::TAsyncStatus TFileTopicClient::CommitOffset(const TString& path, ui64 partitionId, const TString& consumerName, ui64 offset,
+    const NYdb::NTopic::TCommitOffsetSettings& settings) {
+    Y_UNUSED(path);
+    Y_UNUSED(partitionId);
+    Y_UNUSED(consumerName);
+    Y_UNUSED(offset);
+    Y_UNUSED(settings);
+    return NThreading::MakeFuture(NYdb::TStatus(NYdb::EStatus::SUCCESS, {}));
+}
+
+}

+ 36 - 0
ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.h

@@ -0,0 +1,36 @@
+#include "yql_pq_dummy_gateway.h"
+
+#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
+
+namespace NYql {
+struct TFileTopicClient : public ITopicClient {
+    TFileTopicClient(THashMap<TDummyPqGateway::TClusterNPath, TDummyTopic> topics): Topics_(topics) {}
+
+    NYdb::TAsyncStatus CreateTopic(const TString& path, const NYdb::NTopic::TCreateTopicSettings& settings = {}) override;
+
+    NYdb::TAsyncStatus AlterTopic(const TString& path, const NYdb::NTopic::TAlterTopicSettings& settings = {}) override;
+
+    NYdb::TAsyncStatus DropTopic(const TString& path, const NYdb::NTopic::TDropTopicSettings& settings = {}) override;
+
+    NYdb::NTopic::TAsyncDescribeTopicResult DescribeTopic(const TString& path, 
+        const NYdb::NTopic::TDescribeTopicSettings& settings = {}) override;
+
+    NYdb::NTopic::TAsyncDescribeConsumerResult DescribeConsumer(const TString& path, const TString& consumer, 
+        const NYdb::NTopic::TDescribeConsumerSettings& settings = {}) override;
+
+    NYdb::NTopic::TAsyncDescribePartitionResult DescribePartition(const TString& path, i64 partitionId, 
+        const NYdb::NTopic::TDescribePartitionSettings& settings = {}) override;
+
+    std::shared_ptr<NYdb::NTopic::IReadSession> CreateReadSession(const NYdb::NTopic::TReadSessionSettings& settings) override;
+
+    std::shared_ptr<NYdb::NTopic::ISimpleBlockingWriteSession> CreateSimpleBlockingWriteSession(
+        const NYdb::NTopic::TWriteSessionSettings& settings) override;
+    std::shared_ptr<NYdb::NTopic::IWriteSession> CreateWriteSession(const NYdb::NTopic::TWriteSessionSettings& settings) override;
+
+    NYdb::TAsyncStatus CommitOffset(const TString& path, ui64 partitionId, const TString& consumerName, ui64 offset,
+        const NYdb::NTopic::TCommitOffsetSettings& settings = {}) override;
+
+private:
+    THashMap<TDummyPqGateway::TClusterNPath, TDummyTopic> Topics_;
+};
+}

+ 6 - 0
ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.cpp

@@ -40,6 +40,8 @@ public:
         const TString& database,
         bool secure) override;
 
+    ITopicClient::TPtr GetTopicClient(const NYdb::TDriver& driver, const NYdb::NTopic::TTopicClientSettings& settings) override;
+
 private:
     void InitClusterConfigs();
     TPqSession::TPtr GetExistingSession(const TString& sessionId) const;
@@ -138,6 +140,10 @@ IPqGateway::TPtr CreatePqNativeGateway(const TPqGatewayServices& services) {
     return MakeIntrusive<TPqNativeGateway>(services);
 }
 
+ITopicClient::TPtr TPqNativeGateway::GetTopicClient(const NYdb::TDriver& driver, const NYdb::NTopic::TTopicClientSettings& settings = NYdb::NTopic::TTopicClientSettings()) {
+    return MakeIntrusive<TNativeTopicClient>(driver, settings);
+}
+
 TPqNativeGateway::~TPqNativeGateway() {
     Sessions.clear();
 }

+ 3 - 3
ydb/library/yql/providers/pq/provider/ut/yql_pq_ut.cpp

@@ -41,9 +41,9 @@
 
 namespace NYql {
 
-NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory(const NYdb::TDriver& driver) {
+NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory(const NYdb::TDriver& driver, const IPqGateway::TPtr& pqGateway) {
     auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>();
-    RegisterDqPqReadActorFactory(*factory, driver, nullptr);
+    RegisterDqPqReadActorFactory(*factory, driver, nullptr, pqGateway);
 
     RegisterDqPqWriteActorFactory(*factory, driver, nullptr);
     return factory;
@@ -120,7 +120,7 @@ bool RunPqProgram(
 
     const auto driverConfig = NYdb::TDriverConfig().SetLog(CreateLogBackend("cerr"));
     NYdb::TDriver driver(driverConfig);
-    auto dqGateway = CreateLocalDqGateway(functionRegistry.Get(), dqCompFactory, dqTaskTransformFactory, {}, false/*spilling*/, CreateAsyncIoFactory(driver));
+    auto dqGateway = CreateLocalDqGateway(functionRegistry.Get(), dqCompFactory, dqTaskTransformFactory, {}, false/*spilling*/, CreateAsyncIoFactory(driver, pqGateway));
 
     auto storage = NYql::CreateAsyncFileStorage({});
     dataProvidersInit.push_back(NYql::GetDqDataProviderInitializer(&CreateDqExecTransformer, dqGateway, dqCompFactory, {}, storage));

Some files were not shown because too many files changed in this diff