Ivan Sukhov 6 months ago
parent
commit
3acd7256c6

+ 1 - 1
ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp

@@ -284,7 +284,7 @@ private:
         IssuesFromMessage(result->Get()->Record.GetIssues(), issues);
         LOG_E("TS3ReadActor", "Error while object listing, details: TEvObjectPathReadError: " << issues.ToOneLineString());
         issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while object listing", std::move(issues));
-        Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
+        Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, result->Get()->Record.GetFatalCode()));
     }
 
     void HandleAck(TEvS3Provider::TEvAck::TPtr& ev) {

+ 9 - 7
ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp

@@ -965,6 +965,7 @@ public:
                 message = ErrorText;
             }
             Issues.AddIssues(BuildIssues(HttpResponseCode, errorCode, message));
+            FatalCode = StatusFromS3ErrorCode(errorCode);
         }
 
         if (ev->Get()->Issues) {
@@ -1115,7 +1116,7 @@ private:
             DecompressorActorId = Register(CreateS3DecompressorActor(SelfActorId, ReadSpec->Compression));
         }
 
-        NYql::NDqProto::StatusIds::StatusCode fatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR;
+        FatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR;
 
         StartCycleCount = GetCycleCountFast();
 
@@ -1123,7 +1124,7 @@ private:
             if (ReadSpec->Arrow) {
                 if (ReadSpec->Compression) {
                     Issues.AddIssue(TIssue("Blocks optimisations are incompatible with external compression"));
-                    fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
+                    FatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
                 } else {
                     try {
                         if (Url.StartsWith("file://")) {
@@ -1133,7 +1134,7 @@ private:
                         }
                     } catch (const parquet::ParquetException& ex) {
                         Issues.AddIssue(TIssue(ex.what()));
-                        fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
+                        FatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
                         RetryStuff->Cancel();
                     }
                 }
@@ -1149,7 +1150,7 @@ private:
                     LOG_CORO_D("S3 read ERROR");
                 } catch (const NDB::Exception& ex) {
                     Issues.AddIssue(TIssue(ex.message()));
-                    fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
+                    FatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
                     RetryStuff->Cancel();
                 }
             }
@@ -1162,7 +1163,7 @@ private:
             return;
         } catch (const std::exception& err) {
             Issues.AddIssue(TIssue(err.what()));
-            fatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR;
+            FatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR;
             RetryStuff->Cancel();
         }
 
@@ -1170,7 +1171,7 @@ private:
 
         auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while reading file " << Path, std::move(Issues));
         if (issues)
-            Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), fatalCode));
+            Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), FatalCode));
         else
             Send(ParentActorId, new TEvS3Provider::TEvFileFinished(PathIndex, TakeIngressDelta(), TakeCpuTimeDelta(), RetryStuff->SizeLimit));
     }
@@ -1230,6 +1231,7 @@ private:
     bool ServerReturnedError = false;
     TString ErrorText;
     TIssues Issues;
+    NYql::NDqProto::StatusIds::StatusCode FatalCode;
 
     NActors::TActorId DecompressorActorId;
     std::size_t LastOffset = 0;
@@ -1719,7 +1721,7 @@ private:
         IssuesFromMessage(result->Get()->Record.GetIssues(), issues);
         LOG_W("TS3StreamReadActor", "Error while object listing, details: TEvObjectPathReadError: " << issues.ToOneLineString());
         issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while object listing", std::move(issues));
-        Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
+        Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), result->Get()->Record.GetFatalCode()));
     }
 
     void HandleRetry(TEvS3Provider::TEvRetryEventFunc::TPtr& retry) {

+ 5 - 2
ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp

@@ -183,6 +183,7 @@ public:
         , FileSizeLimit(fileSizeLimit)
         , ReadLimit(readLimit)
         , MaybeIssues(Nothing())
+        , FatalCode(NYql::NDqProto::StatusIds::EXTERNAL_ERROR)
         , UseRuntimeListing(useRuntimeListing)
         , ConsumersCount(consumersCount)
         , BatchSizeLimit(batchSizeLimit)
@@ -302,6 +303,7 @@ public:
                                     << " and exceeds limit = " << FileSizeLimit;
                 LOG_E("TS3FileQueueActor", errorMessage);
                 MaybeIssues = TIssues{TIssue{errorMessage}};
+                FatalCode = NYql::NDqProto::StatusIds::PRECONDITION_FAILED;
                 return false;
             }
             LOG_T("TS3FileQueueActor", "SaveRetrievedResults adding path: " << object.Path);
@@ -377,7 +379,7 @@ public:
         LOG_D(
             "TS3FileQueueActor",
             "HandleGetNextBatchForErrorState Giving away rest of Objects");
-        Send(ev->Sender, new TEvS3Provider::TEvObjectPathReadError(*MaybeIssues, ev->Get()->Record.GetTransportMeta()));
+        Send(ev->Sender, new TEvS3Provider::TEvObjectPathReadError(*MaybeIssues, FatalCode, ev->Get()->Record.GetTransportMeta()));
         TryFinish(ev->Sender, ev->Get()->Record.GetTransportMeta().GetSeqNo());
     }
 
@@ -558,7 +560,7 @@ private:
                     if (!MaybeIssues.Defined()) {
                         SendObjects(consumer, requests.front());
                     } else {
-                        Send(consumer, new TEvS3Provider::TEvObjectPathReadError(*MaybeIssues, requests.front()));
+                        Send(consumer, new TEvS3Provider::TEvObjectPathReadError(*MaybeIssues, FatalCode, requests.front()));
                         TryFinish(consumer, requests.front().GetSeqNo());
                     }
                     requests.pop_front();
@@ -603,6 +605,7 @@ private:
     size_t CurrentDirectoryPathIndex = 0;
     THashMap<NActors::TActorId, TDeque<NDqProto::TMessageTransportMeta>> PendingRequests;
     TMaybe<TIssues> MaybeIssues;
+    NYql::NDqProto::StatusIds::StatusCode FatalCode;
     bool UseRuntimeListing;
     ui64 ConsumersCount;
     ui64 BatchSizeLimit;

+ 2 - 1
ydb/library/yql/providers/s3/events/events.h

@@ -100,9 +100,10 @@ struct TEvS3Provider {
 
         TEvObjectPathReadError() = default;
 
-        TEvObjectPathReadError(TIssues issues, const NDqProto::TMessageTransportMeta& transportMeta) {
+        TEvObjectPathReadError(TIssues issues, NYql::NDqProto::StatusIds::StatusCode code, const NDqProto::TMessageTransportMeta& transportMeta) {
             NYql::IssuesToMessage(issues, Record.MutableIssues());
             Record.MutableTransportMeta()->CopyFrom(transportMeta);
+            Record.SetFatalCode(code);
         }
     };
 

+ 14 - 6
ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp

@@ -133,11 +133,15 @@ TS3ListObjectV2Response ParseListObjectV2Response(
     if (const auto& root = xml.Root(); root.Name() == "Error") {
         const auto& code = root.Node("Code", true).Value<TString>();
         const auto& message = root.Node("Message", true).Value<TString>();
-        ythrow yexception() << message << ", error: code: " << code << ", request id: ["
-                            << requestId << "]";
+        const auto errorMessage = TStringBuilder{} << message << ", error: code: " << code 
+            << ", request id: [" << requestId << "]";
+        YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister::ParseListObjectV2Response] " << errorMessage;
+        throw yexception() << errorMessage;
     } else if (root.Name() != "ListBucketResult") {
-        ythrow yexception() << "Unexpected response '" << root.Name()
+        const auto errorMessage = TStringBuilder{} << "Unexpected response '" << root.Name()
                             << "' on discovery, request id: [" << requestId << "]";
+        YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister::ParseListObjectV2Response] " << errorMessage;
+        throw yexception() << errorMessage;
     } else {
         const NXml::TNamespacesForXPath nss(
             1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"});
@@ -313,7 +317,8 @@ private:
 
         auto gateway = ctx.GatewayWeak.lock();
         if (!gateway) {
-            ythrow yexception() << "Gateway disappeared";
+            YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister::SubmitRequestIntoGateway] Gateway disappeared";
+            throw yexception() << "Gateway disappeared";
         }
 
         auto sharedCtx = ctx.SharedCtx;
@@ -360,7 +365,8 @@ private:
     static void OnDiscovery(TListingContext ctx, IHTTPGateway::TResult&& result) try {
         auto gateway = ctx.GatewayWeak.lock();
         if (!gateway) {
-            ythrow yexception() << "Gateway disappeared";
+            YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister::OnDiscovery] Gateway disappeared";
+            throw yexception() << "Gateway disappeared";
         }
         if (!result.Issues) {
             auto xmlString = result.Content.Extract();
@@ -539,8 +545,10 @@ IS3Lister::TPtr MakeS3Lister(
     }
 
     if (!allowLocalFiles) {
-        ythrow yexception() << "Using local files as DataSource isn't allowed, but trying access "
+        const auto errorMessage = TStringBuilder{} << "Using local files as DataSource isn't allowed, but trying access "
                             << listingRequest.Url;
+        YQL_CLOG(DEBUG, ProviderS3) << "[IS3Lister::MakeS3Lister] " << errorMessage;
+        throw yexception() << errorMessage;
     }
     return std::make_shared<TLocalS3Lister>(listingRequest, delimiter);
 }

+ 2 - 0
ydb/library/yql/providers/s3/proto/file_queue.proto

@@ -4,6 +4,7 @@ option cc_enable_arenas = true;
 package NYql.NS3.FileQueue;
 
 import "ydb/library/yql/dq/actors/protos/dq_events.proto";
+import "ydb/library/yql/dq/actors/protos/dq_status_codes.proto";
 import "ydb/public/api/protos/ydb_issue_message.proto";
 
 message TEvUpdateConsumersCount {
@@ -29,6 +30,7 @@ message TEvObjectPathBatch {
 
 message TEvObjectPathReadError {
     repeated Ydb.Issue.IssueMessage Issues = 1;
+    optional NYql.NDqProto.StatusIds.StatusCode FatalCode = 2;
 
     optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
 }