Просмотр исходного кода

Close yt session out of gateway mutex (#7662)

Marina Pereskokova 7 месяцев назад
Родитель
Сommit
07b9b4ec37

+ 18 - 14
ydb/library/yql/core/facade/yql_facade.cpp

@@ -505,13 +505,6 @@ TString TProgram::GetSessionId() const {
     }
 }
 
-TString TProgram::TakeSessionId() {
-    // post-condition: SessionId_ will be empty
-    with_lock(SessionIdLock_) {
-        return std::move(SessionId_);
-    }
-}
-
 void TProgram::AddCredentials(const TVector<std::pair<TString, TCredential>>& credentials) {
     Y_ENSURE(!TypeCtx_, "TypeCtx_ already created");
 
@@ -1702,16 +1695,24 @@ NThreading::TFuture<void> TProgram::CleanupLastSession() {
 NThreading::TFuture<void> TProgram::CloseLastSession() {
     YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
 
-    TString sessionId = TakeSessionId();
-    if (sessionId.empty()) {
-        return MakeFuture();
-    }
-
     TVector<TDataProviderInfo> dataProviders;
     with_lock (DataProvidersLock_) {
         dataProviders = DataProviders_;
     }
 
+    auto promise = NThreading::NewPromise<void>();
+
+    TString sessionId;
+    with_lock(SessionIdLock_) {
+        // post-condition: SessionId_ will be empty
+        sessionId = std::move(SessionId_);
+        if (sessionId.empty()) {
+            return CloseLastSessionFuture_;
+        }
+
+        CloseLastSessionFuture_ = promise.GetFuture();
+    }
+
     TVector<NThreading::TFuture<void>> closeFutures;
     closeFutures.reserve(dataProviders.size());
     for (const auto& dp : dataProviders) {
@@ -1719,11 +1720,14 @@ NThreading::TFuture<void> TProgram::CloseLastSession() {
             dp.CloseSession(sessionId);
         }
         if (dp.CloseSessionAsync) {
-            dp.CloseSessionAsync(sessionId);
+            closeFutures.push_back(dp.CloseSessionAsync(sessionId));
         }
     }
 
-    return NThreading::WaitExceptionOrAll(closeFutures);
+    return NThreading::WaitExceptionOrAll(closeFutures)
+        .Apply([promise = std::move(promise)](const NThreading::TFuture<void>&) mutable {
+            promise.SetValue();
+        });
 }
 
 TString TProgram::ResultsAsString() const {

+ 1 - 1
ydb/library/yql/core/facade/yql_facade.h

@@ -369,7 +369,6 @@ private:
 
     bool FillParseResult(NYql::TAstParseResult&& astRes, NYql::TWarningRules* warningRules = nullptr);
     TString GetSessionId() const;
-    TString TakeSessionId();
 
     NThreading::TFuture<IGraphTransformer::TStatus> AsyncTransformWithFallback(bool applyAsyncChanges);
     void SaveExprRoot();
@@ -411,6 +410,7 @@ private:
     TExprNode::TPtr SavedExprRoot_;
     mutable TAdaptiveLock SessionIdLock_;
     TString SessionId_;
+    NThreading::TFuture<void> CloseLastSessionFuture_;
     TTypeAnnotationContextPtr TypeCtx_;
     TAutoPtr<IPlanBuilder> PlanBuilder_;
     TAutoPtr<IGraphTransformer> Transformer_;

+ 12 - 7
ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp

@@ -233,17 +233,22 @@ public:
     TFuture<void> CloseSession(TCloseSessionOptions&& options) final {
         YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__);
 
+        TSession::TPtr session;
         with_lock(Mutex_) {
             auto it = Sessions_.find(options.SessionId());
             if (it != Sessions_.end()) {
-                auto session = it->second;
+                session = it->second;
                 Sessions_.erase(it);
-                try {
-                    session->Close();
-                } catch (...) {
-                    YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage();
-                    return MakeErrorFuture<void>(std::current_exception());
-                }
+            }
+        }
+
+        // Do final destruction outside of mutex, because it may do some transaction aborts on YT clusters
+        if (session) {
+            try {
+                session->Close();
+            } catch (...) {
+                YQL_CLOG(ERROR, ProviderYt) << CurrentExceptionMessage();
+                return MakeErrorFuture<void>(std::current_exception());
             }
         }