Browse Source

Better sink data serialization for pipe

aozeritsky 1 year ago
parent
commit
aa4bbf0349

+ 1 - 1
ydb/library/yql/dq/actors/task_runner/events.h

@@ -389,7 +389,7 @@ struct TEvSinkPopFinished
     { }
 
     const ui64 Index;
-    TVector<TString> Strings;
+    NDq::TDqSerializedBatch Batch;
     TMaybe<NDqProto::TCheckpoint> Checkpoint;
     i64 Size;
     i64 CheckpointSize;

+ 0 - 2
ydb/library/yql/providers/dq/api/protos/task_command_executor.proto

@@ -75,14 +75,12 @@ message TMeteringStatsResponse {
 
 message TSinkPopRequest {
     uint64 Bytes = 1;
-    bool Raw = 2;
 };
 
 message TSinkPopResponse {
     uint64 Bytes = 1;
     TData Data = 2;
     repeated TMetric Metric = 3;
-    repeated bytes String = 4;
 }
 
 message TIsFinishedResponse {

+ 7 - 13
ydb/library/yql/providers/dq/runtime/task_command_executor.cpp

@@ -608,19 +608,13 @@ public:
                 auto bytes = sink->Pop(batch, request.GetBytes());
 
                 NDqProto::TSinkPopResponse response;
-                if (request.GetRaw()) {
-                    batch.ForEachRow([&response](const auto& value) {
-                        *response.AddString() = value.AsStringRef();
-                    });
-                } else {
-                    NDq::TDqDataSerializer dataSerializer(
-                        Runner->GetTypeEnv(),
-                        Runner->GetHolderFactory(),
-                        NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0);
-                    NDq::TDqSerializedBatch serialized = dataSerializer.Serialize(batch, outputType);
-                    YQL_ENSURE(!serialized.IsOOB());
-                    *response.MutableData() = std::move(serialized.Proto);
-                }
+                NDq::TDqDataSerializer dataSerializer(
+                    Runner->GetTypeEnv(),
+                    Runner->GetHolderFactory(),
+                    NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0);
+                NDq::TDqSerializedBatch serialized = dataSerializer.Serialize(batch, outputType);
+                YQL_ENSURE(!serialized.IsOOB());
+                *response.MutableData() = std::move(serialized.Proto);
                 response.SetBytes(bytes);
                 response.Save(&output);
                 break;

+ 2 - 6
ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp

@@ -1051,7 +1051,7 @@ public:
         return PopStats;
     }
 
-    ui64 PopString(TVector<TString>& batch, ui64 bytes) override {
+    ui64 PopString(NDq::TDqSerializedBatch& batch, ui64 bytes) override {
         try {
             NDqProto::TCommandHeader header;
             header.SetVersion(5);
@@ -1062,16 +1062,12 @@ public:
 
             NDqProto::TSinkPopRequest request;
             request.SetBytes(bytes);
-            request.SetRaw(true);
             header.Save(&Output);
 
             NDqProto::TSinkPopResponse response;
             response.Load(&Input);
 
-            for (auto& row : response.GetString()) {
-                batch.emplace_back(std::move(row));
-            }
-
+            batch.Proto = std::move(*response.MutableData());
             return response.GetBytes();
         } catch (...) {
             TaskRunner->RaiseException();

+ 1 - 1
ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h

@@ -21,7 +21,7 @@ public:
 class IStringSink: public NDq::IDqAsyncOutputBuffer {
 public:
     virtual ~IStringSink() = default;
-    virtual ui64 PopString(TVector<TString>& batch, ui64 bytes) = 0;
+    virtual ui64 PopString(NDq::TDqSerializedBatch& batch, ui64 bytes) = 0;
 };
 
 class IInputChannel : public TThrRefBase, private TNonCopyable {

+ 6 - 5
ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp

@@ -366,9 +366,10 @@ private:
     void OnSinkPopFinished(TEvSinkPopFinished::TPtr& ev) {
         auto guard = TaskRunner->BindAllocator();
         NKikimr::NMiniKQL::TUnboxedValueBatch batch;
-        for (auto& row: ev->Get()->Strings) {
-            batch.emplace_back(NKikimr::NMiniKQL::MakeString(row));
-        }
+        auto sink = TaskRunner->GetSink(ev->Get()->Index);
+        TDqDataSerializer dataSerializer(TaskRunner->GetTypeEnv(), TaskRunner->GetHolderFactory(), NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0);
+        dataSerializer.Deserialize(std::move(ev->Get()->Batch), sink->GetOutputType(), batch);
+
         Parent->SinkSend(
             ev->Get()->Index,
             std::move(batch),
@@ -390,7 +391,7 @@ private:
             try {
                 // auto guard = taskRunner->BindAllocator(); // only for local mode
                 auto sink = taskRunner->GetSink(ev->Get()->Index);
-                TVector<TString> batch;
+                NDq::TDqSerializedBatch batch;
                 NDqProto::TCheckpoint checkpoint;
                 TMaybe<NDqProto::TCheckpoint> maybeCheckpoint;
                 i64 size = 0;
@@ -408,7 +409,7 @@ private:
                 auto event = MakeHolder<TEvSinkPopFinished>(
                     ev->Get()->Index,
                     std::move(maybeCheckpoint), size, checkpointSize, finished, changed);
-                event->Strings = std::move(batch);
+                event->Batch = std::move(batch);
                 // repack data and forward
                 actorSystem->Send(
                     new IEventHandle(