Browse Source

Revert "Disable coordinator if no ingress tasks (#472)" (#1083)

This reverts commit 7c308ffca4e2e378bf364c843707bbd9f434b587.
Dmitry Kardymon 1 year ago
parent
commit
4c962e715d

+ 0 - 6
ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp

@@ -103,12 +103,6 @@ void TCheckpointCoordinator::Handle(NYql::NDqs::TEvReadyState::TPtr& ev) {
         AllActorsSet.insert(actorId);
     }
 
-    CC_LOG_D("ActorsToTrigger count: " << ActorsToTrigger.size() << ", ActorsToNotify count: " << ActorsToNotify.size() << ", ActorsToWaitFor count: " << ActorsToWaitFor.size());
-
-    if (ActorsToTrigger.empty()) {
-        CC_LOG_D("No ingress tasks, coordinator was disabled");
-        return;
-    }
     PendingInit = std::make_unique<TPendingInitCoordinator>(AllActors.size());
 
     CC_LOG_D("Send TEvRegisterCoordinatorRequest");

+ 10 - 21
ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp

@@ -19,7 +19,7 @@ enum ETestGraphFlags : ui64 {
     SourceWithChannelInOneTask = 2,
 };
 
-NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags, const TString& sourceType) {
+NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags = 0) {
 
     NYql::NDqProto::TReadyState result;
 
@@ -29,7 +29,7 @@ NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags, const TString& sourceType
     ingressOutput->AddChannels();
     if (flags & ETestGraphFlags::InputWithSource) {
         auto* source = ingress->AddInputs()->MutableSource();
-        source->SetType(sourceType);
+        source->SetType("PqSource");
     }
 
     auto* map = result.AddTask();
@@ -40,7 +40,7 @@ NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags, const TString& sourceType
     mapOutput->AddChannels();
     if (flags & ETestGraphFlags::SourceWithChannelInOneTask) {
         auto* source = map->AddInputs()->MutableSource();
-        source->SetType(sourceType);
+        source->SetType("PqSource");
     }
 
     auto* egress = result.AddTask();
@@ -70,9 +70,9 @@ struct TTestBootstrap : public TTestActorRuntime {
 
     ::NMonitoring::TDynamicCounterPtr Counters = new ::NMonitoring::TDynamicCounters();
 
-    explicit TTestBootstrap(ui64 graphFlags, const TString& sourceType)
+    explicit TTestBootstrap(ui64 graphFlags = 0)
         : TTestActorRuntime(true)
-        , GraphState(BuildTestGraph(graphFlags, sourceType))
+        , GraphState(BuildTestGraph(graphFlags))
         , CoordinatorId("my-graph-id", 42)
         , CheckpointId(CoordinatorId.Generation, 1)
     {
@@ -281,8 +281,8 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
     class CheckpointsTestHelper : public TTestBootstrap
     {
     public:
-        CheckpointsTestHelper(ui64 graphFlags, const TString& sourceType)
-            : TTestBootstrap(graphFlags, sourceType) {
+        CheckpointsTestHelper(ui64 graphFlags)
+            : TTestBootstrap(graphFlags) {
         }
         
         void InjectCheckpoint() {
@@ -372,33 +372,22 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
     };
 
     Y_UNIT_TEST(ShouldTriggerCheckpointWithSource) {
-        CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, "PqSource");
+        CheckpointsTestHelper test(ETestGraphFlags::InputWithSource);
         test.InjectCheckpoint();
         test.AllSavedAndCommited();
     }
 
     Y_UNIT_TEST(ShouldTriggerCheckpointWithSourcesAndWithChannel) {
-        CheckpointsTestHelper test(ETestGraphFlags::InputWithSource | ETestGraphFlags::SourceWithChannelInOneTask, "PqSource");
+        CheckpointsTestHelper test(ETestGraphFlags::InputWithSource | ETestGraphFlags::SourceWithChannelInOneTask);
         test.InjectCheckpoint();
         test.AllSavedAndCommited();
     }
 
     Y_UNIT_TEST(ShouldAbortPreviousCheckpointsIfNodeStateCantBeSaved) {
-        CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, "PqSource");
+        CheckpointsTestHelper test(ETestGraphFlags::InputWithSource);
         test.InjectCheckpoint();
         test.SaveFailed();
     }
-
-    Y_UNIT_TEST(ShouldDoNothingIfNoIngressTasks) {
-        CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, "S3Source");
-        bool empty = false;   
-        try {
-            test.GrabEdgeEvent<TEvCheckpointStorage::TEvRegisterCoordinatorRequest>(test.StorageProxy, TDuration::Seconds(10));
-        } catch (TEmptyEventQueueException&) {
-            empty = true;
-        }
-        UNIT_ASSERT(empty);
-    }
 }
 
 } // namespace NFq