|
@@ -355,36 +355,11 @@ public:
|
|
|
|
|
|
auto guard = Runner->BindAllocator(0); // Explicitly reset memory limit
|
|
|
NDq::TDqDataSerializer dataSerializer(Runner->GetTypeEnv(), Runner->GetHolderFactory(),
|
|
|
- NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED);
|
|
|
+ NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0);
|
|
|
NKikimr::NMiniKQL::TUnboxedValueBatch buffer(source->GetInputType());
|
|
|
- if (request.GetString().empty() && request.GetChunks() == 0) {
|
|
|
- NDq::TDqSerializedBatch batch;
|
|
|
- batch.Proto = std::move(*request.MutableData());
|
|
|
- dataSerializer.Deserialize(std::move(batch), source->GetInputType(), buffer);
|
|
|
- } else if (!request.GetString().empty()) {
|
|
|
- for (auto& row : request.GetString()) {
|
|
|
- buffer.emplace_back(NKikimr::NMiniKQL::MakeString(row));
|
|
|
- }
|
|
|
- } else {
|
|
|
- i64 chunks = request.GetChunks();
|
|
|
- for (i64 i = 0; i < chunks; i++) {
|
|
|
- NDqProto::TSourcePushChunk chunk;
|
|
|
- chunk.Load(&input);
|
|
|
- i64 parts = chunk.GetParts();
|
|
|
-
|
|
|
- if (parts == 1) {
|
|
|
- buffer.emplace_back(NKikimr::NMiniKQL::MakeString(chunk.GetString()));
|
|
|
- } else {
|
|
|
- TString str;
|
|
|
- for (i64 j = 0; j < parts; j++) {
|
|
|
- NDqProto::TSourcePushPart part;
|
|
|
- part.Load(&input);
|
|
|
- str += part.GetString();
|
|
|
- }
|
|
|
- buffer.emplace_back(NKikimr::NMiniKQL::MakeString(str));
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ NDq::TDqSerializedBatch batch;
|
|
|
+ batch.Proto = std::move(*request.MutableData());
|
|
|
+ dataSerializer.Deserialize(std::move(batch), source->GetInputType(), buffer);
|
|
|
|
|
|
source->Push(std::move(buffer), request.GetSpace());
|
|
|
UpdateSourceStats(source);
|