aggregation.cpp 49 KB


  1. #include "node.h"
  2. #include "source.h"
  3. #include "context.h"
  4. #include <yql/essentials/ast/yql_type_string.h>
  5. #include <library/cpp/charset/ci_string.h>
  6. #include <util/string/builder.h>
  7. #include <util/string/cast.h>
  8. #include <array>
  9. using namespace NYql;
  10. namespace NSQLTranslationV1 {
  11. namespace {
  12. bool BlockWindowAggregationWithoutFrameSpec(TPosition pos, TStringBuf name, ISource* src, TContext& ctx) {
  13. if (src) {
  14. auto winNamePtr = src->GetWindowName();
  15. if (winNamePtr) {
  16. auto winSpecPtr = src->FindWindowSpecification(ctx, *winNamePtr);
  17. if (!winSpecPtr) {
  18. ctx.Error(pos) << "Failed to use aggregation function " << name << " without window specification or in wrong place";
  19. return true;
  20. }
  21. }
  22. }
  23. return false;
  24. }
  25. bool ShouldEmitAggApply(const TContext& ctx) {
  26. const bool blockEngineEnabled = ctx.BlockEngineEnable || ctx.BlockEngineForce;
  27. return ctx.EmitAggApply.GetOrElse(blockEngineEnabled);
  28. }
  29. }
  30. static const THashSet<TString> AggApplyFuncs = {
  31. "count_traits_factory",
  32. "sum_traits_factory",
  33. "avg_traits_factory",
  34. "min_traits_factory",
  35. "max_traits_factory",
  36. "some_traits_factory",
  37. };
  38. class TAggregationFactory : public IAggregation {
  39. public:
  40. TAggregationFactory(TPosition pos, const TString& name, const TString& func, EAggregateMode aggMode,
  41. bool multi = false, bool validateArgs = true)
  42. : IAggregation(pos, name, func, aggMode), Factory(!func.empty() ?
  43. BuildBind(Pos, aggMode == EAggregateMode::OverWindow || aggMode == EAggregateMode::OverWindowDistinct ? "window_module" : "aggregate_module", func) : nullptr),
  44. Multi(multi), ValidateArgs(validateArgs), DynamicFactory(!Factory)
  45. {
  46. if (aggMode != EAggregateMode::OverWindow && !func.empty() && AggApplyFuncs.contains(func)) {
  47. AggApplyName = func.substr(0, func.size() - 15);
  48. }
  49. if (!Factory) {
  50. FakeSource = BuildFakeSource(pos);
  51. }
  52. }
  53. protected:
  54. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) override {
  55. if (!ShouldEmitAggApply(ctx)) {
  56. AggApplyName = "";
  57. }
  58. if (ValidateArgs || isFactory) {
  59. ui32 expectedArgs = ValidateArgs && !Factory ? 2 : (isFactory ? 0 : 1);
  60. if (!Factory && ValidateArgs) {
  61. YQL_ENSURE(!isFactory);
  62. }
  63. if (expectedArgs != exprs.size()) {
  64. ctx.Error(Pos) << "Aggregation function " << (isFactory ? "factory " : "") << Name
  65. << " requires exactly " << expectedArgs << " argument(s), given: " << exprs.size();
  66. return false;
  67. }
  68. }
  69. if (!ValidateArgs) {
  70. Exprs = exprs;
  71. }
  72. if (BlockWindowAggregationWithoutFrameSpec(Pos, GetName(), src, ctx)) {
  73. return false;
  74. }
  75. if (ValidateArgs) {
  76. if (!Factory) {
  77. Factory = exprs[1];
  78. }
  79. }
  80. if (!isFactory) {
  81. if (ValidateArgs) {
  82. Expr = exprs.front();
  83. }
  84. Name = src->MakeLocalName(Name);
  85. }
  86. if (Expr && Expr->IsAsterisk() && AggApplyName == "count") {
  87. AggApplyName = "count_all";
  88. }
  89. if (!Init(ctx, src)) {
  90. return false;
  91. }
  92. if (!isFactory) {
  93. node.Add("Member", "row", Q(Name));
  94. if (IsOverWindow()) {
  95. src->AddTmpWindowColumn(Name);
  96. }
  97. }
  98. return true;
  99. }
  100. TNodePtr AggregationTraitsFactory() const override {
  101. return Factory;
  102. }
  103. TNodePtr GetExtractor(bool many, TContext& ctx) const override {
  104. Y_UNUSED(ctx);
  105. return BuildLambda(Pos, Y("row"), Y("PersistableRepr", many ? Y("Unwrap", Expr) : Expr));
  106. }
  107. TNodePtr GetApply(const TNodePtr& type, bool many, bool allowAggApply, TContext& ctx) const override {
  108. auto extractor = GetExtractor(many, ctx);
  109. if (!extractor) {
  110. return nullptr;
  111. }
  112. if (!Multi) {
  113. if (!DynamicFactory && allowAggApply && !AggApplyName.empty()) {
  114. return Y("AggApply", Q(AggApplyName), Y("ListItemType", type), extractor);
  115. }
  116. return Y("Apply", Factory, (DynamicFactory ? Y("ListItemType", type) : type),
  117. extractor);
  118. }
  119. return Y("MultiAggregate",
  120. Y("ListItemType", type),
  121. extractor,
  122. Factory);
  123. }
  124. bool DoInit(TContext& ctx, ISource* src) override {
  125. if (!ValidateArgs) {
  126. for (auto x : Exprs) {
  127. if (!x->Init(ctx, src)) {
  128. return false;
  129. }
  130. if (x->IsAggregated() && !x->IsAggregationKey() && !IsOverWindow()) {
  131. ctx.Error(Pos) << "Aggregation of aggregated values is forbidden";
  132. return false;
  133. }
  134. }
  135. return true;
  136. }
  137. if (!Expr) {
  138. return true;
  139. }
  140. if (!Expr->Init(ctx, src)) {
  141. return false;
  142. }
  143. if (Expr->IsAggregated() && !Expr->IsAggregationKey() && !IsOverWindow()) {
  144. ctx.Error(Pos) << "Aggregation of aggregated values is forbidden";
  145. return false;
  146. }
  147. if (AggMode == EAggregateMode::Distinct || AggMode == EAggregateMode::OverWindowDistinct) {
  148. const auto column = Expr->GetColumnName();
  149. if (!column) {
  150. // TODO: improve TBasicAggrFunc::CollectPreaggregateExprs()
  151. ctx.Error(Pos) << "Aggregation of aggregated values is forbidden";
  152. return false;
  153. }
  154. DistinctKey = *column;
  155. YQL_ENSURE(src);
  156. if (!IsGeneratedKeyColumn && src->GetJoin()) {
  157. const auto sourcePtr = Expr->GetSourceName();
  158. if (!sourcePtr || !*sourcePtr) {
  159. if (!src->IsGroupByColumn(DistinctKey)) {
  160. ctx.Error(Expr->GetPos()) << ErrorDistinctWithoutCorrelation(DistinctKey);
  161. return false;
  162. }
  163. } else {
  164. DistinctKey = DotJoin(*sourcePtr, DistinctKey);
  165. }
  166. }
  167. if (src->IsGroupByColumn(DistinctKey)) {
  168. ctx.Error(Expr->GetPos()) << ErrorDistinctByGroupKey(DistinctKey);
  169. return false;
  170. }
  171. Expr = AstNode("row");
  172. }
  173. if (FakeSource) {
  174. if (!Factory->Init(ctx, FakeSource.Get())) {
  175. return false;
  176. }
  177. if (AggMode == EAggregateMode::OverWindow) {
  178. Factory = BuildLambda(Pos, Y("type", "extractor"), Y("block", Q(Y(
  179. Y("let", "x", Y("Apply", Factory, "type", "extractor")),
  180. Y("return", Y("ToWindowTraits", "x"))
  181. ))));
  182. }
  183. }
  184. return true;
  185. }
  186. TNodePtr Factory;
  187. TNodePtr Expr;
  188. bool Multi;
  189. bool ValidateArgs;
  190. TString AggApplyName;
  191. TVector<TNodePtr> Exprs;
  192. private:
  193. TSourcePtr FakeSource;
  194. bool DynamicFactory;
  195. };
  196. class TAggregationFactoryImpl final : public TAggregationFactory {
  197. public:
  198. TAggregationFactoryImpl(TPosition pos, const TString& name, const TString& func, EAggregateMode aggMode, bool multi)
  199. : TAggregationFactory(pos, name, func, aggMode, multi)
  200. {}
  201. private:
  202. TNodePtr DoClone() const final {
  203. return new TAggregationFactoryImpl(Pos, Name, Func, AggMode, Multi);
  204. }
  205. };
  206. TAggregationPtr BuildFactoryAggregation(TPosition pos, const TString& name, const TString& func, EAggregateMode aggMode, bool multi) {
  207. return new TAggregationFactoryImpl(pos, name, func, aggMode, multi);
  208. }
  209. class TKeyPayloadAggregationFactory final : public TAggregationFactory {
  210. public:
  211. TKeyPayloadAggregationFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  212. : TAggregationFactory(pos, name, factory, aggMode)
  213. , FakeSource(BuildFakeSource(pos))
  214. {}
  215. private:
  216. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  217. ui32 adjustArgsCount = isFactory ? 0 : 2;
  218. if (exprs.size() < adjustArgsCount || exprs.size() > 1 + adjustArgsCount) {
  219. ctx.Error(Pos) << "Aggregation function " << (isFactory ? "factory " : "") << Name << " requires "
  220. << adjustArgsCount << " or " << (1 + adjustArgsCount) << " arguments, given: " << exprs.size();
  221. return false;
  222. }
  223. if (BlockWindowAggregationWithoutFrameSpec(Pos, GetName(), src, ctx)) {
  224. return false;
  225. }
  226. if (!isFactory) {
  227. Payload = exprs.front();
  228. Key = exprs[1];
  229. }
  230. if (1 + adjustArgsCount == exprs.size()) {
  231. Limit = exprs.back();
  232. Func += "2";
  233. } else {
  234. Func += "1";
  235. }
  236. if (Factory) {
  237. Factory = BuildBind(Pos, AggMode == EAggregateMode::OverWindow ? "window_module" : "aggregate_module", Func);
  238. }
  239. if (!isFactory) {
  240. Name = src->MakeLocalName(Name);
  241. }
  242. if (!Init(ctx, src)) {
  243. return false;
  244. }
  245. if (!isFactory) {
  246. node.Add("Member", "row", Q(Name));
  247. if (IsOverWindow()) {
  248. src->AddTmpWindowColumn(Name);
  249. }
  250. }
  251. return true;
  252. }
  253. TNodePtr DoClone() const final {
  254. return new TKeyPayloadAggregationFactory(Pos, Name, Func, AggMode);
  255. }
  256. TNodePtr GetExtractor(bool many, TContext& ctx) const final {
  257. Y_UNUSED(ctx);
  258. return BuildLambda(Pos, Y("row"), many ? Y("Unwrap", Payload) : Payload);
  259. }
  260. TNodePtr GetApply(const TNodePtr& type, bool many, bool allowAggApply, TContext& ctx) const final {
  261. Y_UNUSED(ctx);
  262. Y_UNUSED(allowAggApply);
  263. auto apply = Y("Apply", Factory, type,
  264. BuildLambda(Pos, Y("row"), many ? Y("Unwrap", Key) : Key),
  265. BuildLambda(Pos, Y("row"), many ? Y("Unwrap", Payload) : Payload));
  266. AddFactoryArguments(apply);
  267. return apply;
  268. }
  269. void AddFactoryArguments(TNodePtr& apply) const final {
  270. if (Limit) {
  271. apply = L(apply, Limit);
  272. }
  273. }
  274. std::vector<ui32> GetFactoryColumnIndices() const final {
  275. return {1u, 0u};
  276. }
  277. bool DoInit(TContext& ctx, ISource* src) final {
  278. if (Limit) {
  279. if (!Limit->Init(ctx, FakeSource.Get())) {
  280. return false;
  281. }
  282. }
  283. if (!Key) {
  284. return true;
  285. }
  286. if (!Key->Init(ctx, src)) {
  287. return false;
  288. }
  289. if (!Payload->Init(ctx, src)) {
  290. return false;
  291. }
  292. if (Key->IsAggregated()) {
  293. ctx.Error(Pos) << "Aggregation of aggregated values is forbidden";
  294. return false;
  295. }
  296. return true;
  297. }
  298. TSourcePtr FakeSource;
  299. TNodePtr Key, Payload, Limit;
  300. };
  301. TAggregationPtr BuildKeyPayloadFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  302. return new TKeyPayloadAggregationFactory(pos, name, factory, aggMode);
  303. }
  304. class TPayloadPredicateAggregationFactory final : public TAggregationFactory {
  305. public:
  306. TPayloadPredicateAggregationFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  307. : TAggregationFactory(pos, name, factory, aggMode)
  308. {}
  309. private:
  310. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  311. ui32 adjustArgsCount = isFactory ? 0 : 2;
  312. if (exprs.size() != adjustArgsCount) {
  313. ctx.Error(Pos) << "Aggregation function " << (isFactory ? "factory " : "") << Name << " requires " <<
  314. adjustArgsCount << " arguments, given: " << exprs.size();
  315. return false;
  316. }
  317. if (BlockWindowAggregationWithoutFrameSpec(Pos, GetName(), src, ctx)) {
  318. return false;
  319. }
  320. if (!isFactory) {
  321. Payload = exprs.front();
  322. Predicate = exprs.back();
  323. Name = src->MakeLocalName(Name);
  324. }
  325. if (!Init(ctx, src)) {
  326. return false;
  327. }
  328. if (!isFactory) {
  329. node.Add("Member", "row", Q(Name));
  330. if (IsOverWindow()) {
  331. src->AddTmpWindowColumn(Name);
  332. }
  333. }
  334. return true;
  335. }
  336. TNodePtr DoClone() const final {
  337. return new TPayloadPredicateAggregationFactory(Pos, Name, Func, AggMode);
  338. }
  339. TNodePtr GetExtractor(bool many, TContext& ctx) const final {
  340. Y_UNUSED(ctx);
  341. return BuildLambda(Pos, Y("row"), many ? Y("Unwrap", Payload) : Payload);
  342. }
  343. TNodePtr GetApply(const TNodePtr& type, bool many, bool allowAggApply, TContext& ctx) const final {
  344. Y_UNUSED(ctx);
  345. Y_UNUSED(allowAggApply);
  346. return Y("Apply", Factory, type,
  347. BuildLambda(Pos, Y("row"), many ? Y("Unwrap", Payload) : Payload),
  348. BuildLambda(Pos, Y("row"), many ? Y("Unwrap", Predicate) : Predicate));
  349. }
  350. std::vector<ui32> GetFactoryColumnIndices() const final {
  351. return {0u, 1u};
  352. }
  353. bool DoInit(TContext& ctx, ISource* src) final {
  354. if (!Predicate) {
  355. return true;
  356. }
  357. if (!Predicate->Init(ctx, src)) {
  358. return false;
  359. }
  360. if (!Payload->Init(ctx, src)) {
  361. return false;
  362. }
  363. if (Payload->IsAggregated()) {
  364. ctx.Error(Pos) << "Aggregation of aggregated values is forbidden";
  365. return false;
  366. }
  367. return true;
  368. }
  369. TNodePtr Payload, Predicate;
  370. };
  371. TAggregationPtr BuildPayloadPredicateFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  372. return new TPayloadPredicateAggregationFactory(pos, name, factory, aggMode);
  373. }
  374. class TTwoArgsAggregationFactory final : public TAggregationFactory {
  375. public:
  376. TTwoArgsAggregationFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  377. : TAggregationFactory(pos, name, factory, aggMode)
  378. {}
  379. private:
  380. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  381. ui32 adjustArgsCount = isFactory ? 0 : 2;
  382. if (exprs.size() != adjustArgsCount) {
  383. ctx.Error(Pos) << "Aggregation function " << (isFactory ? "factory " : "") << Name << " requires " <<
  384. adjustArgsCount << " arguments, given: " << exprs.size();
  385. return false;
  386. }
  387. if (BlockWindowAggregationWithoutFrameSpec(Pos, GetName(), src, ctx)) {
  388. return false;
  389. }
  390. if (!isFactory) {
  391. One = exprs.front();
  392. Two = exprs.back();
  393. Name = src->MakeLocalName(Name);
  394. }
  395. if (!Init(ctx, src)) {
  396. return false;
  397. }
  398. if (!isFactory) {
  399. node.Add("Member", "row", Q(Name));
  400. if (IsOverWindow()) {
  401. src->AddTmpWindowColumn(Name);
  402. }
  403. }
  404. return true;
  405. }
  406. TNodePtr DoClone() const final {
  407. return new TTwoArgsAggregationFactory(Pos, Name, Func, AggMode);
  408. }
  409. TNodePtr GetExtractor(bool many, TContext& ctx) const final {
  410. Y_UNUSED(ctx);
  411. return BuildLambda(Pos, Y("row"), many ? Y("Unwrap", One) : One);
  412. }
  413. TNodePtr GetApply(const TNodePtr& type, bool many, bool allowAggApply, TContext& ctx) const final {
  414. Y_UNUSED(ctx);
  415. Y_UNUSED(allowAggApply);
  416. auto tuple = Q(Y(One, Two));
  417. return Y("Apply", Factory, type, BuildLambda(Pos, Y("row"), many ? Y("Unwrap", tuple) : tuple));
  418. }
  419. bool DoInit(TContext& ctx, ISource* src) final {
  420. if (!One) {
  421. return true;
  422. }
  423. if (!One->Init(ctx, src)) {
  424. return false;
  425. }
  426. if (!Two->Init(ctx, src)) {
  427. return false;
  428. }
  429. if ((One->IsAggregated() || Two->IsAggregated()) && !IsOverWindow()) {
  430. ctx.Error(Pos) << "Aggregation of aggregated values is forbidden";
  431. return false;
  432. }
  433. return true;
  434. }
  435. TNodePtr One, Two;
  436. };
  437. TAggregationPtr BuildTwoArgsFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  438. return new TTwoArgsAggregationFactory(pos, name, factory, aggMode);
  439. }
  440. class THistogramAggregationFactory final : public TAggregationFactory {
  441. public:
  442. THistogramAggregationFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  443. : TAggregationFactory(pos, name, factory, aggMode)
  444. , FakeSource(BuildFakeSource(pos))
  445. , Weight(Y("Double", Q("1.0")))
  446. , Intervals(Y("Uint32", Q("100")))
  447. {}
  448. private:
  449. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  450. if (isFactory) {
  451. if (exprs.size() > 1) {
  452. ctx.Error(Pos) << "Aggregation function factory " << Name << " requires 0 or 1 argument(s), given: " << exprs.size();
  453. return false;
  454. }
  455. } else {
  456. if (exprs.empty() || exprs.size() > 3) {
  457. ctx.Error(Pos) << "Aggregation function " << Name << " requires one, two or three arguments, given: " << exprs.size();
  458. return false;
  459. }
  460. }
  461. if (!isFactory) {
  462. /// \todo: solve it with named arguments
  463. const auto integer = exprs.back()->IsIntegerLiteral();
  464. switch (exprs.size()) {
  465. case 2U:
  466. if (!integer) {
  467. Weight = exprs.back();
  468. }
  469. break;
  470. case 3U:
  471. if (!integer) {
  472. ctx.Error(Pos) << "Aggregation function " << Name << " for case with 3 arguments should have third argument of integer type";
  473. return false;
  474. }
  475. Weight = exprs[1];
  476. break;
  477. }
  478. if (exprs.size() >= 2 && integer) {
  479. Intervals = Y("Cast", exprs.back(), Q("Uint32"));
  480. }
  481. } else {
  482. if (exprs.size() >= 1) {
  483. const auto integer = exprs.back()->IsIntegerLiteral();
  484. if (!integer) {
  485. ctx.Error(Pos) << "Aggregation function factory " << Name << " should have second interger argument";
  486. return false;
  487. }
  488. Intervals = Y("Cast", exprs.back(), Q("Uint32"));
  489. }
  490. }
  491. return TAggregationFactory::InitAggr(ctx, isFactory, src, node, isFactory ? TVector<TNodePtr>() : TVector<TNodePtr>(1, exprs.front()));
  492. }
  493. TNodePtr DoClone() const final {
  494. return new THistogramAggregationFactory(Pos, Name, Func, AggMode);
  495. }
  496. TNodePtr GetApply(const TNodePtr& type, bool many, bool allowAggApply, TContext& ctx) const final {
  497. Y_UNUSED(ctx);
  498. Y_UNUSED(allowAggApply);
  499. auto apply = Y("Apply", Factory, type,
  500. BuildLambda(Pos, Y("row"), many ? Y("Unwrap", Expr) : Expr),
  501. BuildLambda(Pos, Y("row"), many ? Y("Unwrap", Weight) : Weight));
  502. AddFactoryArguments(apply);
  503. return apply;
  504. }
  505. void AddFactoryArguments(TNodePtr& apply) const final {
  506. apply = L(apply, Intervals);
  507. }
  508. std::vector<ui32> GetFactoryColumnIndices() const final {
  509. return {0u, 1u};
  510. }
  511. bool DoInit(TContext& ctx, ISource* src) final {
  512. if (!Weight->Init(ctx, src)) {
  513. return false;
  514. }
  515. if (!Intervals->Init(ctx, FakeSource.Get())) {
  516. return false;
  517. }
  518. return TAggregationFactory::DoInit(ctx, src);
  519. }
  520. TSourcePtr FakeSource;
  521. TNodePtr Weight, Intervals;
  522. };
  523. TAggregationPtr BuildHistogramFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  524. return new THistogramAggregationFactory(pos, name, factory, aggMode);
  525. }
  526. class TLinearHistogramAggregationFactory final : public TAggregationFactory {
  527. public:
  528. TLinearHistogramAggregationFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  529. : TAggregationFactory(pos, name, factory, aggMode)
  530. , FakeSource(BuildFakeSource(pos))
  531. , BinSize(Y("Double", Q("10.0")))
  532. , Minimum(Y("Double", Q(ToString(-1.0 * Max<double>()))))
  533. , Maximum(Y("Double", Q(ToString(Max<double>()))))
  534. {}
  535. private:
  536. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  537. if (isFactory) {
  538. if (exprs.size() > 3) {
  539. ctx.Error(Pos) << "Aggregation function " << Name << " requires zero to three arguments, given: " << exprs.size();
  540. return false;
  541. }
  542. } else {
  543. if (exprs.empty() || exprs.size() > 4) {
  544. ctx.Error(Pos) << "Aggregation function " << Name << " requires one to four arguments, given: " << exprs.size();
  545. return false;
  546. }
  547. }
  548. if (exprs.size() > 1 - isFactory) {
  549. BinSize = exprs[1 - isFactory];
  550. }
  551. if (exprs.size() > 2 - isFactory) {
  552. Minimum = exprs[2 - isFactory];
  553. }
  554. if (exprs.size() > 3 - isFactory) {
  555. Maximum = exprs[3 - isFactory];
  556. }
  557. return TAggregationFactory::InitAggr(ctx, isFactory, src, node, isFactory ? TVector<TNodePtr>() : TVector<TNodePtr>(1, exprs.front()));
  558. }
  559. TNodePtr DoClone() const final {
  560. return new TLinearHistogramAggregationFactory(Pos, Name, Func, AggMode);
  561. }
  562. TNodePtr GetApply(const TNodePtr& type, bool many, bool allowAggApply, TContext& ctx) const final {
  563. Y_UNUSED(ctx);
  564. Y_UNUSED(allowAggApply);
  565. return Y("Apply", Factory, type,
  566. BuildLambda(Pos, Y("row"), many ? Y("Unwrap", Expr) : Expr),
  567. BinSize, Minimum, Maximum);
  568. }
  569. void AddFactoryArguments(TNodePtr& apply) const final {
  570. apply = L(apply, BinSize, Minimum, Maximum);
  571. }
  572. bool DoInit(TContext& ctx, ISource* src) final {
  573. if (!BinSize->Init(ctx, FakeSource.Get())) {
  574. return false;
  575. }
  576. if (!Minimum->Init(ctx, FakeSource.Get())) {
  577. return false;
  578. }
  579. if (!Maximum->Init(ctx, FakeSource.Get())) {
  580. return false;
  581. }
  582. return TAggregationFactory::DoInit(ctx, src);
  583. }
  584. TSourcePtr FakeSource;
  585. TNodePtr BinSize, Minimum, Maximum;
  586. };
  587. TAggregationPtr BuildLinearHistogramFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  588. return new TLinearHistogramAggregationFactory(pos, name, factory, aggMode);
  589. }
  590. class TPercentileFactory final : public TAggregationFactory {
  591. public:
  592. TPercentileFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  593. : TAggregationFactory(pos, name, factory, aggMode)
  594. , FakeSource(BuildFakeSource(pos))
  595. {}
  596. private:
  597. const TString* GetGenericKey() const final {
  598. return Column;
  599. }
  600. void Join(IAggregation* aggr) final {
  601. const auto percentile = dynamic_cast<TPercentileFactory*>(aggr);
  602. YQL_ENSURE(percentile);
  603. YQL_ENSURE(Column && percentile->Column && *Column == *percentile->Column);
  604. YQL_ENSURE(AggMode == percentile->AggMode);
  605. Percentiles.insert(percentile->Percentiles.cbegin(), percentile->Percentiles.cend());
  606. percentile->Percentiles.clear();
  607. }
  608. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  609. ui32 adjustArgsCount = isFactory ? 0 : 1;
  610. if (exprs.size() < 0 + adjustArgsCount || exprs.size() > 1 + adjustArgsCount) {
  611. ctx.Error(Pos) << "Aggregation function " << (isFactory ? "factory " : "") << Name << " requires "
  612. << (0 + adjustArgsCount) << " or " << (1 + adjustArgsCount) << " arguments, given: " << exprs.size();
  613. return false;
  614. }
  615. if (!isFactory) {
  616. Column = exprs.front()->GetColumnName();
  617. }
  618. if (!TAggregationFactory::InitAggr(ctx, isFactory, src, node, isFactory ? TVector<TNodePtr>() : TVector<TNodePtr>(1, exprs.front())))
  619. return false;
  620. TNodePtr x;
  621. if (1 + adjustArgsCount == exprs.size()) {
  622. x = exprs.back();
  623. if (!x->Init(ctx, FakeSource.Get())) {
  624. return false;
  625. }
  626. } else {
  627. x = Y("Double", Q("0.5"));
  628. }
  629. if (isFactory) {
  630. FactoryPercentile = x;
  631. } else {
  632. Percentiles.emplace(Name, x);
  633. }
  634. return true;
  635. }
  636. TNodePtr DoClone() const final {
  637. return new TPercentileFactory(Pos, Name, Func, AggMode);
  638. }
  639. TNodePtr GetApply(const TNodePtr& type, bool many, bool allowAggApply, TContext& ctx) const final {
  640. Y_UNUSED(ctx);
  641. Y_UNUSED(allowAggApply);
  642. TNodePtr percentiles(Percentiles.cbegin()->second);
  643. if (Percentiles.size() > 1U) {
  644. percentiles = Y();
  645. for (const auto& percentile : Percentiles) {
  646. percentiles = L(percentiles, percentile.second);
  647. }
  648. percentiles = Q(percentiles);
  649. }
  650. return Y("Apply", Factory, type, BuildLambda(Pos, Y("row"), many ? Y("Unwrap", Expr) : Expr), percentiles);
  651. }
  652. void AddFactoryArguments(TNodePtr& apply) const final {
  653. apply = L(apply, FactoryPercentile);
  654. }
  655. std::pair<TNodePtr, bool> AggregationTraits(const TNodePtr& type, bool overState, bool many, bool allowAggApply, TContext& ctx) const final {
  656. if (Percentiles.empty())
  657. return { TNodePtr(), true };
  658. TNodePtr names(Q(Percentiles.cbegin()->first));
  659. if (Percentiles.size() > 1U) {
  660. names = Y();
  661. for (const auto& percentile : Percentiles)
  662. names = L(names, Q(percentile.first));
  663. names = Q(names);
  664. }
  665. const bool distinct = AggMode == EAggregateMode::Distinct;
  666. const auto listType = distinct ? Y("ListType", Y("StructMemberType", Y("ListItemType", type), BuildQuotedAtom(Pos, DistinctKey))) : type;
  667. auto apply = GetApply(listType, many, allowAggApply, ctx);
  668. if (!apply) {
  669. return { TNodePtr(), false };
  670. }
  671. auto wrapped = WrapIfOverState(apply, overState, many, ctx);
  672. if (!wrapped) {
  673. return { TNodePtr(), false };
  674. }
  675. return { distinct ?
  676. Q(Y(names, wrapped, BuildQuotedAtom(Pos, DistinctKey))) :
  677. Q(Y(names, wrapped)), true };
  678. }
  679. bool DoInit(TContext& ctx, ISource* src) final {
  680. for (const auto& p : Percentiles) {
  681. if (!p.second->Init(ctx, src)) {
  682. return false;
  683. }
  684. }
  685. return TAggregationFactory::DoInit(ctx, src);
  686. }
  687. TSourcePtr FakeSource;
  688. std::multimap<TString, TNodePtr> Percentiles;
  689. TNodePtr FactoryPercentile;
  690. const TString* Column = nullptr;
  691. };
  692. TAggregationPtr BuildPercentileFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  693. return new TPercentileFactory(pos, name, factory, aggMode);
  694. }
  695. class TTopFreqFactory final : public TAggregationFactory {
  696. public:
  697. TTopFreqFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  698. : TAggregationFactory(pos, name, factory, aggMode)
  699. , FakeSource(BuildFakeSource(pos))
  700. {}
  701. private:
  702. //first - n, second - buffer
  703. using TPair = std::pair<TNodePtr, TNodePtr>;
  704. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  705. ui32 adjustArgsCount = isFactory ? 0 : 1;
  706. const double DefaultBufferC = 1.5;
  707. const ui32 MinBuffer = 100;
  708. if (exprs.size() < adjustArgsCount || exprs.size() > 2 + adjustArgsCount) {
  709. ctx.Error(Pos) << "Aggregation function " << (isFactory? "factory " : "") << Name <<
  710. " requires " << adjustArgsCount << " to " << (2 + adjustArgsCount) << " arguments, given: " << exprs.size();
  711. return false;
  712. }
  713. if (!TAggregationFactory::InitAggr(ctx, isFactory, src, node, isFactory ? TVector<TNodePtr>() : TVector<TNodePtr>(1, exprs.front())))
  714. return false;
  715. TNodePtr n = Y("Null");
  716. TNodePtr buffer = Y("Null");
  717. if (1 + adjustArgsCount <= exprs.size()) {
  718. n = exprs[adjustArgsCount];
  719. if (!n->Init(ctx, FakeSource.Get())) {
  720. return false;
  721. }
  722. n = Y("SafeCast", n, Q("Uint32"));
  723. }
  724. n = Y("Coalesce", n, Y("Uint32", Q("1")));
  725. if (2 + adjustArgsCount == exprs.size()) {
  726. buffer = exprs[1 + adjustArgsCount];
  727. if (!buffer->Init(ctx, FakeSource.Get())) {
  728. return false;
  729. }
  730. buffer = Y("SafeCast", buffer, Q("Uint32"));
  731. }
  732. buffer = Y("Coalesce", buffer, Y("SafeCast", Y("*", n, Y("Double", Q(ToString(DefaultBufferC)))), Q("Uint32")));
  733. buffer = Y("Coalesce", buffer, Y("Uint32", Q(ToString(MinBuffer))));
  734. buffer = Y("Max", buffer, Y("Uint32", Q(ToString(MinBuffer))));
  735. auto x = TPair{ n, buffer };
  736. if (isFactory) {
  737. TopFreqFactoryParams = x;
  738. } else {
  739. TopFreqs.emplace(Name, x);
  740. }
  741. return true;
  742. }
  743. TNodePtr DoClone() const final {
  744. return new TTopFreqFactory(Pos, Name, Func, AggMode);
  745. }
  746. TNodePtr GetApply(const TNodePtr& type, bool many, bool allowAggApply, TContext& ctx) const final {
  747. Y_UNUSED(ctx);
  748. Y_UNUSED(allowAggApply);
  749. TPair topFreqs(TopFreqs.cbegin()->second);
  750. if (TopFreqs.size() > 1U) {
  751. topFreqs = { Y(), Y() };
  752. for (const auto& topFreq : TopFreqs) {
  753. topFreqs = { L(topFreqs.first, topFreq.second.first), L(topFreqs.second, topFreq.second.second) };
  754. }
  755. topFreqs = { Q(topFreqs.first), Q(topFreqs.second) };
  756. }
  757. auto apply = Y("Apply", Factory, type, BuildLambda(Pos, Y("row"), many ? Y("Unwrap", Expr) : Expr), topFreqs.first, topFreqs.second);
  758. return apply;
  759. }
  760. void AddFactoryArguments(TNodePtr& apply) const final {
  761. apply = L(apply, TopFreqFactoryParams.first, TopFreqFactoryParams.second);
  762. }
  763. std::pair<TNodePtr, bool> AggregationTraits(const TNodePtr& type, bool overState, bool many, bool allowAggApply, TContext& ctx) const final {
  764. if (TopFreqs.empty())
  765. return { TNodePtr(), true };
  766. TNodePtr names(Q(TopFreqs.cbegin()->first));
  767. if (TopFreqs.size() > 1U) {
  768. names = Y();
  769. for (const auto& topFreq : TopFreqs)
  770. names = L(names, Q(topFreq.first));
  771. names = Q(names);
  772. }
  773. const bool distinct = AggMode == EAggregateMode::Distinct;
  774. const auto listType = distinct ? Y("ListType", Y("StructMemberType", Y("ListItemType", type), BuildQuotedAtom(Pos, DistinctKey))) : type;
  775. auto apply = GetApply(listType, many, allowAggApply, ctx);
  776. if (!apply) {
  777. return { nullptr, false };
  778. }
  779. auto wrapped = WrapIfOverState(apply, overState, many, ctx);
  780. if (!wrapped) {
  781. return { nullptr, false };
  782. }
  783. return { distinct ?
  784. Q(Y(names, wrapped, BuildQuotedAtom(Pos, DistinctKey))) :
  785. Q(Y(names, wrapped)), true };
  786. }
  787. bool DoInit(TContext& ctx, ISource* src) final {
  788. for (const auto& topFreq : TopFreqs) {
  789. if (!topFreq.second.first->Init(ctx, src)) {
  790. return false;
  791. }
  792. if (!topFreq.second.second->Init(ctx, src)) {
  793. return false;
  794. }
  795. }
  796. return TAggregationFactory::DoInit(ctx, src);
  797. }
  798. std::multimap<TString, TPair> TopFreqs;
  799. TPair TopFreqFactoryParams;
  800. TSourcePtr FakeSource;
  801. };
  802. TAggregationPtr BuildTopFreqFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  803. return new TTopFreqFactory(pos, name, factory, aggMode);
  804. }
  805. template <bool HasKey>
  806. class TTopAggregationFactory final : public TAggregationFactory {
  807. public:
  808. TTopAggregationFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  809. : TAggregationFactory(pos, name, factory, aggMode)
  810. , FakeSource(BuildFakeSource(pos))
  811. {}
  812. private:
  813. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  814. ui32 adjustArgsCount = isFactory ? 1 : (HasKey ? 3 : 2);
  815. if (exprs.size() != adjustArgsCount) {
  816. ctx.Error(Pos) << "Aggregation function " << (isFactory ? "factory " : "") << Name << " requires "
  817. << adjustArgsCount << " arguments, given: " << exprs.size();
  818. return false;
  819. }
  820. if (BlockWindowAggregationWithoutFrameSpec(Pos, GetName(), src, ctx)) {
  821. return false;
  822. }
  823. if (!isFactory) {
  824. Payload = exprs[0];
  825. if (HasKey) {
  826. Key = exprs[1];
  827. }
  828. }
  829. Count = exprs.back();
  830. if (!isFactory) {
  831. Name = src->MakeLocalName(Name);
  832. }
  833. if (!Init(ctx, src)) {
  834. return false;
  835. }
  836. if (!isFactory) {
  837. node.Add("Member", "row", Q(Name));
  838. if (IsOverWindow()) {
  839. src->AddTmpWindowColumn(Name);
  840. }
  841. }
  842. return true;
  843. }
  844. TNodePtr DoClone() const final {
  845. return new TTopAggregationFactory(Pos, Name, Func, AggMode);
  846. }
  847. TNodePtr GetApply(const TNodePtr& type, bool many, bool allowAggApply, TContext& ctx) const final {
  848. Y_UNUSED(ctx);
  849. Y_UNUSED(allowAggApply);
  850. TNodePtr apply;
  851. if (HasKey) {
  852. apply = Y("Apply", Factory, type,
  853. BuildLambda(Pos, Y("row"), many ? Y("Unwrap", Key) : Key),
  854. BuildLambda(Pos, Y("row"), many ? Y("Payload", Payload) : Payload));
  855. } else {
  856. apply = Y("Apply", Factory, type, BuildLambda(Pos, Y("row"), many ? Y("Unwrap", Payload) : Payload));
  857. }
  858. AddFactoryArguments(apply);
  859. return apply;
  860. }
  861. void AddFactoryArguments(TNodePtr& apply) const final {
  862. apply = L(apply, Count);
  863. }
  864. std::vector<ui32> GetFactoryColumnIndices() const final {
  865. if (HasKey) {
  866. return {1u, 0u};
  867. } else {
  868. return {0u};
  869. }
  870. }
  871. bool DoInit(TContext& ctx, ISource* src) final {
  872. if (!Count->Init(ctx, FakeSource.Get())) {
  873. return false;
  874. }
  875. if (!Payload) {
  876. return true;
  877. }
  878. if (HasKey) {
  879. if (!Key->Init(ctx, src)) {
  880. return false;
  881. }
  882. }
  883. if (!Payload->Init(ctx, src)) {
  884. return false;
  885. }
  886. if ((HasKey && Key->IsAggregated()) || (!HasKey && Payload->IsAggregated())) {
  887. ctx.Error(Pos) << "Aggregation of aggregated values is forbidden";
  888. return false;
  889. }
  890. return true;
  891. }
  892. TSourcePtr FakeSource;
  893. TNodePtr Key, Payload, Count;
  894. };
  895. template <bool HasKey>
  896. TAggregationPtr BuildTopFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  897. return new TTopAggregationFactory<HasKey>(pos, name, factory, aggMode);
  898. }
  899. template TAggregationPtr BuildTopFactoryAggregation<false>(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode);
  900. template TAggregationPtr BuildTopFactoryAggregation<true >(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode);
  901. class TCountDistinctEstimateAggregationFactory final : public TAggregationFactory {
  902. public:
  903. TCountDistinctEstimateAggregationFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  904. : TAggregationFactory(pos, name, factory, aggMode)
  905. {}
  906. private:
  907. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  908. ui32 adjustArgsCount = isFactory ? 0 : 1;
  909. if (exprs.size() < adjustArgsCount || exprs.size() > 1 + adjustArgsCount) {
  910. ctx.Error(Pos) << Name << " aggregation function " << (isFactory ? "factory " : "") << " requires " <<
  911. adjustArgsCount << " or " << (1 + adjustArgsCount) << " argument(s), given: " << exprs.size();
  912. return false;
  913. }
  914. Precision = 14;
  915. if (1 + adjustArgsCount <= exprs.size()) {
  916. auto posSecondArg = exprs[adjustArgsCount]->GetPos();
  917. if (!Parseui32(exprs[adjustArgsCount], Precision)) {
  918. ctx.Error(posSecondArg) << Name << ": invalid argument, numeric literal is expected";
  919. return false;
  920. }
  921. }
  922. if (Precision > 18 || Precision < 4) {
  923. ctx.Error(Pos) << Name << ": precision is expected to be between 4 and 18 (inclusive), got " << Precision;
  924. return false;
  925. }
  926. if (!isFactory) {
  927. Expr = exprs[0];
  928. Name = src->MakeLocalName(Name);
  929. }
  930. if (!Init(ctx, src)) {
  931. return false;
  932. }
  933. if (!isFactory) {
  934. node.Add("Member", "row", Q(Name));
  935. if (IsOverWindow()) {
  936. src->AddTmpWindowColumn(Name);
  937. }
  938. }
  939. return true;
  940. }
  941. TNodePtr DoClone() const final {
  942. return new TCountDistinctEstimateAggregationFactory(Pos, Name, Func, AggMode);
  943. }
  944. TNodePtr GetApply(const TNodePtr& type, bool many, bool allowAggApply, TContext& ctx) const final {
  945. Y_UNUSED(ctx);
  946. Y_UNUSED(allowAggApply);
  947. auto apply = Y("Apply", Factory, type, BuildLambda(Pos, Y("row"), many ? Y("Unwrap", Expr) : Expr));
  948. AddFactoryArguments(apply);
  949. return apply;
  950. }
  951. void AddFactoryArguments(TNodePtr& apply) const final {
  952. apply = L(apply, Y("Uint32", Q(ToString(Precision))));
  953. }
  954. private:
  955. ui32 Precision = 0;
  956. };
  957. TAggregationPtr BuildCountDistinctEstimateFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  958. return new TCountDistinctEstimateAggregationFactory(pos, name, factory, aggMode);
  959. }
  960. class TListAggregationFactory final : public TAggregationFactory {
  961. public:
  962. TListAggregationFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  963. : TAggregationFactory(pos, name, factory, aggMode)
  964. , FakeSource(BuildFakeSource(pos))
  965. {
  966. }
  967. private:
  968. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  969. ui32 adjustArgsCount = isFactory ? 0 : 1;
  970. ui32 minArgs = (0 + adjustArgsCount);
  971. ui32 maxArgs = (1 + adjustArgsCount);
  972. if (exprs.size() < minArgs || exprs.size() > maxArgs) {
  973. ctx.Error(Pos) << "List aggregation " << (isFactory ? "factory " : "") << "function require " << minArgs
  974. << " or " << maxArgs << " arguments, given: " << exprs.size();
  975. return false;
  976. }
  977. if (BlockWindowAggregationWithoutFrameSpec(Pos, GetName(), src, ctx)) {
  978. return false;
  979. }
  980. Limit = nullptr;
  981. if (adjustArgsCount + 1U <= exprs.size()) {
  982. auto posSecondArg = exprs[adjustArgsCount]->GetPos();
  983. Limit = exprs[adjustArgsCount];
  984. if (!Limit->Init(ctx, FakeSource.Get())) {
  985. return false;
  986. }
  987. }
  988. if (!isFactory) {
  989. Expr = exprs[0];
  990. Name = src->MakeLocalName(Name);
  991. }
  992. if (!Init(ctx, src)) {
  993. return false;
  994. }
  995. if (!isFactory) {
  996. node.Add("Member", "row", Q(Name));
  997. if (IsOverWindow()) {
  998. src->AddTmpWindowColumn(Name);
  999. }
  1000. }
  1001. return true;
  1002. }
  1003. TNodePtr DoClone() const final {
  1004. return new TListAggregationFactory(Pos, Name, Func, AggMode);
  1005. }
  1006. TNodePtr GetApply(const TNodePtr& type, bool many, bool allowAggApply, TContext& ctx) const final {
  1007. Y_UNUSED(ctx);
  1008. Y_UNUSED(allowAggApply);
  1009. auto apply = Y("Apply", Factory, type, BuildLambda(Pos, Y("row"), many ? Y("Unwrap", Expr) : Expr));
  1010. AddFactoryArguments(apply);
  1011. return apply;
  1012. }
  1013. void AddFactoryArguments(TNodePtr& apply) const final {
  1014. if (!Limit) {
  1015. apply = L(apply, Y("Uint64", Q("0")));
  1016. } else {
  1017. apply = L(apply, Limit);
  1018. }
  1019. }
  1020. private:
  1021. TSourcePtr FakeSource;
  1022. TNodePtr Limit;
  1023. };
  1024. TAggregationPtr BuildListFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  1025. return new TListAggregationFactory(pos, name, factory, aggMode);
  1026. }
  1027. class TUserDefinedAggregationFactory final : public TAggregationFactory {
  1028. public:
  1029. TUserDefinedAggregationFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  1030. : TAggregationFactory(pos, name, factory, aggMode)
  1031. {}
  1032. private:
  1033. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  1034. ui32 adjustArgsCount = isFactory ? 0 : 1;
  1035. if (exprs.size() < (3 + adjustArgsCount) || exprs.size() > (7 + adjustArgsCount)) {
  1036. ctx.Error(Pos) << "User defined aggregation function " << (isFactory ? "factory " : "") << " requires " <<
  1037. (3 + adjustArgsCount) << " to " << (7 + adjustArgsCount) << " arguments, given: " << exprs.size();
  1038. return false;
  1039. }
  1040. Lambdas[0] = BuildLambda(Pos, Y("value", "parent"), Y("NamedApply", exprs[adjustArgsCount], Q(Y("value")), Y("AsStruct"), Y("DependsOn", "parent")));
  1041. Lambdas[1] = BuildLambda(Pos, Y("value", "state", "parent"), Y("NamedApply", exprs[adjustArgsCount + 1], Q(Y("state", "value")), Y("AsStruct"), Y("DependsOn", "parent")));
  1042. Lambdas[2] = BuildLambda(Pos, Y("one", "two"), Y("IfType", exprs[adjustArgsCount + 2], Y("NullType"),
  1043. BuildLambda(Pos, Y(), Y("Void")),
  1044. BuildLambda(Pos, Y(), Y("Apply", exprs[adjustArgsCount + 2], "one", "two"))));
  1045. for (size_t i = 3U; i < Lambdas.size(); ++i) {
  1046. const auto j = adjustArgsCount + i;
  1047. Lambdas[i] = BuildLambda(Pos, Y("state"), j >= exprs.size() ? AstNode("state") : Y("Apply", exprs[j], "state"));
  1048. }
  1049. DefVal = (exprs.size() == (7 + adjustArgsCount)) ? exprs[adjustArgsCount + 6] : Y("Null");
  1050. return TAggregationFactory::InitAggr(ctx, isFactory, src, node, isFactory ? TVector<TNodePtr>() : TVector<TNodePtr>(1, exprs.front()));
  1051. }
  1052. TNodePtr DoClone() const final {
  1053. return new TUserDefinedAggregationFactory(Pos, Name, Func, AggMode);
  1054. }
  1055. TNodePtr GetApply(const TNodePtr& type, bool many, bool allowAggApply, TContext& ctx) const final {
  1056. Y_UNUSED(ctx);
  1057. Y_UNUSED(allowAggApply);
  1058. auto apply = Y("Apply", Factory, type, BuildLambda(Pos, Y("row"), many ? Y("Unwrap", Expr) : Expr));
  1059. AddFactoryArguments(apply);
  1060. return apply;
  1061. }
  1062. void AddFactoryArguments(TNodePtr& apply) const final {
  1063. apply = L(apply, Lambdas[0], Lambdas[1], Lambdas[2], Lambdas[3], Lambdas[4], Lambdas[5], DefVal);
  1064. }
  1065. bool DoInit(TContext& ctx, ISource* src) final {
  1066. for (const auto& lambda : Lambdas) {
  1067. if (!lambda->Init(ctx, src)) {
  1068. return false;
  1069. }
  1070. }
  1071. if (!DefVal->Init(ctx, src)) {
  1072. return false;
  1073. }
  1074. return TAggregationFactory::DoInit(ctx, src);
  1075. }
  1076. std::array<TNodePtr, 6> Lambdas;
  1077. TNodePtr DefVal;
  1078. };
  1079. TAggregationPtr BuildUserDefinedFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  1080. return new TUserDefinedAggregationFactory(pos, name, factory, aggMode);
  1081. }
  1082. class TCountAggregation final : public TAggregationFactory {
  1083. public:
  1084. TCountAggregation(TPosition pos, const TString& name, const TString& func, EAggregateMode aggMode)
  1085. : TAggregationFactory(pos, name, func, aggMode)
  1086. {}
  1087. private:
  1088. TNodePtr DoClone() const final {
  1089. return new TCountAggregation(Pos, Name, Func, AggMode);
  1090. }
  1091. bool DoInit(TContext& ctx, ISource* src) final {
  1092. if (!Expr) {
  1093. return true;
  1094. }
  1095. if (Expr->IsAsterisk()) {
  1096. Expr = Y("Void");
  1097. }
  1098. if (!Expr->Init(ctx, src)) {
  1099. return false;
  1100. }
  1101. Expr->SetCountHint(Expr->IsConstant());
  1102. return TAggregationFactory::DoInit(ctx, src);
  1103. }
  1104. };
  1105. TAggregationPtr BuildCountAggregation(TPosition pos, const TString& name, const TString& func, EAggregateMode aggMode) {
  1106. return new TCountAggregation(pos, name, func, aggMode);
  1107. }
  1108. class TPGFactoryAggregation final : public TAggregationFactory {
  1109. public:
  1110. TPGFactoryAggregation(TPosition pos, const TString& name, EAggregateMode aggMode)
  1111. : TAggregationFactory(pos, name, "", aggMode, false, false)
  1112. , PgFunc(Name)
  1113. {}
  1114. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) override {
  1115. auto ret = TAggregationFactory::InitAggr(ctx, isFactory, src, node, exprs);
  1116. if (ret) {
  1117. if (isFactory) {
  1118. Factory = BuildLambda(Pos, Y("type", "extractor"), Y(AggMode == EAggregateMode::OverWindow ? "PgWindowTraitsTuple" : "PgAggregationTraitsTuple",
  1119. Q(PgFunc), Y("ListItemType", "type"), "extractor"));
  1120. } else {
  1121. Lambda = BuildLambda(Pos, Y("row"), exprs);
  1122. }
  1123. }
  1124. return ret;
  1125. }
  1126. TNodePtr GetExtractor(bool many, TContext& ctx) const override {
  1127. Y_UNUSED(many);
  1128. ctx.Error() << "Partial aggregation by PostgreSQL function isn't supported";
  1129. return nullptr;
  1130. }
  1131. TNodePtr GetApply(const TNodePtr& type, bool many, bool allowAggApply, TContext& ctx) const final {
  1132. Y_UNUSED(many);
  1133. Y_UNUSED(ctx);
  1134. Y_UNUSED(allowAggApply);
  1135. if (ShouldEmitAggApply(ctx) && allowAggApply && AggMode != EAggregateMode::OverWindow) {
  1136. return Y("AggApply",
  1137. Q("pg_" + to_lower(PgFunc)), Y("ListItemType", type), Lambda);
  1138. }
  1139. return Y(AggMode == EAggregateMode::OverWindow ? "PgWindowTraits" : "PgAggregationTraits",
  1140. Q(PgFunc), Y("ListItemType", type), Lambda);
  1141. }
  1142. private:
  1143. TNodePtr DoClone() const final {
  1144. return new TPGFactoryAggregation(Pos, Name, AggMode);
  1145. }
  1146. TString PgFunc;
  1147. TNodePtr Lambda;
  1148. };
  1149. TAggregationPtr BuildPGFactoryAggregation(TPosition pos, const TString& name, EAggregateMode aggMode) {
  1150. return new TPGFactoryAggregation(pos, name, aggMode);
  1151. }
  1152. class TNthValueFactoryAggregation final : public TAggregationFactory {
  1153. public:
  1154. public:
  1155. TNthValueFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)
  1156. : TAggregationFactory(pos, name, factory, aggMode)
  1157. , FakeSource(BuildFakeSource(pos))
  1158. {
  1159. }
  1160. private:
  1161. bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
  1162. ui32 adjustArgsCount = isFactory ? 0 : 1;
  1163. ui32 expectedArgs = (1 + adjustArgsCount);
  1164. if (exprs.size() != expectedArgs) {
  1165. ctx.Error(Pos) << "NthValue aggregation " << (isFactory ? "factory " : "") << "function require "
  1166. << expectedArgs << " arguments, given: " << exprs.size();
  1167. return false;
  1168. }
  1169. if (BlockWindowAggregationWithoutFrameSpec(Pos, GetName(), src, ctx)) {
  1170. return false;
  1171. }
  1172. Index = exprs[adjustArgsCount];
  1173. if (!Index->Init(ctx, FakeSource.Get())) {
  1174. return false;
  1175. }
  1176. if (!isFactory) {
  1177. Expr = exprs[0];
  1178. Name = src->MakeLocalName(Name);
  1179. }
  1180. if (!Init(ctx, src)) {
  1181. return false;
  1182. }
  1183. if (!isFactory) {
  1184. node.Add("Member", "row", Q(Name));
  1185. if (IsOverWindow()) {
  1186. src->AddTmpWindowColumn(Name);
  1187. }
  1188. }
  1189. return true;
  1190. }
  1191. TNodePtr DoClone() const final {
  1192. return new TNthValueFactoryAggregation(Pos, Name, Func, AggMode);
  1193. }
  1194. TNodePtr GetApply(const TNodePtr& type, bool many, bool allowAggApply, TContext& ctx) const final {
  1195. Y_UNUSED(ctx);
  1196. Y_UNUSED(allowAggApply);
  1197. auto apply = Y("Apply", Factory, type, BuildLambda(Pos, Y("row"), many ? Y("Unwrap", Expr) : Expr));
  1198. AddFactoryArguments(apply);
  1199. return apply;
  1200. }
  1201. void AddFactoryArguments(TNodePtr& apply) const final {
  1202. apply = L(apply, Index);
  1203. }
  1204. private:
  1205. TSourcePtr FakeSource;
  1206. TNodePtr Index;
  1207. };
  1208. TAggregationPtr BuildNthFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) {
  1209. return new TNthValueFactoryAggregation(pos, name, factory, aggMode);
  1210. }
  1211. } // namespace NSQLTranslationV1