abcdef 2 years ago
parent
commit
df5f90063a
3 changed files with 453 additions and 126 deletions
  1. 175 77
      ydb/core/persqueue/partition.cpp
  2. 19 5
      ydb/core/persqueue/partition.h
  3. 259 44
      ydb/core/persqueue/ut/partition_ut.cpp

+ 175 - 77
ydb/core/persqueue/partition.cpp

@@ -434,19 +434,22 @@ void RequestDataRange(const TActorContext& ctx, const TActorId& dst, ui32 partit
 void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx) {
     TSet<TString> hasReadRule;
 
-    for (auto& userInfo : UsersInfoStorage.GetAll()) {
+    for (auto& userInfo : UsersInfoStorage->GetAll()) {
         userInfo.second.ReadFromTimestamp = TInstant::Zero();
         userInfo.second.HasReadRule = false;
         hasReadRule.insert(userInfo.first);
     }
     for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
         const auto& consumer = config.GetReadRules(i);
-        auto& userInfo = UsersInfoStorage.GetOrCreate(consumer, ctx, 0);
+        auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx, 0);
         userInfo.HasReadRule = true;
         ui64 rrGen = i < config.ReadRuleGenerationsSize() ? config.GetReadRuleGenerations(i) : 0;
         if (userInfo.ReadRuleGeneration != rrGen) {
             THolder<TEvPQ::TEvSetClientInfo> event = MakeHolder<TEvPQ::TEvSetClientInfo>(0, consumer, 0, "", 0, 0,
                                                 TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen);
+            //
+            // TODO(abcdef): заменить на вызов ProcessUserAct
+            //
             AddUserAct(event.Release());
             userInfo.Session = "";
             userInfo.Offset = 0;
@@ -462,7 +465,7 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config
             userInfo.ReadFromTimestamp = ts;
     }
     for (auto& consumer : hasReadRule) {
-        auto& userInfo = UsersInfoStorage.GetOrCreate(consumer, ctx);
+        auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx);
         THolder<TEvPQ::TEvSetClientInfo> event = MakeHolder<TEvPQ::TEvSetClientInfo>(0, consumer,
                                                                                0, "", 0, 0, TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE, 0);
         if (!userInfo.Important && userInfo.LabeledCounters) {
@@ -471,18 +474,22 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config
         userInfo.Session = "";
         userInfo.Offset = 0;
         userInfo.Step = userInfo.Generation = 0;
+        //
+        // TODO(abcdef): заменить на вызов ProcessUserAct
+        //
         AddUserAct(event.Release());
     }
 }
 
 TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, const TActorId& blobCache,
                        const NPersQueue::TTopicConverterPtr& topicConverter, bool isLocalDC, TString dcId, bool isServerless,
-                       const NKikimrPQ::TPQTabletConfig& config, const TTabletCountersBase& counters,
+                       const NKikimrPQ::TPQTabletConfig& tabletConfig, const TTabletCountersBase& counters,
                        bool newPartition,
                        TVector<TTransaction> distrTxs)
     : TabletID(tabletId)
     , Partition(partition)
-    , Config(config)
+    , TabletConfig(tabletConfig)
+    , Counters(counters)
     , TopicConverter(topicConverter)
     , IsLocalDC(isLocalDC)
     , DCId(std::move(dcId))
@@ -491,21 +498,13 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
     , WriteInflightSize(0)
     , Tablet(tablet)
     , BlobCache(blobCache)
-    , InitState(WaitDiskStatus)
+    , InitState(WaitConfig)
     , PartitionedBlob(partition, 0, 0, 0, 0, 0, Head, NewHead, true, false, 8_MB)
     , NewHeadKey{TKey{}, 0, TInstant::Zero(), 0}
     , BodySize(0)
     , MaxWriteResponsesSize(0)
     , GapSize(0)
-    , CloudId(config.GetYcCloudId())
-    , DbId(config.GetYdbDatabaseId())
-    , DbPath(config.GetYdbDatabasePath())
     , IsServerless(isServerless)
-    , FolderId(config.GetYcFolderId())
-    , UsersInfoStorage(
-            DCId, TabletID, TopicConverter, Partition, counters, Config,
-            CloudId, DbId, config.GetYdbDatabasePath(), IsServerless, FolderId
-    )
     , ReadingTimestamp(false)
     , Cookie(0)
     , InitDuration(TDuration::Zero())
@@ -524,17 +523,10 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
     , AvgQuotaBytes{{TDuration::Seconds(1), 1000}, {TDuration::Minutes(1), 1000}, {TDuration::Hours(1), 2000}, {TDuration::Days(1), 2000}}
     , ReservedSize(0)
     , Channel(0)
-    , TotalChannelWritesByHead(Config.GetPartitionConfig().GetNumChannels(), 0)
     , WriteBufferIsFullCounter(nullptr)
     , WriteLagMs(TDuration::Minutes(1), 100)
 {
-    if (Config.GetPartitionConfig().HasMirrorFrom()) {
-        ManageWriteTimestampEstimate = !Config.GetPartitionConfig().GetMirrorFrom().GetSyncWriteTime();
-    } else {
-        ManageWriteTimestampEstimate = IsLocalDC;
-    }
-
-    TabletCounters.Populate(counters);
+    TabletCounters.Populate(Counters);
 
     if (!distrTxs.empty()) {
         std::move(distrTxs.begin(), distrTxs.end(),
@@ -712,7 +704,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
                     }
                 }
                 TABLEBODY() {
-                    for (auto& d: UsersInfoStorage.GetAll()) {
+                    for (auto& d: UsersInfoStorage->GetAll()) {
                         TABLER() {
                             TABLED() {out << EncodeHtmlPcdata(d.first);}
                             TABLED() {out << d.second.Offset;}
@@ -736,8 +728,52 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
     ctx.Send(ev->Sender, new TEvPQ::TEvMonResponse(Partition, res, out.Str()));
 }
 
+void TPartition::RequestConfig(const TActorContext& ctx)
+{
+    auto event = MakeHolder<TEvKeyValue::TEvRequest>();
+    auto read = event->Record.AddCmdRead();
+    read->SetKey(GetKeyConfig());
+    ctx.Send(Tablet, event.Release());
+}
+
+void TPartition::HandleConfig(const NKikimrClient::TResponse& res, const TActorContext& ctx)
+{
+    auto& response = res.GetReadResult(0);
+
+    switch (response.GetStatus()) {
+    case NKikimrProto::OK:
+        Y_VERIFY(Config.ParseFromString(response.GetValue()));
+        Y_VERIFY(Config.GetVersion() <= TabletConfig.GetVersion());
+        if (Config.GetVersion() < TabletConfig.GetVersion()) {
+            auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(TopicConverter,
+                                                                     TabletConfig);
+            PushFrontDistrTx(event.Release());
+        }
+        break;
+    case NKikimrProto::NODATA:
+        Config = TabletConfig;
+        break;
+    case NKikimrProto::ERROR:
+        LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE,
+                    "Partition " << Partition << " can't read config");
+        ctx.Send(Tablet, new TEvents::TEvPoisonPill());
+        break;
+    default:
+        Cerr << "ERROR " << response.GetStatus() << "\n";
+        Y_FAIL("bad status");
+    };
+
+    InitState = WaitDiskStatus;
+    Initialize(ctx);
+}
 
 void TPartition::Bootstrap(const TActorContext& ctx) {
+    Y_VERIFY(InitState == WaitConfig);
+    RequestConfig(ctx);
+    Become(&TThis::StateInit);
+}
+
+void TPartition::Initialize(const TActorContext& ctx) {
     CreationTime = ctx.Now();
     WriteCycleStartTime = ctx.Now();
     WriteQuota.ConstructInPlace(Config.GetPartitionConfig().GetBurstSize(),
@@ -747,6 +783,11 @@ void TPartition::Bootstrap(const TActorContext& ctx) {
     LastUsedStorageMeterTimestamp = ctx.Now();
     WriteTimestampEstimate = ManageWriteTimestampEstimate ? ctx.Now() : TInstant::Zero();
 
+    CloudId = Config.GetYcCloudId();
+    DbId = Config.GetYdbDatabaseId();
+    DbPath = Config.GetYdbDatabasePath();
+    FolderId = Config.GetYcFolderId();
+
     CalcTopicWriteQuotaParams(AppData()->PQConfig,
                               IsLocalDC,
                               TopicConverter,
@@ -755,6 +796,25 @@ void TPartition::Bootstrap(const TActorContext& ctx) {
                               TopicWriteQuoterPath,
                               TopicWriteQuotaResourcePath);
 
+    UsersInfoStorage.ConstructInPlace(DCId,
+                                      TabletID,
+                                      TopicConverter,
+                                      Partition,
+                                      Counters,
+                                      Config,
+                                      CloudId,
+                                      DbId,
+                                      Config.GetYdbDatabasePath(),
+                                      IsServerless,
+                                      FolderId);
+    TotalChannelWritesByHead.resize(Config.GetPartitionConfig().GetNumChannels());
+
+    if (Config.GetPartitionConfig().HasMirrorFrom()) {
+        ManageWriteTimestampEstimate = !Config.GetPartitionConfig().GetMirrorFrom().GetSyncWriteTime();
+    } else {
+        ManageWriteTimestampEstimate = IsLocalDC;
+    }
+
     if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) {
         PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(TopicConverter->GetClientsideName(),
                                                                      Partition,
@@ -764,7 +824,7 @@ void TPartition::Bootstrap(const TActorContext& ctx) {
                                                                      Partition));
     }
 
-    UsersInfoStorage.Init(Tablet, SelfId(), ctx);
+    UsersInfoStorage->Init(Tablet, SelfId(), ctx);
 
     Y_VERIFY(AppData(ctx)->PQConfig.GetMaxBlobsPerLevel() > 0);
     ui32 border = LEVEL0;
@@ -787,7 +847,7 @@ void TPartition::Bootstrap(const TActorContext& ctx) {
     }
 
     for (const auto& readQuota : Config.GetPartitionConfig().GetReadQuota()) {
-        auto &userInfo = UsersInfoStorage.GetOrCreate(readQuota.GetClientId(), ctx);
+        auto &userInfo = UsersInfoStorage->GetOrCreate(readQuota.GetClientId(), ctx);
         userInfo.ReadQuota.UpdateConfig(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond());
     }
 
@@ -982,7 +1042,7 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
                 res->Record.SetCookie(*(it->Cookie));
             ctx.Send(it->Sender, res.Release());
             if (!it->ClientId.empty()) {
-                auto& userInfo = UsersInfoStorage.GetOrCreate(it->ClientId, ctx);
+                auto& userInfo = UsersInfoStorage->GetOrCreate(it->ClientId, ctx);
                 userInfo.ForgetSubscription(ctx.Now());
             }
             it = HasDataRequests.erase(it);
@@ -1002,7 +1062,7 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
                     res->Record.SetCookie(*(it->Request.Cookie));
                 ctx.Send(it->Request.Sender, res.Release());
                 if (!it->Request.ClientId.empty()) {
-                    auto& userInfo = UsersInfoStorage.GetOrCreate(it->Request.ClientId, ctx);
+                    auto& userInfo = UsersInfoStorage->GetOrCreate(it->Request.ClientId, ctx);
                     userInfo.ForgetSubscription(ctx.Now());
                 }
                 HasDataRequests.erase(jt);
@@ -1020,7 +1080,7 @@ void TPartition::UpdateAvailableSize(const TActorContext& ctx) {
 
     auto now = ctx.Now();
     WriteQuota->Update(now);
-    for (auto &c : UsersInfoStorage.GetAll()) {
+    for (auto &c : UsersInfoStorage->GetAll()) {
         while (true) {
             c.second.ReadQuota.Update(now);
             if (!c.second.ReadQuota.CanExaust() && !c.second.ReadRequests.empty()) {
@@ -1077,7 +1137,7 @@ void TPartition::HandleWakeup(const TActorContext& ctx) {
     ProcessHasDataRequests(ctx);
 
     auto now = ctx.Now();
-    for (auto& userInfo : UsersInfoStorage.GetAll()) {
+    for (auto& userInfo : UsersInfoStorage->GetAll()) {
         userInfo.second.UpdateReadingTimeAndState(now);
         for (auto& avg : userInfo.second.AvgReadBytes) {
             avg.Update(now);
@@ -1162,7 +1222,7 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, bool hasWrites,
     const auto& partConfig = Config.GetPartitionConfig();
     ui64 minOffset = EndOffset;
     for (const auto& importantClientId : partConfig.GetImportantClientId()) {
-        TUserInfo* userInfo = UsersInfoStorage.GetIfExists(importantClientId);
+        TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantClientId);
         ui64 curOffset = StartOffset;
         if (userInfo && userInfo->Offset >= 0) //-1 means no offset
             curOffset = userInfo->Offset;
@@ -1285,7 +1345,7 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont
         Y_VERIFY(res.second);
 
         if (InitDone && record.HasClientId() && !record.GetClientId().empty()) {
-            auto& userInfo = UsersInfoStorage.GetOrCreate(record.GetClientId(), ctx);
+            auto& userInfo = UsersInfoStorage->GetOrCreate(record.GetClientId(), ctx);
             ++userInfo.Subscriptions;
             userInfo.UpdateReadOffset((i64)EndOffset - 1, ctx.Now(), ctx.Now(), ctx.Now());
             userInfo.UpdateReadingTimeAndState(ctx.Now());
@@ -1302,7 +1362,7 @@ void TPartition::Handle(TEvPQ::TEvMirrorerCounters::TPtr& ev, const TActorContex
 }
 
 void TPartition::Handle(NReadSpeedLimiterEvents::TEvCounters::TPtr& ev, const TActorContext& /*ctx*/) {
-    auto userInfo = UsersInfoStorage.GetIfExists(ev->Get()->User);
+    auto userInfo = UsersInfoStorage->GetIfExists(ev->Get()->User);
     if (userInfo && userInfo->ReadSpeedLimiter) {
         auto diff = ev->Get()->Counters.MakeDiffForAggr(userInfo->ReadSpeedLimiter->Baseline);
         TabletCounters.Populate(*diff.Get());
@@ -1336,7 +1396,10 @@ void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx)
     if (Mirrorer) {
         Send(Mirrorer->Actor, new TEvents::TEvPoisonPill());
     }
-    UsersInfoStorage.Clear(ctx);
+
+    if (UsersInfoStorage.Defined()) {
+        UsersInfoStorage->Clear(ctx);
+    }
 
     Die(ctx);
 }
@@ -1504,9 +1567,9 @@ void TPartition::HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TRe
                 } else if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkProtoSourceId) {
                     SourceIdStorage.LoadSourceIdInfo(*key, pair.GetValue(), ctx.Now());
                 } else if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkUser) {
-                    UsersInfoStorage.Parse(*key, pair.GetValue(), ctx);
+                    UsersInfoStorage->Parse(*key, pair.GetValue(), ctx);
                 } else if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkUserDeprecated) {
-                    UsersInfoStorage.ParseDeprecated(*key, pair.GetValue(), ctx);
+                    UsersInfoStorage->ParseDeprecated(*key, pair.GetValue(), ctx);
                 }
             }
             //make next step
@@ -1729,6 +1792,10 @@ void TPartition::HandleOnInit(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCo
         DiskIsFull = !diskIsOk;
 
     switch(InitState) {
+        case WaitConfig:
+            Y_VERIFY(response.ReadResultSize() == 1);
+            HandleConfig(response, ctx);
+            break;
         case WaitDiskStatus:
             Y_VERIFY(response.GetStatusResultSize());
             HandleGetDiskStatus(response, ctx);
@@ -1757,7 +1824,7 @@ void TPartition::HandleOnInit(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCo
 
 void TPartition::InitComplete(const TActorContext& ctx) {
     if (StartOffset == EndOffset && EndOffset == 0) {
-        for (auto& [user, info] : UsersInfoStorage.GetAll()) {
+        for (auto& [user, info] : UsersInfoStorage->GetAll()) {
             if (info.Offset > 0 && StartOffset < (ui64)info.Offset) {
                  Head.Offset = EndOffset = StartOffset = info.Offset;
             }
@@ -1810,7 +1877,7 @@ void TPartition::InitComplete(const TActorContext& ctx) {
 
     InitUserInfoForImportantClients(ctx);
 
-    for (auto& userInfoPair : UsersInfoStorage.GetAll()) {
+    for (auto& userInfoPair : UsersInfoStorage->GetAll()) {
         Y_VERIFY(userInfoPair.second.Offset >= 0);
         ReadTimestampForOffset(userInfoPair.first, userInfoPair.second, ctx);
     }
@@ -1834,7 +1901,7 @@ void TPartition::InitComplete(const TActorContext& ctx) {
 
 
 void TPartition::UpdateUserInfoEndOffset(const TInstant& now) {
-    for (auto& userInfo : UsersInfoStorage.GetAll()) {
+    for (auto& userInfo : UsersInfoStorage->GetAll()) {
         userInfo.second.EndOffset = (i64)EndOffset;
         userInfo.second.UpdateReadingTimeAndState(now);
     }
@@ -1885,20 +1952,20 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) {
     TSet<TString> important;
     for (const auto& importantUser : Config.GetPartitionConfig().GetImportantClientId()) {
         important.insert(importantUser);
-        TUserInfo* userInfo = UsersInfoStorage.GetIfExists(importantUser);
+        TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantUser);
         if (userInfo && !userInfo->Important && userInfo->LabeledCounters) {
             ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCountersDrop(Partition, userInfo->LabeledCounters->GetGroup()));
             userInfo->SetImportant(true);
             continue;
         }
         if (!userInfo) {
-            userInfo = &UsersInfoStorage.Create(ctx, importantUser, 0, true, "", 0, 0, 0, 0, TInstant::Zero());
+            userInfo = &UsersInfoStorage->Create(ctx, importantUser, 0, true, "", 0, 0, 0, 0, TInstant::Zero());
         }
         if (userInfo->Offset < (i64)StartOffset)
             userInfo->Offset = StartOffset;
         ReadTimestampForOffset(importantUser, *userInfo, ctx);
     }
-    for (auto& userInfoPair : UsersInfoStorage.GetAll()) {
+    for (auto& userInfoPair : UsersInfoStorage->GetAll()) {
         if (!important.contains(userInfoPair.first) && userInfoPair.second.Important && userInfoPair.second.LabeledCounters) {
             ctx.Send(
                 Tablet,
@@ -1911,7 +1978,7 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) {
 
 
 void TPartition::Handle(TEvPQ::TEvChangePartitionConfig::TPtr& ev, const TActorContext& ctx) {
-    AddDistrTx(ev->Release());
+    PushBackDistrTx(ev->Release());
 
     ProcessTxsAndUserActs(ctx);
 }
@@ -2019,7 +2086,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContex
     result.SetWriteTimestampEstimateMS(WriteTimestampEstimate.MilliSeconds());
 
     if (!ev->Get()->ClientId.empty()) {
-        TUserInfo* userInfo = UsersInfoStorage.GetIfExists(ev->Get()->ClientId);
+        TUserInfo* userInfo = UsersInfoStorage->GetIfExists(ev->Get()->ClientId);
         if (userInfo) {
             i64 offset = Max<i64>(userInfo->Offset, 0);
             result.SetClientOffset(userInfo->Offset);
@@ -2081,7 +2148,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
     TVector<ui64> resSpeed;
     resSpeed.resize(4);
     ui64 maxQuota = 0;
-    for (auto& userInfoPair : UsersInfoStorage.GetAll()) {
+    for (auto& userInfoPair : UsersInfoStorage->GetAll()) {
         auto& userInfo = userInfoPair.second;
         if (ev->Get()->ClientId.empty() || ev->Get()->ClientId == userInfo.User) {
             Y_VERIFY(userInfo.AvgReadBytes.size() == 4);
@@ -2166,7 +2233,7 @@ void TPartition::Handle(TEvPQ::TEvGetPartitionClientInfo::TPtr& ev, const TActor
     result.SetStartOffset(StartOffset);
     result.SetEndOffset(EndOffset);
     result.SetResponseTimestamp(ctx.Now().MilliSeconds());
-    for (auto& pr : UsersInfoStorage.GetAll()) {
+    for (auto& pr : UsersInfoStorage->GetAll()) {
         TUserInfo& userInfo(pr.second);
         NKikimrPQ::TClientInfo& clientInfo = *result.AddClientInfo();
         clientInfo.SetClientId(pr.first);
@@ -2239,7 +2306,7 @@ TInstant TPartition::GetWriteTimeEstimate(ui64 offset) const {
 
 
 void TPartition::Handle(TEvPQ::TEvGetClientOffset::TPtr& ev, const TActorContext& ctx) {
-    auto& userInfo = UsersInfoStorage.GetOrCreate(ev->Get()->ClientId, ctx);
+    auto& userInfo = UsersInfoStorage->GetOrCreate(ev->Get()->ClientId, ctx);
     Y_VERIFY(userInfo.Offset >= -1, "Unexpected Offset: %" PRIi64, userInfo.Offset);
     ui64 offset = Max<i64>(userInfo.Offset, 0);
     auto ts = GetTime(userInfo, offset);
@@ -2284,7 +2351,7 @@ void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc
 
 void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx)
 {
-    AddDistrTx(ev->Release());
+    PushBackDistrTx(ev->Release());
 
     ProcessTxsAndUserActs(ctx);
 }
@@ -2366,7 +2433,7 @@ void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& c
 
     //make readinfo class
     TReadAnswer answer(info.FormAnswer(
-        ctx, *ev->Get(), EndOffset, Partition, &UsersInfoStorage.GetOrCreate(info.User, ctx),
+        ctx, *ev->Get(), EndOffset, Partition, &UsersInfoStorage->GetOrCreate(info.User, ctx),
         info.Destination, GetSizeLag(info.Offset), Tablet, Config.GetMeteringMode()
     ));
 
@@ -2631,7 +2698,7 @@ void TPartition::Handle(TEvPQ::TEvReadTimeout::TPtr& ev, const TActorContext& ct
     ctx.Send(Tablet, answer.Event.Release());
     LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, " waiting read cookie " << ev->Get()->Cookie
         << " partition " << Partition << " read timeout for " << res->User << " offset " << res->Offset);
-    auto& userInfo = UsersInfoStorage.GetOrCreate(res->User, ctx);
+    auto& userInfo = UsersInfoStorage->GetOrCreate(res->User, ctx);
 
     userInfo.ForgetSubscription(ctx.Now());
     OnReadRequestFinished(std::move(res.GetRef()), answer.Size);
@@ -2791,7 +2858,7 @@ void TPartition::Handle(TEvPQ::TEvRead::TPtr& ev, const TActorContext& ctx) {
 
     Y_VERIFY(read->Offset <= EndOffset);
 
-    auto& userInfo = UsersInfoStorage.GetOrCreate(user, ctx);
+    auto& userInfo = UsersInfoStorage->GetOrCreate(user, ctx);
 
     if (!read->SessionId.empty()) {
         if (userInfo.Session != read->SessionId) {
@@ -2817,7 +2884,7 @@ void TPartition::Handle(NReadSpeedLimiterEvents::TEvResponse::TPtr& ev, const TA
 void TPartition::DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const TActorContext& ctx) {
     auto read = ev->Get();
     const TString& user = read->ClientId;
-    auto& userInfo = UsersInfoStorage.GetOrCreate(user, ctx);
+    auto& userInfo = UsersInfoStorage->GetOrCreate(user, ctx);
 
     ui64 offset = read->Offset;
     if (read->PartNo == 0 && (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo.ReadFromTimestamp > TInstant::MilliSeconds(1))) {
@@ -2865,7 +2932,7 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const
 }
 
 void TPartition::OnReadRequestFinished(TReadInfo&& info, ui64 answerSize) {
-    auto userInfo = UsersInfoStorage.GetIfExists(info.User);
+    auto userInfo = UsersInfoStorage->GetIfExists(info.User);
     Y_VERIFY(userInfo);
 
     if (Config.GetMeteringMode() == NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS) {
@@ -3063,7 +3130,7 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo
 }
 
 void TPartition::ProcessTimestampsForNewData(const ui64 prevEndOffset, const TActorContext& ctx) {
-    for (auto& userInfoPair : UsersInfoStorage.GetAll()) {
+    for (auto& userInfoPair : UsersInfoStorage->GetAll()) {
         if (userInfoPair.second.Offset >= (i64)prevEndOffset && userInfoPair.second.Offset < (i64)EndOffset) {
             ReadTimestampForOffset(userInfoPair.first, userInfoPair.second, ctx);
         }
@@ -3072,7 +3139,7 @@ void TPartition::ProcessTimestampsForNewData(const ui64 prevEndOffset, const TAc
 
 void TPartition::Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext& ctx) {
     ReadingTimestamp = false;
-    auto userInfo = UsersInfoStorage.GetIfExists(ReadingForUser);
+    auto userInfo = UsersInfoStorage->GetIfExists(ReadingForUser);
     if (!userInfo || userInfo->ReadRuleGeneration != ReadingForUserReadRuleGeneration) {
         LOG_INFO_S(
             ctx, NKikimrServices::PERSQUEUE,
@@ -3138,7 +3205,7 @@ void TPartition::ProcessTimestampRead(const TActorContext& ctx) {
         TString user = UpdateUserInfoTimestamp.front().first;
         ui64 readRuleGeneration = UpdateUserInfoTimestamp.front().second;
         UpdateUserInfoTimestamp.pop_front();
-        auto userInfo = UsersInfoStorage.GetIfExists(user);
+        auto userInfo = UsersInfoStorage->GetIfExists(user);
         if (!userInfo || !userInfo->ReadScheduled || userInfo->ReadRuleGeneration != readRuleGeneration)
             continue;
         userInfo->ReadScheduled = false;
@@ -3152,7 +3219,7 @@ void TPartition::ProcessTimestampRead(const TActorContext& ctx) {
 
 void TPartition::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx) {
     ReadingTimestamp = false;
-    auto userInfo = UsersInfoStorage.GetIfExists(ReadingForUser);
+    auto userInfo = UsersInfoStorage->GetIfExists(ReadingForUser);
     if (!userInfo || userInfo->ReadRuleGeneration != ReadingForUserReadRuleGeneration) {
         ProcessTimestampRead(ctx);
         return;
@@ -3293,7 +3360,7 @@ void TPartition::ReportCounters(const TActorContext& ctx) {
     }
     // per client counters
     const auto now = ctx.Now();
-    for (auto& userInfoPair : UsersInfoStorage.GetAll()) {
+    for (auto& userInfoPair : UsersInfoStorage->GetAll()) {
         auto& userInfo = userInfoPair.second;
         if (!userInfo.LabeledCounters)
             continue;
@@ -3590,7 +3657,7 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx)
 
     for (auto& user : AffectedUsers) {
         if (auto* actual = GetPendingUserIfExists(user)) {
-            TUserInfo& userInfo = UsersInfoStorage.GetOrCreate(user, ctx);
+            TUserInfo& userInfo = UsersInfoStorage->GetOrCreate(user, ctx);
             bool offsetHasChanged = (userInfo.Offset != actual->Offset);
 
             userInfo.Session = actual->Session;
@@ -3614,12 +3681,12 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx)
                 TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1);
             }
         } else {
-            UsersInfoStorage.Remove(user, ctx);
+            UsersInfoStorage->Remove(user, ctx);
         }
     }
 
     for (auto& [consumer, important] : PendingSetImportant) {
-        if (auto* userInfo = UsersInfoStorage.GetIfExists(consumer); userInfo) {
+        if (auto* userInfo = UsersInfoStorage->GetIfExists(consumer); userInfo) {
             userInfo->SetImportant(important);
         }
     }
@@ -3642,14 +3709,19 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx)
     ProcessTxsAndUserActs(ctx);
 }
 
-void TPartition::AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event)
+void TPartition::PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event)
 {
     DistrTxs.emplace_back(std::move(event));
 }
 
-void TPartition::AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event)
+void TPartition::PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event)
 {
-    DistrTxs.emplace_back(std::move(event));
+    DistrTxs.emplace_back(std::move(event), true);
+}
+
+void TPartition::PushFrontDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event)
+{
+    DistrTxs.emplace_front(std::move(event), false);
 }
 
 void TPartition::AddImmediateTx(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction> tx)
@@ -3712,7 +3784,7 @@ void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)
     if (!DistrTxs.empty()) {
         ProcessDistrTxs(ctx);
 
-        if (!DistrTxs.empty()) {
+        if (TxInProgress) {
             return;
         }
     }
@@ -3728,6 +3800,7 @@ void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)
                           *PlanStep, *TxId);
     }
     AddCmdWriteUserInfos(request->Record);
+    AddCmdWriteConfig(request->Record);
 
     ctx.Send(Tablet, request.Release());
     UsersInfoWriteInProgress = true;
@@ -3762,7 +3835,7 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx,
             break;
         }
 
-        if (!UsersInfoStorage.GetIfExists(consumer)) {
+        if (!UsersInfoStorage->GetIfExists(consumer)) {
             predicate = false;
             break;
         }
@@ -3861,7 +3934,7 @@ void TPartition::BeginChangePartitionConfig(const TEvPQ::TEvChangePartitionConfi
 
     TSet<TString> hasReadRule;
 
-    for (auto& [consumer, info] : UsersInfoStorage.GetAll()) {
+    for (auto& [consumer, info] : UsersInfoStorage->GetAll()) {
         PendingReadFromTimestamp[consumer] = TInstant::Zero();
 
         hasReadRule.insert(consumer);
@@ -3927,17 +4000,17 @@ void TPartition::EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig&
 
     Y_VERIFY(Config.GetPartitionConfig().GetTotalPartitions() > 0);
 
-    UsersInfoStorage.UpdateConfig(event.Config);
+    UsersInfoStorage->UpdateConfig(event.Config);
 
     WriteQuota->UpdateConfig(Config.GetPartitionConfig().GetBurstSize(), Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond());
     if (AppData(ctx)->PQConfig.GetQuotingConfig().GetPartitionReadQuotaIsTwiceWriteQuota()) {
-        for (auto& userInfo : UsersInfoStorage.GetAll()) {
+        for (auto& userInfo : UsersInfoStorage->GetAll()) {
             userInfo.second.ReadQuota.UpdateConfig(Config.GetPartitionConfig().GetBurstSize() * 2, Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() * 2);
         }
     }
 
     for (const auto& readQuota : Config.GetPartitionConfig().GetReadQuota()) {
-        auto& userInfo = UsersInfoStorage.GetOrCreate(readQuota.GetClientId(), ctx);
+        auto& userInfo = UsersInfoStorage->GetOrCreate(readQuota.GetClientId(), ctx);
         userInfo.ReadQuota.UpdateConfig(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond());
     }
 
@@ -3959,10 +4032,17 @@ void TPartition::EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig&
         }
     }
 
-    SchedulePartitionConfigChanged();
+    if (SendChangeConfigReply) {
+        SchedulePartitionConfigChanged();
+    }
     ReportCounters(ctx);
 }
 
+TString TPartition::GetKeyConfig() const
+{
+    return Sprintf("_config_%u", Partition);
+}
+
 void TPartition::InitPendingUserInfoForImportantClients(const TEvPQ::TEvChangePartitionConfig& event,
                                                         const TActorContext& ctx) {
     const NKikimrPQ::TPQTabletConfig& config = event.Config;
@@ -3991,7 +4071,7 @@ void TPartition::InitPendingUserInfoForImportantClients(const TEvPQ::TEvChangePa
         //ReadTimestampForOffset(consumer, *userInfo, ctx);
     }
 
-    for (auto& [consumer, userInfo] : UsersInfoStorage.GetAll()) {
+    for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) {
         if (!important.contains(consumer) && userInfo.Important && userInfo.LabeledCounters) {
             ScheduleDropPartitionLabeledCounters(userInfo.LabeledCounters->GetGroup());
             PendingSetImportant[consumer] = false;
@@ -4020,6 +4100,7 @@ void TPartition::ProcessDistrTx(const TActorContext& ctx)
         Y_VERIFY(!ChangeConfig);
 
         ChangeConfig = t.ChangeConfig;
+        SendChangeConfigReply = t.SendReply;
         BeginChangePartitionConfig(*ChangeConfig, ctx);
 
         RemoveDistrTx();
@@ -4440,17 +4521,34 @@ void TPartition::AddCmdWriteUserInfos(NKikimrClient::TKeyValueRequest& request)
     }
 }
 
+void TPartition::AddCmdWriteConfig(NKikimrClient::TKeyValueRequest& request)
+{
+    if (!ChangeConfig) {
+        return;
+    }
+
+    TString key = GetKeyConfig();
+
+    TString data;
+    Y_VERIFY(ChangeConfig->Config.SerializeToString(&data));
+
+    auto write = request.AddCmdWrite();
+    write->SetKey(key.Data(), key.Size());
+    write->SetValue(data.Data(), data.Size());
+    write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE);
+}
+
 TUserInfo& TPartition::GetOrCreatePendingUser(const TString& user,
                                               const TActorContext& ctx,
                                               TMaybe<ui64> readRuleGeneration)
 {
-    TUserInfo* userInfo = UsersInfoStorage.GetIfExists(user);
+    TUserInfo* userInfo = UsersInfoStorage->GetIfExists(user);
 
     auto i = PendingUsersInfo.find(user);
     if (i == PendingUsersInfo.end()) {
-        auto [p, _] = PendingUsersInfo.emplace(user, UsersInfoStorage.CreateUserInfo(user,
-                                                                                     ctx,
-                                                                                     readRuleGeneration));
+        auto [p, _] = PendingUsersInfo.emplace(user, UsersInfoStorage->CreateUserInfo(user,
+                                                                                      ctx,
+                                                                                      readRuleGeneration));
 
         if (userInfo) {
             p->second.Session = userInfo->Session;
@@ -5542,7 +5640,7 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
     ui32 size = 0;
 
     Y_VERIFY(!info.User.empty());
-    auto& userInfo = UsersInfoStorage.GetOrCreate(info.User, ctx);
+    auto& userInfo = UsersInfoStorage->GetOrCreate(info.User, ctx);
 
     if (subscription) {
         userInfo.ForgetSubscription(ctx.Now());
@@ -5573,7 +5671,7 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
         LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reading cookie " << cookie << ". All data is from uncompacted head.");
 
         TReadAnswer answer(info.FormAnswer(
-            ctx, EndOffset, Partition, &UsersInfoStorage.GetOrCreate(info.User, ctx),
+            ctx, EndOffset, Partition, &UsersInfoStorage->GetOrCreate(info.User, ctx),
             info.Destination, GetSizeLag(info.Offset), Tablet, Config.GetMeteringMode()
         ));
         const auto& resp = dynamic_cast<TEvPQ::TEvProxyResponse*>(answer.Event.Get())->Response;

+ 19 - 5
ydb/core/persqueue/partition.h

@@ -48,8 +48,10 @@ struct TTransaction {
         Y_VERIFY(Tx);
     }
 
-    explicit TTransaction(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> changeConfig) :
-        ChangeConfig(changeConfig)
+    TTransaction(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> changeConfig,
+                 bool sendReply) :
+        ChangeConfig(changeConfig),
+        SendReply(sendReply)
     {
         Y_VERIFY(ChangeConfig);
     }
@@ -58,6 +60,7 @@ struct TTransaction {
     TMaybe<bool> Predicate;
 
     TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> ChangeConfig;
+    bool SendReply;
 };
 
 class TPartition : public TActorBootstrapped<TPartition> {
@@ -205,8 +208,9 @@ private:
     void ProcessTxsAndUserActs(const TActorContext& ctx);
     void ContinueProcessTxsAndUserActs(const TActorContext& ctx);
 
-    void AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event);
-    void AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event);
+    void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event);
+    void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event);
+    void PushFrontDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event);
     void RemoveDistrTx();
     void ProcessDistrTxs(const TActorContext& ctx);
     void ProcessDistrTx(const TActorContext& ctx);
@@ -249,6 +253,7 @@ private:
     void AddCmdWriteTxMeta(NKikimrClient::TKeyValueRequest& request,
                            ui64 step, ui64 txId);
     void AddCmdWriteUserInfos(NKikimrClient::TKeyValueRequest& request);
+    void AddCmdWriteConfig(NKikimrClient::TKeyValueRequest& request);
     void AddCmdDeleteRange(NKikimrClient::TKeyValueRequest& request,
                            const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated);
 
@@ -277,10 +282,15 @@ private:
                                     const TActorContext& ctx);
     void EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& event,
                                   const TActorContext& ctx);
+    TString GetKeyConfig() const;
 
     void InitPendingUserInfoForImportantClients(const TEvPQ::TEvChangePartitionConfig& event,
                                                 const TActorContext& ctx);
 
+    void RequestConfig(const TActorContext& ctx);
+    void HandleConfig(const NKikimrClient::TResponse& res, const TActorContext& ctx);
+    void Initialize(const TActorContext& ctx);
+
 public:
     static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
         return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR;
@@ -454,6 +464,7 @@ private:
     }
 private:
     enum EInitState {
+        WaitConfig,
         WaitDiskStatus,
         WaitInfoRange,
         WaitDataRange,
@@ -465,6 +476,8 @@ private:
     ui64 TabletID;
     ui32 Partition;
     NKikimrPQ::TPQTabletConfig Config;
+    NKikimrPQ::TPQTabletConfig TabletConfig;
+    const TTabletCountersBase& Counters;
     NPersQueue::TTopicConverterPtr TopicConverter;
     bool IsLocalDC;
     TString DCId;
@@ -514,7 +527,7 @@ private:
     bool IsServerless;
     TString FolderId;
 
-    TUsersInfoStorage UsersInfoStorage;
+    TMaybe<TUsersInfoStorage> UsersInfoStorage;
 
     //
     // user actions and transactions
@@ -535,6 +548,7 @@ private:
     TMaybe<ui64> TxId;
     bool TxIdHasChanged = false;
     TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> ChangeConfig;
+    bool SendChangeConfigReply = true;
     //
     //
     //

+ 259 - 44
ydb/core/persqueue/ut/partition_ut.cpp

@@ -25,15 +25,6 @@ Y_UNIT_TEST_SUITE(TPartitionTests) {
 
 namespace NHelpers {
 
-struct TCreatePartitionParams {
-    ui32 Partition = 1;
-    ui64 Begin = 0;
-    ui64 End = 0;
-    TMaybe<ui64> PlanStep;
-    TMaybe<ui64> TxId;
-    TVector<TTransaction> Transactions;
-};
-
 struct TCreateConsumerParams {
     TString Consumer;
     ui64 Offset = 0;
@@ -44,15 +35,39 @@ struct TCreateConsumerParams {
     ui64 ReadRuleGeneration = 0;
 };
 
+struct TConfigParams {
+    ui64 Version = 0;
+    TVector<TCreateConsumerParams> Consumers;
+};
+
+struct TCreatePartitionParams {
+    ui32 Partition = 1;
+    ui64 Begin = 0;
+    ui64 End = 0;
+    TMaybe<ui64> PlanStep;
+    TMaybe<ui64> TxId;
+    TVector<TTransaction> Transactions;
+    TConfigParams Config;
+};
+
 }
 
 class TPartitionFixture : public NUnitTest::TBaseFixture {
 protected:
     struct TUserInfoMatcher {
+        TMaybe<TString> Consumer;
         TMaybe<TString> Session;
         TMaybe<ui64> Offset;
         TMaybe<ui32> Generation;
         TMaybe<ui32> Step;
+        TMaybe<ui64> ReadRuleGeneration;
+    };
+
+    struct TDeleteRangeMatcher {
+        TMaybe<char> TypeInfo;
+        TMaybe<ui32> Partition;
+        TMaybe<char> Mark;
+        TMaybe<TString> Consumer;
     };
 
     struct TCmdWriteMatcher {
@@ -60,6 +75,7 @@ protected:
         TMaybe<ui64> PlanStep;
         TMaybe<ui64> TxId;
         THashMap<size_t, TUserInfoMatcher> UserInfos;
+        THashMap<size_t, TDeleteRangeMatcher> DeleteRanges;
     };
 
     struct TProxyResponseMatcher {
@@ -93,6 +109,10 @@ protected:
         TMaybe<ui32> Partition;
     };
 
+    struct TChangePartitionConfigMatcher {
+        TMaybe<ui32> Partition;
+    };
+
     struct TTxOperationMatcher {
         TMaybe<ui32> Partition;
         TMaybe<TString> Consumer;
@@ -110,16 +130,17 @@ protected:
 
     using TCreatePartitionParams = NHelpers::TCreatePartitionParams;
     using TCreateConsumerParams = NHelpers::TCreateConsumerParams;
+    using TConfigParams = NHelpers::TConfigParams;
 
     void SetUp(NUnitTest::TTestContext&) override;
     void TearDown(NUnitTest::TTestContext&) override;
 
     void CreatePartitionActor(ui32 partition,
-                              const TVector<TCreateConsumerParams>& consumers,
+                              const TConfigParams& config,
                               bool newPartition,
                               TVector<TTransaction> txs);
     void CreatePartition(const TCreatePartitionParams& params = {},
-                         const TVector<TCreateConsumerParams>& consumers = {});
+                         const TConfigParams& config = {});
 
     void CreateSession(const TString& clientId,
                        const TString& sessionId,
@@ -148,6 +169,8 @@ protected:
     void WaitProxyResponse(const TProxyResponseMatcher &matcher = {});
     void WaitErrorResponse(const TErrorMatcher& matcher = {});
 
+    void WaitConfigRequest();
+    void SendConfigResponse(const TConfigParams& config);
     void WaitDiskStatusRequest();
     void SendDiskStatusResponse();
     void WaitMetaReadRequest();
@@ -179,15 +202,26 @@ protected:
     void SendRollbackTx(ui64 step, ui64 txId);
     void WaitCommitTxDone(const TCommitTxDoneMatcher& matcher = {});
 
+    void SendChangePartitionConfig(const TConfigParams& config = {});
+    void WaitPartitionConfigChanged(const TChangePartitionConfigMatcher& matcher = {});
+
     TTransaction MakeTransaction(ui64 step, ui64 txId,
                                  TString consumer,
                                  ui64 begin, ui64 end,
                                  TMaybe<bool> predicate = Nothing());
 
+    static NKikimrPQ::TPQTabletConfig MakeConfig(ui64 version,
+                                                 const TVector<TCreateConsumerParams>& consumers);
+
     TMaybe<TTestContext> Ctx;
     TMaybe<TFinalizer> Finalizer;
 
     TActorId ActorId;
+
+    NPersQueue::TTopicConverterPtr TopicConverter;
+    NKikimrPQ::TPQTabletConfig Config;
+
+    TAutoPtr<TTabletCountersBase> TabletCounters;
 };
 
 void TPartitionFixture::SetUp(NUnitTest::TTestContext&)
@@ -203,8 +237,10 @@ void TPartitionFixture::TearDown(NUnitTest::TTestContext&)
 {
 }
 
+
+
 void TPartitionFixture::CreatePartitionActor(ui32 id,
-                                             const TVector<TCreateConsumerParams>& consumers,
+                                             const TConfigParams& config,
                                              bool newPartition,
                                              TVector<TTransaction> txs)
 {
@@ -225,46 +261,42 @@ void TPartitionFixture::CreatePartitionActor(ui32 id,
     >;
 
     TAutoPtr<TCounters> counters(new TCounters());
-    TAutoPtr<TTabletCountersBase> tabletCounters = counters->GetSecondTabletCounters().Release();
-
-    NPersQueue::TTopicNamesConverterFactory factory(true, "/Root/PQ", "dc1");
-    NPersQueue::TTopicConverterPtr topicConverter;
-    NKikimrPQ::TPQTabletConfig config;
+    TabletCounters = counters->GetSecondTabletCounters().Release();
 
-    for (auto& c : consumers) {
-        config.AddReadRules(c.Consumer);
-    }
-
-    config.SetTopicName("rt3.dc1--account--topic");
-    config.SetTopicPath("/Root/PQ/rt3.dc1--account--topic");
-    config.SetFederationAccount("account");
-    config.SetLocalDC(true);
-    config.SetYdbDatabasePath("");
+    Config = MakeConfig(config.Version,
+                        config.Consumers);
 
-    topicConverter = factory.MakeTopicConverter(config);
+    NPersQueue::TTopicNamesConverterFactory factory(true, "/Root/PQ", "dc1");
+    TopicConverter = factory.MakeTopicConverter(Config);
 
     auto actor = new NPQ::TPartition(Ctx->TabletId,
                                      id,
                                      Ctx->Edge,
                                      Ctx->Edge,
-                                     topicConverter,
+                                     TopicConverter,
                                      true,
                                      "dcId",
                                      false,
-                                     config,
-                                     *tabletCounters,
+                                     Config,
+                                     *TabletCounters,
                                      newPartition,
                                      std::move(txs));
     ActorId = Ctx->Runtime->Register(actor);
 }
 
 void TPartitionFixture::CreatePartition(const TCreatePartitionParams& params,
-                                        const TVector<TCreateConsumerParams>& consumers)
+                                        const TConfigParams& config)
 {
     if ((params.Begin == 0) && (params.End == 0)) {
-        CreatePartitionActor(params.Partition, consumers, true, {});
+        CreatePartitionActor(params.Partition, config, true, {});
+
+        WaitConfigRequest();
+        SendConfigResponse(params.Config);
     } else {
-        CreatePartitionActor(params.Partition, consumers, false, params.Transactions);
+        CreatePartitionActor(params.Partition, config, false, params.Transactions);
+
+        WaitConfigRequest();
+        SendConfigResponse(params.Config);
 
         WaitDiskStatusRequest();
         SendDiskStatusResponse();
@@ -273,7 +305,7 @@ void TPartitionFixture::CreatePartition(const TCreatePartitionParams& params,
         SendMetaReadResponse(params.PlanStep, params.TxId);
 
         WaitInfoRangeRequest();
-        SendInfoRangeResponse(params.Partition, consumers);
+        SendInfoRangeResponse(params.Partition, params.Config.Consumers);
 
         WaitDataRangeRequest();
         SendDataRangeResponse(params.Begin, params.End);
@@ -349,8 +381,13 @@ void TPartitionFixture::WaitCmdWrite(const TCmdWriteMatcher& matcher)
     UNIT_ASSERT_VALUES_EQUAL(event->Record.GetCookie(), 1);             // SET_OFFSET_COOKIE
 
     if (matcher.Count.Defined()) {
-        UNIT_ASSERT_VALUES_EQUAL(*matcher.Count, event->Record.CmdWriteSize());
+        UNIT_ASSERT_VALUES_EQUAL(*matcher.Count,
+                                 event->Record.CmdWriteSize() + event->Record.CmdDeleteRangeSize());
     }
+
+    //
+    // TxMeta
+    //
     if (matcher.PlanStep.Defined()) {
         NKikimrPQ::TPartitionTxMeta meta;
         UNIT_ASSERT(meta.ParseFromString(event->Record.GetCmdWrite(0).GetValue()));
@@ -363,10 +400,12 @@ void TPartitionFixture::WaitCmdWrite(const TCmdWriteMatcher& matcher)
 
         UNIT_ASSERT_VALUES_EQUAL(*matcher.TxId, meta.GetTxId());
     }
+
+    //
+    // CmdWrite
+    //
     for (auto& [index, userInfo] : matcher.UserInfos) {
-        if (matcher.Count.Defined()) {
-            UNIT_ASSERT(index < *matcher.Count);
-        }
+        UNIT_ASSERT(index < event->Record.CmdWriteSize());
 
         NKikimrPQ::TUserInfo ud;
         UNIT_ASSERT(ud.ParseFromString(event->Record.GetCmdWrite(index).GetValue()));
@@ -387,6 +426,31 @@ void TPartitionFixture::WaitCmdWrite(const TCmdWriteMatcher& matcher)
             UNIT_ASSERT(ud.HasOffset());
             UNIT_ASSERT_VALUES_EQUAL(*userInfo.Offset, ud.GetOffset());
         }
+        if (userInfo.ReadRuleGeneration) {
+            UNIT_ASSERT(ud.HasReadRuleGeneration());
+            UNIT_ASSERT_VALUES_EQUAL(*userInfo.ReadRuleGeneration, ud.GetReadRuleGeneration());
+        }
+    }
+
+    //
+    // CmdDeleteRange
+    //
+    for (auto& [index, deleteRange] : matcher.DeleteRanges) {
+        UNIT_ASSERT(index < event->Record.CmdDeleteRangeSize());
+        UNIT_ASSERT(event->Record.GetCmdDeleteRange(index).HasRange());
+
+        auto& range = event->Record.GetCmdDeleteRange(index).GetRange();
+        TString key = range.GetFrom();
+        UNIT_ASSERT(key.Size() > (1 + 10 + 1)); // type + partition + mark + consumer
+
+        if (deleteRange.Partition.Defined()) {
+            auto partition = FromString<ui32>(key.substr(1, 10));
+            UNIT_ASSERT_VALUES_EQUAL(*deleteRange.Partition, partition);
+        }
+        if (deleteRange.Consumer.Defined()) {
+            TString consumer = key.substr(12);
+            UNIT_ASSERT_VALUES_EQUAL(*deleteRange.Consumer, consumer);
+        }
     }
 }
 
@@ -453,6 +517,35 @@ void TPartitionFixture::WaitErrorResponse(const TErrorMatcher& matcher)
     }
 }
 
+void TPartitionFixture::WaitConfigRequest()
+{
+    auto event = Ctx->Runtime->GrabEdgeEvent<TEvKeyValue::TEvRequest>();
+    UNIT_ASSERT(event != nullptr);
+
+    UNIT_ASSERT_VALUES_EQUAL(event->Record.CmdReadSize(), 1);
+}
+
+void TPartitionFixture::SendConfigResponse(const TConfigParams& config)
+{
+    auto event = MakeHolder<TEvKeyValue::TEvResponse>();
+    event->Record.SetStatus(NMsgBusProxy::MSTATUS_OK);
+
+    auto read = event->Record.AddReadResult();
+    if (config.Consumers.empty()) {
+        read->SetStatus(NKikimrProto::NODATA);
+    } else {
+        read->SetStatus(NKikimrProto::OK);
+
+        TString out;
+        Y_VERIFY(MakeConfig(config.Version,
+                            config.Consumers).SerializeToString(&out));
+
+        read->SetValue(out);
+    }
+
+    Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
+}
+
 void TPartitionFixture::WaitDiskStatusRequest()
 {
     auto event = Ctx->Runtime->GrabEdgeEvent<TEvKeyValue::TEvRequest>();
@@ -688,6 +781,23 @@ void TPartitionFixture::WaitCommitTxDone(const TCommitTxDoneMatcher& matcher)
     }
 }
 
+void TPartitionFixture::SendChangePartitionConfig(const TConfigParams& config)
+{
+    auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(TopicConverter, MakeConfig(config.Version,
+                                                                                        config.Consumers));
+    Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
+}
+
+void TPartitionFixture::WaitPartitionConfigChanged(const TChangePartitionConfigMatcher& matcher)
+{
+    auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvPartitionConfigChanged>();
+    UNIT_ASSERT(event != nullptr);
+
+    if (matcher.Partition) {
+        UNIT_ASSERT_VALUES_EQUAL(*matcher.Partition, event->Partition);
+    }
+}
+
 TTransaction TPartitionFixture::MakeTransaction(ui64 step, ui64 txId,
                                                 TString consumer,
                                                 ui64 begin, ui64 end,
@@ -699,6 +809,27 @@ TTransaction TPartitionFixture::MakeTransaction(ui64 step, ui64 txId,
     return TTransaction(event, predicate);
 }
 
+NKikimrPQ::TPQTabletConfig TPartitionFixture::MakeConfig(ui64 version,
+                                                         const TVector<TCreateConsumerParams>& consumers)
+{
+    NKikimrPQ::TPQTabletConfig config;
+
+    config.SetVersion(version);
+
+    for (auto& c : consumers) {
+        config.AddReadRules(c.Consumer);
+        config.AddReadRuleGenerations(c.Generation);
+    }
+
+    config.SetTopicName("rt3.dc1--account--topic");
+    config.SetTopicPath("/Root/PQ/rt3.dc1--account--topic");
+    config.SetFederationAccount("account");
+    config.SetLocalDC(true);
+    config.SetYdbDatabasePath("");
+
+    return config;
+}
+
 Y_UNIT_TEST_F(Batching, TPartitionFixture)
 {
     CreatePartition();
@@ -986,12 +1117,14 @@ Y_UNIT_TEST_F(AfterRestart_1, TPartitionFixture)
     txs.push_back(MakeTransaction(step, 11111, consumer, 0, 2, true));
     txs.push_back(MakeTransaction(step, 22222, consumer, 2, 4));
 
-    CreatePartition({.Partition=partition,
+    CreatePartition({
+                    .Partition=partition,
                     .Begin=begin,
                     .End=end,
                     .PlanStep=step, .TxId=10000,
-                    .Transactions=std::move(txs)},
-                    {{.Consumer=consumer, .Offset=0, .Session=session}});
+                    .Transactions=std::move(txs),
+                    .Config={.Consumers={{.Consumer=consumer, .Offset=0, .Session=session}}}
+                    });
 
     SendCommitTx(step, 11111);
 
@@ -1015,12 +1148,14 @@ Y_UNIT_TEST_F(AfterRestart_2, TPartitionFixture)
     txs.push_back(MakeTransaction(step, 11111, consumer, 0, 2));
     txs.push_back(MakeTransaction(step, 22222, consumer, 2, 4));
 
-    CreatePartition({.Partition=partition,
+    CreatePartition({
+                    .Partition=partition,
                     .Begin=begin,
                     .End=end,
                     .PlanStep=step, .TxId=10000,
-                    .Transactions=std::move(txs)},
-                    {{.Consumer=consumer, .Offset=0, .Session=session}});
+                    .Transactions=std::move(txs),
+                    .Config={.Consumers={{.Consumer=consumer, .Offset=0, .Session=session}}}
+                    });
 
     WaitCalcPredicateResult({.Step=step, .TxId=11111, .Partition=partition, .Predicate=true});
 }
@@ -1085,6 +1220,86 @@ Y_UNIT_TEST_F(CorrectRange_Rollback, TPartitionFixture)
     WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=partition, .Predicate=true});
 }
 
+Y_UNIT_TEST_F(ChangeConfig, TPartitionFixture)
+{
+    const ui32 partition = 3;
+    const ui64 begin = 0;
+    const ui64 end = 10;
+    const TString client = "client";
+    const TString session = "session";
+
+    const ui64 step = 12345;
+    const ui64 txId_1 = 67890;
+    const ui64 txId_2 = 67891;
+
+    CreatePartition({
+                    .Partition=partition, .Begin=begin, .End=end,
+                    .Config={.Consumers={
+                    {.Consumer="client-1", .Offset=0, .Session="session-1"},
+                    {.Consumer="client-2", .Offset=0, .Session="session-2"},
+                    {.Consumer="client-3", .Offset=0, .Session="session-3"}
+                    }}
+                    });
+
+    SendCalcPredicate(step, txId_1, "client-1", 0, 2);
+    SendChangePartitionConfig({.Version=2,
+                              .Consumers={
+                              {.Consumer="client-1", .Generation=0},
+                              {.Consumer="client-3", .Generation=7}
+                              }});
+    //
+    // consumer 'client-2' will be deleted
+    //
+    SendCalcPredicate(step, txId_2, "client-2", 0, 2);
+
+    WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=partition, .Predicate=true});
+    SendCommitTx(step, txId_1);
+
+    //
+    // consumer 'client-2' was deleted
+    //
+    WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=partition, .Predicate=false});
+    SendRollbackTx(step, txId_2);
+
+    WaitCmdWrite({.Count=8,
+                 .PlanStep=step, .TxId=txId_2,
+                 .UserInfos={
+                 {1, {.Consumer="client-1", .Session="", .Offset=2}},
+                 {3, {.Consumer="client-3", .Session="", .Offset=0, .ReadRuleGeneration=7}}
+                 },
+                 .DeleteRanges={
+                 {0, {.Partition=3, .Consumer="client-2"}}
+                 }});
+    SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
+
+    WaitPartitionConfigChanged({.Partition=partition});
+}
+
+Y_UNIT_TEST_F(TabletConfig_Is_Newer_That_PartitionConfig, TPartitionFixture)
+{
+    CreatePartition({
+                    .Partition=3,
+                    .Begin=0, .End=10,
+                    //
+                    // конфиг партиции
+                    //
+                    .Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}}
+                    },
+                    //
+                    // конфиг таблетки
+                    //
+                    {.Version=2, .Consumers={{.Consumer="client-2"}}});
+
+    WaitCmdWrite({.Count=5,
+                 .UserInfos={
+                 {0, {.Consumer="client-2", .Session="", .Offset=0, .ReadRuleGeneration=0}}
+                 },
+                 .DeleteRanges={
+                 {0, {.Partition=3, .Consumer="client-1"}}
+                 }});
+    SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
+}
+
 }
 
 }