Browse Source

Split topic partitions. Take sourceid information from parent partitions

tesseract 1 year ago
parent
commit
0628d4698c

+ 5 - 0
.mapping.json

@@ -9475,6 +9475,11 @@
   "ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-x86_64.txt":"",
   "ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.txt":"",
   "ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.windows-x86_64.txt":"",
+  "ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.darwin-x86_64.txt":"",
+  "ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.linux-aarch64.txt":"",
+  "ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.linux-x86_64.txt":"",
+  "ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.txt":"",
+  "ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/CMakeLists.windows-x86_64.txt":"",
   "ydb/public/sdk/cpp/client/ydb_types/CMakeLists.darwin-x86_64.txt":"",
   "ydb/public/sdk/cpp/client/ydb_types/CMakeLists.linux-aarch64.txt":"",
   "ydb/public/sdk/cpp/client/ydb_types/CMakeLists.linux-x86_64.txt":"",

+ 1 - 0
ydb/core/persqueue/CMakeLists.darwin-x86_64.txt

@@ -57,6 +57,7 @@ target_sources(ydb-core-persqueue PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_monitoring.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_read.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_sourcemanager.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_write.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp

+ 1 - 0
ydb/core/persqueue/CMakeLists.linux-aarch64.txt

@@ -58,6 +58,7 @@ target_sources(ydb-core-persqueue PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_monitoring.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_read.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_sourcemanager.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_write.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp

+ 1 - 0
ydb/core/persqueue/CMakeLists.linux-x86_64.txt

@@ -58,6 +58,7 @@ target_sources(ydb-core-persqueue PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_monitoring.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_read.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_sourcemanager.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_write.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp

+ 1 - 0
ydb/core/persqueue/CMakeLists.windows-x86_64.txt

@@ -57,6 +57,7 @@ target_sources(ydb-core-persqueue PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_monitoring.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_read.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_sourcemanager.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_write.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp

+ 8 - 0
ydb/core/persqueue/events/internal.h

@@ -138,6 +138,8 @@ struct TEvPQ {
         EvQuotaCountersUpdated,
         EvConsumerRemoved,
         EvFetchResponse,
+        EvSourceIdRequest,
+        EvSourceIdResponse,
         EvEnd
     };
 
@@ -854,6 +856,12 @@ struct TEvPQ {
         TString Message;
         NKikimrClient::TPersQueueFetchResponse Response;
     };
+
+    struct TEvSourceIdRequest : public TEventPB<TEvSourceIdRequest, NKikimrPQ::TEvSourceIdRequest, EvSourceIdRequest> {
+    };
+
+    struct TEvSourceIdResponse : public TEventPB<TEvSourceIdResponse, NKikimrPQ::TEvSourceIdResponse, EvSourceIdResponse> {
+    };
 };
 
 } //NKikimr

+ 185 - 25
ydb/core/persqueue/partition.cpp

@@ -1,5 +1,6 @@
 #include "event_helpers.h"
 #include "mirrorer.h"
+#include "partition_log.h"
 #include "partition_util.h"
 #include "partition.h"
 #include "read.h"
@@ -53,6 +54,29 @@ const TString& TPartition::TopicName() const {
     return TopicConverter->GetClientsideName();
 }
 
+TString TPartition::LogPrefix() const {
+    TString state;
+    if (CurrentStateFunc() == &TThis::StateInit) {
+        state = "StateInit";
+    } else if (CurrentStateFunc() == &TThis::StateIdle) {
+        state = "StateIdle";
+    } else if (CurrentStateFunc() == &TThis::StateWrite) {
+        state = "StateWrite";
+    } else {
+        state = "Unknown";
+    }
+    return TStringBuilder() << "" << SelfId() << " " << state << " Partition: " << Partition << " ";
+}
+
+bool TPartition::CanWrite() const {
+    return (PartitionConfig == nullptr || PartitionConfig->GetStatus() == NKikimrPQ::ETopicPartitionStatus::Active) 
+        && (!PendingPartitionConfig || PendingPartitionConfig->GetStatus() == NKikimrPQ::ETopicPartitionStatus::Active);
+}
+
+bool TPartition::CanEnqueue() const {
+    return PartitionConfig == nullptr || PartitionConfig->GetStatus() == NKikimrPQ::ETopicPartitionStatus::Active;
+}
+
 ui64 GetOffsetEstimate(const std::deque<TDataKey>& container, TInstant timestamp, ui64 offset) {
     if (container.empty()) {
         return offset;
@@ -114,6 +138,8 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
     , TopicConverter(topicConverter)
     , IsLocalDC(TabletConfig.GetLocalDC())
     , DCId(std::move(dcId))
+    , PartitionGraph()
+    , SourceManager(this)
     , StartOffset(0)
     , EndOffset(0)
     , WriteInflightSize(0)
@@ -851,34 +877,50 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx
 }
 
 void TPartition::Handle(TEvPQ::TEvGetMaxSeqNoRequest::TPtr& ev, const TActorContext& ctx) {
-    auto response = MakeHolder<TEvPQ::TEvProxyResponse>(ev->Get()->Cookie);
-    NKikimrClient::TResponse& resp = response->Response;
+    MaxSeqNoRequests.emplace_back(ev);
+    ProcessMaxSeqNoRequest(ctx);
+}
 
-    resp.SetStatus(NMsgBusProxy::MSTATUS_OK);
-    resp.SetErrorCode(NPersQueue::NErrorCode::OK);
+void TPartition::ProcessMaxSeqNoRequest(const TActorContext& ctx) {
+    PQ_LOG_T("TPartition::ProcessMaxSeqNoRequest. Queue size: " << MaxSeqNoRequests.size());
+    SourceManager.EnsureSource(ctx);
 
-    auto& result = *resp.MutablePartitionResponse()->MutableCmdGetMaxSeqNoResult();
-    for (const auto& sourceId : ev->Get()->SourceIds) {
-        auto& protoInfo = *result.AddSourceIdInfo();
-        protoInfo.SetSourceId(sourceId);
+    while(!MaxSeqNoRequests.empty()) {
+        auto& ev =  MaxSeqNoRequests.front();
 
-        auto it = SourceIdStorage.GetInMemorySourceIds().find(sourceId);
-        if (it == SourceIdStorage.GetInMemorySourceIds().end()) {
-            continue;
-        }
+        auto response = MakeHolder<TEvPQ::TEvProxyResponse>(ev->Get()->Cookie);
+        NKikimrClient::TResponse& resp = response->Response;
 
-        const auto& memInfo = it->second;
-        Y_ABORT_UNLESS(memInfo.Offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, memInfo.Offset);
-        Y_ABORT_UNLESS(memInfo.SeqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, memInfo.SeqNo);
+        resp.SetStatus(NMsgBusProxy::MSTATUS_OK);
+        resp.SetErrorCode(NPersQueue::NErrorCode::OK);
 
-        protoInfo.SetSeqNo(memInfo.SeqNo);
-        protoInfo.SetOffset(memInfo.Offset);
-        protoInfo.SetWriteTimestampMS(memInfo.WriteTimestamp.MilliSeconds());
-        protoInfo.SetExplicit(memInfo.Explicit);
-        protoInfo.SetState(TSourceIdInfo::ConvertState(memInfo.State));
-    }
+        auto& result = *resp.MutablePartitionResponse()->MutableCmdGetMaxSeqNoResult();
+        for (const auto& sourceId : ev->Get()->SourceIds) {
+            auto& protoInfo = *result.AddSourceIdInfo();
+            protoInfo.SetSourceId(sourceId);
+
+            auto info = SourceManager.Get(sourceId);
+            if (!info) {
+                PQ_LOG_D("Stop MaxSeqNoRequest - scheduled a research. SourceId: " << sourceId);
+                return;
+            }
+            if (info.State == TSourceIdInfo::EState::Unknown) {
+                continue;
+            }
 
-    ctx.Send(Tablet, response.Release());
+            Y_ABORT_UNLESS(info.Offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, info.Offset);
+            Y_ABORT_UNLESS(info.SeqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, info.SeqNo);
+
+            protoInfo.SetSeqNo(info.SeqNo);
+            protoInfo.SetOffset(info.Offset);
+            protoInfo.SetWriteTimestampMS(info.WriteTimestamp.MilliSeconds());
+            protoInfo.SetExplicit(info.Explicit);
+            protoInfo.SetState(TSourceIdInfo::ConvertState(info.State));
+        }
+
+        ctx.Send(Tablet, response.Release());
+        MaxSeqNoRequests.pop_front();
+    }
 }
 
 void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& ctx) {
@@ -1297,12 +1339,14 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&
         DiskIsFull = !diskIsOk;
 
     if (response.HasCookie()) {
-        HandleSetOffsetResponse(response.GetCookie(), ctx);
+        OnProcessTxsAndUserActsWriteComplete(response.GetCookie(), ctx);
     } else {
-        if (ctx.Now() - WriteStartTime > TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetMinWriteLatencyMs())) {
+        const auto writeDuration = ctx.Now() - WriteStartTime;
+        const auto minWriteLatency = TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetMinWriteLatencyMs());
+        if (writeDuration > minWriteLatency) {
             HandleWriteResponse(ctx);
         } else {
-            ctx.Schedule(TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetMinWriteLatencyMs()) - (ctx.Now() - WriteStartTime), new TEvPQ::TEvHandleWriteResponse());
+            ctx.Schedule(minWriteLatency - writeDuration, new TEvPQ::TEvHandleWriteResponse());
         }
     }
 }
@@ -1414,6 +1458,7 @@ void TPartition::RemoveDistrTx()
     Y_ABORT_UNLESS(!DistrTxs.empty());
 
     DistrTxs.pop_front();
+    PendingPartitionConfig = nullptr;
 }
 
 void TPartition::ProcessDistrTxs(const TActorContext& ctx)
@@ -1495,6 +1540,8 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event)
     ChangeConfig =
         MakeSimpleShared<TEvPQ::TEvChangePartitionConfig>(TopicConverter,
                                                           event.Config);
+    PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config, Partition);
+
     SendChangeConfigReply = false;
     return true;
 }
@@ -1623,11 +1670,85 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co
     }
 }
 
+void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorContext& ctx) {
+    Y_ABORT_UNLESS(cookie == SET_OFFSET_COOKIE);
+
+
+    if (ChangeConfig) {
+        EndChangePartitionConfig(ChangeConfig->Config,
+                                 ChangeConfig->TopicConverter,
+                                 ctx);
+    }
+
+    for (auto& user : AffectedUsers) {
+        if (auto* actual = GetPendingUserIfExists(user)) {
+            TUserInfo& userInfo = UsersInfoStorage->GetOrCreate(user, ctx);
+            bool offsetHasChanged = (userInfo.Offset != actual->Offset);
+
+            userInfo.Session = actual->Session;
+            userInfo.Generation = actual->Generation;
+            userInfo.Step = actual->Step;
+            userInfo.Offset = actual->Offset;
+            userInfo.ReadRuleGeneration = actual->ReadRuleGeneration;
+            userInfo.ReadFromTimestamp = actual->ReadFromTimestamp;
+            userInfo.HasReadRule = true;
+
+            if (userInfo.Important != actual->Important) {
+                if (userInfo.LabeledCounters) {
+                    ScheduleDropPartitionLabeledCounters(userInfo.LabeledCounters->GetGroup());
+                }
+                userInfo.SetImportant(actual->Important);
+            }
+            if (userInfo.Important && userInfo.Offset < (i64)StartOffset) {
+                userInfo.Offset = StartOffset;
+            }
+
+            if (offsetHasChanged && !userInfo.UpdateTimestampFromCache()) {
+                userInfo.ActualTimestamps = false;
+                ReadTimestampForOffset(user, userInfo, ctx);
+            } else {
+                TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1);
+            }
+        } else {
+            auto ui = UsersInfoStorage->GetIfExists(user);
+            if (ui && ui->LabeledCounters) {
+                ScheduleDropPartitionLabeledCounters(ui->LabeledCounters->GetGroup());
+            }
+
+            UsersInfoStorage->Remove(user, ctx);
+            Send(ReadQuotaTrackerActor, new TEvPQ::TEvConsumerRemoved(user));
+        }
+    }
+
+    for (auto& [actor, reply] : Replies) {
+        ctx.Send(actor, reply.release());
+    }
+
+    PendingUsersInfo.clear();
+    Replies.clear();
+    AffectedUsers.clear();
+
+    UsersInfoWriteInProgress = false;
+
+    TxIdHasChanged = false;
+
+    if (ChangeConfig) {
+        ReportCounters(ctx, true);
+        ChangeConfig = nullptr;
+        PendingPartitionConfig = nullptr;
+    }
+
+
+    ProcessTxsAndUserActs(ctx);
+}
+
 void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config,
                                           NPersQueue::TTopicConverterPtr topicConverter,
                                           const TActorContext& ctx)
 {
     Config = config;
+    PartitionConfig = GetPartitionConfig(Config, Partition);
+    PartitionGraph.Rebuild(Config);
     TopicConverter = topicConverter;
 
     Y_ABORT_UNLESS(Config.GetPartitionConfig().GetTotalPartitions() > 0);
@@ -1697,6 +1818,9 @@ void TPartition::ProcessDistrTx(const TActorContext& ctx)
     } else if (t.ProposeConfig) {
         t.Predicate = BeginTransaction(*t.ProposeConfig);
 
+        PendingPartitionConfig = GetPartitionConfig(t.ProposeConfig->Config, Partition);
+        //Y_VERIFY_DEBUG_S(PendingPartitionConfig, "Partition " << Partition << " config not found");
+
         ctx.Send(Tablet,
                  MakeHolder<TEvPQ::TEvProposePartitionConfigResult>(t.ProposeConfig->Step,
                                                                     t.ProposeConfig->TxId,
@@ -1707,6 +1831,7 @@ void TPartition::ProcessDistrTx(const TActorContext& ctx)
         Y_ABORT_UNLESS(!ChangeConfig);
 
         ChangeConfig = t.ChangeConfig;
+        PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config, Partition);
         SendChangeConfigReply = t.SendReply;
         BeginChangePartitionConfig(ChangeConfig->Config, ctx);
 
@@ -2392,5 +2517,40 @@ void TPartition::Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext
     }
 }
 
+void TPartition::Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx) {
+    auto& record = ev->Get()->Record;
+
+    if (Partition != record.GetPartition()) {
+        LOG_INFO_S(
+            ctx, NKikimrServices::PERSQUEUE,
+            "TEvSourceIdRequest for wrong partition " << record.GetPartition() << "." <<
+            " Topic: \"" << TopicName() << "\"." <<
+            " Partition: " << Partition << "."
+        );
+        return;
+    }
+
+    auto& memoryStorage = SourceIdStorage.GetInMemorySourceIds();
+
+    auto response = MakeHolder<TEvPQ::TEvSourceIdResponse>();
+    for(auto& sourceId : record.GetSourceId()) {
+        auto* s = response->Record.AddSource();
+        s->SetId(sourceId);
+
+        auto it = memoryStorage.find(sourceId);
+        if (it != memoryStorage.end()) {
+            auto& info = it->second;
+            s->SetState(Convert(info.State));
+            s->SetSeqNo(info.SeqNo);
+            s->SetOffset(info.Offset);
+            s->SetExplicit(info.Explicit);
+            s->SetWriteTimestamp(info.WriteTimestamp.GetValue());
+        } else {
+            s->SetState(NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Unknown);
+        }
+    }
+
+    Send(ev->Sender, response.Release());
+}
 
 } // namespace NKikimr::NPQ

+ 40 - 14
ydb/core/persqueue/partition.h

@@ -4,6 +4,7 @@
 #include "header.h"
 #include "key.h"
 #include "partition_init.h"
+#include "partition_sourcemanager.h"
 #include "partition_types.h"
 #include "quota_tracker.h"
 #include "sourceid.h"
@@ -78,6 +79,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
     friend TInitDataRangeStep;
     friend TInitDataStep;
 
+    friend TPartitionSourceManager;
+
 public:
     const TString& TopicName() const;
 
@@ -88,6 +91,9 @@ private:
     struct THasDataReq;
     struct THasDataDeadline;
 
+    bool CanWrite() const;
+    bool CanEnqueue() const;
+
     void ReplyError(const TActorContext& ctx, const ui64 dst, NPersQueue::NErrorCode::EErrorCode errorCode, const TString& error);
     void ReplyPropose(const TActorContext& ctx, const NKikimrPQ::TEvProposeTransaction& event, NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode);
     void ReplyErrorForStoredWrites(const TActorContext& ctx);
@@ -101,7 +107,7 @@ private:
     void AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx);
     void AnswerCurrentWrites(const TActorContext& ctx);
     void CancelAllWritesOnIdle(const TActorContext& ctx);
-    void CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TSourceIdWriter& sourceIdWriter, NPersQueue::NErrorCode::EErrorCode errorCode = NPersQueue::NErrorCode::BAD_REQUEST);
+    void CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TPartitionSourceManager::TModificationBatch& sourceIdBatch, NPersQueue::NErrorCode::EErrorCode errorCode = NPersQueue::NErrorCode::BAD_REQUEST);
     void ClearOldHead(const ui64 offset, const ui16 partNo, TEvKeyValue::TEvRequest* request);
     void CreateMirrorerActor();
     void DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const TActorContext& ctx);
@@ -155,7 +161,6 @@ private:
     void HandleOnWrite(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx);
     void HandleOnWrite(TEvPQ::TEvUpdateAvailableSize::TPtr& ev, const TActorContext& ctx);
     void HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx);
-    void HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx);
     void HandleWakeup(const TActorContext& ctx);
     void HandleWriteResponse(const TActorContext& ctx);
 
@@ -175,6 +180,8 @@ private:
     void ProcessTimestampRead(const TActorContext& ctx);
     void ProcessTimestampsForNewData(const ui64 prevEndOffset, const TActorContext& ctx);
 
+    void ProcessMaxSeqNoRequest(const TActorContext& ctx);
+
     void ReadTimestampForOffset(const TString& user, TUserInfo& ui, const TActorContext& ctx);
     void ReportCounters(const TActorContext& ctx, bool force = false);
     bool UpdateCounters(const TActorContext& ctx, bool force = false);
@@ -199,7 +206,7 @@ private:
 
     TInstant GetWriteTimeEstimate(ui64 offset) const;
     bool AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx,
-        TSourceIdWriter& sourceIdWriter, THeartbeatEmitter& heartbeatEmitter);
+        TPartitionSourceManager::TModificationBatch& sourceIdBatch);
     bool CleanUp(TEvKeyValue::TEvRequest* request, const TActorContext& ctx);
 
     // Removes blobs that are no longer required. Blobs are no longer required if the storage time of all messages
@@ -300,6 +307,7 @@ private:
 
     void BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config,
                                     const TActorContext& ctx);
+    void OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorContext& ctx);
     void EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config,
                                   NPersQueue::TTopicConverterPtr topicConverter,
                                   const TActorContext& ctx);
@@ -312,7 +320,8 @@ private:
 
     template <typename T>
     void EmplaceRequest(T&& body, const TActorContext& ctx) {
-        Requests.emplace_back(body, WriteQuota->GetQuotedTime(ctx.Now()), ctx.Now() - TInstant::Zero());
+        const auto now = ctx.Now();
+        Requests.emplace_back(body, WriteQuota->GetQuotedTime(now), now - TInstant::Zero());
     }
     void EmplaceResponse(TMessage&& message, const TActorContext& ctx);
 
@@ -327,6 +336,9 @@ private:
 
     void ResendPendingEvents(const TActorContext& ctx);
 
+    void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx);
+
+    TString LogPrefix() const;
 public:
     static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
         return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR;
@@ -460,6 +472,8 @@ private:
             HFuncTraced(TEvPQ::TEvTxCommit, Handle);
             HFuncTraced(TEvPQ::TEvTxRollback, Handle);
             HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
+            HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle);
+            HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle);
             HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
             HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
             HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
@@ -515,6 +529,8 @@ private:
             HFuncTraced(TEvPQ::TEvTxCommit, Handle);
             HFuncTraced(TEvPQ::TEvTxRollback, Handle);
             HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
+            HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle);
+            HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle);
             HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
             HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
             HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
@@ -525,36 +541,45 @@ private:
     }
 
 private:
+    enum class ProcessResult {
+        Continue,
+        Abort,
+        Break
+    };
+
     struct ProcessParameters {
-        ProcessParameters(TSourceIdWriter& sourceIdWriter,
-                          THeartbeatEmitter& heartbeatEmitter)
-                : SourceIdWriter(sourceIdWriter)
-                , HeartbeatEmitter(heartbeatEmitter) {
+        ProcessParameters(TPartitionSourceManager::TModificationBatch& sourceIdBatch)
+                : SourceIdBatch(sourceIdBatch) {
             }
 
-        TSourceIdWriter& SourceIdWriter;
-        THeartbeatEmitter& HeartbeatEmitter;
+        TPartitionSourceManager::TModificationBatch& SourceIdBatch;
 
         ui64 CurOffset;
         bool OldPartsCleared;
         bool HeadCleared;
     };
 
-    bool ProcessRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& parameters);
-    bool ProcessRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& parameters);
-    bool ProcessRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters);
-    bool ProcessRequest(TWriteMsg& msg, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request, const TActorContext& ctx);
+    ProcessResult ProcessRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& parameters);
+    ProcessResult ProcessRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& parameters);
+    ProcessResult ProcessRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters);
+    ProcessResult ProcessRequest(TWriteMsg& msg, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request, const TActorContext& ctx);
 
 private:
     ui64 TabletID;
     ui32 Partition;
     NKikimrPQ::TPQTabletConfig Config;
     NKikimrPQ::TPQTabletConfig TabletConfig;
+    const NKikimrPQ::TPQTabletConfig::TPartition* PartitionConfig = nullptr;
+    const NKikimrPQ::TPQTabletConfig::TPartition* PendingPartitionConfig = nullptr;
+
     const TTabletCountersBase& Counters;
     NPersQueue::TTopicConverterPtr TopicConverter;
     bool IsLocalDC;
     TString DCId;
 
+    TPartitionGraph PartitionGraph;
+    TPartitionSourceManager SourceManager;
+
     ui32 MaxBlobSize;
     const ui32 TotalLevels = 4;
     TVector<ui32> CompactLevelBorder;
@@ -575,6 +600,7 @@ private:
 
     std::deque<TMessage> Requests;
     std::deque<TMessage> Responses;
+    std::deque<TEvPQ::TEvGetMaxSeqNoRequest::TPtr> MaxSeqNoRequests;
 
     THead Head;
     THead NewHead;

+ 2 - 0
ydb/core/persqueue/partition_init.cpp

@@ -179,6 +179,8 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon
 
     case NKikimrProto::NODATA:
         Partition()->Config = Partition()->TabletConfig;
+        Partition()->PartitionConfig = GetPartitionConfig(Partition()->Config, Partition()->Partition);
+        Partition()->PartitionGraph.Rebuild(Partition()->Config);
         break;
 
     case NKikimrProto::ERROR:

+ 18 - 0
ydb/core/persqueue/partition_log.h

@@ -0,0 +1,18 @@
+#pragma once
+
+#include <library/cpp/actors/core/log.h>
+#include <util/generic/string.h>
+
+namespace NKikimr::NPQ {
+
+inline TString LogPrefix() { return {}; }
+
+#define PQ_LOG_T(stream) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::PERSQUEUE, LogPrefix() << stream)
+#define PQ_LOG_D(stream) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PERSQUEUE, LogPrefix() << stream)
+#define PQ_LOG_I(stream) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PERSQUEUE, LogPrefix() << stream)
+#define PQ_LOG_W(stream) LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::PERSQUEUE, LogPrefix() << stream)
+#define PQ_LOG_NOTICE(stream) LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::PERSQUEUE, LogPrefix() << stream)
+#define PQ_LOG_ERROR(stream) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PERSQUEUE, LogPrefix() << stream)
+#define PQ_LOG_CRIT(stream) LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::PERSQUEUE, LogPrefix() << stream)
+
+} // namespace NKikimr::NPQ

Some files were not shown because too many files changed in this diff