Browse Source

Implemented RemoteTopicReader KIKIMR-20306 (#1093)

Ilnaz Nizametdinov 1 year ago
parent
commit
404ef8886e

+ 1 - 0
ydb/core/base/events.h

@@ -171,6 +171,7 @@ struct TKikimrEvents : TEvents {
         ES_TABLE_CREATOR,
         ES_PQ_PARTITION_CHOOSER,
         ES_GRAPH,
+        ES_REPLICATION_SERVICE,
     };
 };
 

+ 47 - 0
ydb/core/tx/replication/service/table_writer.cpp

@@ -0,0 +1,47 @@
+#include "table_writer.h"
+#include "worker.h"
+
+#include <ydb/library/actors/core/actor.h>
+#include <ydb/library/actors/core/hfunc.h>
+
+namespace NKikimr::NReplication::NService {
+
+class TLocalTableWriter: public TActor<TLocalTableWriter> {
+    void Handle(TEvWorker::TEvHandshake::TPtr& ev) {
+        Worker = ev->Sender;
+        Send(Worker, new TEvWorker::TEvHandshake());
+    }
+
+    void Handle(TEvWorker::TEvData::TPtr& ev) {
+        Worker = ev->Sender;
+        // TODO
+    }
+
+public:
+    explicit TLocalTableWriter(const TString& path)
+        : TActor(&TThis::StateWork)
+        , Path(path)
+    {
+        Y_UNUSED(Path);
+    }
+
+    STFUNC(StateWork) {
+        switch (ev->GetTypeRewrite()) {
+            hFunc(TEvWorker::TEvHandshake, Handle);
+            hFunc(TEvWorker::TEvData, Handle);
+            sFunc(TEvents::TEvPoison, PassAway);
+        }
+    }
+
+private:
+    const TString Path;
+
+    TActorId Worker;
+
+}; // TLocalTableWriter
+
+IActor* CreateLocalTableWriter(const TString& path) {
+    return new TLocalTableWriter(path);
+}
+
+}

+ 9 - 0
ydb/core/tx/replication/service/table_writer.h

@@ -0,0 +1,9 @@
+#pragma once
+
+#include <ydb/core/base/defs.h>
+
+namespace NKikimr::NReplication::NService {
+
+IActor* CreateLocalTableWriter(const TString& path);
+
+}

+ 110 - 0
ydb/core/tx/replication/service/topic_reader.cpp

@@ -0,0 +1,110 @@
+#include "topic_reader.h"
+#include "worker.h"
+
+#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
+#include <ydb/library/actors/core/actor.h>
+#include <ydb/library/actors/core/hfunc.h>
+
+namespace NKikimr::NReplication::NService {
+
+class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
+    using TReadSessionSettings = NYdb::NTopic::TReadSessionSettings;
+
+    void Handle(TEvWorker::TEvHandshake::TPtr& ev) {
+        Worker = ev->Sender;
+
+        Y_ABORT_UNLESS(!ReadSession);
+        Send(YdbProxy, new TEvYdbProxy::TEvCreateTopicReaderRequest(Settings));
+    }
+
+    void Handle(TEvYdbProxy::TEvCreateTopicReaderResponse::TPtr& ev) {
+        ReadSession = ev->Get()->Result;
+
+        Y_ABORT_UNLESS(!Worker);
+        Send(Worker, new TEvWorker::TEvHandshake());
+    }
+
+    void Handle(TEvWorker::TEvPoll::TPtr&) {
+        Y_ABORT_UNLESS(ReadSession);
+        Send(ReadSession, new TEvYdbProxy::TEvReadTopicRequest());
+
+        if (CommitOffset) {
+            Send(YdbProxy, new TEvYdbProxy::TEvCommitOffsetRequest(
+                Settings.Topics_[0].Path_,
+                Settings.Topics_[0].PartitionIds_[0],
+                Settings.ConsumerName_,
+                CommitOffset, {}
+            ));
+        }
+    }
+
+    void Handle(TEvYdbProxy::TEvReadTopicResponse::TPtr& ev) {
+        auto& result = ev->Get()->Result;
+        TVector<TEvWorker::TEvData::TRecord> records(Reserve(result.Messages.size()));
+
+        for (auto& msg : result.Messages) {
+            Y_ABORT_UNLESS(msg.GetCodec() == NYdb::NTopic::ECodec::RAW);
+            Y_DEBUG_ABORT_UNLESS(msg.GetOffset() + 1 > CommitOffset);
+            CommitOffset = Max(CommitOffset, msg.GetOffset() + 1);
+            records.emplace_back(msg.GetOffset(), std::move(msg.GetData()));
+        }
+
+        Send(Worker, new TEvWorker::TEvData(std::move(records)));
+    }
+
+    void Handle(TEvYdbProxy::TEvCommitOffsetResponse::TPtr& ev) {
+        if (!ev->Get()->Result.IsSuccess()) {
+            Leave();
+        }
+    }
+
+    void Leave() {
+        Send(Worker, new TEvents::TEvGone());
+        PassAway();
+    }
+
+    void PassAway() override {
+        if (const auto& actorId = std::exchange(ReadSession, {})) {
+            Send(actorId, new TEvents::TEvPoison());
+        }
+
+        TActor::PassAway();
+    }
+
+public:
+    explicit TRemoteTopicReader(const TActorId& ydbProxy, const TReadSessionSettings& opts)
+        : TActor(&TThis::StateWork)
+        , YdbProxy(ydbProxy)
+        , Settings(opts)
+    {
+        Y_ABORT_UNLESS(Settings.Topics_.size() == 1);
+        Y_ABORT_UNLESS(Settings.Topics_.at(0).PartitionIds_.size() == 1);
+    }
+
+    STFUNC(StateWork) {
+        switch (ev->GetTypeRewrite()) {
+            hFunc(TEvWorker::TEvHandshake, Handle);
+            hFunc(TEvWorker::TEvPoll, Handle);
+            hFunc(TEvYdbProxy::TEvCreateTopicReaderResponse, Handle);
+            hFunc(TEvYdbProxy::TEvReadTopicResponse, Handle);
+            hFunc(TEvYdbProxy::TEvCommitOffsetResponse, Handle);
+            sFunc(TEvents::TEvGone, Leave);
+            sFunc(TEvents::TEvPoison, PassAway);
+        }
+    }
+
+private:
+    const TActorId YdbProxy;
+    const TReadSessionSettings Settings;
+
+    TActorId Worker;
+    TActorId ReadSession;
+    ui64 CommitOffset = 0;
+
+}; // TRemoteTopicReader
+
+IActor* CreateRemoteTopicReader(const TActorId& ydbProxy, const NYdb::NTopic::TReadSessionSettings& opts) {
+    return new TRemoteTopicReader(ydbProxy, opts);
+}
+
+}

+ 13 - 0
ydb/core/tx/replication/service/topic_reader.h

@@ -0,0 +1,13 @@
+#pragma once
+
+#include <ydb/core/base/defs.h>
+
+namespace NYdb::NTopic {
+    struct TReadSessionSettings;
+}
+
+namespace NKikimr::NReplication::NService {
+
+IActor* CreateRemoteTopicReader(const TActorId& ydbProxy, const NYdb::NTopic::TReadSessionSettings& opts);
+
+}

+ 148 - 0
ydb/core/tx/replication/service/worker.cpp

@@ -0,0 +1,148 @@
+#include "worker.h"
+
+#include <ydb/library/actors/core/actor_bootstrapped.h>
+#include <ydb/library/actors/core/hfunc.h>
+#include <ydb/library/services/services.pb.h>
+
+namespace NKikimr::NReplication::NService {
+
+TEvWorker::TEvData::TRecord::TRecord(ui64 offset, const TString& data)
+    : Offset(offset)
+    , Data(data)
+{
+}
+
+TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data)
+    : Offset(offset)
+    , Data(std::move(data))
+{
+}
+
+TEvWorker::TEvData::TEvData(TVector<TRecord>&& records)
+    : Records(std::move(records))
+{
+}
+
+class TWorker: public TActorBootstrapped<TWorker> {
+    struct TActorInfo {
+        THolder<IActor> Actor;
+        TActorId ActorId;
+        bool InitDone;
+
+        explicit TActorInfo(THolder<IActor>&& actor)
+            : Actor(std::move(actor))
+            , InitDone(false)
+        {
+        }
+
+        operator TActorId() const {
+            return ActorId;
+        }
+
+        explicit operator bool() const {
+            return InitDone;
+        }
+    };
+
+    TActorId RegisterActor(TActorInfo& info) {
+        Y_ABORT_UNLESS(info.Actor);
+        info.ActorId = RegisterWithSameMailbox(info.Actor.Release());
+        return info.ActorId;
+    }
+
+    void InitActor(TActorInfo& info) {
+        Y_ABORT_UNLESS(info.ActorId);
+        Send(info.ActorId, new TEvWorker::TEvHandshake());
+        info.InitDone = false;
+    }
+
+    void Handle(TEvWorker::TEvHandshake::TPtr& ev) {
+        if (ev->Sender == Reader) {
+            Reader.InitDone = true;
+        } else if (ev->Sender == Writer) {
+            Writer.InitDone = true;
+        } else {
+            // TODO: log warn
+        }
+
+        if (Reader && Writer) {
+            Send(Reader, new TEvWorker::TEvPoll());
+        }
+    }
+
+    void Handle(TEvWorker::TEvPoll::TPtr& ev) {
+        if (ev->Sender != Writer) {
+            // TODO: log warn
+            return;
+        }
+
+        Send(ev->Forward(Reader));
+    }
+
+    void Handle(TEvWorker::TEvData::TPtr& ev) {
+        if (ev->Sender != Reader) {
+            // TODO: log warn
+            return;
+        }
+
+        Send(ev->Forward(Writer));
+    }
+
+    void Handle(TEvents::TEvGone::TPtr& ev) {
+        if (ev->Sender == Reader) {
+            // TODO
+        } else if (ev->Sender == Writer) {
+            // TODO
+        } else {
+            // TODO: log warn
+        }
+    }
+
+    void PassAway() override {
+        for (auto* actor : {&Reader, &Writer}) {
+            Send(*actor, new TEvents::TEvPoison());
+        }
+
+        TActorBootstrapped::PassAway();
+    }
+
+public:
+    static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+        return NKikimrServices::TActivity::REPLICATION_WORKER;
+    }
+
+    explicit TWorker(THolder<IActor>&& reader, THolder<IActor>&& writer)
+        : Reader(std::move(reader))
+        , Writer(std::move(writer))
+    {
+    }
+
+    void Bootstrap() {
+        for (auto* actor : {&Reader, &Writer}) {
+            RegisterActor(*actor);
+            InitActor(*actor);
+        }
+
+        Become(&TThis::StateWork);
+    }
+
+    STATEFN(StateWork) {
+        switch (ev->GetTypeRewrite()) {
+            hFunc(TEvWorker::TEvHandshake, Handle);
+            hFunc(TEvWorker::TEvPoll, Handle);
+            hFunc(TEvWorker::TEvData, Handle);
+            hFunc(TEvents::TEvGone, Handle);
+            sFunc(TEvents::TEvPoison, PassAway);
+        }
+    }
+
+private:
+    TActorInfo Reader;
+    TActorInfo Writer;
+};
+
+IActor* CreateWorker(THolder<IActor>&& reader, THolder<IActor>&& writer) {
+    return new TWorker(std::move(reader), std::move(writer));
+}
+
+}

+ 43 - 0
ydb/core/tx/replication/service/worker.h

@@ -0,0 +1,43 @@
+#pragma once
+
+#include <ydb/core/base/defs.h>
+#include <ydb/core/base/events.h>
+
+#include <util/generic/vector.h>
+
+namespace NKikimr::NReplication::NService {
+
+struct TEvWorker {
+    enum EEv {
+        EvBegin = EventSpaceBegin(TKikimrEvents::ES_REPLICATION_SERVICE),
+
+        EvHandshake,
+        EvPoll,
+        EvData,
+
+        EvEnd,
+    };
+
+    static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_REPLICATION_SERVICE));
+
+    struct TEvHandshake: public TEventLocal<TEvHandshake, EvHandshake> {};
+    struct TEvPoll: public TEventLocal<TEvPoll, EvPoll> {};
+
+    struct TEvData: public TEventLocal<TEvData, EvData> {
+        struct TRecord {
+            ui64 Offset;
+            TString Data;
+
+            explicit TRecord(ui64 offset, const TString& data);
+            explicit TRecord(ui64 offset, TString&& data);
+        };
+
+        TVector<TRecord> Records;
+
+        explicit TEvData(TVector<TRecord>&& records);
+    };
+};
+
+IActor* CreateWorker(THolder<IActor>&& reader, THolder<IActor>&& writer);
+
+}

+ 5 - 0
ydb/core/tx/replication/service/ya.make

@@ -2,10 +2,15 @@ LIBRARY()
 
 PEERDIR(
     ydb/core/base
+    ydb/core/tx/replication/ydb_proxy
+    ydb/library/actors/core
 )
 
 SRCS(
     service.cpp
+    table_writer.cpp
+    topic_reader.cpp
+    worker.cpp
 )
 
 YQL_LAST_ABI_VERSION()

+ 1 - 0
ydb/library/services/services.proto

@@ -1015,5 +1015,6 @@ message TActivity {
         STATISTICS_AGGREGATOR = 622;
         KAFKA_READ_SESSION_ACTOR = 623;
         GRAPH_SERVICE = 624;
+        REPLICATION_WORKER = 625;
     };
 };