|
@@ -199,6 +199,7 @@ public:
|
|
|
, PartitionKeyPacker(true, partitionKeyType)
|
|
|
, Parameters(parameters)
|
|
|
, Cache(cache)
|
|
|
+ , Terminating(false)
|
|
|
{}
|
|
|
bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) override {
|
|
|
MKQL_ENSURE(not DelayedRow, "Internal logic error"); //we're finalizing previous partition
|
|
@@ -219,6 +220,9 @@ public:
|
|
|
return false;
|
|
|
}
|
|
|
bool ProcessEndOfData(TComputationContext& ctx) override {
|
|
|
+ if (Terminating)
|
|
|
+ return false;
|
|
|
+ Terminating = true;
|
|
|
if (PartitionHandler) {
|
|
|
return PartitionHandler->ProcessEndOfData(ctx);
|
|
|
}
|
|
@@ -247,6 +251,9 @@ public:
|
|
|
));
|
|
|
PartitionHandler->ProcessInputRow(std::move(temp), ctx);
|
|
|
}
|
|
|
+ if (Terminating) {
|
|
|
+ return NUdf::TUnboxedValue::MakeFinish();
|
|
|
+ }
|
|
|
return NUdf::TUnboxedValue{};
|
|
|
}
|
|
|
private:
|
|
@@ -258,6 +265,7 @@ private:
|
|
|
const TMatchRecognizeProcessorParameters& Parameters;
|
|
|
const TContainerCacheOnContext& Cache;
|
|
|
NUdf::TUnboxedValue DelayedRow;
|
|
|
+ bool Terminating;
|
|
|
};
|
|
|
|
|
|
class TStateForInterleavedPartitions
|
|
@@ -375,17 +383,12 @@ public:
|
|
|
);
|
|
|
}
|
|
|
auto state = static_cast<State*>(stateValue.AsBoxed().Get());
|
|
|
- bool terminating = false;
|
|
|
while (true) {
|
|
|
if (auto output = state->GetOutputIfReady(ctx); output) {
|
|
|
return output;
|
|
|
}
|
|
|
- if (terminating) {
|
|
|
- return NUdf::TUnboxedValue::MakeFinish();
|
|
|
- }
|
|
|
auto item = InputFlow->GetValue(ctx);
|
|
|
if (item.IsFinish()) {
|
|
|
- terminating = true;
|
|
|
state->ProcessEndOfData(ctx);
|
|
|
continue;
|
|
|
} else if (item.IsSpecial()) {
|