Просмотр исходного кода

Restoring authorship annotation for <mokhotskii@yandex-team.ru>. Commit 1 of 2.

mokhotskii 3 лет назад
Родитель
Сommit
edfa737d40

+ 4 - 4
contrib/python/botocore/ya.make

@@ -568,10 +568,10 @@ RESOURCE_FILES(
     #botocore/data/kinesis-video-media/2017-09-30/service-2.json
     #botocore/data/kinesis-video-signaling/2019-12-04/paginators-1.json
     #botocore/data/kinesis-video-signaling/2019-12-04/service-2.json
-    botocore/data/kinesis/2013-12-02/examples-1.json
-    botocore/data/kinesis/2013-12-02/paginators-1.json
-    botocore/data/kinesis/2013-12-02/service-2.json
-    botocore/data/kinesis/2013-12-02/waiters-2.json
+    botocore/data/kinesis/2013-12-02/examples-1.json 
+    botocore/data/kinesis/2013-12-02/paginators-1.json 
+    botocore/data/kinesis/2013-12-02/service-2.json 
+    botocore/data/kinesis/2013-12-02/waiters-2.json 
     #botocore/data/kinesisanalytics/2015-08-14/examples-1.json
     #botocore/data/kinesisanalytics/2015-08-14/paginators-1.json
     #botocore/data/kinesisanalytics/2015-08-14/service-2.json

+ 25 - 25
ydb/core/client/server/msgbus_server_pq_metacache.cpp

@@ -26,21 +26,21 @@ IActor* CreateSchemeCache(NActors::TActorSystem* ActorSystem, TIntrusivePtr<NMon
 class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheActor> {
     using TBase = TActorBootstrapped<TPersQueueMetaCacheActor>;
 public:
-    TPersQueueMetaCacheActor(TPersQueueMetaCacheActor&&) = default;
-    TPersQueueMetaCacheActor& operator=(TPersQueueMetaCacheActor&&) = default;
-
-    TPersQueueMetaCacheActor(ui64 grpcPort,
-                             const NMonitoring::TDynamicCounterPtr& counters,
-                             const TDuration& versionCheckInterval)
+    TPersQueueMetaCacheActor(TPersQueueMetaCacheActor&&) = default; 
+    TPersQueueMetaCacheActor& operator=(TPersQueueMetaCacheActor&&) = default; 
+ 
+    TPersQueueMetaCacheActor(ui64 grpcPort, 
+                             const NMonitoring::TDynamicCounterPtr& counters, 
+                             const TDuration& versionCheckInterval) 
         : Counters(counters)
-        , ClientWrapper(std::move(std::make_unique<TClientWrapper>(grpcPort)))
+        , ClientWrapper(std::move(std::make_unique<TClientWrapper>(grpcPort))) 
         , VersionCheckInterval(versionCheckInterval)
         , Generation(std::make_shared<TAtomicCounter>())
     {
     }
 
-    TPersQueueMetaCacheActor(const NMonitoring::TDynamicCounterPtr& counters,
-                             const TDuration& versionCheckInterval)
+    TPersQueueMetaCacheActor(const NMonitoring::TDynamicCounterPtr& counters, 
+                             const TDuration& versionCheckInterval) 
         : Counters(counters)
         , VersionCheckInterval(versionCheckInterval)
         , Generation(std::make_shared<TAtomicCounter>())
@@ -58,7 +58,7 @@ public:
                 Die(ctx);
                 return;
             }
-            ClientWrapper.reset(new TClientWrapper(driver));
+            ClientWrapper.reset(new TClientWrapper(driver)); 
         }
         SkipVersionCheck = !AppData(ctx)->PQConfig.GetMetaCacheSkipVersionCheck();
         PathPrefix = TopicPrefix(ctx);
@@ -358,7 +358,7 @@ private:
 
 
     void SendSchemeCacheRequest(const TVector<TString>& topics, bool addDefaultPathPrefix, const TActorContext& ctx) {
-        auto schemeCacheRequest = std::make_unique<NSchemeCache::TSchemeCacheNavigate>(++RequestId);
+        auto schemeCacheRequest = std::make_unique<NSchemeCache::TSchemeCacheNavigate>(++RequestId); 
         for (const auto& path : topics) {
             auto split = NKikimr::SplitPath(path);
             Y_VERIFY(!split.empty());
@@ -372,7 +372,7 @@ private:
             entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTopic;
             schemeCacheRequest->ResultSet.emplace_back(std::move(entry));
         }
-        ctx.Send(SchemeCacheId, new TEvTxProxySchemeCache::TEvNavigateKeySet(schemeCacheRequest.release()));
+        ctx.Send(SchemeCacheId, new TEvTxProxySchemeCache::TEvNavigateKeySet(schemeCacheRequest.release())); 
     }
 
     void SendListTopicsResponse(const TActorId& recipient, const TActorContext& ctx) {
@@ -444,11 +444,11 @@ private:
 
     class TClientWrapper {
     public:
-        TClientWrapper(const TClientWrapper&) = delete;
-        TClientWrapper& operator=(const TClientWrapper&) = delete;
-        TClientWrapper(TClientWrapper&&) = default;
-        TClientWrapper& operator=(TClientWrapper&&) = default;
-
+        TClientWrapper(const TClientWrapper&) = delete; 
+        TClientWrapper& operator=(const TClientWrapper&) = delete; 
+        TClientWrapper(TClientWrapper&&) = default; 
+        TClientWrapper& operator=(TClientWrapper&&) = default; 
+ 
         TClientWrapper(ui64 driverPort)
             : DriverPort(driverPort)
         {}
@@ -464,18 +464,18 @@ private:
                     auto driverConfig = NYdb::TDriverConfig().SetEndpoint(endpoint);
                     DriverHolder.Reset(new NYdb::TDriver(driverConfig));
                     Driver = DriverHolder.Get();
-                    TableClient.reset(new NYdb::NTable::TTableClient(*Driver));
+                    TableClient.reset(new NYdb::NTable::TTableClient(*Driver)); 
                 }
             } else if (Driver != nullptr) {
-                TableClient.reset(new NYdb::NTable::TTableClient(*Driver));
+                TableClient.reset(new NYdb::NTable::TTableClient(*Driver)); 
             }
         }
-
+ 
         NYdb::NTable::TTableClient* GetClient() {
             Y_VERIFY(TableClient);
-            return TableClient.get();
+            return TableClient.get(); 
         }
-
+ 
         void Stop() {
             if (DriverHolder != nullptr) {
                 TableClient->Stop();
@@ -489,15 +489,15 @@ private:
         THolder<NYdb::TDriver> DriverHolder;
         NYdb::TDriver* Driver = nullptr;
         TMaybe<ui64> DriverPort;
-        std::unique_ptr<NYdb::NTable::TTableClient> TableClient;
-    };
+        std::unique_ptr<NYdb::NTable::TTableClient> TableClient; 
+    }; 
 
     NMonitoring::TDynamicCounterPtr Counters;
     NActors::TActorSystem* ActorSystem;
     TString VersionQuery;
     TString TopicsQuery;
 
-    std::unique_ptr<TClientWrapper> ClientWrapper;
+    std::unique_ptr<TClientWrapper> ClientWrapper; 
     TAsyncCreateSessionResult SessionFuture;
     TMaybe<TSession> YdbSession;
     TAsyncPrepareQueryResult TopicsQueryFuture;

+ 6 - 6
ydb/core/grpc_services/rpc_scheme_base.h

@@ -10,7 +10,7 @@ namespace NGRpcService {
 
 template <typename TDerived, typename TRequest>
 class TRpcSchemeRequestActor : public TRpcOperationRequestActor<TDerived, TRequest> {
-protected:
+protected: 
     using TBase = TRpcOperationRequestActor<TDerived, TRequest>;
 
 public:
@@ -128,17 +128,17 @@ protected:
 
     void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, const TActorContext& ctx) {
         NTabletPipe::CloseClient(ctx, SchemePipeActorId_);
-        return this->ReplyNotifyTxCompletionResult(ev, ctx);
+        return this->ReplyNotifyTxCompletionResult(ev, ctx); 
     }
 
     void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered::TPtr&, const TActorContext&) {
     }
 
     virtual void ReplyNotifyTxCompletionResult(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, const TActorContext& ctx) {
-        Y_UNUSED(ev);
-        return this->Reply(Ydb::StatusIds::SUCCESS, ctx);
-    }
-
+        Y_UNUSED(ev); 
+        return this->Reply(Ydb::StatusIds::SUCCESS, ctx); 
+    } 
+ 
 private:
     TActorId SchemePipeActorId_;
 };

+ 7 - 7
ydb/core/persqueue/event_helpers.cpp

@@ -20,8 +20,8 @@ void ReplyPersQueueError(
     NKikimrServices::EServiceKikimr service,
     const ui64 responseCookie,
     NPersQueue::NErrorCode::EErrorCode errorCode,
-    const TString& error,
-    bool logDebug
+    const TString& error, 
+    bool logDebug 
 ) {
     if (errorCode == NPersQueue::NErrorCode::BAD_REQUEST) {
         counters.Cumulative()[COUNTER_PQ_BAD_REQUEST].Increment(1);
@@ -36,11 +36,11 @@ void ReplyPersQueueError(
     }
     logStr << " error: " << error;
 
-    if (logDebug) {
-        LOG_DEBUG_S(ctx, service, logStr);
-    } else {
-        LOG_WARN_S(ctx, service, logStr);
-    }
+    if (logDebug) { 
+        LOG_DEBUG_S(ctx, service, logStr); 
+    } else { 
+        LOG_WARN_S(ctx, service, logStr); 
+    } 
     ctx.Send(dstActor, new TEvPQ::TEvError(errorCode, error, responseCookie));
 }
 

+ 2 - 2
ydb/core/persqueue/event_helpers.h

@@ -19,8 +19,8 @@ void ReplyPersQueueError(
     NKikimrServices::EServiceKikimr service,
     const ui64 responseCookie,
     NPersQueue::NErrorCode::EErrorCode errorCode,
-    const TString& error,
-    bool logDebug = false
+    const TString& error, 
+    bool logDebug = false 
 );
 
 }// NPQ

+ 8 - 8
ydb/core/persqueue/events/internal.h

@@ -131,7 +131,7 @@ struct TEvPQ {
             ui32 UncompressedSize;
             TString PartitionKey;
             TString ExplicitHashKey;
-            bool External;
+            bool External; 
         };
 
         TEvWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, TVector<TMsg> &&msgs, bool isDirectWrite)
@@ -149,7 +149,7 @@ struct TEvPQ {
         TMaybe<ui64> Offset;
         TVector<TMsg> Msgs;
         bool IsDirectWrite;
-
+ 
     };
 
     struct TEvReadTimeout : public TEventLocal<TEvReadTimeout, EvReadTimeout> {
@@ -161,10 +161,10 @@ struct TEvPQ {
     };
 
     struct TEvRead : public TEventLocal<TEvRead, EvRead> {
-        TEvRead(const ui64 cookie, const ui64 offset, const ui16 partNo, const ui32 count,
-                const TString& sessionId, const TString& clientId, const ui32 timeout, const ui32 size,
-                const ui32 maxTimeLagMs, const ui64 readTimestampMs, const TString& clientDC,
-                bool externalOperation)
+        TEvRead(const ui64 cookie, const ui64 offset, const ui16 partNo, const ui32 count, 
+                const TString& sessionId, const TString& clientId, const ui32 timeout, const ui32 size, 
+                const ui32 maxTimeLagMs, const ui64 readTimestampMs, const TString& clientDC, 
+                bool externalOperation) 
         : Cookie(cookie)
         , Offset(offset)
         , PartNo(partNo)
@@ -176,7 +176,7 @@ struct TEvPQ {
         , MaxTimeLagMs(maxTimeLagMs)
         , ReadTimestampMs(readTimestampMs)
         , ClientDC(clientDC)
-        , ExternalOperation(externalOperation)
+        , ExternalOperation(externalOperation) 
         {}
 
         ui64 Cookie;
@@ -190,7 +190,7 @@ struct TEvPQ {
         ui32 MaxTimeLagMs;
         ui64 ReadTimestampMs;
         TString ClientDC;
-        bool ExternalOperation;
+        bool ExternalOperation; 
     };
 
     struct TEvMonRequest : public TEventLocal<TEvMonRequest, EvMonRequest> {

+ 3 - 3
ydb/core/persqueue/mirrorer.cpp

@@ -61,7 +61,7 @@ void TMirrorer::Bootstrap(const TActorContext& ctx) {
             TString suffix = LocalDC ? "Remote" : "Internal";
             MirrorerErrors = NKikimr::NPQ::TMultiCounter(
                 GetServiceCounters(counters, "pqproxy|writeSession"),
-                GetLabels(TopicName), {}, {"MirrorerErrors" + suffix}, true
+                GetLabels(TopicName), {}, {"MirrorerErrors" + suffix}, true 
             );
             MirrorerTimeLags = THolder<TPercentileCounter>(new TPercentileCounter(
                 GetServiceCounters(counters, "pqproxy|mirrorWriteTimeLag"),
@@ -71,11 +71,11 @@ void TMirrorer::Bootstrap(const TActorContext& ctx) {
             ));
             InitTimeoutCounter = NKikimr::NPQ::TMultiCounter(
                 GetServiceCounters(counters, "pqproxy|writeSession"),
-                GetLabels(TopicName), {}, {"MirrorerInitTimeout" + suffix}, true
+                GetLabels(TopicName), {}, {"MirrorerInitTimeout" + suffix}, true 
             );
             WriteTimeoutCounter = NKikimr::NPQ::TMultiCounter(
                 GetServiceCounters(counters, "pqproxy|writeSession"),
-                {}, {}, {"MirrorerWriteTimeout"}, true, "sensor", false
+                {}, {}, {"MirrorerWriteTimeout"}, true, "sensor", false 
             );
         }
     }

+ 248 - 248
ydb/core/persqueue/partition.cpp

@@ -14,11 +14,11 @@
 #include <ydb/core/protos/msgbus.pb.h>
 #include <ydb/library/persqueue/topic_parser/topic_parser.h>
 #include <ydb/public/lib/base/msgbus.h>
-#include <library/cpp/html/pcdata/pcdata.h>
-#include <library/cpp/monlib/service/pages/templates.h>
+#include <library/cpp/html/pcdata/pcdata.h> 
+#include <library/cpp/monlib/service/pages/templates.h> 
 #include <library/cpp/time_provider/time_provider.h>
-#include <util/folder/path.h>
-#include <util/string/escape.h>
+#include <util/folder/path.h> 
+#include <util/string/escape.h> 
 #include <util/system/byteorder.h>
 
 #define VERIFY_RESULT_BLOB(blob, pos) \
@@ -243,7 +243,7 @@ void TPartition::ReplyError(const TActorContext& ctx, const ui64 dst, NPersQueue
 {
     ReplyPersQueueError(
         dst == 0 ? ctx.SelfID : Tablet, ctx, TabletID, TopicName, Partition, Counters, NKikimrServices::PERSQUEUE,
-        dst, errorCode, error, true
+        dst, errorCode, error, true 
     );
 }
 
@@ -474,10 +474,10 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
     , BodySize(0)
     , MaxWriteResponsesSize(0)
     , GapSize(0)
-    , CloudId(config.GetYcCloudId())
-    , DbId(config.GetYdbDatabaseId())
-    , FolderId(config.GetYcFolderId())
-    , UsersInfoStorage(DCId, TabletID, TopicName, Partition, counters, Config, CloudId, DbId, FolderId)
+    , CloudId(config.GetYcCloudId()) 
+    , DbId(config.GetYdbDatabaseId()) 
+    , FolderId(config.GetYcFolderId()) 
+    , UsersInfoStorage(DCId, TabletID, TopicName, Partition, counters, Config, CloudId, DbId, FolderId) 
     , ReadingTimestamp(false)
     , SetOffsetCookie(0)
     , Cookie(0)
@@ -490,10 +490,10 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
     , WriteCycleStartTime(ctx.Now())
     , WriteCycleSize(0)
     , WriteNewSize(0)
-    , WriteNewSizeInternal(0)
+    , WriteNewSizeInternal(0) 
     , WriteNewSizeUncompressed(0)
     , WriteNewMessages(0)
-    , WriteNewMessagesInternal(0)
+    , WriteNewMessagesInternal(0) 
     , DiskIsFull(false)
     , HasDataReqNum(0)
     , WriteQuota(Config.GetPartitionConfig().GetBurstSize(), Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(), ctx.Now())
@@ -749,7 +749,7 @@ void TPartition::Bootstrap(const TActorContext& ctx)
     }
 
     LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "boostrapping " << Partition << " " << ctx.SelfID);
-
+ 
     if (NewPartition) {
         InitComplete(ctx);
     } else {
@@ -757,164 +757,164 @@ void TPartition::Bootstrap(const TActorContext& ctx)
         RequestDiskStatus(ctx, Tablet, Config.GetPartitionConfig().GetNumChannels());
         Become(&TThis::StateInit);
     }
-
+ 
     if (AppData(ctx)->Counters) {
-        TVector<NPQ::TLabelsInfo> labels;
-        if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) {
-            SetupStreamCounters(ctx);
-        } else {
-            if (TopicName.find("--") == TString::npos)
-                return;
-            SetupTopicCounters(ctx);
-        }
-    }
-}
-
-void TPartition::SetupTopicCounters(const TActorContext& ctx) {
-    auto counters = AppData(ctx)->Counters;
-    auto labels = NKikimr::NPQ::GetLabels(TopicName);
-    const TString suffix = LocalDC ? "Original" : "Mirrored";
-
-    WriteBufferIsFullCounter.SetCounter(
-        GetCounters(counters, "writingTime", TopicName),
-            {{"host", DCId},
-            {"Partition", ToString<ui32>(Partition)}},
-            {"sensor", "BufferFullTime" + suffix, true});
-
-    InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
-        GetServiceCounters(counters, "pqproxy|writeTimeLag"), GetLabels(TopicName),
-            {{"sensor", "TimeLags" + suffix}}, "Interval",
-            TVector<std::pair<ui64, TString>>{
-                {100, "100ms"}, {200, "200ms"}, {500, "500ms"}, {1000, "1000ms"},
-                {2000, "2000ms"}, {5000, "5000ms"}, {10'000, "10000ms"}, {30'000, "30000ms"},
-                {60'000, "60000ms"}, {180'000,"180000ms"}, {9'999'999, "999999ms"}}, true));
-
-
-    MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
-        GetServiceCounters(counters, "pqproxy|writeInfo"), GetLabels(TopicName),
-            {{"sensor", "MessageSize" + suffix}}, "Size",
-            TVector<std::pair<ui64, TString>>{
-                {1024, "1kb"}, {5120, "5kb"}, {10240, "10kb"},
-                {20'480, "20kb"}, {51'200, "50kb"}, {102'400, "100kb"}, {204'800, "200kb"},
-                {524'288, "512kb"},{1'048'576, "1024kb"}, {2'097'152,"2048kb"}, {5'242'880, "5120kb"},
-                {10'485'760, "10240kb"}, {67'108'864, "65536kb"}, {999'999'999, "99999999kb"}}, true));
-
-    BytesWritten = NKikimr::NPQ::TMultiCounter(GetServiceCounters(counters, "pqproxy|writeSession"),
-                                               GetLabels(TopicName), {}, {"BytesWritten" + suffix}, true);
-    BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(GetServiceCounters(counters, "pqproxy|writeSession"),
-                                                           GetLabels(TopicName), {}, {"UncompressedBytesWritten" + suffix}, true);
-
-    BytesWrittenComp = NKikimr::NPQ::TMultiCounter(GetServiceCounters(counters, "pqproxy|writeSession"),
-                                                   GetLabels(TopicName), {}, {"CompactedBytesWritten" + suffix}, true);
-
-    MsgsWritten = NKikimr::NPQ::TMultiCounter(GetServiceCounters(counters, "pqproxy|writeSession"),
-                                              GetLabels(TopicName), {}, {"MessagesWritten" + suffix}, true);
-
-    TVector<NPQ::TLabelsInfo> aggr = {{{{"Account", NPersQueue::GetAccount(TopicName)}}, {"total"}}};
-    ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs();
-    auto subGroup = GetServiceCounters(counters, "pqproxy|SLI");
-    WriteLatency = NKikimr::NPQ::CreateSLIDurationCounter(subGroup, aggr, "Write", border,
-                                                          {100, 200, 500, 1000, 1500, 2000,
-                                                           5000, 10'000, 30'000, 99'999'999});
-    SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "sensor", false);
-    WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "sensor", false);
-    if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) {
-        TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
-            new NKikimr::NPQ::TPercentileCounter(
-                GetServiceCounters(counters, "pqproxy|topicWriteQuotaWait"), GetLabels(TopicName),
-                    {{"sensor", "TopicWriteQuotaWait" + suffix}}, "Interval",
-                        TVector<std::pair<ui64, TString>>{
-                            {0, "0ms"}, {1, "1ms"}, {5, "5ms"}, {10, "10ms"},
-                            {20, "20ms"}, {50, "50ms"}, {100, "100ms"}, {500, "500ms"},
-                            {1000, "1000ms"}, {2500, "2500ms"}, {5000, "5000ms"},
-                            {10'000, "10000ms"}, {9'999'999, "999999ms"}}, true));
-    }
-
-    PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
-        new NKikimr::NPQ::TPercentileCounter(GetServiceCounters(counters, "pqproxy|partitionWriteQuotaWait"),
-            GetLabels(TopicName), {{"sensor", "PartitionWriteQuotaWait" + suffix}}, "Interval",
-                TVector<std::pair<ui64, TString>>{
-                    {0, "0ms"}, {1, "1ms"}, {5, "5ms"}, {10, "10ms"},
-                    {20, "20ms"}, {50, "50ms"}, {100, "100ms"}, {500, "500ms"},
-                    {1000, "1000ms"}, {2500, "2500ms"}, {5000, "5000ms"},
-                    {10'000, "10000ms"}, {9'999'999, "999999ms"}}, true));
-}
-
-void TPartition::SetupStreamCounters(const TActorContext& ctx) {
-    auto counters = AppData(ctx)->Counters;
-    auto labels = NKikimr::NPQ::GetLabelsForStream(TopicName, CloudId, DbId, FolderId);
-
-    WriteBufferIsFullCounter.SetCounter(
-        GetCountersForStream(counters, "writingTime"),
-        {{"host", DCId},
-         {"partition", ToString<ui32>(Partition)},
-         {"stream", TopicName}},
-        {"name", "stream.internal_write.buffer_brimmed_duration_ms", true});
-
-    InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
-        NKikimr::NPQ::GetCountersForStream(counters, "writeTimeLag"), labels,
-                    {{"name", "stream.internal_write.time_lags_milliseconds"}}, "bin",
-                    TVector<std::pair<ui64, TString>>{
-                        {100, "100"}, {200, "200"}, {500, "500"},
-                        {1000, "1000"}, {2000, "2000"}, {5000, "5000"},
-                        {10'000, "10000"}, {30'000, "30000"}, {60'000, "60000"},
-                        {180'000,"180000"}, {9'999'999, "999999"}}, true));
-
-    MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
-        NKikimr::NPQ::GetCountersForStream(counters, "writeInfo"), labels,
-                    {{"name", "stream.internal_write.record_size_bytes"}}, "bin",
-                    TVector<std::pair<ui64, TString>>{
-                        {1024, "1024"}, {5120, "5120"}, {10'240, "10240"},
-                        {20'480, "20480"}, {51'200, "51200"}, {102'400, "102400"},
-                        {204'800, "204800"}, {524'288, "524288"},{1'048'576, "1048576"},
-                        {2'097'152,"2097152"}, {5'242'880, "5242880"}, {10'485'760, "10485760"},
-                        {67'108'864, "67108864"}, {999'999'999, "99999999"}}, true));
-
-    BytesWritten = NKikimr::NPQ::TMultiCounter(
-        NKikimr::NPQ::GetCountersForStream(counters, "writeSession"), labels, {},
-                    {"stream.internal_write.bytes_per_second",
-                     "stream.incoming_bytes_per_second"} , true, "name");
-    MsgsWritten = NKikimr::NPQ::TMultiCounter(
-        NKikimr::NPQ::GetCountersForStream(counters, "writeSession"), labels, {},
-                    {"stream.internal_write.records_per_second",
-                     "stream.incoming_records_per_second"}, true, "name");
-
-    BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(
-        NKikimr::NPQ::GetCountersForStream(counters, "writeSession"), labels, {},
-                    {"stream.internal_write.uncompressed_bytes_per_second"}, true, "name");
-    BytesWrittenComp = NKikimr::NPQ::TMultiCounter(
-        NKikimr::NPQ::GetCountersForStream(counters, "writeSession"), labels, {},
-                    {"stream.internal_write.compacted_bytes_per_second"}, true, "name");
-
-    TVector<NPQ::TLabelsInfo> aggr = {{{{"Account", NPersQueue::GetAccount(TopicName)}}, {"total"}}};
-    ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs();
-    auto subGroup = GetServiceCounters(counters, "pqproxy|SLI");
-    WriteLatency = NKikimr::NPQ::CreateSLIDurationCounter(subGroup, aggr, "Write", border,
-                                                          {100, 200, 500, 1000, 1500, 2000,
-                                                           5000, 10'000, 30'000, 99'999'999});
-    SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "name", false);
-    WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "name", false);
-    if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) {
-        TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
-            new NKikimr::NPQ::TPercentileCounter(
-                GetCountersForStream(counters, "topicWriteQuotaWait"), labels,
-                            {{"name", "stream.internal_write.topic_write_quota_wait_milliseconds"}}, "bin",
-                            TVector<std::pair<ui64, TString>>{
-                                {0, "0"}, {1, "1"}, {5, "5"}, {10, "10"},
-                                {20, "20"}, {50, "50"}, {100, "100"}, {500, "500"},
-                                {1000, "1000"}, {2500, "2500"}, {5000, "5000"},
-                                {10'000, "10000"}, {9'999'999, "999999"}}, true));
-    }
-
-    PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
-        new NKikimr::NPQ::TPercentileCounter(
-            GetCountersForStream(counters, "partitionWriteQuotaWait"), labels,
-                        {{"name", "stream.internal_write.partition_write_quota_wait_milliseconds"}}, "bin",
-                        TVector<std::pair<ui64, TString>>{
-                            {0, "0"}, {1, "1"}, {5, "5"}, {10, "10"},
-                            {20, "20"}, {50, "50"}, {100, "100"}, {500, "500"},
-                            {1000, "1000"}, {2500, "2500"}, {5000, "5000"},
-                            {10'000, "10000"}, {9'999'999, "999999"}}, true));
+        TVector<NPQ::TLabelsInfo> labels; 
+        if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) { 
+            SetupStreamCounters(ctx); 
+        } else { 
+            if (TopicName.find("--") == TString::npos) 
+                return; 
+            SetupTopicCounters(ctx); 
+        } 
+    } 
+} 
+
+void TPartition::SetupTopicCounters(const TActorContext& ctx) { 
+    auto counters = AppData(ctx)->Counters; 
+    auto labels = NKikimr::NPQ::GetLabels(TopicName); 
+    const TString suffix = LocalDC ? "Original" : "Mirrored"; 
+
+    WriteBufferIsFullCounter.SetCounter( 
+        GetCounters(counters, "writingTime", TopicName), 
+            {{"host", DCId}, 
+            {"Partition", ToString<ui32>(Partition)}}, 
+            {"sensor", "BufferFullTime" + suffix, true}); 
+
+    InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( 
+        GetServiceCounters(counters, "pqproxy|writeTimeLag"), GetLabels(TopicName), 
+            {{"sensor", "TimeLags" + suffix}}, "Interval", 
+            TVector<std::pair<ui64, TString>>{ 
+                {100, "100ms"}, {200, "200ms"}, {500, "500ms"}, {1000, "1000ms"}, 
+                {2000, "2000ms"}, {5000, "5000ms"}, {10'000, "10000ms"}, {30'000, "30000ms"}, 
+                {60'000, "60000ms"}, {180'000,"180000ms"}, {9'999'999, "999999ms"}}, true)); 
+
+
+    MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( 
+        GetServiceCounters(counters, "pqproxy|writeInfo"), GetLabels(TopicName), 
+            {{"sensor", "MessageSize" + suffix}}, "Size", 
+            TVector<std::pair<ui64, TString>>{ 
+                {1024, "1kb"}, {5120, "5kb"}, {10240, "10kb"}, 
+                {20'480, "20kb"}, {51'200, "50kb"}, {102'400, "100kb"}, {204'800, "200kb"}, 
+                {524'288, "512kb"},{1'048'576, "1024kb"}, {2'097'152,"2048kb"}, {5'242'880, "5120kb"}, 
+                {10'485'760, "10240kb"}, {67'108'864, "65536kb"}, {999'999'999, "99999999kb"}}, true)); 
+
+    BytesWritten = NKikimr::NPQ::TMultiCounter(GetServiceCounters(counters, "pqproxy|writeSession"), 
+                                               GetLabels(TopicName), {}, {"BytesWritten" + suffix}, true); 
+    BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(GetServiceCounters(counters, "pqproxy|writeSession"), 
+                                                           GetLabels(TopicName), {}, {"UncompressedBytesWritten" + suffix}, true); 
+
+    BytesWrittenComp = NKikimr::NPQ::TMultiCounter(GetServiceCounters(counters, "pqproxy|writeSession"), 
+                                                   GetLabels(TopicName), {}, {"CompactedBytesWritten" + suffix}, true); 
+
+    MsgsWritten = NKikimr::NPQ::TMultiCounter(GetServiceCounters(counters, "pqproxy|writeSession"), 
+                                              GetLabels(TopicName), {}, {"MessagesWritten" + suffix}, true); 
+
+    TVector<NPQ::TLabelsInfo> aggr = {{{{"Account", NPersQueue::GetAccount(TopicName)}}, {"total"}}}; 
+    ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs(); 
+    auto subGroup = GetServiceCounters(counters, "pqproxy|SLI"); 
+    WriteLatency = NKikimr::NPQ::CreateSLIDurationCounter(subGroup, aggr, "Write", border, 
+                                                          {100, 200, 500, 1000, 1500, 2000, 
+                                                           5000, 10'000, 30'000, 99'999'999}); 
+    SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "sensor", false); 
+    WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "sensor", false); 
+    if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) { 
+        TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( 
+            new NKikimr::NPQ::TPercentileCounter( 
+                GetServiceCounters(counters, "pqproxy|topicWriteQuotaWait"), GetLabels(TopicName), 
+                    {{"sensor", "TopicWriteQuotaWait" + suffix}}, "Interval", 
+                        TVector<std::pair<ui64, TString>>{ 
+                            {0, "0ms"}, {1, "1ms"}, {5, "5ms"}, {10, "10ms"}, 
+                            {20, "20ms"}, {50, "50ms"}, {100, "100ms"}, {500, "500ms"}, 
+                            {1000, "1000ms"}, {2500, "2500ms"}, {5000, "5000ms"}, 
+                            {10'000, "10000ms"}, {9'999'999, "999999ms"}}, true)); 
+    } 
+
+    PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( 
+        new NKikimr::NPQ::TPercentileCounter(GetServiceCounters(counters, "pqproxy|partitionWriteQuotaWait"), 
+            GetLabels(TopicName), {{"sensor", "PartitionWriteQuotaWait" + suffix}}, "Interval", 
+                TVector<std::pair<ui64, TString>>{ 
+                    {0, "0ms"}, {1, "1ms"}, {5, "5ms"}, {10, "10ms"}, 
+                    {20, "20ms"}, {50, "50ms"}, {100, "100ms"}, {500, "500ms"}, 
+                    {1000, "1000ms"}, {2500, "2500ms"}, {5000, "5000ms"}, 
+                    {10'000, "10000ms"}, {9'999'999, "999999ms"}}, true)); 
+} 
+
+void TPartition::SetupStreamCounters(const TActorContext& ctx) { 
+    auto counters = AppData(ctx)->Counters; 
+    auto labels = NKikimr::NPQ::GetLabelsForStream(TopicName, CloudId, DbId, FolderId); 
+
+    WriteBufferIsFullCounter.SetCounter( 
+        GetCountersForStream(counters, "writingTime"), 
+        {{"host", DCId}, 
+         {"partition", ToString<ui32>(Partition)}, 
+         {"stream", TopicName}}, 
+        {"name", "stream.internal_write.buffer_brimmed_duration_ms", true}); 
+ 
+    InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( 
+        NKikimr::NPQ::GetCountersForStream(counters, "writeTimeLag"), labels, 
+                    {{"name", "stream.internal_write.time_lags_milliseconds"}}, "bin", 
+                    TVector<std::pair<ui64, TString>>{ 
+                        {100, "100"}, {200, "200"}, {500, "500"}, 
+                        {1000, "1000"}, {2000, "2000"}, {5000, "5000"}, 
+                        {10'000, "10000"}, {30'000, "30000"}, {60'000, "60000"}, 
+                        {180'000,"180000"}, {9'999'999, "999999"}}, true)); 
+ 
+    MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( 
+        NKikimr::NPQ::GetCountersForStream(counters, "writeInfo"), labels, 
+                    {{"name", "stream.internal_write.record_size_bytes"}}, "bin", 
+                    TVector<std::pair<ui64, TString>>{ 
+                        {1024, "1024"}, {5120, "5120"}, {10'240, "10240"}, 
+                        {20'480, "20480"}, {51'200, "51200"}, {102'400, "102400"}, 
+                        {204'800, "204800"}, {524'288, "524288"},{1'048'576, "1048576"}, 
+                        {2'097'152,"2097152"}, {5'242'880, "5242880"}, {10'485'760, "10485760"}, 
+                        {67'108'864, "67108864"}, {999'999'999, "99999999"}}, true)); 
+ 
+    BytesWritten = NKikimr::NPQ::TMultiCounter( 
+        NKikimr::NPQ::GetCountersForStream(counters, "writeSession"), labels, {}, 
+                    {"stream.internal_write.bytes_per_second", 
+                     "stream.incoming_bytes_per_second"} , true, "name"); 
+    MsgsWritten = NKikimr::NPQ::TMultiCounter( 
+        NKikimr::NPQ::GetCountersForStream(counters, "writeSession"), labels, {}, 
+                    {"stream.internal_write.records_per_second", 
+                     "stream.incoming_records_per_second"}, true, "name"); 
+ 
+    BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter( 
+        NKikimr::NPQ::GetCountersForStream(counters, "writeSession"), labels, {}, 
+                    {"stream.internal_write.uncompressed_bytes_per_second"}, true, "name"); 
+    BytesWrittenComp = NKikimr::NPQ::TMultiCounter( 
+        NKikimr::NPQ::GetCountersForStream(counters, "writeSession"), labels, {}, 
+                    {"stream.internal_write.compacted_bytes_per_second"}, true, "name"); 
+ 
+    TVector<NPQ::TLabelsInfo> aggr = {{{{"Account", NPersQueue::GetAccount(TopicName)}}, {"total"}}}; 
+    ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs(); 
+    auto subGroup = GetServiceCounters(counters, "pqproxy|SLI"); 
+    WriteLatency = NKikimr::NPQ::CreateSLIDurationCounter(subGroup, aggr, "Write", border, 
+                                                          {100, 200, 500, 1000, 1500, 2000, 
+                                                           5000, 10'000, 30'000, 99'999'999}); 
+    SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "name", false); 
+    WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "name", false); 
+    if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) { 
+        TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( 
+            new NKikimr::NPQ::TPercentileCounter( 
+                GetCountersForStream(counters, "topicWriteQuotaWait"), labels, 
+                            {{"name", "stream.internal_write.topic_write_quota_wait_milliseconds"}}, "bin", 
+                            TVector<std::pair<ui64, TString>>{ 
+                                {0, "0"}, {1, "1"}, {5, "5"}, {10, "10"}, 
+                                {20, "20"}, {50, "50"}, {100, "100"}, {500, "500"}, 
+                                {1000, "1000"}, {2500, "2500"}, {5000, "5000"}, 
+                                {10'000, "10000"}, {9'999'999, "999999"}}, true)); 
+    }
+ 
+    PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( 
+        new NKikimr::NPQ::TPercentileCounter( 
+            GetCountersForStream(counters, "partitionWriteQuotaWait"), labels, 
+                        {{"name", "stream.internal_write.partition_write_quota_wait_milliseconds"}}, "bin", 
+                        TVector<std::pair<ui64, TString>>{ 
+                            {0, "0"}, {1, "1"}, {5, "5"}, {10, "10"}, 
+                            {20, "20"}, {50, "50"}, {100, "100"}, {500, "500"}, 
+                            {1000, "1000"}, {2500, "2500"}, {5000, "5000"}, 
+                            {10'000, "10000"}, {9'999'999, "999999"}}, true)); 
 }
 
 void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
@@ -2330,11 +2330,11 @@ TReadAnswer TReadInfo::FormAnswer(
         ui16 internalPartsCount = blobs[pos].InternalPartsCount;
         const TString& blobValue = blobs[pos].Value;
 
-        if (blobValue.empty()) { // this is ok. Means that someone requested too much data
+        if (blobValue.empty()) { // this is ok. Means that someone requested too much data 
             LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, "Not full answer here!");
             ui64 answerSize = answer->Response.ByteSize();
             if (userInfo && Destination != 0) {
-                userInfo->ReadDone(ctx, ctx.Now(), answerSize, cnt, ClientDC);
+                userInfo->ReadDone(ctx, ctx.Now(), answerSize, cnt, ClientDC); 
             }
             readResult->SetSizeLag(sizeLag - size);
             return {answerSize, std::move(answer)};
@@ -2342,9 +2342,9 @@ TReadAnswer TReadInfo::FormAnswer(
         Y_VERIFY(blobValue.size() == blobs[pos].Size, "value for offset %" PRIu64 " count %u size must be %u, but got %u",
                                                         offset, count, blobs[pos].Size, (ui32)blobValue.size());
 
-        if (offset > Offset || (offset == Offset && partNo > PartNo)) { // got gap
+        if (offset > Offset || (offset == Offset && partNo > PartNo)) { // got gap 
             Offset = offset;
-            PartNo = partNo;
+            PartNo = partNo; 
         }
         Y_VERIFY(offset <= Offset);
         Y_VERIFY(offset < Offset || partNo <= PartNo);
@@ -2362,7 +2362,7 @@ TReadAnswer TReadInfo::FormAnswer(
             }
             offset += header.GetCount();
 
-            if (pos == Max<ui32>()) // this batch does not contain data to read, skip it
+            if (pos == Max<ui32>()) // this batch does not contain data to read, skip it 
                 continue;
 
 
@@ -2375,31 +2375,31 @@ TReadAnswer TReadInfo::FormAnswer(
                 psize = size;
                 TClientBlob &res = batch.Blobs[i];
                 VERIFY_RESULT_BLOB(res, i);
-                bool messageSkippingBehaviour = AppData()->PQConfig.GetTopicsAreFirstClassCitizen() &&
-                    ReadTimestampMs > res.WriteTimestamp.MilliSeconds();
-                if (!messageSkippingBehaviour) {
-                    size += res.GetBlobSize();
-                    Y_VERIFY(PartNo == res.GetPartNo(), "pos %" PRIu32 " i %" PRIu32 " Offset %" PRIu64 " PartNo %" PRIu16 " offset %" PRIu64 " partNo %" PRIu16,
-                             pos, i, Offset, PartNo, offset, res.GetPartNo());
-
-                    if (userInfo) {
-                        userInfo->AddTimestampToCache(
-                                                      Offset, res.WriteTimestamp, res.CreateTimestamp,
-                                                      Destination != 0, ctx.Now()
-                                                      );
-                    }
-
-                    AddResultBlob(readResult, res, Offset);
-                    if (res.IsLastPart()) {
-                        ++cnt;
-                    }
+                bool messageSkippingBehaviour = AppData()->PQConfig.GetTopicsAreFirstClassCitizen() && 
+                    ReadTimestampMs > res.WriteTimestamp.MilliSeconds(); 
+                if (!messageSkippingBehaviour) { 
+                    size += res.GetBlobSize(); 
+                    Y_VERIFY(PartNo == res.GetPartNo(), "pos %" PRIu32 " i %" PRIu32 " Offset %" PRIu64 " PartNo %" PRIu16 " offset %" PRIu64 " partNo %" PRIu16, 
+                             pos, i, Offset, PartNo, offset, res.GetPartNo()); 
+
+                    if (userInfo) { 
+                        userInfo->AddTimestampToCache( 
+                                                      Offset, res.WriteTimestamp, res.CreateTimestamp, 
+                                                      Destination != 0, ctx.Now() 
+                                                      ); 
+                    } 
+ 
+                    AddResultBlob(readResult, res, Offset); 
+                    if (res.IsLastPart()) { 
+                        ++cnt; 
+                    } 
                 }
 
-                if (res.IsLastPart()) {
+                if (res.IsLastPart()) { 
                     PartNo = 0;
                     ++Offset;
-                } else {
-                    ++PartNo;
+                } else { 
+                    ++PartNo; 
                 }
             }
 
@@ -2444,7 +2444,7 @@ TReadAnswer TReadInfo::FormAnswer(
     Y_VERIFY(Offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, Offset);
     ui64 answerSize = answer->Response.ByteSize();
     if (userInfo && Destination != 0) {
-        userInfo->ReadDone(ctx, ctx.Now(), answerSize, cnt, ClientDC);
+        userInfo->ReadDone(ctx, ctx.Now(), answerSize, cnt, ClientDC); 
     }
     readResult->SetSizeLag(sizeLag - size);
     return {answerSize, std::move(answer)};
@@ -2472,7 +2472,7 @@ void TPartition::Handle(TEvPQ::TEvReadTimeout::TPtr& ev, const TActorContext& ct
 }
 
 
-TVector<TRequestedBlob> TPartition::GetReadRequestFromBody(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, ui32* rcount, ui32* rsize)
+TVector<TRequestedBlob> TPartition::GetReadRequestFromBody(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, ui32* rcount, ui32* rsize) 
 {
     Y_VERIFY(rcount && rsize);
     ui32& count = *rcount;
@@ -2502,12 +2502,12 @@ TVector<TRequestedBlob> TPartition::GetReadRequestFromBody(const ui64 startOffse
             sz = (cnt == it->Key.GetCount() ? it->Size : 0); //not readed client blobs can be of ~8Mb, so don't count this size at all
         }
         while (it != DataKeysBody.end() && size < maxSize && count < maxCount) {
-            size += sz;
-            count += cnt;
-            TRequestedBlob reqBlob(it->Key.GetOffset(), it->Key.GetPartNo(), it->Key.GetCount(),
-                                   it->Key.GetInternalPartsCount(), it->Size, TString());
-            blobs.push_back(reqBlob);
-
+            size += sz; 
+            count += cnt; 
+            TRequestedBlob reqBlob(it->Key.GetOffset(), it->Key.GetPartNo(), it->Key.GetCount(), 
+                                   it->Key.GetInternalPartsCount(), it->Size, TString()); 
+            blobs.push_back(reqBlob); 
+ 
             ++it;
             if (it == DataKeysBody.end())
                 break;
@@ -2520,12 +2520,12 @@ TVector<TRequestedBlob> TPartition::GetReadRequestFromBody(const ui64 startOffse
 
 
 
-TVector<TClientBlob> TPartition::GetReadRequestFromHead(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, const ui64 readTimestampMs, ui32* rcount, ui32* rsize, ui64* insideHeadOffset)
+TVector<TClientBlob> TPartition::GetReadRequestFromHead(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, const ui64 readTimestampMs, ui32* rcount, ui32* rsize, ui64* insideHeadOffset) 
 {
     ui32& count = *rcount;
     ui32& size = *rsize;
     TVector<TClientBlob> res;
-    std::optional<ui64> firstAddedBlobOffset{};
+    std::optional<ui64> firstAddedBlobOffset{}; 
     ui32 pos = 0;
     if (startOffset > Head.Offset || startOffset == Head.Offset && partNo > Head.PartNo) {
         pos = Head.FindPos(startOffset, partNo);
@@ -2542,10 +2542,10 @@ TVector<TClientBlob> TPartition::GetReadRequestFromHead(const ui64 startOffset,
         for (; i < blobs.size(); ++i) {
 
             Y_VERIFY(pno == blobs[i].GetPartNo());
-            bool messageSkippingBehaviour = AppData()->PQConfig.GetTopicsAreFirstClassCitizen() &&
-                readTimestampMs > blobs[i].WriteTimestamp.MilliSeconds();
-            bool skip = offset < startOffset || offset == startOffset &&
-                blobs[i].GetPartNo() < partNo || messageSkippingBehaviour;
+            bool messageSkippingBehaviour = AppData()->PQConfig.GetTopicsAreFirstClassCitizen() && 
+                readTimestampMs > blobs[i].WriteTimestamp.MilliSeconds(); 
+            bool skip = offset < startOffset || offset == startOffset && 
+                blobs[i].GetPartNo() < partNo || messageSkippingBehaviour; 
             if (blobs[i].IsLastPart()) {
                 ++offset;
                 pno = 0;
@@ -2556,19 +2556,19 @@ TVector<TClientBlob> TPartition::GetReadRequestFromHead(const ui64 startOffset,
             }
             if (skip)
                 continue;
-            if (count > maxCount) // blob is counted already
+            if (count > maxCount) // blob is counted already 
                 break;
             if (size >= maxSize)
                 break;
-            size += blobs[i].GetBlobSize();
-            res.push_back(blobs[i]);
-            if (!firstAddedBlobOffset && AppData()->PQConfig.GetTopicsAreFirstClassCitizen())
-                firstAddedBlobOffset = offset > 0 ? offset - 1 : 0;
+            size += blobs[i].GetBlobSize(); 
+            res.push_back(blobs[i]); 
+            if (!firstAddedBlobOffset && AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) 
+                firstAddedBlobOffset = offset > 0 ? offset - 1 : 0; 
         }
         if (i < blobs.size()) // already got limit
             break;
     }
-    *insideHeadOffset = firstAddedBlobOffset.value_or(*insideHeadOffset);
+    *insideHeadOffset = firstAddedBlobOffset.value_or(*insideHeadOffset); 
     return res;
 }
 
@@ -2847,9 +2847,9 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo
                 << " user " << user << " send read request for offset " << userInfo.Offset << " initiated " << " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset << " ReadingTimestamp " << ReadingTimestamp);
 
 
-    THolder<TEvPQ::TEvRead> event = MakeHolder<TEvPQ::TEvRead>(0, userInfo.Offset, 0, 1, "",
-                                                               user, 0, MAX_BLOB_PART_SIZE * 2, 0, 0, "",
-                                                               userInfo.DoExternalRead);
+    THolder<TEvPQ::TEvRead> event = MakeHolder<TEvPQ::TEvRead>(0, userInfo.Offset, 0, 1, "", 
+                                                               user, 0, MAX_BLOB_PART_SIZE * 2, 0, 0, "", 
+                                                               userInfo.DoExternalRead); 
     ctx.Send(ctx.SelfID, event.Release());
     Counters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_MISS].Increment(1);
 }
@@ -3314,8 +3314,8 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&
 
     //check correctness of response
     if (response.GetStatus() != NMsgBusProxy::MSTATUS_OK) {
-        LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "OnWrite topic '" << TopicName << "' partition " <<
-                    Partition << " commands are not processed at all, reason: " << response.DebugString());
+        LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "OnWrite topic '" << TopicName << "' partition " << 
+                    Partition << " commands are not processed at all, reason: " << response.DebugString()); 
         ctx.Send(Tablet, new TEvents::TEvPoisonPill());
         //TODO: if status is DISK IS FULL, is global status MSTATUS_OK? it will be good if it is true
         return;
@@ -3500,13 +3500,13 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
     Counters.Percentile()[COUNTER_PQ_WRITE_CYCLE_BYTES].IncrementFor(WriteCycleSize);
     Counters.Percentile()[COUNTER_PQ_WRITE_NEW_BYTES].IncrementFor(WriteNewSize);
     if (BytesWritten)
-        BytesWritten.Inc(WriteNewSizeInternal);
+        BytesWritten.Inc(WriteNewSizeInternal); 
     if (BytesWrittenUncompressed)
         BytesWrittenUncompressed.Inc(WriteNewSizeUncompressed);
     if (BytesWrittenComp)
         BytesWrittenComp.Inc(WriteCycleSize);
     if (MsgsWritten)
-        MsgsWritten.Inc(WriteNewMessagesInternal);
+        MsgsWritten.Inc(WriteNewMessagesInternal); 
 
     //All ok
     auto now = ctx.Now();
@@ -3525,10 +3525,10 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
 
     WriteCycleSize = 0;
     WriteNewSize = 0;
-    WriteNewSizeInternal = 0;
+    WriteNewSizeInternal = 0; 
     WriteNewSizeUncompressed = 0;
     WriteNewMessages = 0;
-    WriteNewMessagesInternal = 0;
+    WriteNewMessagesInternal = 0; 
     UpdateWriteBufferIsFullState(now);
 
     AnswerCurrentWrites(ctx);
@@ -3958,8 +3958,8 @@ void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::T
 }
 
 
-bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx,
-                                         TSourceIdWriter& sourceIdWriter) {
+bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx, 
+                                         TSourceIdWriter& sourceIdWriter) { 
 
     ui64 curOffset = PartitionedBlob.IsInited() ? PartitionedBlob.GetOffset() : EndOffset;
 
@@ -4112,14 +4112,14 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
                     }
                 }
             }
-            PartitionedBlob = TPartitionedBlob(Partition, curOffset, p.Msg.SourceId, p.Msg.SeqNo,
-                                               p.Msg.TotalParts, p.Msg.TotalSize, Head, NewHead,
-                                               headCleared, needCompactHead, MaxBlobSize);
+            PartitionedBlob = TPartitionedBlob(Partition, curOffset, p.Msg.SourceId, p.Msg.SeqNo, 
+                                               p.Msg.TotalParts, p.Msg.TotalSize, Head, NewHead, 
+                                               headCleared, needCompactHead, MaxBlobSize); 
         }
 
         LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Topic '" << TopicName << "' partition " << Partition
-                    << " part blob processing sourceId '" << EscapeC(p.Msg.SourceId) <<
-                    "' seqNo " << p.Msg.SeqNo << " partNo " << p.Msg.PartNo);
+                    << " part blob processing sourceId '" << EscapeC(p.Msg.SourceId) << 
+                    "' seqNo " << p.Msg.SeqNo << " partNo " << p.Msg.PartNo); 
         TString s;
         if (!PartitionedBlob.IsNextPart(p.Msg.SourceId, p.Msg.SeqNo, p.Msg.PartNo, &s)) {
             //this must not be happen - client sends gaps, fail this client till the end
@@ -4129,13 +4129,13 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
         }
 
         WriteNewSize += p.Msg.SourceId.size() + p.Msg.Data.size();
-        WriteNewSizeInternal = p.Msg.External ? 0 : WriteNewSize;
+        WriteNewSizeInternal = p.Msg.External ? 0 : WriteNewSize; 
         WriteNewSizeUncompressed += p.Msg.UncompressedSize + p.Msg.SourceId.size();
-        if (p.Msg.PartNo == 0) {
-             ++WriteNewMessages;
-             if (!p.Msg.External)
-                 ++WriteNewMessagesInternal;
-        }
+        if (p.Msg.PartNo == 0) { 
+             ++WriteNewMessages; 
+             if (!p.Msg.External) 
+                 ++WriteNewMessagesInternal; 
+        } 
 
         TMaybe<TPartData> partData;
         if (p.Msg.TotalParts > 1) { //this is multi-part message
@@ -4175,12 +4175,12 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
             write->SetKeyToCache(resKey.Data(), resKey.Size());
             WriteCycleSize += newWrite.second.size();
 
-            LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Topic '" << TopicName <<
-                        "' partition " << Partition <<
-                        " part blob sourceId '" << EscapeC(p.Msg.SourceId) <<
-                        "' seqNo " << p.Msg.SeqNo << " partNo " << p.Msg.PartNo <<
-                        " result is " << TStringBuf(newWrite.first.Data(), newWrite.first.Size()) <<
-                        " size " << newWrite.second.size());
+            LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Topic '" << TopicName << 
+                        "' partition " << Partition << 
+                        " part blob sourceId '" << EscapeC(p.Msg.SourceId) << 
+                        "' seqNo " << p.Msg.SeqNo << " partNo " << p.Msg.PartNo << 
+                        " result is " << TStringBuf(newWrite.first.Data(), newWrite.first.Size()) << 
+                        " size " << newWrite.second.size()); 
         }
 
         if (lastBlobPart) {
@@ -4578,15 +4578,15 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
         userInfo.UpdateReadingTimeAndState(ctx.Now());
         return;
     }
-    TVector<TRequestedBlob> blobs = GetReadRequestFromBody(info.Offset, info.PartNo, info.Count, info.Size, &count, &size);
+    TVector<TRequestedBlob> blobs = GetReadRequestFromBody(info.Offset, info.PartNo, info.Count, info.Size, &count, &size); 
     info.Blobs = blobs;
     ui64 lastOffset = info.Offset + Min(count, info.Count);
     LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "read cookie " << cookie << " added " << info.Blobs.size()
                 << " blobs, size " << size << " count " << count << " last offset " << lastOffset);
 
-    ui64 insideHeadOffset{0};
-    info.Cached = GetReadRequestFromHead(info.Offset, info.PartNo, info.Count, info.Size, info.ReadTimestampMs, &count, &size, &insideHeadOffset);
-    info.CachedOffset = Head.Offset > 0 ? Head.Offset : insideHeadOffset;
+    ui64 insideHeadOffset{0}; 
+    info.Cached = GetReadRequestFromHead(info.Offset, info.PartNo, info.Count, info.Size, info.ReadTimestampMs, &count, &size, &insideHeadOffset); 
+    info.CachedOffset = Head.Offset > 0 ? Head.Offset : insideHeadOffset; 
 
     if (info.Destination != 0) {
         ++userInfo.ActiveReads;
@@ -4618,7 +4618,7 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
     Y_VERIFY(res);
 
     THolder<TEvPQ::TEvBlobRequest> request(new TEvPQ::TEvBlobRequest(user, cookie, Partition,
-                                                                     lastOffset, std::move(blobs)));
+                                                                     lastOffset, std::move(blobs))); 
 
     ctx.Send(BlobCache, request.Release());
 }

+ 11 - 11
ydb/core/persqueue/partition.h

@@ -141,8 +141,8 @@ private:
     void OnReadRequestFinished(TReadInfo&& info, ui64 answerSize);
 
     // will return rcount and rsize also
-    TVector<TRequestedBlob> GetReadRequestFromBody(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, ui32* rcount, ui32* rsize);
-    TVector<TClientBlob>    GetReadRequestFromHead(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, const ui64 readTimestampMs, ui32* rcount, ui32* rsize, ui64* insideHeadOffset);
+    TVector<TRequestedBlob> GetReadRequestFromBody(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, ui32* rcount, ui32* rsize); 
+    TVector<TClientBlob>    GetReadRequestFromHead(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, const ui64 readTimestampMs, ui32* rcount, ui32* rsize, ui64* insideHeadOffset); 
     void ProcessRead(const TActorContext& ctx, TReadInfo&& info, const ui64 cookie, bool subscription);
 
     void HandleOnIdle(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx);
@@ -210,9 +210,9 @@ private:
     void CreateMirrorerActor();
     bool IsQuotingEnabled() const;
 
-    void SetupTopicCounters(const TActorContext& ctx);
-    void SetupStreamCounters(const TActorContext& ctx);
-
+    void SetupTopicCounters(const TActorContext& ctx); 
+    void SetupStreamCounters(const TActorContext& ctx); 
+ 
 public:
     static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
         return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR;
@@ -564,10 +564,10 @@ private:
     std::deque<std::pair<ui64,ui64>> GapOffsets;
     ui64 GapSize;
 
-    TString CloudId;
-    TString DbId;
-    TString FolderId;
-
+    TString CloudId; 
+    TString DbId; 
+    TString FolderId; 
+ 
     TUsersInfoStorage UsersInfoStorage;
 
     std::deque<TString> UpdateUserInfoTimestamp;
@@ -601,10 +601,10 @@ private:
     TInstant WriteCycleStartTime;
     ui32 WriteCycleSize;
     ui32 WriteNewSize;
-    ui32 WriteNewSizeInternal;
+    ui32 WriteNewSizeInternal; 
     ui64 WriteNewSizeUncompressed;
     ui32 WriteNewMessages;
-    ui32 WriteNewMessagesInternal;
+    ui32 WriteNewMessagesInternal; 
 
     TInstant CurrentTimestamp;
 

+ 81 - 81
ydb/core/persqueue/percentile_counter.cpp

@@ -7,11 +7,11 @@ namespace NKikimr {
 
 namespace NPQ {
 
-NMonitoring::TDynamicCounterPtr GetCounters(NMonitoring::TDynamicCounterPtr counters,
-                                            const TString& subsystem, const TString& topic)
+NMonitoring::TDynamicCounterPtr GetCounters(NMonitoring::TDynamicCounterPtr counters, 
+                                            const TString& subsystem, const TString& topic) 
 {
     auto pos = topic.find("--");
-    Y_VERIFY(pos != TString::npos);
+    Y_VERIFY(pos != TString::npos); 
 
     TString origDC = topic.substr(4, pos - 4);
     origDC.to_title();
@@ -21,20 +21,20 @@ NMonitoring::TDynamicCounterPtr GetCounters(NMonitoring::TDynamicCounterPtr coun
     TString topicPath = NPersQueue::ConvertOldTopicName(realTopic);
     TString account = topicPath.substr(0, topicPath.find("/"));
     return GetServiceCounters(counters, "pqproxy|" + subsystem)
-        ->GetSubgroup("OriginDC", origDC)
-        ->GetSubgroup("Producer", producer)
-        ->GetSubgroup("TopicPath", topicPath)
-        ->GetSubgroup("Account", account)
-        ->GetSubgroup("Topic", realTopic);
-}
-
-NMonitoring::TDynamicCounterPtr GetCountersForStream(NMonitoring::TDynamicCounterPtr counters,
-                                                     const TString& subsystem)
-{
-    return counters->GetSubgroup("counters", "pqproxy")
-        ->GetSubgroup("subsystem", subsystem);
+        ->GetSubgroup("OriginDC", origDC) 
+        ->GetSubgroup("Producer", producer) 
+        ->GetSubgroup("TopicPath", topicPath) 
+        ->GetSubgroup("Account", account) 
+        ->GetSubgroup("Topic", realTopic); 
 }
 
+NMonitoring::TDynamicCounterPtr GetCountersForStream(NMonitoring::TDynamicCounterPtr counters, 
+                                                     const TString& subsystem) 
+{ 
+    return counters->GetSubgroup("counters", "pqproxy") 
+        ->GetSubgroup("subsystem", subsystem); 
+} 
+ 
 TVector<TLabelsInfo> GetLabels(const TString& topic)
 {
     auto pos = topic.find("--");
@@ -66,71 +66,71 @@ TVector<TLabelsInfo> GetLabels(const TString& cluster, const TString& realTopic)
     return res;
 }
 
-TVector<TLabelsInfo> GetLabelsForStream(const TString& topic, const TString& cloudId,
-                                        const TString& dbId, const TString& folderId) {
-    TVector<TLabelsInfo> res = {
-            {{{"database", dbId}}, {dbId}},
-            {{{"cloud", cloudId}}, {cloudId}},
-            {{{"folder", folderId}}, {folderId}},
-            {{{"stream", topic}}, {topic}}};
-    return res;
-}
-
-TMultiCounter::TMultiCounter(NMonitoring::TDynamicCounterPtr counters,
-                             const TVector<TLabelsInfo>& labels,
-                             const TVector<std::pair<TString, TString>>& subgroups,
-                             const TVector<TString>& counter_names,
-                             bool deriv,
-                             const TString& name,
-                             bool expiring)
-    : Value(0)
-{
-    Y_VERIFY(counters);
-
-    for (const auto& counter : counter_names) {
-        for (ui32 i = 0; i <= labels.size(); ++i) {
-            auto cc = counters;
-            for (ui32 j = 0; j < labels.size(); ++j) {
-                Y_VERIFY(!labels[j].Labels.empty());
-                for (ui32 k = 0; k < labels[j].Labels.size(); ++k) {
-                    Y_VERIFY(labels[j].Labels.size() == labels[j].AggrNames.size());
-                    const TString& res = (j < i) ? labels[j].Labels[k].second : labels[j].AggrNames[k];
-                    cc = cc->GetSubgroup(labels[j].Labels[k].first, res);
-                }
-            }
-            for (const auto& g: subgroups) {
-                cc = cc->GetSubgroup(g.first, g.second);
-            }
-            if (expiring) {
-                Counters.push_back(cc->GetExpiringNamedCounter(name, counter, deriv));
-            } else {
-                Counters.push_back(cc->GetNamedCounter(name, counter, deriv));
-            }
-        }
-    }
-}
-
-void TMultiCounter::Inc(ui64 val)
-{
-    for (auto& c : Counters) (*c) += val;
-    Value += val;
-}
-
-void TMultiCounter::Dec(ui64 val) {
-    for (auto& c : Counters) (*c) -= val;
-    Value -= val;
-}
-
-void TMultiCounter::Set(ui64 value) {
-    auto diff = value - Value;
-    Inc(diff);
-}
-
-TMultiCounter::operator bool() {
-    return !Counters.empty();
-}
-
-
+TVector<TLabelsInfo> GetLabelsForStream(const TString& topic, const TString& cloudId, 
+                                        const TString& dbId, const TString& folderId) { 
+    TVector<TLabelsInfo> res = { 
+            {{{"database", dbId}}, {dbId}}, 
+            {{{"cloud", cloudId}}, {cloudId}}, 
+            {{{"folder", folderId}}, {folderId}}, 
+            {{{"stream", topic}}, {topic}}}; 
+    return res; 
+} 
+
+TMultiCounter::TMultiCounter(NMonitoring::TDynamicCounterPtr counters, 
+                             const TVector<TLabelsInfo>& labels, 
+                             const TVector<std::pair<TString, TString>>& subgroups, 
+                             const TVector<TString>& counter_names, 
+                             bool deriv, 
+                             const TString& name, 
+                             bool expiring) 
+    : Value(0) 
+{ 
+    Y_VERIFY(counters); 
+ 
+    for (const auto& counter : counter_names) { 
+        for (ui32 i = 0; i <= labels.size(); ++i) { 
+            auto cc = counters; 
+            for (ui32 j = 0; j < labels.size(); ++j) { 
+                Y_VERIFY(!labels[j].Labels.empty()); 
+                for (ui32 k = 0; k < labels[j].Labels.size(); ++k) { 
+                    Y_VERIFY(labels[j].Labels.size() == labels[j].AggrNames.size()); 
+                    const TString& res = (j < i) ? labels[j].Labels[k].second : labels[j].AggrNames[k]; 
+                    cc = cc->GetSubgroup(labels[j].Labels[k].first, res); 
+                } 
+            } 
+            for (const auto& g: subgroups) { 
+                cc = cc->GetSubgroup(g.first, g.second); 
+            } 
+            if (expiring) { 
+                Counters.push_back(cc->GetExpiringNamedCounter(name, counter, deriv)); 
+            } else { 
+                Counters.push_back(cc->GetNamedCounter(name, counter, deriv)); 
+            } 
+        } 
+    } 
+} 
+ 
+void TMultiCounter::Inc(ui64 val) 
+{ 
+    for (auto& c : Counters) (*c) += val; 
+    Value += val; 
+} 
+ 
+void TMultiCounter::Dec(ui64 val) { 
+    for (auto& c : Counters) (*c) -= val; 
+    Value -= val; 
+} 
+ 
+void TMultiCounter::Set(ui64 value) { 
+    auto diff = value - Value; 
+    Inc(diff); 
+} 
+ 
+TMultiCounter::operator bool() { 
+    return !Counters.empty(); 
+} 
+ 
+ 
 TPercentileCounter::TPercentileCounter(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const TVector<TLabelsInfo>& labels, const TVector<std::pair<TString, TString>>& subgroups, const TString& sensor,
                     const TVector<std::pair<ui64, TString>>& intervals, const bool deriv, bool expiring)
 {
@@ -139,7 +139,7 @@ TPercentileCounter::TPercentileCounter(TIntrusivePtr<NMonitoring::TDynamicCounte
     Ranges.reserve(intervals.size());
     for (auto& interval : intervals) {
         Ranges.push_back(interval.first);
-        Counters.push_back(TMultiCounter(counters, labels, subgroups, {interval.second}, deriv, sensor, expiring));
+        Counters.push_back(TMultiCounter(counters, labels, subgroups, {interval.second}, deriv, sensor, expiring)); 
     }
     Ranges.back() = Max<ui64>();
 }

Некоторые файлы не были показаны из-за большого количества измененных файлов