Browse Source

additional signals for commit timings control (#8729)

ivanmorozov333 6 months ago
parent
commit
0ba24ba349

+ 8 - 0
ydb/core/tx/columnshard/columnshard__progress_tx.cpp

@@ -14,6 +14,7 @@ private:
     const ui32 TabletTxNo;
     std::optional<NOlap::TSnapshot> LastCompletedTx;
     std::optional<TTxController::TPlanQueueItem> PlannedQueueItem;
+    std::optional<TMonotonic> StartExecution;
 
 public:
     TTxProgressTx(TColumnShard* self)
@@ -54,6 +55,7 @@ public:
             } else {
                 Self->ProgressTxController->PopFirstPlannedTx();
             }
+            StartExecution = TMonotonic::Now();
 
             LastCompletedTx = NOlap::TSnapshot(step, txId);
             if (LastCompletedTx > Self->LastCompletedTx) {
@@ -84,11 +86,17 @@ public:
             Self->RescheduleWaitingReads();
         }
         if (PlannedQueueItem) {
+            AFL_VERIFY(TxOperator);
+            Self->GetProgressTxController().GetCounters().OnTxProgressLag(
+                TxOperator->GetOpType(), TMonotonic::Now() - TMonotonic::MilliSeconds(PlannedQueueItem->Step));
             Self->GetProgressTxController().ProgressOnComplete(*PlannedQueueItem);
         }
         if (LastCompletedTx) {
             Self->LastCompletedTx = std::max(*LastCompletedTx, Self->LastCompletedTx);
         }
+        if (StartExecution) {
+            Self->GetProgressTxController().GetCounters().OnTxProgressDuration(TxOperator->GetOpType(), TMonotonic::Now() - *StartExecution);
+        }
         Self->SetupIndexation();
     }
 };

+ 1 - 0
ydb/core/tx/columnshard/counters/columnshard.cpp

@@ -53,6 +53,7 @@ TCSCounters::TCSCounters()
     HistogramSuccessWriteMiddle6PutBlobsDurationMs = TBase::GetHistogram("SuccessWriteMiddle6PutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));
     HistogramFailedWritePutBlobsDurationMs = TBase::GetHistogram("FailedWritePutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));
     HistogramWriteTxCompleteDurationMs = TBase::GetHistogram("WriteTxCompleteDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));
+
     WritePutBlobsCount = TBase::GetValue("WritePutBlobs");
     WriteRequests = TBase::GetValue("WriteRequests");
 

+ 0 - 1
ydb/core/tx/columnshard/counters/columnshard.h

@@ -72,7 +72,6 @@ private:
     NMonitoring::TDynamicCounters::TCounterPtr WriteRequests;
     THashMap<EWriteFailReason, NMonitoring::TDynamicCounters::TCounterPtr> FailedWriteRequests;
     NMonitoring::TDynamicCounters::TCounterPtr SuccessWriteRequests;
-
 public:
     const TCSInitialization Initialization;
     TTxProgressCounters TxProgress;

+ 13 - 1
ydb/core/tx/columnshard/counters/tx_progress.h

@@ -24,6 +24,8 @@ private:
         NMonitoring::TDynamicCounters::TCounterPtr FinishProposeOnComplete;
         NMonitoring::TDynamicCounters::TCounterPtr FinishPlannedTx;
         NMonitoring::TDynamicCounters::TCounterPtr AbortTx;
+        NMonitoring::THistogramPtr HistogramTxProgressDuration;
+        NMonitoring::THistogramPtr HistogramTxProgressLag;
 
         TProgressCounters(const TCommonCountersOwner& owner)
             : TBase(owner)
@@ -34,13 +36,23 @@ private:
             , FinishProposeOnExecute(TBase::GetDeriviative("FinishProposeOnExecute"))
             , FinishProposeOnComplete(TBase::GetDeriviative("FinishProposeOnComplete"))
             , FinishPlannedTx(TBase::GetDeriviative("FinishPlannedTx"))
-            , AbortTx(TBase::GetDeriviative("AbortTx")) {
+            , AbortTx(TBase::GetDeriviative("AbortTx"))
+            , HistogramTxProgressDuration(TBase::GetHistogram("TxProgress/Execution/DurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)))
+            , HistogramTxProgressLag(TBase::GetHistogram("TxProgress/LagOnComplete/DurationMs", NMonitoring::ExponentialHistogram(18, 2, 5))) {
         }
     };
 
     THashMap<TOpType, TProgressCounters> CountersByOpType;
 
 public:
+    void OnTxProgressDuration(const TString& opType, const TDuration d) {
+        GetSubGroup(opType).HistogramTxProgressDuration->Collect(d.MilliSeconds());
+    }
+
+    void OnTxProgressLag(const TString& opType, const TDuration d) {
+        GetSubGroup(opType).HistogramTxProgressLag->Collect(d.MilliSeconds());
+    }
+
     void OnRegisterTx(const TOpType& opType) {
         GetSubGroup(opType).RegisterTx->Add(1);
     }

+ 3 - 0
ydb/core/tx/columnshard/transactions/tx_controller.h

@@ -381,6 +381,9 @@ public:
             DoOnTabletInit(owner);
         }
     };
+    TTxProgressCounters& GetCounters() {
+        return Counters;
+    }
 
 private:
     const TDuration MaxCommitTxDelay = TDuration::Seconds(30);

+ 4 - 12
ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp

@@ -12,10 +12,10 @@ namespace NKikimr {
         RowsCount = TBase::GetDeriviative("Rows/Count");
         PackageSize = TBase::GetHistogram("Rows/PackageSize", NMonitoring::ExponentialHistogram(15, 2, 10));
 
-        DurationToStartCommit = TBase::GetHistogram("ToStartCommit/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
-        DurationToFinishCommit = TBase::GetHistogram("ToFinishCommit/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
-        DurationToStartWriting = TBase::GetHistogram("ToStartWriting/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
-        DurationToTxStarted = TBase::GetHistogram("ToTxStarted/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
+        PreparingDuration = TBase::GetHistogram("Preparing/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
+        WritingDuration = TBase::GetHistogram("Writing/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
+        CommitDuration = TBase::GetHistogram("Commit/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
+        PrepareReplyDuration = TBase::GetHistogram("ToReply/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
 
         const google::protobuf::EnumDescriptor* descriptor = ::Ydb::StatusIds::StatusCode_descriptor();
         for (ui32 i = 0; i < (ui32)descriptor->value_count(); ++i) {
@@ -24,12 +24,4 @@ namespace NKikimr {
         }
     }
 
-    void TUploadCounters::OnReply(const TDuration d, const ::Ydb::StatusIds::StatusCode code) const {
-        const TString name = ::Ydb::StatusIds::StatusCode_Name(code);
-        auto it = CodesCount.find(name);
-        Y_ABORT_UNLESS(it != CodesCount.end());
-        it->second->Add(1);
-        ReplyDuration->Collect(d.MilliSeconds());
-    }
-
 }

+ 58 - 22
ydb/core/tx/tx_proxy/upload_rows_common_impl.h

@@ -45,29 +45,64 @@ private:
     NMonitoring::TDynamicCounters::TCounterPtr RowsCount;
     NMonitoring::THistogramPtr PackageSize;
 
-    NMonitoring::THistogramPtr DurationToStartCommit;
-    NMonitoring::THistogramPtr DurationToFinishCommit;
-    NMonitoring::THistogramPtr DurationToStartWriting;
-    NMonitoring::THistogramPtr DurationToTxStarted;
+    NMonitoring::THistogramPtr PreparingDuration;
+    NMonitoring::THistogramPtr WritingDuration;
+    NMonitoring::THistogramPtr CommitDuration;
+    NMonitoring::THistogramPtr PrepareReplyDuration;
 
     THashMap<TString, NMonitoring::TDynamicCounters::TCounterPtr> CodesCount;
 public:
     TUploadCounters();
 
-    void OnTxStarted(const TDuration d) const {
-        DurationToTxStarted->Collect(d.MilliSeconds());
-    }
+    class TGuard: TMoveOnly {
+    private:
+        TMonotonic Start = TMonotonic::Now();
+        std::optional<TMonotonic> WritingStarted;
+        std::optional<TMonotonic> CommitStarted;
+        std::optional<TMonotonic> CommitFinished;
+        std::optional<TMonotonic> ReplyFinished;
+        TUploadCounters& Owner;
+    public:
+        TGuard(const TMonotonic start, TUploadCounters& owner)
+            : Start(start)
+            , Owner(owner)
+        {
 
-    void OnWritingStarted(const TDuration d) const {
-        DurationToStartWriting->Collect(d.MilliSeconds());
-    }
+        }
 
-    void OnStartCommit(const TDuration d) const {
-        DurationToStartCommit->Collect(d.MilliSeconds());
-    }
+        void OnWritingStarted() {
+            WritingStarted = TMonotonic::Now();
+            Owner.PreparingDuration->Collect((*WritingStarted - Start).MilliSeconds());
+        }
+
+        void OnCommitStarted() {
+            CommitStarted = TMonotonic::Now();
+            AFL_VERIFY(WritingStarted);
+            Owner.WritingDuration->Collect((*CommitStarted - *WritingStarted).MilliSeconds());
+        }
 
-    void OnFinishCommit(const TDuration d) const {
-        DurationToFinishCommit->Collect(d.MilliSeconds());
+        void OnCommitFinished() {
+            CommitFinished = TMonotonic::Now();
+            AFL_VERIFY(CommitStarted);
+            Owner.CommitDuration->Collect((*CommitFinished - *CommitStarted).MilliSeconds());
+        }
+
+        void OnReply(const ::Ydb::StatusIds::StatusCode code) {
+            ReplyFinished = TMonotonic::Now();
+            if (CommitFinished) {
+                Owner.PrepareReplyDuration->Collect((*ReplyFinished - *CommitFinished).MilliSeconds());
+            }
+            Owner.ReplyDuration->Collect((*ReplyFinished - Start).MilliSeconds());
+
+            const TString name = ::Ydb::StatusIds::StatusCode_Name(code);
+            auto it = Owner.CodesCount.find(name);
+            Y_ABORT_UNLESS(it != Owner.CodesCount.end());
+            it->second->Add(1);
+        }
+    };
+
+    TGuard BuildGuard(const TMonotonic start) {
+        return TGuard(start, *this);
     }
 
     void OnRequest(const ui64 rowsCount) const {
@@ -76,7 +111,7 @@ public:
         PackageSize->Collect(rowsCount);
     }
 
-    void OnReply(const TDuration d, const ::Ydb::StatusIds::StatusCode code) const;
+    void OnReply(const TDuration dFull, const TDuration dDelta, const ::Ydb::StatusIds::StatusCode code) const;
 };
 
 
@@ -169,6 +204,7 @@ private:
     TActorId LeaderPipeCache;
     TDuration Timeout;
     TInstant StartTime;
+    std::optional<TInstant> StartCommitTime;
     TActorId TimeoutTimerActorId;
 
     TAutoPtr<NSchemeCache::TSchemeCacheRequest> ResolvePartitionsResult;
@@ -185,7 +221,7 @@ private:
     std::shared_ptr<NYql::TIssues> Issues = std::make_shared<NYql::TIssues>();
     NLongTxService::TLongTxId LongTxId;
     TUploadCounters UploadCounters;
-
+    TUploadCounters::TGuard UploadCountersGuard;
 protected:
     enum class EUploadSource {
         ProtoValues = 0,
@@ -237,6 +273,7 @@ public:
         , LeaderPipeCache(MakePipePerNodeCacheID(false))
         , Timeout((timeout && timeout <= DEFAULT_TIMEOUT) ? timeout : DEFAULT_TIMEOUT)
         , Status(Ydb::StatusIds::SUCCESS)
+        , UploadCountersGuard(UploadCounters.BuildGuard(TMonotonic::Now()))
         , DiskQuotaExceeded(diskQuotaExceeded)
         , Span(std::move(span))
     {}
@@ -762,7 +799,7 @@ private:
     }
 
     void WriteToColumnTable(const NActors::TActorContext& ctx) {
-        UploadCounters.OnWritingStarted(TAppData::TimeProvider->Now() - StartTime);
+        UploadCountersGuard.OnWritingStarted();
         TString accessCheckError;
         if (!CheckAccess(accessCheckError)) {
             return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, LogPrefix() << accessCheckError, ctx);
@@ -787,7 +824,6 @@ private:
 
     void Handle(NLongTxService::TEvLongTxService::TEvBeginTxResult::TPtr& ev, const TActorContext& ctx) {
         const auto* msg = ev->Get();
-        UploadCounters.OnTxStarted(TAppData::TimeProvider->Now() - StartTime);
 
         if (msg->Record.GetStatus() != Ydb::StatusIds::SUCCESS) {
             NYql::TIssues issues;
@@ -917,7 +953,7 @@ private:
     }
 
     void CommitLongTx(const TActorContext& ctx) {
-        UploadCounters.OnStartCommit(TAppData::TimeProvider->Now() - StartTime);
+        UploadCountersGuard.OnCommitStarted();
         TActorId longTxServiceId = NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId());
         ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvCommitTx(LongTxId), 0, 0, Span.GetTraceId());
         TBase::Become(&TThis::StateWaitCommitLongTx);
@@ -932,7 +968,7 @@ private:
     }
 
     void Handle(NLongTxService::TEvLongTxService::TEvCommitTxResult::TPtr& ev, const NActors::TActorContext& ctx) {
-        UploadCounters.OnFinishCommit(TAppData::TimeProvider->Now() - StartTime);
+        UploadCountersGuard.OnCommitFinished();
         const auto* msg = ev->Get();
 
         if (msg->Record.GetStatus() == Ydb::StatusIds::SUCCESS) {
@@ -1288,7 +1324,7 @@ private:
     }
 
     void ReplyWithResult(::Ydb::StatusIds::StatusCode status, const TActorContext& ctx) {
-        UploadCounters.OnReply(TAppData::TimeProvider->Now() - StartTime, status);
+        UploadCountersGuard.OnReply(status);
         SendResult(ctx, status);
 
         LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, LogPrefix() << "completed with status " << status);