aozeritsky 1 год назад
Родитель
Сommit
77631baaf4

+ 25 - 5
ydb/library/yql/providers/dq/runtime/task_command_executor.cpp

@@ -339,6 +339,9 @@ public:
 
                 NDq::TDqSerializedBatch data;
                 data.Proto.Load(&input);
+                if (data.IsOOB()) {
+                    LoadRopeFromPipe(input, data.Payload);
+                }
 
                 auto guard = Runner->BindAllocator(0); // Explicitly reset memory limit
                 channel->Push(std::move(data));
@@ -354,11 +357,14 @@ public:
                 request.Load(&input);
 
                 auto guard = Runner->BindAllocator(0); // Explicitly reset memory limit
-                NDq::TDqDataSerializer dataSerializer(Runner->GetTypeEnv(), Runner->GetHolderFactory(),
-                    NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0);
                 NKikimr::NMiniKQL::TUnboxedValueBatch buffer(source->GetInputType());
                 NDq::TDqSerializedBatch batch;
                 batch.Proto = std::move(*request.MutableData());
+                if (batch.IsOOB()) {
+                    LoadRopeFromPipe(input, batch.Payload);
+                }
+                NDq::TDqDataSerializer dataSerializer(Runner->GetTypeEnv(), Runner->GetHolderFactory(),
+                    (NDqProto::EDataTransportVersion)batch.Proto.GetTransportVersion());
                 dataSerializer.Deserialize(std::move(batch), source->GetInputType(), buffer);
 
                 source->Push(std::move(buffer), request.GetSpace());
@@ -439,7 +445,7 @@ public:
 
                 NDq::TDqSerializedBatch batch;
                 response.SetResult(channel->Pop(batch));
-                YQL_ENSURE(!batch.IsOOB());
+                bool isOOB = batch.IsOOB();
                 *response.MutableData() = std::move(batch.Proto);
                 UpdateOutputChannelStats(channel);
 
@@ -450,6 +456,9 @@ public:
 
                 response.MutableStats()->PackFrom(GetStats(taskId));
                 response.Save(&output);
+                if (isOOB) {
+                    SaveRopeToPipe(output, batch.Payload);
+                }
 
                 break;
             }
@@ -586,12 +595,15 @@ public:
                 NDq::TDqDataSerializer dataSerializer(
                     Runner->GetTypeEnv(),
                     Runner->GetHolderFactory(),
-                    NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0);
+                    DataTransportVersion);
                 NDq::TDqSerializedBatch serialized = dataSerializer.Serialize(batch, outputType);
-                YQL_ENSURE(!serialized.IsOOB());
+                bool isOOB = serialized.IsOOB();
                 *response.MutableData() = std::move(serialized.Proto);
                 response.SetBytes(bytes);
                 response.Save(&output);
+                if (isOOB) {
+                    SaveRopeToPipe(output, serialized.Payload);
+                }
                 break;
             }
             case NDqProto::TCommandHeader::SINK_STATS: {
@@ -634,6 +646,13 @@ public:
         if (!DqConfiguration->CollectCoreDumps.Get().GetOrElse(false)) {
             DontCollectDumps();
         }
+        const bool fastPickle = DqConfiguration->UseFastPickleTransport.Get().GetOrElse(TDqSettings::TDefault::UseFastPickleTransport);
+        const bool oob = DqConfiguration->UseOOBTransport.Get().GetOrElse(TDqSettings::TDefault::UseOOBTransport);
+        if (oob) {
+            DataTransportVersion = fastPickle ? NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_FAST_PICKLE_1_0 : NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_PICKLE_1_0; 
+        } else {
+            DataTransportVersion = fastPickle ? NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_FAST_PICKLE_1_0 : NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0;
+        }
         // TODO: Maybe use taskParams from task.GetTask().GetParameters()
         THashMap<TString, TString> taskParams;
         for (const auto& x: taskMeta.GetTaskParams()) {
@@ -713,6 +732,7 @@ public:
     TTaskCounters QueryStat;
     TTaskCounters PrevStat;
     TDqConfiguration::TPtr DqConfiguration = MakeIntrusive<TDqConfiguration>();
+    NDqProto::EDataTransportVersion DataTransportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED;
     TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FunctionRegistry;
     NDq::TDqTaskRunnerContext Ctx;
     const NKikimr::NMiniKQL::TUdfModuleRemappings EmptyRemappings;

+ 46 - 3
ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp

@@ -10,6 +10,7 @@
 #include <ydb/library/yql/utils/log/log.h>
 #include <ydb/library/yql/utils/backtrace/backtrace.h>
 #include <ydb/library/yql/utils/yql_panic.h>
+#include <ydb/library/yql/utils/rope_over_buffer.h>
 
 #include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
 
@@ -50,6 +51,41 @@ extern "C" int kill(int pid, int sig);
 extern "C" int waitpid(int pid, int* status, int options);
 #endif
 
+void SaveRopeToPipe(IOutputStream& output, const TRope& rope) {
+    for (const auto& [data, size] : rope) {
+        output.Write(&size, sizeof(size_t));
+        YQL_ENSURE(size != 0);
+        output.Write(data, size);
+    }
+    size_t zero = 0;
+    output.Write(&zero, sizeof(size_t));
+}
+
+namespace {
+
+void Load(IInputStream& input, void* buf, size_t size) {
+    char* p = (char*)buf;
+    while (size) {
+        auto len = input.Read(p, size);
+        p += len;
+        size -= len;
+    }
+}
+
+}
+
+void LoadRopeFromPipe(IInputStream& input, TRope& rope) {
+    size_t size;
+    do {
+        Load(input, &size, sizeof(size_t));
+        if (size) {
+            auto buffer = std::shared_ptr<char[]>(new char[size]);
+            Load(input, buffer.get(), size);            
+            rope.Insert(rope.End(), NYql::MakeReadOnlyRope(buffer, buffer.get(), size));
+        }
+    } while (size != 0);
+}
+
 class TChildProcess: private TNonCopyable {
 public:
     TChildProcess(const TString& exeName, const TVector<TString>& args, const THashMap<TString, TString>& env, const TString& workDir)
@@ -525,8 +561,10 @@ public:
         header.SetChannelId(ChannelId);
         header.Save(&Output);
 
-        YQL_ENSURE(!data.IsOOB(), "OOB Transport is not supported here");
         data.Proto.Save(&Output);
+        if (data.IsOOB()) {
+            SaveRopeToPipe(Output, data.Payload);
+        }
     }
 
     void Finish() override {
@@ -722,8 +760,8 @@ public:
     }
 
     void Push(NDq::TDqSerializedBatch&& serialized, i64 space) override {
-        YQL_ENSURE(!serialized.IsOOB());
         NDqProto::TSourcePushRequest data;
+        bool isOOB = serialized.IsOOB();
         *data.MutableData() = std::move(serialized.Proto);
         data.SetSpace(space);
 
@@ -734,6 +772,9 @@ public:
         header.SetChannelId(PushStats.InputIndex);
         header.Save(&Output);
         data.Save(&Output);
+        if (isOOB) {
+            SaveRopeToPipe(Output, serialized.Payload);
+        }
     }
 
     void Push(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) override {
@@ -826,7 +867,9 @@ public:
         response.Load(&Input);
         data.Clear();
         data.Proto = std::move(*response.MutableData());
-        YQL_ENSURE(!data.IsOOB());
+        if (data.IsOOB()) {
+            LoadRopeFromPipe(Input, data.Payload);
+        }
         return response;
     }
 

+ 3 - 0
ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h

@@ -12,6 +12,9 @@ extern const TString WorkingDirectoryParamName;
 extern const TString WorkingDirectoryDontInitParamName; // COMPAT(aozeritsky)
 extern const TString UseMetaParamName; // COMPAT(aozeritsky)
 
+void SaveRopeToPipe(IOutputStream& output, const TRope& rope);
+void LoadRopeFromPipe(IInputStream& input, TRope& rope);
+
 class IInputChannel : public TThrRefBase, private TNonCopyable {
 public:
     using TPtr = TIntrusivePtr<IInputChannel>;

+ 4 - 1
ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp

@@ -260,7 +260,10 @@ private:
         YQL_ENSURE(!batch.IsWide());
 
         auto source = TaskRunner->GetSource(index);
-        TDqDataSerializer dataSerializer(TaskRunner->GetTypeEnv(), TaskRunner->GetHolderFactory(), NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0);
+        TDqDataSerializer dataSerializer(TaskRunner->GetTypeEnv(), TaskRunner->GetHolderFactory(), 
+            // NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0
+            NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_PICKLE_1_0
+        );
         TDqSerializedBatch serialized = dataSerializer.Serialize(batch, source->GetInputType());
 
         Invoker->Invoke([serialized=std::move(serialized),taskRunner=TaskRunner, actorSystem, selfId, cookie, parentId=ParentId, space, finish, index, settings=Settings, stageId=StageId]() mutable {