aggregation.cpp 41 KB


  1. #include "node.h"
  2. #include "context.h"
  3. #include <yql/essentials/ast/yql_type_string.h>
  4. #include <library/cpp/charset/ci_string.h>
  5. #include <util/string/builder.h>
  6. #include <util/string/cast.h>
  7. #include <array>
  8. using namespace NYql;
  9. namespace NSQLTranslationV0 {
  10. class TAggregationFactory : public IAggregation {
  11. public:
  12. TAggregationFactory(TPosition pos, const TString& name, const TString& func, EAggregateMode aggMode, bool multi = false)
  13. : IAggregation(pos, name, func, aggMode), Factory(!func.empty() ?
  14. BuildBind(Pos, aggMode == EAggregateMode::OverWindow ? "window_module" : "aggregate_module", func) : nullptr),
  15. DynamicFactory(!Factory), Multi(multi)
  16. {
  17. if (!Factory) {
  18. FakeSource = BuildFakeSource(pos);
  19. }
  20. }
  21. protected:
  22. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) override {
  23. ui32 expectedArgs = !Factory ? 2 : (isFactory ? 0 : 1);
  24. if (!Factory) {
  25. YQL_ENSURE(!isFactory);
  26. }
  27. if (expectedArgs != exprs.size()) {
  28. ctx.Error(Pos) << "Aggregation function " << (isFactory ? "factory " : "") << Name
  29. << " requires exactly " << expectedArgs << " argument(s), given: " << exprs.size();
  30. return false;
  31. }
  32. if (!Factory) {
  33. Factory = exprs[1];
  34. }
  35. if (!isFactory) {
  36. Expr = exprs.front();
  37. Name = src->MakeLocalName(Name);
  38. }
  39. if (!Init(ctx, src)) {
  40. return false;
  41. }
  42. if (!isFactory) {
  43. node.Add("Member", "row", Q(Name));
  44. }
  45. return true;
  46. }
  47. TNodePtr AggregationTraitsFactory() const override {
  48. return Factory;
  49. }
  50. TNodePtr GetApply(const TNodePtr& type) const override {
  51. if (!Multi) {
  52. return Y("Apply", Factory, (DynamicFactory ? Y("ListItemType", type) : type),
  53. BuildLambda(Pos, Y("row"), Y("EnsurePersistable", Expr)));
  54. }
  55. return Y("MultiAggregate",
  56. Y("ListItemType", type),
  57. BuildLambda(Pos, Y("row"), Y("EnsurePersistable", Expr)),
  58. Factory);
  59. }
  60. bool DoInit(TContext& ctx, ISource* src) override {
  61. if (!Expr) {
  62. return true;
  63. }
  64. ctx.PushBlockShortcuts();
  65. if (!Expr->Init(ctx, src)) {
  66. return false;
  67. }
  68. if (Expr->IsAggregated() && !Expr->IsAggregationKey() && !IsOverWindow()) {
  69. ctx.Error(Pos) << "Aggregation of aggregated values is forbidden for non window functions";
  70. return false;
  71. }
  72. if (AggMode == EAggregateMode::Distinct) {
  73. const auto column = Expr->GetColumnName();
  74. if (!column) {
  75. ctx.Error(Expr->GetPos()) << "DISTINCT qualifier may only be used with column references";
  76. return false;
  77. }
  78. DistinctKey = *column;
  79. YQL_ENSURE(src);
  80. if (src->GetJoin()) {
  81. const auto sourcePtr = Expr->GetSourceName();
  82. if (!sourcePtr || !*sourcePtr) {
  83. if (!src->IsGroupByColumn(DistinctKey)) {
  84. ctx.Error(Expr->GetPos()) << ErrorDistinctWithoutCorrelation(DistinctKey);
  85. return false;
  86. }
  87. } else {
  88. DistinctKey = DotJoin(*sourcePtr, DistinctKey);
  89. }
  90. }
  91. if (src->IsGroupByColumn(DistinctKey)) {
  92. ctx.Error(Expr->GetPos()) << ErrorDistinctByGroupKey(DistinctKey);
  93. return false;
  94. }
  95. Expr = AstNode("row");
  96. ctx.PopBlockShortcuts();
  97. } else {
  98. Expr = ctx.GroundBlockShortcutsForExpr(Expr);
  99. }
  100. if (FakeSource) {
  101. ctx.PushBlockShortcuts();
  102. if (!Factory->Init(ctx, FakeSource.Get())) {
  103. return false;
  104. }
  105. Factory = ctx.GroundBlockShortcutsForExpr(Factory);
  106. if (AggMode == EAggregateMode::OverWindow) {
  107. Factory = BuildLambda(Pos, Y("type", "extractor"), Y("block", Q(Y(
  108. Y("let", "x", Y("Apply", Factory, "type", "extractor")),
  109. Y("return", Y("WindowTraits",
  110. Y("NthArg", Q("0"), "x"),
  111. Y("NthArg", Q("1"), "x"),
  112. Y("NthArg", Q("2"), "x"),
  113. BuildLambda(Pos, Y("value", "state"), Y("Void")),
  114. Y("NthArg", Q("6"), "x"),
  115. Y("NthArg", Q("7"), "x")
  116. ))
  117. ))));
  118. }
  119. }
  120. return true;
  121. }
  122. TNodePtr Factory;
  123. TSourcePtr FakeSource;
  124. TNodePtr Expr;
  125. bool DynamicFactory;
  126. bool Multi;
  127. };
  128. class TAggregationFactoryImpl final : public TAggregationFactory {
  129. public:
  130. TAggregationFactoryImpl(TPosition pos, const TString& name, const TString& func, EAggregateMode aggMode, bool multi)
  131. : TAggregationFactory(pos, name, func, aggMode, multi)
  132. {}
  133. private:
  134. TNodePtr DoClone() const final {
  135. return new TAggregationFactoryImpl(Pos, Name, Func, AggMode, Multi);
  136. }
  137. };
  138. class TAggregationFactoryWinAutoargImpl final : public TAggregationFactory {
  139. public:
  140. TAggregationFactoryWinAutoargImpl(TPosition pos, const TString& name, const TString& func, EAggregateMode aggMode)
  141. : TAggregationFactory(pos, name, func, aggMode)
  142. {}
  143. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) override {
  144. Y_UNUSED(isFactory);
  145. if (!IsOverWindow()) {
  146. ctx.Error(Pos) << "Expected aggregation function: " << GetName() << " only as window function. You may have forgotten OVER instruction.";
  147. return false;
  148. }
  149. TVector<TNodePtr> exprsAuto;
  150. if (!exprs) {
  151. auto winNamePtr = src->GetWindowName();
  152. YQL_ENSURE(winNamePtr);
  153. auto winSpecPtr = src->FindWindowSpecification(ctx, *winNamePtr);
  154. if (!winSpecPtr) {
  155. return false;
  156. }
  157. const auto& orderSpec = winSpecPtr->OrderBy;
  158. if (!orderSpec) {
  159. ctx.Warning(Pos, TIssuesIds::YQL_AGGREGATE_BY_WIN_FUNC_WITHOUT_ORDER_BY) <<
  160. "Expected ORDER BY specification for window: '" << *winNamePtr <<
  161. "' used in aggregation function: '" << GetName() <<
  162. " You may have forgotten to ORDER BY in WINDOW specification or choose the wrong WINDOW.";
  163. }
  164. for (const auto& spec: orderSpec) {
  165. exprsAuto.push_back(spec->OrderExpr);
  166. }
  167. if (exprsAuto.size() > 1) {
  168. exprsAuto = {BuildTuple(GetPos(), exprsAuto)};
  169. }
  170. }
  171. return TAggregationFactory::InitAggr(ctx, isFactory, src, node, exprsAuto ? exprsAuto : exprs);
  172. }
  173. private:
  174. TNodePtr DoClone() const final {
  175. return new TAggregationFactoryWinAutoargImpl(Pos, Name, Func, AggMode);
  176. }
  177. };
  178. TAggregationPtr BuildFactoryAggregation(TPosition pos, const TString& name, const TString& func, EAggregateMode aggMode, bool multi) {
  179. return new TAggregationFactoryImpl(pos, name, func, aggMode, multi);
  180. }
  181. TAggregationPtr BuildFactoryAggregationWinAutoarg(TPosition pos, const TString& name, const TString& func, EAggregateMode aggMode) {
  182. return new TAggregationFactoryWinAutoargImpl(pos, name, func, aggMode);
  183. }
  184. class TKeyPayloadAggregationFactory final : public TAggregationFactory {
  185. public:
  186. TKeyPayloadAggregationFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  187. : TAggregationFactory(pos, name, factory, aggMode)
  188. {}
  189. private:
  190. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  191. ui32 adjustArgsCount = isFactory ? 0 : 2;
  192. if (exprs.size() < adjustArgsCount || exprs.size() > 1 + adjustArgsCount) {
  193. ctx.Error(Pos) << "Aggregation function " << (isFactory ? "factory " : "") << Name << " requires "
  194. << adjustArgsCount << " or " << (1 + adjustArgsCount) << " arguments, given: " << exprs.size();
  195. return false;
  196. }
  197. if (!isFactory) {
  198. Payload = exprs.front();
  199. Key = exprs[1];
  200. }
  201. Limit = (1 + adjustArgsCount == exprs.size() ? exprs.back() : Y("Void"));
  202. if (!isFactory) {
  203. Name = src->MakeLocalName(Name);
  204. }
  205. if (!Init(ctx, src)) {
  206. return false;
  207. }
  208. if (!isFactory) {
  209. node.Add("Member", "row", Q(Name));
  210. }
  211. return true;
  212. }
  213. TNodePtr DoClone() const final {
  214. return new TKeyPayloadAggregationFactory(Pos, Name, Func, AggMode);
  215. }
  216. TNodePtr GetApply(const TNodePtr& type) const final {
  217. auto apply = Y("Apply", Factory, type, BuildLambda(Pos, Y("row"), Key), BuildLambda(Pos, Y("row"), Payload));
  218. AddFactoryArguments(apply);
  219. return apply;
  220. }
  221. void AddFactoryArguments(TNodePtr& apply) const final {
  222. apply = L(apply, Limit);
  223. }
  224. std::vector<ui32> GetFactoryColumnIndices() const final {
  225. return {1u, 0u};
  226. }
  227. bool DoInit(TContext& ctx, ISource* src) final {
  228. if (!Key) {
  229. return true;
  230. }
  231. ctx.PushBlockShortcuts();
  232. if (!Key->Init(ctx, src)) {
  233. return false;
  234. }
  235. Key = ctx.GroundBlockShortcutsForExpr(Key);
  236. ctx.PushBlockShortcuts();
  237. if (!Payload->Init(ctx, src)) {
  238. return false;
  239. }
  240. Payload = ctx.GroundBlockShortcutsForExpr(Payload);
  241. if (!Limit->Init(ctx, src)) {
  242. return false;
  243. }
  244. if (Key->IsAggregated()) {
  245. ctx.Error(Pos) << "Aggregation of aggregated values is forbidden";
  246. return false;
  247. }
  248. return true;
  249. }
  250. TNodePtr Key, Payload, Limit;
  251. };
  252. TAggregationPtr BuildKeyPayloadFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  253. return new TKeyPayloadAggregationFactory(pos, name, factory, aggMode);
  254. }
  255. class TPayloadPredicateAggregationFactory final : public TAggregationFactory {
  256. public:
  257. TPayloadPredicateAggregationFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  258. : TAggregationFactory(pos, name, factory, aggMode)
  259. {}
  260. private:
  261. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  262. ui32 adjustArgsCount = isFactory ? 0 : 2;
  263. if (exprs.size() != adjustArgsCount) {
  264. ctx.Error(Pos) << "Aggregation function " << (isFactory ? "factory " : "") << Name << " requires " <<
  265. adjustArgsCount << " arguments, given: " << exprs.size();
  266. return false;
  267. }
  268. if (!isFactory) {
  269. Payload = exprs.front();
  270. Predicate = exprs.back();
  271. Name = src->MakeLocalName(Name);
  272. }
  273. if (!Init(ctx, src)) {
  274. return false;
  275. }
  276. if (!isFactory) {
  277. node.Add("Member", "row", Q(Name));
  278. }
  279. return true;
  280. }
  281. TNodePtr DoClone() const final {
  282. return new TPayloadPredicateAggregationFactory(Pos, Name, Func, AggMode);
  283. }
  284. TNodePtr GetApply(const TNodePtr& type) const final {
  285. return Y("Apply", Factory, type, BuildLambda(Pos, Y("row"), Payload), BuildLambda(Pos, Y("row"), Predicate));
  286. }
  287. std::vector<ui32> GetFactoryColumnIndices() const final {
  288. return {0u, 1u};
  289. }
  290. bool DoInit(TContext& ctx, ISource* src) final {
  291. if (!Predicate) {
  292. return true;
  293. }
  294. ctx.PushBlockShortcuts();
  295. if (!Predicate->Init(ctx, src)) {
  296. return false;
  297. }
  298. Predicate = ctx.GroundBlockShortcutsForExpr(Predicate);
  299. ctx.PushBlockShortcuts();
  300. if (!Payload->Init(ctx, src)) {
  301. return false;
  302. }
  303. Payload = ctx.GroundBlockShortcutsForExpr(Payload);
  304. if (Payload->IsAggregated()) {
  305. ctx.Error(Pos) << "Aggregation of aggregated values is forbidden";
  306. return false;
  307. }
  308. return true;
  309. }
  310. TNodePtr Payload, Predicate;
  311. };
  312. TAggregationPtr BuildPayloadPredicateFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  313. return new TPayloadPredicateAggregationFactory(pos, name, factory, aggMode);
  314. }
  315. class TTwoArgsAggregationFactory final : public TAggregationFactory {
  316. public:
  317. TTwoArgsAggregationFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  318. : TAggregationFactory(pos, name, factory, aggMode)
  319. {}
  320. private:
  321. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  322. ui32 adjustArgsCount = isFactory ? 0 : 2;
  323. if (exprs.size() != adjustArgsCount) {
  324. ctx.Error(Pos) << "Aggregation function " << (isFactory ? "factory " : "") << Name << " requires " <<
  325. adjustArgsCount << " arguments, given: " << exprs.size();
  326. return false;
  327. }
  328. if (!isFactory) {
  329. One = exprs.front();
  330. Two = exprs.back();
  331. Name = src->MakeLocalName(Name);
  332. }
  333. if (!Init(ctx, src)) {
  334. return false;
  335. }
  336. if (!isFactory) {
  337. node.Add("Member", "row", Q(Name));
  338. }
  339. return true;
  340. }
  341. TNodePtr DoClone() const final {
  342. return new TTwoArgsAggregationFactory(Pos, Name, Func, AggMode);
  343. }
  344. TNodePtr GetApply(const TNodePtr& type) const final {
  345. auto tuple = Q(Y(One, Two));
  346. return Y("Apply", Factory, type, BuildLambda(Pos, Y("row"), tuple));
  347. }
  348. bool DoInit(TContext& ctx, ISource* src) final {
  349. if (!One) {
  350. return true;
  351. }
  352. ctx.PushBlockShortcuts();
  353. if (!One->Init(ctx, src)) {
  354. return false;
  355. }
  356. One = ctx.GroundBlockShortcutsForExpr(One);
  357. ctx.PushBlockShortcuts();
  358. if (!Two->Init(ctx, src)) {
  359. return false;
  360. }
  361. Two = ctx.GroundBlockShortcutsForExpr(Two);
  362. if ((One->IsAggregated() || Two->IsAggregated()) && !IsOverWindow()) {
  363. ctx.Error(Pos) << "Aggregation of aggregated values is forbidden";
  364. return false;
  365. }
  366. return true;
  367. }
  368. TNodePtr One, Two;
  369. };
  370. TAggregationPtr BuildTwoArgsFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  371. return new TTwoArgsAggregationFactory(pos, name, factory, aggMode);
  372. }
  373. class THistogramAggregationFactory final : public TAggregationFactory {
  374. public:
  375. THistogramAggregationFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  376. : TAggregationFactory(pos, name, factory, aggMode)
  377. , FakeSource(BuildFakeSource(pos))
  378. , Weight(Y("Double", Q("1.0")))
  379. , Intervals(Y("Uint32", Q("100")))
  380. {}
  381. private:
  382. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  383. if (isFactory) {
  384. if (exprs.size() > 1) {
  385. ctx.Error(Pos) << "Aggregation function factory " << Name << " requires 0 or 1 argument(s), given: " << exprs.size();
  386. return false;
  387. }
  388. } else {
  389. if (exprs.empty() || exprs.size() > 3) {
  390. ctx.Error(Pos) << "Aggregation function " << Name << " requires one, two or three arguments, given: " << exprs.size();
  391. return false;
  392. }
  393. }
  394. if (!isFactory) {
  395. /// \todo: solve it with named arguments
  396. const auto integer = exprs.back()->IsIntegerLiteral();
  397. switch (exprs.size()) {
  398. case 2U:
  399. if (!integer) {
  400. Weight = exprs.back();
  401. }
  402. break;
  403. case 3U:
  404. if (!integer) {
  405. ctx.Error(Pos) << "Aggregation function " << Name << " for case with 3 argument should have second interger argument";
  406. return false;
  407. }
  408. Weight = exprs[1];
  409. break;
  410. }
  411. if (exprs.size() >= 2 && integer) {
  412. Intervals = Y("Cast", exprs.back(), Q("Uint32"));
  413. }
  414. } else {
  415. if (exprs.size() >= 1) {
  416. const auto integer = exprs.back()->IsIntegerLiteral();
  417. if (!integer) {
  418. ctx.Error(Pos) << "Aggregation function factory " << Name << " should have second interger argument";
  419. return false;
  420. }
  421. Intervals = Y("Cast", exprs.back(), Q("Uint32"));
  422. }
  423. }
  424. return TAggregationFactory::InitAggr(ctx, isFactory, src, node, isFactory ? TVector<TNodePtr>() : TVector<TNodePtr>(1, exprs.front()));
  425. }
  426. TNodePtr DoClone() const final {
  427. return new THistogramAggregationFactory(Pos, Name, Func, AggMode);
  428. }
  429. TNodePtr GetApply(const TNodePtr& type) const final {
  430. auto apply = Y("Apply", Factory, type, BuildLambda(Pos, Y("row"), Expr), BuildLambda(Pos, Y("row"), Weight));
  431. AddFactoryArguments(apply);
  432. return apply;
  433. }
  434. void AddFactoryArguments(TNodePtr& apply) const final {
  435. apply = L(apply, Intervals);
  436. }
  437. std::vector<ui32> GetFactoryColumnIndices() const final {
  438. return {0u, 1u};
  439. }
  440. bool DoInit(TContext& ctx, ISource* src) final {
  441. ctx.PushBlockShortcuts();
  442. if (!Weight->Init(ctx, src)) {
  443. return false;
  444. }
  445. Weight = ctx.GroundBlockShortcutsForExpr(Weight);
  446. ctx.PushBlockShortcuts();
  447. if (!Intervals->Init(ctx, FakeSource.Get())) {
  448. return false;
  449. }
  450. Intervals = ctx.GroundBlockShortcutsForExpr(Intervals);
  451. return TAggregationFactory::DoInit(ctx, src);
  452. }
  453. TSourcePtr FakeSource;
  454. TNodePtr Weight, Intervals;
  455. };
  456. TAggregationPtr BuildHistogramFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  457. return new THistogramAggregationFactory(pos, name, factory, aggMode);
  458. }
  459. class TLinearHistogramAggregationFactory final : public TAggregationFactory {
  460. public:
  461. TLinearHistogramAggregationFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  462. : TAggregationFactory(pos, name, factory, aggMode)
  463. , FakeSource(BuildFakeSource(pos))
  464. , BinSize(Y("Double", Q("10.0")))
  465. , Minimum(Y("Double", Q(ToString(-1.0 * Max<double>()))))
  466. , Maximum(Y("Double", Q(ToString(Max<double>()))))
  467. {}
  468. private:
  469. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  470. Y_UNUSED(isFactory);
  471. if (exprs.empty() || exprs.size() > 4) {
  472. ctx.Error(Pos) << "Aggregation function " << Name << " requires one to four arguments, given: " << exprs.size();
  473. return false;
  474. }
  475. if (exprs.size() > 1) {
  476. BinSize = exprs[1];
  477. }
  478. if (exprs.size() > 2) {
  479. Minimum = exprs[2];
  480. }
  481. if (exprs.size() > 3) {
  482. Maximum = exprs[3];
  483. }
  484. return TAggregationFactory::InitAggr(ctx, isFactory, src, node, { exprs.front() });
  485. }
  486. TNodePtr DoClone() const final {
  487. return new TLinearHistogramAggregationFactory(Pos, Name, Func, AggMode);
  488. }
  489. TNodePtr GetApply(const TNodePtr& type) const final {
  490. return Y("Apply", Factory, type,
  491. BuildLambda(Pos, Y("row"), Expr),
  492. BinSize, Minimum, Maximum);
  493. }
  494. bool DoInit(TContext& ctx, ISource* src) final {
  495. ctx.PushBlockShortcuts();
  496. if (!BinSize->Init(ctx, FakeSource.Get())) {
  497. return false;
  498. }
  499. BinSize = ctx.GroundBlockShortcutsForExpr(BinSize);
  500. ctx.PushBlockShortcuts();
  501. if (!Minimum->Init(ctx, FakeSource.Get())) {
  502. return false;
  503. }
  504. Minimum = ctx.GroundBlockShortcutsForExpr(Minimum);
  505. ctx.PushBlockShortcuts();
  506. if (!Maximum->Init(ctx, FakeSource.Get())) {
  507. return false;
  508. }
  509. Maximum = ctx.GroundBlockShortcutsForExpr(Maximum);
  510. return TAggregationFactory::DoInit(ctx, src);
  511. }
  512. TSourcePtr FakeSource;
  513. TNodePtr BinSize, Minimum, Maximum;
  514. };
  515. TAggregationPtr BuildLinearHistogramFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  516. return new TLinearHistogramAggregationFactory(pos, name, factory, aggMode);
  517. }
  518. class TPercentileFactory final : public TAggregationFactory {
  519. public:
  520. TPercentileFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  521. : TAggregationFactory(pos, name, factory, aggMode)
  522. , FakeSource(BuildFakeSource(pos))
  523. {}
  524. private:
  525. const TString* GetGenericKey() const final {
  526. return Column;
  527. }
  528. void Join(IAggregation* aggr) final {
  529. const auto percentile = dynamic_cast<TPercentileFactory*>(aggr);
  530. Y_ABORT_UNLESS(percentile);
  531. Y_ABORT_UNLESS(*Column == *percentile->Column);
  532. Y_ABORT_UNLESS(AggMode == percentile->AggMode);
  533. Percentiles.insert(percentile->Percentiles.cbegin(), percentile->Percentiles.cend());
  534. percentile->Percentiles.clear();
  535. }
  536. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  537. ui32 adjustArgsCount = isFactory ? 0 : 1;
  538. if (exprs.size() < 0 + adjustArgsCount || exprs.size() > 1 + adjustArgsCount) {
  539. ctx.Error(Pos) << "Aggregation function " << (isFactory ? "factory " : "") << Name << " requires "
  540. << (0 + adjustArgsCount) << " or " << (1 + adjustArgsCount) << " arguments, given: " << exprs.size();
  541. return false;
  542. }
  543. if (!isFactory) {
  544. Column = exprs.front()->GetColumnName();
  545. if (!Column) {
  546. ctx.Error(Pos) << Name << " may only be used with column reference as first argument.";
  547. return false;
  548. }
  549. }
  550. if (!TAggregationFactory::InitAggr(ctx, isFactory, src, node, isFactory ? TVector<TNodePtr>() : TVector<TNodePtr>(1, exprs.front())))
  551. return false;
  552. TNodePtr x;
  553. if (1 + adjustArgsCount == exprs.size()) {
  554. x = exprs.back();
  555. ctx.PushBlockShortcuts();
  556. if (!x->Init(ctx, FakeSource.Get())) {
  557. return false;
  558. }
  559. x = ctx.GroundBlockShortcutsForExpr(x);
  560. } else {
  561. x = Y("Double", Q("0.5"));
  562. }
  563. if (isFactory) {
  564. FactoryPercentile = x;
  565. } else {
  566. Percentiles.emplace(Name, x);
  567. }
  568. return true;
  569. }
  570. TNodePtr DoClone() const final {
  571. return new TPercentileFactory(Pos, Name, Func, AggMode);
  572. }
  573. TNodePtr GetApply(const TNodePtr& type) const final {
  574. TNodePtr percentiles(Percentiles.cbegin()->second);
  575. if (Percentiles.size() > 1U) {
  576. percentiles = Y();
  577. for (const auto& percentile : Percentiles) {
  578. percentiles = L(percentiles, percentile.second);
  579. }
  580. percentiles = Q(percentiles);
  581. }
  582. return Y("Apply", Factory, type, BuildLambda(Pos, Y("row"), Expr), percentiles);
  583. }
  584. void AddFactoryArguments(TNodePtr& apply) const final {
  585. apply = L(apply, FactoryPercentile);
  586. }
  587. TNodePtr AggregationTraits(const TNodePtr& type) const final {
  588. if (Percentiles.empty())
  589. return TNodePtr();
  590. TNodePtr names(Q(Percentiles.cbegin()->first));
  591. if (Percentiles.size() > 1U) {
  592. names = Y();
  593. for (const auto& percentile : Percentiles)
  594. names = L(names, Q(percentile.first));
  595. names = Q(names);
  596. }
  597. const bool distinct = AggMode == EAggregateMode::Distinct;
  598. const auto listType = distinct ? Y("ListType", Y("StructMemberType", Y("ListItemType", type), BuildQuotedAtom(Pos, DistinctKey))) : type;
  599. return distinct ? Q(Y(names, GetApply(listType), BuildQuotedAtom(Pos, DistinctKey))) : Q(Y(names, GetApply(listType)));
  600. }
  601. bool DoInit(TContext& ctx, ISource* src) final {
  602. for (const auto& p : Percentiles) {
  603. if (!p.second->Init(ctx, src)) {
  604. return false;
  605. }
  606. }
  607. return TAggregationFactory::DoInit(ctx, src);
  608. }
  609. TSourcePtr FakeSource;
  610. std::multimap<TString, TNodePtr> Percentiles;
  611. TNodePtr FactoryPercentile;
  612. const TString* Column = nullptr;
  613. };
  614. TAggregationPtr BuildPercentileFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  615. return new TPercentileFactory(pos, name, factory, aggMode);
  616. }
  617. class TTopFreqFactory final : public TAggregationFactory {
  618. public:
  619. TTopFreqFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  620. : TAggregationFactory(pos, name, factory, aggMode)
  621. {}
  622. private:
  623. //first - n, second - buffer
  624. using TPair = std::pair<TNodePtr, TNodePtr>;
  625. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  626. ui32 adjustArgsCount = isFactory ? 0 : 1;
  627. const double DefaultBufferC = 1.5;
  628. const ui32 MinBuffer = 100;
  629. if (exprs.size() < adjustArgsCount || exprs.size() > 2 + adjustArgsCount) {
  630. ctx.Error(Pos) << "Aggregation function " << (isFactory? "factory " : "") << Name <<
  631. " requires " << adjustArgsCount << " to " << (2 + adjustArgsCount) << " arguments, given: " << exprs.size();
  632. return false;
  633. }
  634. if (!TAggregationFactory::InitAggr(ctx, isFactory, src, node, isFactory ? TVector<TNodePtr>() : TVector<TNodePtr>(1, exprs.front())))
  635. return false;
  636. ui32 n;
  637. ui32 buffer;
  638. if (1 + adjustArgsCount <= exprs.size()) {
  639. auto posSecondArg = exprs[adjustArgsCount]->GetPos();
  640. if (!Parseui32(exprs[adjustArgsCount], n)) {
  641. ctx.Error(posSecondArg) << "TopFreq: invalid argument #" << (1 + adjustArgsCount) << ", numeric literal is expected";
  642. return false;
  643. }
  644. } else {
  645. n = 1;
  646. }
  647. if (2 + adjustArgsCount == exprs.size()) {
  648. auto posThirdArg = exprs[1 + adjustArgsCount]->GetPos();
  649. if (Parseui32(exprs[1 + adjustArgsCount], buffer)) {
  650. if (n > buffer) {
  651. ctx.Error(posThirdArg) << "TopFreq: #" << (2 + adjustArgsCount) << " argument (buffer size) must be greater or equal than previous argument ";
  652. return false;
  653. }
  654. } else {
  655. ctx.Error(posThirdArg) << "TopFreq: invalid #" << (2 + adjustArgsCount) << " argument, numeric literal is expected";
  656. return false;
  657. }
  658. } else {
  659. buffer = std::max(ui32(n * DefaultBufferC), MinBuffer);
  660. }
  661. auto x = TPair{ Y("Uint32", Q(ToString(n))), Y("Uint32", Q(ToString(buffer))) };
  662. if (isFactory) {
  663. TopFreqFactoryParams = x;
  664. } else {
  665. TopFreqs.emplace(Name, x);
  666. }
  667. return true;
  668. }
  669. TNodePtr DoClone() const final {
  670. return new TTopFreqFactory(Pos, Name, Func, AggMode);
  671. }
  672. TNodePtr GetApply(const TNodePtr& type) const final {
  673. TPair topFreqs(TopFreqs.cbegin()->second);
  674. if (TopFreqs.size() > 1U) {
  675. topFreqs = { Y(), Y() };
  676. for (const auto& topFreq : TopFreqs) {
  677. topFreqs = { L(topFreqs.first, topFreq.second.first), L(topFreqs.second, topFreq.second.second) };
  678. }
  679. topFreqs = { Q(topFreqs.first), Q(topFreqs.second) };
  680. }
  681. auto apply = Y("Apply", Factory, type, BuildLambda(Pos, Y("row"), Expr), topFreqs.first, topFreqs.second);
  682. return apply;
  683. }
  684. void AddFactoryArguments(TNodePtr& apply) const final {
  685. apply = L(apply, TopFreqFactoryParams.first, TopFreqFactoryParams.second);
  686. }
  687. TNodePtr AggregationTraits(const TNodePtr& type) const final {
  688. if (TopFreqs.empty())
  689. return TNodePtr();
  690. TNodePtr names(Q(TopFreqs.cbegin()->first));
  691. if (TopFreqs.size() > 1U) {
  692. names = Y();
  693. for (const auto& topFreq : TopFreqs)
  694. names = L(names, Q(topFreq.first));
  695. names = Q(names);
  696. }
  697. const bool distinct = AggMode == EAggregateMode::Distinct;
  698. const auto listType = distinct ? Y("ListType", Y("StructMemberType", Y("ListItemType", type), BuildQuotedAtom(Pos, DistinctKey))) : type;
  699. return distinct ? Q(Y(names, GetApply(listType), BuildQuotedAtom(Pos, DistinctKey))) : Q(Y(names, GetApply(listType)));
  700. }
  701. bool DoInit(TContext& ctx, ISource* src) final {
  702. for (const auto& topFreq : TopFreqs) {
  703. if (!topFreq.second.first->Init(ctx, src)) {
  704. return false;
  705. }
  706. if (!topFreq.second.second->Init(ctx, src)) {
  707. return false;
  708. }
  709. }
  710. return TAggregationFactory::DoInit(ctx, src);
  711. }
  712. std::multimap<TString, TPair> TopFreqs;
  713. TPair TopFreqFactoryParams;
  714. };
  715. TAggregationPtr BuildTopFreqFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  716. return new TTopFreqFactory(pos, name, factory, aggMode);
  717. }
  718. template <bool HasKey>
  719. class TTopAggregationFactory final : public TAggregationFactory {
  720. public:
  721. TTopAggregationFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  722. : TAggregationFactory(pos, name, factory, aggMode)
  723. , FakeSource(BuildFakeSource(pos))
  724. {}
  725. private:
  726. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  727. ui32 adjustArgsCount = isFactory ? 1 : (HasKey ? 3 : 2);
  728. if (exprs.size() != adjustArgsCount) {
  729. ctx.Error(Pos) << "Aggregation function " << (isFactory ? "factory " : "") << Name << " requires "
  730. << adjustArgsCount << " arguments, given: " << exprs.size();
  731. return false;
  732. }
  733. if (!isFactory) {
  734. Payload = exprs[0];
  735. if (HasKey) {
  736. Key = exprs[1];
  737. }
  738. }
  739. Count = exprs.back();
  740. if (!isFactory) {
  741. Name = src->MakeLocalName(Name);
  742. }
  743. if (!Init(ctx, src)) {
  744. return false;
  745. }
  746. if (!isFactory) {
  747. node.Add("Member", "row", Q(Name));
  748. }
  749. return true;
  750. }
  751. TNodePtr DoClone() const final {
  752. return new TTopAggregationFactory(Pos, Name, Func, AggMode);
  753. }
  754. TNodePtr GetApply(const TNodePtr& type) const final {
  755. TNodePtr apply;
  756. if (HasKey) {
  757. apply = Y("Apply", Factory, type, BuildLambda(Pos, Y("row"), Key), BuildLambda(Pos, Y("row"), Payload));
  758. } else {
  759. apply = Y("Apply", Factory, type, BuildLambda(Pos, Y("row"), Payload));
  760. }
  761. AddFactoryArguments(apply);
  762. return apply;
  763. }
  764. void AddFactoryArguments(TNodePtr& apply) const final {
  765. apply = L(apply, Count);
  766. }
  767. std::vector<ui32> GetFactoryColumnIndices() const final {
  768. if (HasKey) {
  769. return {1u, 0u};
  770. } else {
  771. return {0u};
  772. }
  773. }
  774. bool DoInit(TContext& ctx, ISource* src) final {
  775. ctx.PushBlockShortcuts();
  776. if (!Count->Init(ctx, FakeSource.Get())) {
  777. return false;
  778. }
  779. Count = ctx.GroundBlockShortcutsForExpr(Count);
  780. if (!Payload) {
  781. return true;
  782. }
  783. if (HasKey) {
  784. ctx.PushBlockShortcuts();
  785. if (!Key->Init(ctx, src)) {
  786. return false;
  787. }
  788. Key = ctx.GroundBlockShortcutsForExpr(Key);
  789. }
  790. ctx.PushBlockShortcuts();
  791. if (!Payload->Init(ctx, src)) {
  792. return false;
  793. }
  794. Payload = ctx.GroundBlockShortcutsForExpr(Payload);
  795. if ((HasKey && Key->IsAggregated()) || (!HasKey && Payload->IsAggregated())) {
  796. ctx.Error(Pos) << "Aggregation of aggregated values is forbidden";
  797. return false;
  798. }
  799. return true;
  800. }
  801. TSourcePtr FakeSource;
  802. TNodePtr Key, Payload, Count;
  803. };
  804. template <bool HasKey>
  805. TAggregationPtr BuildTopFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  806. return new TTopAggregationFactory<HasKey>(pos, name, factory, aggMode);
  807. }
  808. template TAggregationPtr BuildTopFactoryAggregation<false>(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode);
  809. template TAggregationPtr BuildTopFactoryAggregation<true >(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode);
  810. class TCountDistinctEstimateAggregationFactory final : public TAggregationFactory {
  811. public:
  812. TCountDistinctEstimateAggregationFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  813. : TAggregationFactory(pos, name, factory, aggMode)
  814. {}
  815. private:
  816. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  817. ui32 adjustArgsCount = isFactory ? 0 : 1;
  818. if (exprs.size() < adjustArgsCount || exprs.size() > 1 + adjustArgsCount) {
  819. ctx.Error(Pos) << Name << " aggregation function " << (isFactory ? "factory " : "") << " requires " <<
  820. adjustArgsCount << " or " << (1 + adjustArgsCount) << " argument(s), given: " << exprs.size();
  821. return false;
  822. }
  823. Precision = 14;
  824. if (1 + adjustArgsCount <= exprs.size()) {
  825. auto posSecondArg = exprs[adjustArgsCount]->GetPos();
  826. if (!Parseui32(exprs[adjustArgsCount], Precision)) {
  827. ctx.Error(posSecondArg) << Name << ": invalid argument, numeric literal is expected";
  828. return false;
  829. }
  830. }
  831. if (Precision > 18 || Precision < 4) {
  832. ctx.Error(Pos) << Name << ": precision is expected to be between 4 and 18 (inclusive), got " << Precision;
  833. return false;
  834. }
  835. if (!isFactory) {
  836. Expr = exprs[0];
  837. Name = src->MakeLocalName(Name);
  838. }
  839. if (!Init(ctx, src)) {
  840. return false;
  841. }
  842. if (!isFactory) {
  843. node.Add("Member", "row", Q(Name));
  844. }
  845. return true;
  846. }
  847. TNodePtr DoClone() const final {
  848. return new TCountDistinctEstimateAggregationFactory(Pos, Name, Func, AggMode);
  849. }
  850. TNodePtr GetApply(const TNodePtr& type) const final {
  851. auto apply = Y("Apply", Factory, type, BuildLambda(Pos, Y("row"), Expr));
  852. AddFactoryArguments(apply);
  853. return apply;
  854. }
  855. void AddFactoryArguments(TNodePtr& apply) const final {
  856. apply = L(apply, Y("Uint32", Q(ToString(Precision))));
  857. }
  858. private:
  859. ui32 Precision = 0;
  860. };
  861. TAggregationPtr BuildCountDistinctEstimateFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  862. return new TCountDistinctEstimateAggregationFactory(pos, name, factory, aggMode);
  863. }
  864. class TListAggregationFactory final : public TAggregationFactory {
  865. public:
  866. TListAggregationFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  867. : TAggregationFactory(pos, name, factory, aggMode)
  868. {}
  869. private:
  870. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  871. ui32 adjustArgsCount = isFactory ? 0 : 1;
  872. ui32 minArgs = (0 + adjustArgsCount);
  873. ui32 maxArgs = (1 + adjustArgsCount);
  874. if (exprs.size() < minArgs || exprs.size() > maxArgs) {
  875. ctx.Error(Pos) << "List aggregation " << (isFactory ? "factory " : "") << "function require " << minArgs
  876. << " or " << maxArgs << " arguments, given: " << exprs.size();
  877. return false;
  878. }
  879. Limit = 0;
  880. if (adjustArgsCount + 1U <= exprs.size()) {
  881. auto posSecondArg = exprs[adjustArgsCount]->GetPos();
  882. if (!Parseui32(exprs[adjustArgsCount], Limit)) {
  883. ctx.Error(posSecondArg) << "List: invalid last argument, numeric literal is expected";
  884. return false;
  885. }
  886. }
  887. if (!isFactory) {
  888. Expr = exprs[0];
  889. Name = src->MakeLocalName(Name);
  890. }
  891. if (!Init(ctx, src)) {
  892. return false;
  893. }
  894. if (!isFactory) {
  895. node.Add("Member", "row", Q(Name));
  896. }
  897. return true;
  898. }
  899. TNodePtr DoClone() const final {
  900. return new TListAggregationFactory(Pos, Name, Func, AggMode);
  901. }
  902. TNodePtr GetApply(const TNodePtr& type) const final {
  903. auto apply = Y("Apply", Factory, type, BuildLambda(Pos, Y("row"), Expr));
  904. AddFactoryArguments(apply);
  905. return apply;
  906. }
  907. void AddFactoryArguments(TNodePtr& apply) const final {
  908. apply = L(apply, Y("Uint64", Q(ToString(Limit))));
  909. }
  910. private:
  911. ui32 Limit = 0;
  912. };
  913. TAggregationPtr BuildListFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  914. return new TListAggregationFactory(pos, name, factory, aggMode);
  915. }
  916. class TUserDefinedAggregationFactory final : public TAggregationFactory {
  917. public:
  918. TUserDefinedAggregationFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  919. : TAggregationFactory(pos, name, factory, aggMode)
  920. {}
  921. private:
  922. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  923. ui32 adjustArgsCount = isFactory ? 0 : 1;
  924. if (exprs.size() < (3 + adjustArgsCount) || exprs.size() > (7 + adjustArgsCount)) {
  925. ctx.Error(Pos) << "User defined aggregation function " << (isFactory ? "factory " : "") << " requires " <<
  926. (3 + adjustArgsCount) << " to " << (7 + adjustArgsCount) << " arguments, given: " << exprs.size();
  927. return false;
  928. }
  929. Lambdas[0] = BuildLambda(Pos, Y("value", "parent"), Y("NamedApply", exprs[adjustArgsCount], Q(Y("value")), Y("AsStruct"), Y("DependsOn", "parent")));
  930. Lambdas[1] = BuildLambda(Pos, Y("value", "state", "parent"), Y("NamedApply", exprs[adjustArgsCount + 1], Q(Y("state", "value")), Y("AsStruct"), Y("DependsOn", "parent")));
  931. Lambdas[2] = BuildLambda(Pos, Y("one", "two"), Y("Apply", exprs[adjustArgsCount + 2], "one", "two"));
  932. for (size_t i = 3U; i < Lambdas.size(); ++i) {
  933. const auto j = adjustArgsCount + i;
  934. Lambdas[i] = BuildLambda(Pos, Y("state"), j >= exprs.size() ? AstNode("state") : Y("Apply", exprs[j], "state"));
  935. }
  936. DefVal = (exprs.size() == (7 + adjustArgsCount)) ? exprs[adjustArgsCount + 6] : Y("Null");
  937. return TAggregationFactory::InitAggr(ctx, isFactory, src, node, isFactory ? TVector<TNodePtr>() : TVector<TNodePtr>(1, exprs.front()));
  938. }
  939. TNodePtr DoClone() const final {
  940. return new TUserDefinedAggregationFactory(Pos, Name, Func, AggMode);
  941. }
  942. TNodePtr GetApply(const TNodePtr& type) const final {
  943. auto apply = Y("Apply", Factory, type, BuildLambda(Pos, Y("row"), Expr));
  944. AddFactoryArguments(apply);
  945. return apply;
  946. }
  947. void AddFactoryArguments(TNodePtr& apply) const final {
  948. apply = L(apply, Lambdas[0], Lambdas[1], Lambdas[2], Lambdas[3], Lambdas[4], Lambdas[5], DefVal);
  949. }
  950. bool DoInit(TContext& ctx, ISource* src) final {
  951. for (const auto& lambda : Lambdas) {
  952. if (!lambda->Init(ctx, src)) {
  953. return false;
  954. }
  955. }
  956. if (!DefVal->Init(ctx, src)) {
  957. return false;
  958. }
  959. return TAggregationFactory::DoInit(ctx, src);
  960. }
  961. std::array<TNodePtr, 6> Lambdas;
  962. TNodePtr DefVal;
  963. };
  964. TAggregationPtr BuildUserDefinedFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  965. return new TUserDefinedAggregationFactory(pos, name, factory, aggMode);
  966. }
  967. class TCountAggregation final : public TAggregationFactory {
  968. public:
  969. TCountAggregation(TPosition pos, const TString& name, const TString& func, EAggregateMode aggMode)
  970. : TAggregationFactory(pos, name, func, aggMode)
  971. {}
  972. private:
  973. TNodePtr DoClone() const final {
  974. return new TCountAggregation(Pos, Name, Func, AggMode);
  975. }
  976. bool DoInit(TContext& ctx, ISource* src) final {
  977. if (!Expr) {
  978. return true;
  979. }
  980. if (Expr->IsAsterisk()) {
  981. Expr = Y("Void");
  982. }
  983. ctx.PushBlockShortcuts();
  984. if (!Expr->Init(ctx, src)) {
  985. return false;
  986. }
  987. Expr->SetCountHint(Expr->IsConstant());
  988. Expr = ctx.GroundBlockShortcutsForExpr(Expr);
  989. return TAggregationFactory::DoInit(ctx, src);
  990. }
  991. };
  992. TAggregationPtr BuildCountAggregation(TPosition pos, const TString& name, const TString& func, EAggregateMode aggMode) {
  993. return new TCountAggregation(pos, name, func, aggMode);
  994. }
  995. } // namespace NSQLTranslationV0