mkql_squeeze_state.cpp 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. #include "mkql_squeeze_state.h"
  2. #include "mkql_saveload.h"
  3. #include <yql/essentials/minikql/mkql_string_util.h>
  4. #include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
  5. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  6. namespace NKikimr {
  7. namespace NMiniKQL {
  8. constexpr ui32 StateVersion = 1;
  9. TSqueezeState::TSqueezeState(
  10. IComputationExternalNode* item,
  11. IComputationExternalNode* state,
  12. IComputationNode* outSwitch,
  13. IComputationNode* initState,
  14. IComputationNode* updateState,
  15. IComputationExternalNode* inSave,
  16. IComputationNode* outSave,
  17. IComputationExternalNode* inLoad,
  18. IComputationNode* outLoad,
  19. const TType* stateType
  20. )
  21. : Item(item)
  22. , State(state)
  23. , Switch(outSwitch)
  24. , InitState(initState)
  25. , UpdateState(updateState)
  26. , InSave(inSave)
  27. , OutSave(outSave)
  28. , InLoad(inLoad)
  29. , OutLoad(outLoad)
  30. , StateType(stateType)
  31. {}
  32. TSqueezeState::TSqueezeState(const TSqueezeState& state)
  33. : Item(state.Item)
  34. , State(state.State)
  35. , Switch(state.Switch)
  36. , InitState(state.InitState)
  37. , UpdateState(state.UpdateState)
  38. , InSave(state.InSave)
  39. , OutSave(state.OutSave)
  40. , InLoad(state.InLoad)
  41. , OutLoad(state.OutLoad)
  42. , StateType(state.StateType)
  43. {}
  44. NUdf::TUnboxedValue TSqueezeState::Save(TComputationContext& ctx) const {
  45. TOutputSerializer out(EMkqlStateType::SIMPLE_BLOB, StateVersion, ctx);
  46. out.Write(static_cast<ui8>(Stage));
  47. if (ESqueezeState::Work == Stage) {
  48. InSave->SetValue(ctx, State->GetValue(ctx));
  49. out.WriteUnboxedValue(GetPacker(), OutSave->GetValue(ctx));
  50. }
  51. return out.MakeState();
  52. }
  53. void TSqueezeState::Load(TComputationContext& ctx, const NUdf::TStringRef& state) {
  54. TInputSerializer in(state, EMkqlStateType::SIMPLE_BLOB);
  55. const auto loadStateVersion = in.GetStateVersion();
  56. if (loadStateVersion != StateVersion) {
  57. THROW yexception() << "Invalid state version " << loadStateVersion;
  58. }
  59. Stage = static_cast<ESqueezeState>(in.Read<ui8>());
  60. if (ESqueezeState::Work == Stage) {
  61. InLoad->SetValue(ctx, in.ReadUnboxedValue(GetPacker(), ctx));
  62. State->SetValue(ctx, OutLoad->GetValue(ctx));
  63. }
  64. }
  65. const TValuePacker& TSqueezeState::GetPacker() const {
  66. if (!Packer && StateType)
  67. Packer = MakeHolder<TValuePacker>(false, StateType);
  68. return *Packer;
  69. }
  70. TSqueezeCodegenValue::TSqueezeCodegenValue(TMemoryUsageInfo* memInfo, const TSqueezeState& state, TFetchPtr fetch, TComputationContext& ctx, NUdf::TUnboxedValue&& stream)
  71. : TBase(memInfo)
  72. , FetchFunc(fetch)
  73. , Stream(std::move(stream))
  74. , Ctx(ctx)
  75. , State(state)
  76. {}
  77. ui32 TSqueezeCodegenValue::GetTraverseCount() const {
  78. return 1U;
  79. }
  80. NUdf::TUnboxedValue TSqueezeCodegenValue::GetTraverseItem(ui32) const {
  81. return Stream;
  82. }
  83. NUdf::TUnboxedValue TSqueezeCodegenValue::Save() const {
  84. return State.Save(Ctx);
  85. }
  86. void TSqueezeCodegenValue::Load(const NUdf::TStringRef& state) {
  87. State.Load(Ctx, state);
  88. }
  89. NUdf::EFetchStatus TSqueezeCodegenValue::Fetch(NUdf::TUnboxedValue& result) {
  90. if (ESqueezeState::Finished == State.Stage)
  91. return NUdf::EFetchStatus::Finish;
  92. return FetchFunc(&Ctx, static_cast<const NUdf::TUnboxedValuePod&>(Stream), result, State.Stage);
  93. }
  94. }
  95. }