Browse Source

optimize statistics latency for cold start KIKIMR-18323

monster 1 year ago
parent
commit
59a0eeda03

+ 62 - 2
ydb/core/tx/schemeshard/schemeshard_impl.cpp

@@ -6825,11 +6825,60 @@ void TSchemeShard::BroadcastStatistics() {
     Send(NStat::MakeStatServiceID(leadingNodeId), broadcast.release());
 }
 
+void TSchemeShard::BroadcastStatisticsFast() {
+    LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
+        "Broadcast statistics fast"
+        << ", node count = " << StatFastBroadcastNodes.size()
+        << ", at schemeshard: " << TabletID());
+
+    if (StatFastBroadcastNodes.empty()) {
+        return;
+    }
+
+    ui32 leadingNodeId = *StatFastBroadcastNodes.begin();
+
+    auto broadcast = std::make_unique<NStat::TEvStatistics::TEvBroadcastStatistics>();
+    auto* record = broadcast->MutableRecord();
+    for (const auto& nodeId : StatFastBroadcastNodes) {
+        if (nodeId == leadingNodeId) {
+            continue;
+        }
+        record->AddNodeIds(nodeId);
+    }
+
+    broadcast->PreSerializedData = PreSerializedStatisticsMapData;
+
+    Send(NStat::MakeStatServiceID(leadingNodeId), broadcast.release());
+
+    StatFastBroadcastNodes.clear();
+}
+
+void TSchemeShard::SendStatisticsToNode(ui32 nodeId) {
+    LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
+        "Send statistics to node"
+        << ", node id = " << nodeId
+        << ", at schemeshard: " << TabletID());
+
+    auto broadcast = std::make_unique<NStat::TEvStatistics::TEvBroadcastStatistics>();
+    broadcast->PreSerializedData = PreSerializedStatisticsMapData;
+
+    Send(NStat::MakeStatServiceID(nodeId), broadcast.release());
+}
+
 void TSchemeShard::Handle(TEvPrivate::TEvProcessStatistics::TPtr&, const TActorContext& ctx) {
     GenerateStatisticsMap();
     BroadcastStatistics();
+    ctx.Schedule(TDuration::Minutes(2), new TEvPrivate::TEvProcessStatistics());
+}
 
-    ctx.Schedule(TDuration::Seconds(15), new TEvPrivate::TEvProcessStatistics());
+void TSchemeShard::Handle(TEvPrivate::TEvStatFastBroadcastCheck::TPtr&, const TActorContext& ctx) {
+    StatFastCheckInFlight = false;
+    BroadcastStatisticsFast();
+
+    if (++StatFastBroadcastCounter < STAT_OPTIMIZE_N_FIRST_NODES) {
+        ctx.Schedule(TDuration::MilliSeconds(100), new TEvPrivate::TEvStatFastBroadcastCheck());
+        StatFastCheckInFlight = true;
+    }
 }
 
 void TSchemeShard::Handle(NStat::TEvStatistics::TEvRegisterNode::TPtr& ev, const TActorContext& ctx) {
@@ -6843,7 +6892,18 @@ void TSchemeShard::Handle(NStat::TEvStatistics::TEvRegisterNode::TPtr& ev, const
         ++StatNodes[nodeId];
     }
 
-    Y_UNUSED(hasStatistics);
+    if (!hasStatistics) {
+        if (StatFastBroadcastCounter > 0) {
+            --StatFastBroadcastCounter;
+            SendStatisticsToNode(nodeId);
+        } else {
+            StatFastBroadcastNodes.insert(nodeId);
+        }
+        if (!StatFastCheckInFlight) {
+            ctx.Schedule(TDuration::MilliSeconds(100), new TEvPrivate::TEvStatFastBroadcastCheck());
+            StatFastCheckInFlight = true;
+        }
+    }
 
     LOG_DEBUG_S(ctx, NKikimrServices::STATISTICS,
         "Register node"

+ 8 - 0
ydb/core/tx/schemeshard/schemeshard_impl.h

@@ -1247,10 +1247,18 @@ public:
     std::unordered_map<ui32, size_t> StatNodes;
     std::unordered_map<TActorId, ui32> StatNodePipes;
 
+    static constexpr size_t STAT_OPTIMIZE_N_FIRST_NODES = 2;
+    size_t StatFastBroadcastCounter = STAT_OPTIMIZE_N_FIRST_NODES;
+    bool StatFastCheckInFlight = false;
+    std::unordered_set<ui32> StatFastBroadcastNodes;
+
     void Handle(TEvPrivate::TEvProcessStatistics::TPtr& ev, const TActorContext& ctx);
+    void Handle(TEvPrivate::TEvStatFastBroadcastCheck::TPtr& ev, const TActorContext& ctx);
     void Handle(NStat::TEvStatistics::TEvRegisterNode::TPtr& ev, const TActorContext& ctx);
     void GenerateStatisticsMap();
     void BroadcastStatistics();
+    void BroadcastStatisticsFast();
+    void SendStatisticsToNode(ui32 nodeId);
 
 public:
     void ChangeStreamShardsCount(i64 delta) override;

+ 4 - 0
ydb/core/tx/schemeshard/schemeshard_private.h

@@ -29,6 +29,7 @@ struct TEvPrivate {
         EvRunCdcStreamScan,
         EvPersistTopicStats,
         EvProcessStatistics,
+        EvStatFastBroadcastCheck,
         EvEnd
     };
 
@@ -183,6 +184,9 @@ struct TEvPrivate {
     struct TEvProcessStatistics: public TEventLocal<TEvProcessStatistics, EvProcessStatistics> {
     };
 
+    struct TEvStatFastBroadcastCheck: public TEventLocal<TEvStatFastBroadcastCheck, EvStatFastBroadcastCheck> {
+    };
+
 }; // TEvPrivate
 
 } // NSchemeShard