mkql_block_agg_factory.h 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. #pragma once
  2. #include <yql/essentials/minikql/computation/mkql_computation_node.h>
  3. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  4. #include <yql/essentials/minikql/computation/mkql_block_builder.h>
  5. namespace NKikimr {
  6. namespace NMiniKQL {
  7. class IAggColumnBuilder {
  8. public:
  9. virtual ~IAggColumnBuilder() = default;
  10. virtual void Add(const void* state) = 0;
  11. virtual NUdf::TUnboxedValue Build() = 0;
  12. };
  13. class IBlockAggregatorBase {
  14. public:
  15. virtual ~IBlockAggregatorBase() = default;
  16. const ui32 StateSize;
  17. explicit IBlockAggregatorBase(ui32 stateSize)
  18. : StateSize(stateSize)
  19. {}
  20. virtual void DestroyState(void* state) noexcept = 0;
  21. };
  22. class IBlockAggregatorCombineAll : public IBlockAggregatorBase {
  23. public:
  24. virtual void InitState(void* state) = 0;
  25. virtual void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) = 0;
  26. virtual NUdf::TUnboxedValue FinishOne(const void* state) = 0;
  27. explicit IBlockAggregatorCombineAll(ui32 stateSize)
  28. : IBlockAggregatorBase(stateSize)
  29. {}
  30. };
  31. class IBlockAggregatorCombineKeys : public IBlockAggregatorBase {
  32. public:
  33. virtual void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) = 0;
  34. virtual void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) = 0;
  35. virtual std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) = 0;
  36. explicit IBlockAggregatorCombineKeys(ui32 stateSize)
  37. : IBlockAggregatorBase(stateSize)
  38. {}
  39. };
  40. class IBlockAggregatorFinalizeKeys : public IBlockAggregatorBase {
  41. public:
  42. virtual void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) = 0;
  43. virtual void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) = 0;
  44. virtual std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) = 0;
  45. virtual void SerializeState(void* state, NUdf::TOutputBuffer& buffer) = 0;
  46. virtual void DeserializeState(void* state, NUdf::TInputBuffer& buffer) = 0;
  47. explicit IBlockAggregatorFinalizeKeys(ui32 stateSize)
  48. : IBlockAggregatorBase(stateSize)
  49. {}
  50. };
  51. template <typename TBase>
  52. class TBlockAggregatorBase : public TBase {
  53. public:
  54. TBlockAggregatorBase(ui32 stateSize, std::optional<ui32> filterColumn, TComputationContext& ctx)
  55. : TBase(stateSize)
  56. , FilterColumn_(filterColumn)
  57. , Ctx_(ctx)
  58. {
  59. }
  60. protected:
  61. const std::optional<ui32> FilterColumn_;
  62. TComputationContext& Ctx_;
  63. };
  64. template <typename T>
  65. class IPreparedBlockAggregator {
  66. public:
  67. virtual ~IPreparedBlockAggregator() = default;
  68. virtual std::unique_ptr<T> Make(TComputationContext& ctx) const = 0;
  69. const ui32 StateSize;
  70. explicit IPreparedBlockAggregator(ui32 stateSize)
  71. : StateSize(stateSize)
  72. {}
  73. };
  74. class IBlockAggregatorFactory {
  75. public:
  76. virtual ~IBlockAggregatorFactory() = default;
  77. virtual std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorCombineAll>> PrepareCombineAll(
  78. TTupleType* tupleType,
  79. std::optional<ui32> filterColumn,
  80. const std::vector<ui32>& argsColumns,
  81. const TTypeEnvironment& env) const = 0;
  82. virtual std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorCombineKeys>> PrepareCombineKeys(
  83. TTupleType* tupleType,
  84. const std::vector<ui32>& argsColumns,
  85. const TTypeEnvironment& env) const = 0;
  86. virtual std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorFinalizeKeys>> PrepareFinalizeKeys(
  87. TTupleType* tupleType,
  88. const std::vector<ui32>& argsColumns,
  89. const TTypeEnvironment& env,
  90. TType* returnType,
  91. ui32 hint) const = 0;
  92. };
  93. const IBlockAggregatorFactory& GetBlockAggregatorFactory(TStringBuf name);
  94. struct TCombineAllTag {
  95. using TAggregator = IBlockAggregatorCombineAll;
  96. using TPreparedAggregator = IPreparedBlockAggregator<TAggregator>;
  97. using TBase = TBlockAggregatorBase<TAggregator>;
  98. };
  99. struct TCombineKeysTag {
  100. using TAggregator = IBlockAggregatorCombineKeys;
  101. using TPreparedAggregator = IPreparedBlockAggregator<TAggregator>;
  102. using TBase = TBlockAggregatorBase<TAggregator>;
  103. };
  104. struct TFinalizeKeysTag {
  105. using TAggregator = IBlockAggregatorFinalizeKeys;
  106. using TPreparedAggregator = IPreparedBlockAggregator<TAggregator>;
  107. using TBase = TBlockAggregatorBase<TAggregator>;
  108. };
  109. }
  110. }