123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- #include "mkql_mapnext.h"
- #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
- namespace NKikimr {
- namespace NMiniKQL {
- namespace {
- struct TState : public TComputationValue<TState> {
- using TComputationValue::TComputationValue;
- std::optional<NUdf::TUnboxedValue> Prev;
- bool Finish = false;
- };
- class TFlowMapNextWrapper : public TStatefulFlowComputationNode<TFlowMapNextWrapper> {
- typedef TStatefulFlowComputationNode<TFlowMapNextWrapper> TBaseComputation;
- public:
- TFlowMapNextWrapper(TComputationMutables& mutables, EValueRepresentation kind, IComputationNode* flow,
- IComputationExternalNode* item, IComputationExternalNode* nextItem, IComputationNode* newItem)
- : TBaseComputation(mutables, flow, kind, EValueRepresentation::Any)
- , Flow(flow)
- , Item(item)
- , NextItem(nextItem)
- , NewItem(newItem)
- {}
- NUdf::TUnboxedValue DoCalculate(NUdf::TUnboxedValue& stateValue, TComputationContext& ctx) const {
- if (!stateValue.HasValue()) {
- stateValue = ctx.HolderFactory.Create<TState>();
- }
- TState& state = *static_cast<TState*>(stateValue.AsBoxed().Get());
- NUdf::TUnboxedValue result;
- for (;;) {
- if (state.Finish) {
- if (!state.Prev) {
- return NUdf::TUnboxedValuePod::MakeFinish();
- }
- Item->SetValue(ctx, std::move(*state.Prev));
- state.Prev.reset();
- NextItem->SetValue(ctx, NUdf::TUnboxedValuePod());
- return NewItem->GetValue(ctx);
- }
- auto item = Flow->GetValue(ctx);
- if (item.IsYield()) {
- return item;
- }
- if (item.IsFinish()) {
- state.Finish = true;
- continue;
- }
- if (!state.Prev) {
- state.Prev = std::move(item);
- continue;
- }
- Item->SetValue(ctx, std::move(*state.Prev));
- state.Prev = item;
- NextItem->SetValue(ctx, std::move(item));
- result = NewItem->GetValue(ctx);
- break;
- }
- return result;
- }
- private:
- void RegisterDependencies() const final {
- if (const auto flow = FlowDependsOn(Flow)) {
- Own(flow, Item);
- Own(flow, NextItem);
- DependsOn(flow, NewItem);
- }
- }
- IComputationNode* const Flow;
- IComputationExternalNode* const Item;
- IComputationExternalNode* const NextItem;
- IComputationNode* const NewItem;
- };
- class TStreamMapNextWrapper : public TMutableComputationNode<TStreamMapNextWrapper> {
- typedef TMutableComputationNode<TStreamMapNextWrapper> TBaseComputation;
- public:
- TStreamMapNextWrapper(TComputationMutables& mutables, IComputationNode* stream,
- IComputationExternalNode* item, IComputationExternalNode* nextItem, IComputationNode* newItem)
- : TBaseComputation(mutables)
- , Stream(stream)
- , Item(item)
- , NextItem(nextItem)
- , NewItem(newItem)
- , StateIndex(mutables.CurValueIndex++)
- {}
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- return ctx.HolderFactory.Create<TStreamValue>(ctx, Stream->GetValue(ctx), Item, NextItem, NewItem, StateIndex);
- }
- private:
- void RegisterDependencies() const final {
- DependsOn(Stream);
- Own(Item);
- Own(NextItem);
- DependsOn(NewItem);
- }
- class TStreamValue : public TComputationValue<TStreamValue> {
- public:
- using TBase = TComputationValue<TStreamValue>;
- TStreamValue(TMemoryUsageInfo* memInfo, TComputationContext& compCtx, NUdf::TUnboxedValue&& stream,
- IComputationExternalNode* item, IComputationExternalNode* nextItem, IComputationNode* newItem, ui32 stateIndex)
- : TBase(memInfo)
- , CompCtx(compCtx)
- , Stream(std::move(stream))
- , Item(item)
- , NextItem(nextItem)
- , NewItem(newItem)
- , StateIndex(stateIndex)
- {
- }
- private:
- ui32 GetTraverseCount() const final {
- return 1U;
- }
- NUdf::TUnboxedValue GetTraverseItem(ui32) const final {
- return Stream;
- }
- NUdf::TUnboxedValue Save() const final {
- return NUdf::TUnboxedValuePod::Zero();
- }
- void Load(const NUdf::TStringRef&) final {}
- NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final {
- auto& state = GetState();
- for (;;) {
- if (state.Finish) {
- if (!state.Prev) {
- return NUdf::EFetchStatus::Finish;
- }
- Item->SetValue(CompCtx, std::move(*state.Prev));
- state.Prev.reset();
- NextItem->SetValue(CompCtx, NUdf::TUnboxedValuePod());
- result = NewItem->GetValue(CompCtx);
- return NUdf::EFetchStatus::Ok;
- }
- NUdf::TUnboxedValue item;
- const auto status = Stream.Fetch(item);
- if (status == NUdf::EFetchStatus::Yield) {
- return status;
- }
- if (status == NUdf::EFetchStatus::Finish) {
- state.Finish = true;
- continue;
- }
- if (!state.Prev) {
- state.Prev = std::move(item);
- continue;
- }
- Item->SetValue(CompCtx, std::move(*state.Prev));
- state.Prev = item;
- NextItem->SetValue(CompCtx, std::move(item));
- result = NewItem->GetValue(CompCtx);
- break;
- }
- return NUdf::EFetchStatus::Ok;
- }
- TState& GetState() const {
- auto& result = CompCtx.MutableValues[StateIndex];
- if (!result.HasValue()) {
- result = CompCtx.HolderFactory.Create<TState>();
- }
- return *static_cast<TState*>(result.AsBoxed().Get());
- }
- TComputationContext& CompCtx;
- const NUdf::TUnboxedValue Stream;
- IComputationExternalNode* const Item;
- IComputationExternalNode* const NextItem;
- IComputationNode* const NewItem;
- const ui32 StateIndex;
- };
- IComputationNode* const Stream;
- IComputationExternalNode* const Item;
- IComputationExternalNode* const NextItem;
- IComputationNode* const NewItem;
- const ui32 StateIndex;
- };
- }
- IComputationNode* WrapMapNext(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args, got " << callable.GetInputsCount());
- const auto type = callable.GetType()->GetReturnType();
- const auto input = LocateNode(ctx.NodeLocator, callable, 0);
- const auto itemArg = LocateExternalNode(ctx.NodeLocator, callable, 1);
- const auto nextItemArg = LocateExternalNode(ctx.NodeLocator, callable, 2);
- const auto newItem = LocateNode(ctx.NodeLocator, callable, 3);
- if (type->IsFlow()) {
- return new TFlowMapNextWrapper(ctx.Mutables, GetValueRepresentation(type), input, itemArg, nextItemArg, newItem);
- } else if (type->IsStream()) {
- return new TStreamMapNextWrapper(ctx.Mutables, input, itemArg, nextItemArg, newItem);
- }
- THROW yexception() << "Expected flow or stream.";
- }
- }
- }
|