Browse Source

skip some connections and bindings

hcpp 1 year ago
parent
commit
3c3e1bcfb4

+ 6 - 0
ydb/core/fq/libs/compute/ydb/events/events.h

@@ -351,6 +351,12 @@ struct TEvYdbCompute {
             , Status(status)
         {}
 
+        TEvSynchronizeResponse(const TString& scope, NYql::TIssues issues)
+            : Scope(scope)
+            , Issues(std::move(issues))
+            , Status(NYdb::EStatus::SUCCESS)
+        {}
+
         TString Scope;
         NYql::TIssues Issues;
         NYdb::EStatus Status;

+ 47 - 19
ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp

@@ -188,18 +188,24 @@ public:
         hFunc(TEvControlPlaneStorage::TEvModifyDatabaseResponse, Handle);
     )
 
-    void Handle(const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr&) {
-        CreatedConnections++;
-        if (CreatedConnections == Connections.size()) {
+    void ProcessCreateConnection() {
+        ProcessedConnections++;
+        if (ProcessedConnections == Connections.size()) {
             LOG_I("Start create external tables stage for the scope " << Scope);
             Become(&TSynchronizeScopeActor::StateCreateExternalTablesFunc);
             CreateExternalTables();
         }
     }
 
+    void Handle(const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr&) {
+        SuccessfullyCreatedConnections++;
+        ProcessCreateConnection();
+    }
+
     void Handle(const TEvControlPlaneProxy::TEvCreateConnectionResponse::TPtr& ev) {
-        LOG_E("Create external data source response (error): " << CreatedConnections << " of " << Connections.size() << ", issues = " << ev.Get()->Get()->Issues.ToOneLineString());
-        ReplyErrorAndPassAway(ev.Get()->Get()->Issues, "Сonnection creation error at the synchronization stage");
+        LOG_E("Create external data source response (error): " << ProcessedConnections << " of " << Connections.size() << ", issues = " << ev.Get()->Get()->Issues.ToOneLineString());
+        ProcessCreateConnection();
+        Issues.AddIssues(ev.Get()->Get()->Issues);
     }
 
     STRICT_STFUNC(StateCreateExternalTablesFunc,
@@ -208,16 +214,24 @@ public:
         hFunc(TEvControlPlaneStorage::TEvModifyDatabaseResponse, Handle);
     )
 
-    void Handle(const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr&) {
-        CreatedBindings++;
-        if (CreatedBindings == Bindings.size()) {
+    void ProcessCreateBinding() {
+        ProcessedBindings++;
+        if (ProcessedBindings == Bindings.size()) {
             SendFinalModifyDatabase();
         }
     }
 
+    void Handle(const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr&) {
+        SuccessfullyCreatedBindings++;
+        ProcessCreateBinding();
+    }
+
     void Handle(const TEvControlPlaneProxy::TEvCreateBindingResponse::TPtr& ev) {
-        LOG_E("Create external table response (error): " << CreatedBindings << " of " << Bindings.size() << ", issues = " << ev.Get()->Get()->Issues.ToOneLineString());
-        ReplyErrorAndPassAway(ev.Get()->Get()->Issues, "Binding creation error at the synchronization stage");
+        LOG_E("Create external table response (error): " << ProcessedBindings << " of " << Bindings.size() << ", issues = " << ev.Get()->Get()->Issues.ToOneLineString());
+
+        ProcessCreateBinding();
+
+        Issues.AddIssues(ev.Get()->Get()->Issues);
     }
 
     void Handle(const TEvControlPlaneStorage::TEvModifyDatabaseResponse::TPtr& ev) {
@@ -430,7 +444,7 @@ private:
     }
 
     void ReplyAndPassAway() {
-        Send(ParentActorId, new TEvYdbCompute::TEvSynchronizeResponse{Scope});
+        Send(ParentActorId, new TEvYdbCompute::TEvSynchronizeResponse{Scope, Issues});
         PassAway();
     }
 
@@ -492,6 +506,14 @@ private:
     }
 
     void SendFinalModifyDatabase() {
+        if (Issues) {
+            LOG_I("Synchronization has already completed with errors for scope: " << Scope);
+            Issues.AddIssue(TStringBuilder{} << "Connections created " << SuccessfullyCreatedConnections << " of " << ProcessedConnections);
+            Issues.AddIssue(TStringBuilder{} << "Bindings created " << SuccessfullyCreatedBindings << " of " << ProcessedBindings);
+            ReplyAndPassAway();
+            return;
+        }
+
         const auto& controlPlane = ComputeConfig.GetProto().GetYdb().GetControlPlane();
         switch (controlPlane.type_case()) {
             case NConfig::TYdbComputeControlPlane::TYPE_NOT_SET:
@@ -535,12 +557,15 @@ private:
 
     TString PageToken;
     TSet<TString> BindingIds;
-    uint64_t CreatedConnections = 0;
-    uint64_t CreatedBindings = 0;
+    uint64_t SuccessfullyCreatedConnections = 0;
+    uint64_t SuccessfullyCreatedBindings = 0;
+    uint64_t ProcessedConnections = 0;
+    uint64_t ProcessedBindings = 0;
     TMap<TString, FederatedQuery::Connection> Connections;
     TMap<TString, FederatedQuery::Binding> Bindings;
     NFq::NPrivate::TCounters Counters;
     std::shared_ptr<NYdb::NTable::TTableClient> Client;
+    NYql::TIssues Issues;
 };
 
 class TSynchronizatinServiceActor : public NActors::TActorBootstrapped<TSynchronizatinServiceActor> {
@@ -556,9 +581,9 @@ class TSynchronizatinServiceActor : public NActors::TActorBootstrapped<TSynchron
 
     struct TSynchtonizationCounters {
         struct TCounters : public virtual TThrRefBase  {
-            TCounters(const ::NMonitoring::TDynamicCounterPtr& counters) 
-                : SynchronizationOk(counters->GetCounter("Ok", true))
-                , SynchronizationFailed(counters->GetCounter("Failed", true))
+            TCounters(const ::NMonitoring::TDynamicCounterPtr& counters, bool derivative = true) 
+                : SynchronizationOk(counters->GetCounter("Ok", derivative))
+                , SynchronizationFailed(counters->GetCounter("Failed", derivative))
             {}
             ::NMonitoring::TDynamicCounters::TCounterPtr SynchronizationOk;
             ::NMonitoring::TDynamicCounters::TCounterPtr SynchronizationFailed;
@@ -566,7 +591,6 @@ class TSynchronizatinServiceActor : public NActors::TActorBootstrapped<TSynchron
 
         using TCountersPtr  = TIntrusivePtr<TCounters>;
         
-
         TSynchtonizationCounters(const ::NMonitoring::TDynamicCounterPtr& counters)
             : Counters(counters)
             , SubgroupCounters(Counters->GetSubgroup("step", "Synchronization"))
@@ -589,7 +613,7 @@ class TSynchronizatinServiceActor : public NActors::TActorBootstrapped<TSynchron
             if (it != CountersByScope.end()) {
                 return it->second;
             }
-            return CountersByScope[scope] = MakeIntrusive<TCounters>(SubgroupCounters->GetSubgroup("scope", scope));
+            return CountersByScope[scope] = MakeIntrusive<TCounters>(SubgroupCounters->GetSubgroup("scope", scope), false);
         }
 
     public:
@@ -669,7 +693,11 @@ public:
 
         it->second.Requests.clear();
 
-        if (ev->Get()->Status == NYdb::EStatus::SUCCESS) {
+        if (ev->Get()->Status == NYdb::EStatus::SUCCESS && ev->Get()->Issues) {
+            LOG_E("Synchronization failed (skipped some bindings and connections) for " << ev->Get()->Scope << " with issues " << ev->Get()->Issues.ToOneLineString());
+            Counters.IncFailed(ev->Get()->Scope);
+            it->second.Status = EScopeStatus::SYNCHRONIZED;
+        } else if (ev->Get()->Status == NYdb::EStatus::SUCCESS) {
             Counters.IncOk(ev->Get()->Scope);
             it->second.Status = EScopeStatus::SYNCHRONIZED;
         } else {