yql_aggregate_expander.h 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. #pragma once
  2. #include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
  3. #include <yql/essentials/core/yql_opt_utils.h>
  4. #include "yql_type_annotation.h"
  5. namespace NYql {
  6. class TAggregateExpander {
  7. public:
  8. TAggregateExpander(bool usePartitionsByKeys, const bool useFinalizeByKeys, const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx,
  9. bool forceCompact = false, bool compactForDistinct = false, bool usePhases = false, bool useBlocks = false)
  10. : Node(node)
  11. , Ctx(ctx)
  12. , TypesCtx(typesCtx)
  13. , UsePartitionsByKeys(usePartitionsByKeys)
  14. , UseFinalizeByKeys(useFinalizeByKeys)
  15. , ForceCompact(forceCompact)
  16. , CompactForDistinct(compactForDistinct)
  17. , UsePhases(usePhases)
  18. , AggregatedColumns(nullptr)
  19. , VoidNode(ctx.NewCallable(node->Pos(), "Void", {}))
  20. , HaveDistinct(false)
  21. , EffectiveCompact(false)
  22. , HaveSessionSetting(false)
  23. , OriginalRowType(nullptr)
  24. , RowType(nullptr)
  25. , UseBlocks(useBlocks)
  26. {
  27. PreMap = Ctx.Builder(node->Pos())
  28. .Lambda()
  29. .Param("premap")
  30. .Callable("Just").Arg(0, "premap").Seal()
  31. .Seal().Build();
  32. SortParams = {
  33. .Key = VoidNode,
  34. .Order = VoidNode
  35. };
  36. }
  37. TExprNode::TPtr ExpandAggregate();
  38. static TExprNode::TPtr CountAggregateRewrite(const NNodes::TCoAggregate& node, TExprContext& ctx, bool useBlocks);
  39. private:
  40. using TIdxSet = std::set<ui32>;
  41. TExprNode::TPtr ExpandAggregateWithFullOutput();
  42. TExprNode::TPtr ExpandAggApply(const TExprNode::TPtr& node);
  43. bool CollectTraits();
  44. TExprNode::TPtr RebuildAggregate();
  45. TExprNode::TPtr GetContextLambda();
  46. void ProcessSessionSetting(TExprNode::TPtr sessionSetting);
  47. TVector<const TTypeAnnotationNode*> GetKeyItemTypes();
  48. bool IsNeedPickle(const TVector<const TTypeAnnotationNode*>& keyItemTypes);
  49. TExprNode::TPtr GetKeyExtractor(bool needPickle);
  50. void CollectColumnsSpecs();
  51. void BuildNothingStates();
  52. // Partial aggregate generation
  53. TExprNode::TPtr GeneratePartialAggregate(const TExprNode::TPtr keyExtractor, const TVector<const TTypeAnnotationNode*>& keyItemTypes, bool needPickle);
  54. TExprNode::TPtr GeneratePartialAggregateForNonDistinct(const TExprNode::TPtr& keyExtractor, const TExprNode::TPtr& pickleTypeNode);
  55. TExprNode::TPtr GenerateDistinctGrouper(const TExprNode::TPtr distinctField,
  56. const TVector<const TTypeAnnotationNode*>& keyItemTypes, bool needDistinctPickle);
  57. TExprNode::TPtr ReturnKeyAsIsForCombineInit(const TExprNode::TPtr& pickleTypeNode);
  58. // Post aggregate
  59. TExprNode::TPtr GeneratePostAggregate(const TExprNode::TPtr& preAgg, const TExprNode::TPtr& keyExtractor);
  60. TExprNode::TPtr GeneratePreprocessLambda(const TExprNode::TPtr& keyExtractor);
  61. TExprNode::TPtr GenerateCondenseSwitch(const TExprNode::TPtr& keyExtractor);
  62. TExprNode::TPtr BuildFinalizeByKeyLambda(const TExprNode::TPtr& preprocessLambda, const TExprNode::TPtr& keyExtractor);
  63. TExprNode::TPtr GeneratePostAggregateInitPhase();
  64. TExprNode::TPtr GeneratePostAggregateSavePhase();
  65. TExprNode::TPtr GeneratePostAggregateMergePhase();
  66. std::function<TExprNodeBuilder& (TExprNodeBuilder&)> GetPartialAggArgExtractor(ui32 i, bool deserialize);
  67. TExprNode::TPtr GetFinalAggStateExtractor(ui32 i);
  68. TExprNode::TPtr GeneratePhases();
  69. void GenerateInitForDistinct(TExprNodeBuilder& parent, ui32& ndx, const TIdxSet& indicies, const TExprNode::TPtr& distinctField);
  70. TExprNode::TPtr GenerateJustOverStates(const TExprNode::TPtr& input, const TIdxSet& indicies);
  71. TExprNode::TPtr SerializeIdxSet(const TIdxSet& indicies);
  72. TExprNode::TPtr TryGenerateBlockCombineAllOrHashed();
  73. TExprNode::TPtr TryGenerateBlockMergeFinalizeHashed();
  74. TExprNode::TPtr TryGenerateBlockCombine();
  75. TExprNode::TPtr TryGenerateBlockMergeFinalize();
  76. TExprNode::TPtr MakeInputBlocks(const TExprNode::TPtr& stream, TExprNode::TListType& keyIdxs,
  77. TVector<TString>& outputColumns, TExprNode::TListType& aggs, bool overState, bool many, ui32* streamIdxColumn = nullptr);
  78. private:
  79. static constexpr TStringBuf SessionStartMemberName = "_yql_group_session_start";
  80. const TExprNode::TPtr Node;
  81. TExprContext& Ctx;
  82. TTypeAnnotationContext& TypesCtx;
  83. bool UsePartitionsByKeys;
  84. bool UseFinalizeByKeys = false;
  85. bool ForceCompact;
  86. bool CompactForDistinct;
  87. bool UsePhases;
  88. TStringBuf Suffix;
  89. TSessionWindowParams SessionWindowParams;
  90. TExprNode::TPtr AggList;
  91. TExprNode::TListType Traits;
  92. TExprNode::TPtr KeyColumns;
  93. TExprNode::TPtr AggregatedColumns;
  94. const TExprNode::TPtr VoidNode;
  95. TMaybe<TStringBuf> SessionOutputColumn;
  96. TSortParams SortParams;
  97. bool HaveDistinct;
  98. bool EffectiveCompact;
  99. bool HaveSessionSetting;
  100. const TStructExprType* OriginalRowType;
  101. const TStructExprType* RowType;
  102. TVector<const TItemExprType*> RowItems;
  103. TExprNode::TPtr PreMap;
  104. bool UseBlocks;
  105. TExprNode::TListType InitialColumnNames;
  106. TExprNode::TListType FinalColumnNames;
  107. TExprNode::TListType DistinctFields;
  108. TExprNode::TListType NothingStates;
  109. std::unordered_map<std::string_view, TIdxSet> Distinct2Columns;
  110. TIdxSet NonDistinctColumns;
  111. std::unordered_map<std::string_view, bool> DistinctFieldNeedsPickle;
  112. std::unordered_map<std::string_view, TExprNode::TPtr> UdfSetCreate;
  113. std::unordered_map<std::string_view, TExprNode::TPtr> UdfAddValue;
  114. std::unordered_map<std::string_view, TExprNode::TPtr> UdfWasChanged;
  115. };
  116. inline TExprNode::TPtr ExpandAggregatePeepholeImpl(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx,
  117. const bool useFinalizeByKey, const bool useBlocks, const bool allowSpilling) {
  118. TAggregateExpander aggExpander(!useFinalizeByKey && !useBlocks, useFinalizeByKey, node, ctx, typesCtx,
  119. true, false, false, typesCtx.IsBlockEngineEnabled() && !allowSpilling);
  120. return aggExpander.ExpandAggregate();
  121. }
  122. TExprNode::TPtr ExpandAggregatePeephole(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx);
  123. }