Browse Source

fix unexcepted events

initial
mrlolthe1st 2 years ago
parent
commit
ed241323ff

+ 2 - 3
ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp

@@ -87,7 +87,7 @@ public:
                                 std::bind(&OnResult, promise, std::placeholders::_1),
                                 {},
                                 RetryPolicy);
-            return arrow::Buffer::FromString(promise.GetFuture().GetValueSync());
+            return arrow::Buffer::FromString(std::move(promise.GetFuture().GetValueSync()));
         } catch (const std::exception& e) {
             return arrow::Status::UnknownError(e.what());
         }
@@ -104,8 +104,7 @@ public:
 private:
     static void OnResult(NThreading::TPromise<TString> promise, IHTTPGateway::TResult&& result) {
         try {
-            auto res = std::get<IHTTPGateway::TContent>(result).Extract();
-            promise.SetValue(res);
+            promise.SetValue(std::move(std::get<IHTTPGateway::TContent>(result).Extract()));
         } catch (const std::exception& e) {
             promise.SetException(std::current_exception());
         }

+ 25 - 35
ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp

@@ -865,6 +865,27 @@ private:
         }
     }
 
+    template<typename EvType>
+    void WaitEvent() {
+        auto event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvPause, TEvPrivate::TEvContinue, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>();
+        TVector<THolder<IEventBase>> otherEvents;
+        while (!event->CastAsLocal<EvType>()) {
+            if (event->CastAsLocal<NActors::TEvents::TEvPoison>()) {
+                throw TS3ReadAbort();
+            }
+            
+            if (!event->CastAsLocal<TEvPrivate::TEvPause>() && !event->CastAsLocal<TEvPrivate::TEvContinue>()) {
+                otherEvents.push_back(event->ReleaseBase());
+            }
+            
+            event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvPause, TEvPrivate::TEvContinue, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>();
+        }
+
+        for (auto& e: otherEvents) {
+            Send(SelfActorId, e.Release());
+        }
+    }
+
     void Run() final try {
 
         LOG_CORO_D("TS3ReadCoroImpl", "Run" << ", Path: " << Path);
@@ -911,26 +932,11 @@ private:
                 auto onResolve = [actorSystem, actorId = this->SelfActorId] {
                                             actorSystem->Send(new IEventHandle(actorId, actorId, new TEvPrivate::TEvFutureResolved()));
                                         };
-                auto waitForResolve = [&] {
-                                            auto event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>();
-                                            TVector<THolder<IEventBase>> otherEvents;
-                                            while (!event->CastAsLocal<TEvPrivate::TEvFutureResolved>()) {
-                                                if (event->CastAsLocal<NActors::TEvents::TEvPoison>()) {
-                                                    throw TS3ReadAbort();
-                                                }
-                                                otherEvents.push_back(event->ReleaseBase());
-                                                event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>();
-                                            }
-
-                                            for (auto &e: otherEvents) {
-                                                Send(SelfActorId, e.Release());
-                                            }
-                                        };
                 auto future = ArrowReader->GetSchema(fileDesc);
                 future.Subscribe([onResolve](const NThreading::TFuture<IArrowReader::TSchemaResponse>&) {
                                                 onResolve();
                                             });
-                waitForResolve();
+                WaitEvent<TEvPrivate::TEvFutureResolved>();
                 auto result = future.GetValue();
                 std::shared_ptr<arrow::Schema> schema = result.Schema;
                 std::vector<int> columnIndices;
@@ -965,7 +971,7 @@ private:
                                                 std::move(columnIndices),
                                                 std::move(columnConverters),
                                                 onResolve,
-                                                waitForResolve);
+                                                [&] { WaitEvent<TEvPrivate::TEvFutureResolved>(); });
                 ProcessBatches<std::shared_ptr<arrow::RecordBatch>, TEvPrivate::TEvNextRecordBatch>(reader, isLocal);
             } else {
                 std::unique_ptr<NDB::ReadBuffer> buffer;
@@ -1039,22 +1045,6 @@ private:
         auto selfActorId = SelfActorId;
         size_t cntBlocksInFly = 0;
         if (isLocal) {
-            auto waitProcessed = [&] {
-                auto event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>();
-                TVector<THolder<IEventBase>> otherEvents;
-                while (!event->CastAsLocal<TEvPrivate::TEvBlockProcessed>()) {
-                    if (event->CastAsLocal<NActors::TEvents::TEvPoison>()) {
-                        throw TS3ReadAbort();
-                    }
-                    otherEvents.push_back(event->ReleaseBase());
-                    event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>();
-                }
-
-                for (auto& e: otherEvents) {
-                    Send(SelfActorId, e.Release());
-                }
-            };
-
             for (;;) {
                 T batch;
 
@@ -1062,7 +1052,7 @@ private:
                     break;
                 }
                 if (++cntBlocksInFly > MaxBlocksInFly) {
-                    waitProcessed();
+                    WaitEvent<TEvPrivate::TEvBlockProcessed>();
                     --cntBlocksInFly;
                 }
                 Send(ParentActorId, new TEv(batch, PathIndex, [actorSystem, selfActorId]() {
@@ -1070,7 +1060,7 @@ private:
                 }, GetIngressDelta()));
             }
             while (cntBlocksInFly--) {
-                waitProcessed();
+                WaitEvent<TEvPrivate::TEvBlockProcessed>();
             }
         } else {
             for (;;) {