select.cpp 105 KB


  1. #include "sql.h"
  2. #include "source.h"
  3. #include "context.h"
  4. #include "match_recognize.h"
  5. #include <yql/essentials/providers/common/provider/yql_provider_names.h>
  6. #include <yql/essentials/utils/yql_panic.h>
  7. #include <library/cpp/charset/ci_string.h>
  8. using namespace NYql;
  9. namespace NSQLTranslationV1 {
  10. class TSubqueryNode: public INode {
  11. public:
  12. TSubqueryNode(TSourcePtr&& source, const TString& alias, bool inSubquery, int ensureTupleSize, TScopedStatePtr scoped)
  13. : INode(source->GetPos())
  14. , Source(std::move(source))
  15. , Alias(alias)
  16. , InSubquery(inSubquery)
  17. , EnsureTupleSize(ensureTupleSize)
  18. , Scoped(scoped)
  19. {
  20. YQL_ENSURE(!Alias.empty());
  21. }
  22. ISource* GetSource() override {
  23. return Source.Get();
  24. }
  25. bool DoInit(TContext& ctx, ISource* src) override {
  26. YQL_ENSURE(!src, "Source not expected for subquery node");
  27. Source->UseAsInner();
  28. if (!Source->Init(ctx, nullptr)) {
  29. return false;
  30. }
  31. TTableList tableList;
  32. Source->GetInputTables(tableList);
  33. auto tables = BuildInputTables(Pos, tableList, InSubquery, Scoped);
  34. if (!tables->Init(ctx, Source.Get())) {
  35. return false;
  36. }
  37. auto source = Source->Build(ctx);
  38. if (!source) {
  39. return false;
  40. }
  41. if (EnsureTupleSize != -1) {
  42. source = Y("EnsureTupleSize", source, Q(ToString(EnsureTupleSize)));
  43. }
  44. Node = Y("let", Alias, Y("block", Q(L(tables, Y("return", Q(Y("world", source)))))));
  45. IsUsed = true;
  46. return true;
  47. }
  48. void DoUpdateState() const override {
  49. State.Set(ENodeState::Const, true);
  50. }
  51. bool UsedSubquery() const override {
  52. return IsUsed;
  53. }
  54. TAstNode* Translate(TContext& ctx) const override {
  55. Y_DEBUG_ABORT_UNLESS(Node);
  56. return Node->Translate(ctx);
  57. }
  58. const TString* SubqueryAlias() const override {
  59. return &Alias;
  60. }
  61. TPtr DoClone() const final {
  62. return new TSubqueryNode(Source->CloneSource(), Alias, InSubquery, EnsureTupleSize, Scoped);
  63. }
  64. protected:
  65. TSourcePtr Source;
  66. TNodePtr Node;
  67. const TString Alias;
  68. const bool InSubquery;
  69. const int EnsureTupleSize;
  70. bool IsUsed = false;
  71. TScopedStatePtr Scoped;
  72. };
  73. TNodePtr BuildSubquery(TSourcePtr source, const TString& alias, bool inSubquery, int ensureTupleSize, TScopedStatePtr scoped) {
  74. return new TSubqueryNode(std::move(source), alias, inSubquery, ensureTupleSize, scoped);
  75. }
  76. class TSourceNode: public INode {
  77. public:
  78. TSourceNode(TPosition pos, TSourcePtr&& source, bool checkExist)
  79. : INode(pos)
  80. , Source(std::move(source))
  81. , CheckExist(checkExist)
  82. {}
  83. ISource* GetSource() override {
  84. return Source.Get();
  85. }
  86. bool DoInit(TContext& ctx, ISource* src) override {
  87. if (AsInner) {
  88. Source->UseAsInner();
  89. }
  90. if (!Source->Init(ctx, src)) {
  91. return false;
  92. }
  93. Node = Source->Build(ctx);
  94. if (!Node) {
  95. return false;
  96. }
  97. if (src) {
  98. if (IsSubquery()) {
  99. /// should be not used?
  100. auto columnsPtr = Source->GetColumns();
  101. if (columnsPtr && (columnsPtr->All || columnsPtr->QualifiedAll || columnsPtr->List.size() == 1)) {
  102. Node = Y("SingleMember", Y("SqlAccess", Q("dict"), Y("Take", Node, Y("Uint64", Q("1"))), Y("Uint64", Q("0"))));
  103. } else {
  104. ctx.Error(Pos) << "Source used in expression should contain one concrete column";
  105. if (RefPos) {
  106. ctx.Error(*RefPos) << "Source is used here";
  107. }
  108. return false;
  109. }
  110. }
  111. src->AddDependentSource(Source.Get());
  112. }
  113. return true;
  114. }
  115. bool IsSubquery() const {
  116. return !AsInner && Source->IsSelect() && !CheckExist;
  117. }
  118. void DoUpdateState() const override {
  119. State.Set(ENodeState::Const, IsSubquery());
  120. }
  121. TAstNode* Translate(TContext& ctx) const override {
  122. Y_DEBUG_ABORT_UNLESS(Node);
  123. return Node->Translate(ctx);
  124. }
  125. TPtr DoClone() const final {
  126. return new TSourceNode(Pos, Source->CloneSource(), CheckExist);
  127. }
  128. protected:
  129. TSourcePtr Source;
  130. TNodePtr Node;
  131. bool CheckExist;
  132. };
  133. TNodePtr BuildSourceNode(TPosition pos, TSourcePtr source, bool checkExist) {
  134. return new TSourceNode(pos, std::move(source), checkExist);
  135. }
  136. class TFakeSource: public ISource {
  137. public:
  138. TFakeSource(TPosition pos, bool missingFrom, bool inSubquery)
  139. : ISource(pos)
  140. , MissingFrom(missingFrom)
  141. , InSubquery(inSubquery)
  142. {}
  143. bool IsFake() const override {
  144. return true;
  145. }
  146. TMaybe<bool> AddColumn(TContext& ctx, TColumnNode& column) override {
  147. // TODO: fix column reference scope - with proper scopes error below should happen earlier
  148. if (column.CanBeType()) {
  149. return true;
  150. }
  151. ctx.Error(Pos) << (MissingFrom ? "Column references are not allowed without FROM" : "Source does not allow column references");
  152. ctx.Error(column.GetPos()) << "Column reference "
  153. << (column.GetColumnName() ? "'" + *column.GetColumnName() + "'" : "(expr)");
  154. return {};
  155. }
  156. bool AddFilter(TContext& ctx, TNodePtr filter) override {
  157. Y_UNUSED(filter);
  158. auto pos = filter ? filter->GetPos() : Pos;
  159. ctx.Error(pos) << (MissingFrom ? "Filtering is not allowed without FROM" : "Source does not allow filtering");
  160. return false;
  161. }
  162. TNodePtr Build(TContext& ctx) override {
  163. Y_UNUSED(ctx);
  164. auto ret = Y("AsList", Y("AsStruct"));
  165. if (InSubquery) {
  166. return Y("WithWorld", ret, "world");
  167. } else {
  168. return ret;
  169. }
  170. }
  171. bool AddGroupKey(TContext& ctx, const TString& column) override {
  172. Y_UNUSED(column);
  173. ctx.Error(Pos) << "Grouping is not allowed " << (MissingFrom ? "without FROM" : "in this context");
  174. return false;
  175. }
  176. bool AddAggregation(TContext& ctx, TAggregationPtr aggr) override {
  177. YQL_ENSURE(aggr);
  178. ctx.Error(aggr->GetPos()) << "Aggregation is not allowed " << (MissingFrom ? "without FROM" : "in this context");
  179. return false;
  180. }
  181. bool AddAggregationOverWindow(TContext& ctx, const TString& windowName, TAggregationPtr func) override {
  182. Y_UNUSED(windowName);
  183. YQL_ENSURE(func);
  184. ctx.Error(func->GetPos()) << "Aggregation is not allowed " << (MissingFrom ? "without FROM" : "in this context");
  185. return false;
  186. }
  187. bool AddFuncOverWindow(TContext& ctx, const TString& windowName, TNodePtr func) override {
  188. Y_UNUSED(windowName);
  189. YQL_ENSURE(func);
  190. ctx.Error(func->GetPos()) << "Window functions are not allowed " << (MissingFrom ? "without FROM" : "in this context");
  191. return false;
  192. }
  193. TWindowSpecificationPtr FindWindowSpecification(TContext& ctx, const TString& windowName) const override {
  194. Y_UNUSED(windowName);
  195. ctx.Error(Pos) << "Window and aggregation functions are not allowed " << (MissingFrom ? "without FROM" : "in this context");
  196. return {};
  197. }
  198. bool IsGroupByColumn(const TString& column) const override {
  199. Y_UNUSED(column);
  200. return false;
  201. }
  202. TNodePtr BuildFilter(TContext& ctx, const TString& label) override {
  203. Y_UNUSED(ctx);
  204. Y_UNUSED(label);
  205. return nullptr;
  206. }
  207. std::pair<TNodePtr, bool> BuildAggregation(const TString& label, TContext& ctx) override {
  208. Y_UNUSED(label);
  209. Y_UNUSED(ctx);
  210. return { nullptr, true };
  211. }
  212. TPtr DoClone() const final {
  213. return new TFakeSource(Pos, MissingFrom, InSubquery);
  214. }
  215. private:
  216. const bool MissingFrom;
  217. const bool InSubquery;
  218. };
  219. TSourcePtr BuildFakeSource(TPosition pos, bool missingFrom, bool inSubquery) {
  220. return new TFakeSource(pos, missingFrom, inSubquery);
  221. }
  222. class TNodeSource: public ISource {
  223. public:
  224. TNodeSource(TPosition pos, const TNodePtr& node, bool wrapToList, bool wrapByTableSource)
  225. : ISource(pos)
  226. , Node(node)
  227. , WrapToList(wrapToList)
  228. , WrapByTableSource(wrapByTableSource)
  229. {
  230. YQL_ENSURE(Node);
  231. FakeSource = BuildFakeSource(pos);
  232. }
  233. bool ShouldUseSourceAsColumn(const TString& source) const final {
  234. return source && source != GetLabel();
  235. }
  236. TMaybe<bool> AddColumn(TContext& ctx, TColumnNode& column) final {
  237. Y_UNUSED(ctx);
  238. Y_UNUSED(column);
  239. return true;
  240. }
  241. bool DoInit(TContext& ctx, ISource* src) final {
  242. if (!Node->Init(ctx, FakeSource.Get())) {
  243. return false;
  244. }
  245. return ISource::DoInit(ctx, src);
  246. }
  247. TNodePtr Build(TContext& /*ctx*/) final {
  248. auto nodeAst = AstNode(Node);
  249. if (WrapToList) {
  250. nodeAst = Y("ToList", nodeAst);
  251. }
  252. if (WrapByTableSource) {
  253. nodeAst = Y("TableSource", nodeAst);
  254. }
  255. return nodeAst;
  256. }
  257. TPtr DoClone() const final {
  258. return new TNodeSource(Pos, SafeClone(Node), WrapToList, WrapByTableSource);
  259. }
  260. private:
  261. TNodePtr Node;
  262. const bool WrapToList;
  263. const bool WrapByTableSource;
  264. TSourcePtr FakeSource;
  265. };
  266. TSourcePtr BuildNodeSource(TPosition pos, const TNodePtr& node, bool wrapToList, bool wrapByTableSource) {
  267. return new TNodeSource(pos, node, wrapToList, wrapByTableSource);
  268. }
  269. class IProxySource: public ISource {
  270. protected:
  271. IProxySource(TPosition pos, ISource* src)
  272. : ISource(pos)
  273. , Source(src)
  274. {}
  275. void AllColumns() override {
  276. Y_DEBUG_ABORT_UNLESS(Source);
  277. return Source->AllColumns();
  278. }
  279. const TColumns* GetColumns() const override {
  280. Y_DEBUG_ABORT_UNLESS(Source);
  281. return Source->GetColumns();
  282. }
  283. void GetInputTables(TTableList& tableList) const override {
  284. if (Source) {
  285. Source->GetInputTables(tableList);
  286. }
  287. ISource::GetInputTables(tableList);
  288. }
  289. TMaybe<bool> AddColumn(TContext& ctx, TColumnNode& column) override {
  290. Y_DEBUG_ABORT_UNLESS(Source);
  291. const TString label(Source->GetLabel());
  292. Source->SetLabel(Label);
  293. const auto ret = Source->AddColumn(ctx, column);
  294. Source->SetLabel(label);
  295. return ret;
  296. }
  297. bool ShouldUseSourceAsColumn(const TString& source) const override {
  298. return Source->ShouldUseSourceAsColumn(source);
  299. }
  300. bool IsStream() const override {
  301. Y_DEBUG_ABORT_UNLESS(Source);
  302. return Source->IsStream();
  303. }
  304. EOrderKind GetOrderKind() const override {
  305. Y_DEBUG_ABORT_UNLESS(Source);
  306. return Source->GetOrderKind();
  307. }
  308. TWriteSettings GetWriteSettings() const override {
  309. Y_DEBUG_ABORT_UNLESS(Source);
  310. return Source->GetWriteSettings();
  311. }
  312. protected:
  313. void SetSource(ISource* source) {
  314. Source = source;
  315. }
  316. ISource* Source;
  317. };
  318. class IRealSource: public ISource {
  319. protected:
  320. IRealSource(TPosition pos)
  321. : ISource(pos)
  322. {
  323. }
  324. void AllColumns() override {
  325. Columns.SetAll();
  326. }
  327. const TColumns* GetColumns() const override {
  328. return &Columns;
  329. }
  330. TMaybe<bool> AddColumn(TContext& ctx, TColumnNode& column) override {
  331. const auto& label = *column.GetSourceName();
  332. const auto& source = GetLabel();
  333. if (!label.empty() && label != source && !(source.StartsWith(label) && source[label.size()] == ':')) {
  334. if (column.IsReliable()) {
  335. ctx.Error(column.GetPos()) << "Unknown correlation name: " << label;
  336. }
  337. return {};
  338. }
  339. if (column.IsAsterisk()) {
  340. return true;
  341. }
  342. const auto* name = column.GetColumnName();
  343. if (name && !column.CanBeType() && !Columns.IsColumnPossible(ctx, *name) && !IsAlias(EExprSeat::GroupBy, *name) && !IsAlias(EExprSeat::DistinctAggr, *name)) {
  344. if (column.IsReliable()) {
  345. TStringBuilder sb;
  346. sb << "Column " << *name << " is not in source column set";
  347. if (const auto mistype = FindColumnMistype(*name)) {
  348. sb << ". Did you mean " << mistype.GetRef() << "?";
  349. }
  350. ctx.Error(column.GetPos()) << sb;
  351. }
  352. return {};
  353. }
  354. return true;
  355. }
  356. TMaybe<TString> FindColumnMistype(const TString& name) const override {
  357. auto result = FindMistypeIn(Columns.Real, name);
  358. if (!result) {
  359. auto result = FindMistypeIn(Columns.Artificial, name);
  360. }
  361. return result ? result : ISource::FindColumnMistype(name);
  362. }
  363. protected:
  364. TColumns Columns;
  365. };
  366. class IComposableSource : private TNonCopyable {
  367. public:
  368. virtual ~IComposableSource() = default;
  369. virtual void BuildProjectWindowDistinct(TNodePtr& blocks, TContext& ctx, bool ordered) = 0;
  370. };
  371. using TComposableSourcePtr = TIntrusivePtr<IComposableSource>;
  372. class TMuxSource: public ISource {
  373. public:
  374. TMuxSource(TPosition pos, TVector<TSourcePtr>&& sources)
  375. : ISource(pos)
  376. , Sources(std::move(sources))
  377. {
  378. YQL_ENSURE(Sources.size() > 1);
  379. }
  380. void AllColumns() final {
  381. for (auto& source: Sources) {
  382. source->AllColumns();
  383. }
  384. }
  385. const TColumns* GetColumns() const final {
  386. // Columns are equal in all sources. Return from the first one
  387. return Sources.front()->GetColumns();
  388. }
  389. void GetInputTables(TTableList& tableList) const final {
  390. for (auto& source: Sources) {
  391. source->GetInputTables(tableList);
  392. }
  393. ISource::GetInputTables(tableList);
  394. }
  395. bool IsStream() const final {
  396. return AnyOf(Sources, [] (const TSourcePtr& s) { return s->IsStream(); });
  397. }
  398. bool DoInit(TContext& ctx, ISource* src) final {
  399. for (auto& source: Sources) {
  400. if (AsInner) {
  401. source->UseAsInner();
  402. }
  403. if (src) {
  404. src->AddDependentSource(source.Get());
  405. }
  406. if (!source->Init(ctx, src)) {
  407. return false;
  408. }
  409. if (!source->InitFilters(ctx)) {
  410. return false;
  411. }
  412. }
  413. return true;
  414. }
  415. TMaybe<bool> AddColumn(TContext& ctx, TColumnNode& column) final {
  416. for (auto& source: Sources) {
  417. if (!source->AddColumn(ctx, column)) {
  418. return {};
  419. }
  420. }
  421. return true;
  422. }
  423. TNodePtr Build(TContext& ctx) final {
  424. TNodePtr block;
  425. auto muxArgs = Y();
  426. for (size_t i = 0; i < Sources.size(); ++i) {
  427. auto& source = Sources[i];
  428. auto input = source->Build(ctx);
  429. auto ref = ctx.MakeName("src");
  430. muxArgs->Add(ref);
  431. if (block) {
  432. block = L(block, Y("let", ref, input));
  433. } else {
  434. block = Y(Y("let", ref, input));
  435. }
  436. auto filter = source->BuildFilter(ctx, ref);
  437. if (filter) {
  438. block = L(block, Y("let", ref, filter));
  439. }
  440. if (ctx.EnableSystemColumns) {
  441. block = L(block, Y("let", ref, Y("RemoveSystemMembers", ref)));
  442. }
  443. }
  444. return GroundWithExpr(block, Y("Mux", Q(muxArgs)));
  445. }
  446. bool AddFilter(TContext& ctx, TNodePtr filter) final {
  447. Y_UNUSED(filter);
  448. ctx.Error() << "Filter is not allowed for multiple sources";
  449. return false;
  450. }
  451. TPtr DoClone() const final {
  452. return new TMuxSource(Pos, CloneContainer(Sources));
  453. }
  454. protected:
  455. TVector<TSourcePtr> Sources;
  456. };
  457. TSourcePtr BuildMuxSource(TPosition pos, TVector<TSourcePtr>&& sources) {
  458. return new TMuxSource(pos, std::move(sources));
  459. }
  460. class TSubqueryRefNode: public IRealSource {
  461. public:
  462. TSubqueryRefNode(const TNodePtr& subquery, const TString& alias, int tupleIndex)
  463. : IRealSource(subquery->GetPos())
  464. , Subquery(subquery)
  465. , Alias(alias)
  466. , TupleIndex(tupleIndex)
  467. {
  468. YQL_ENSURE(subquery->GetSource());
  469. }
  470. ISource* GetSource() override {
  471. return this;
  472. }
  473. bool DoInit(TContext& ctx, ISource* src) override {
  474. // independent subquery should not connect source
  475. Subquery->UseAsInner();
  476. if (!Subquery->Init(ctx, nullptr)) {
  477. return false;
  478. }
  479. Columns = *Subquery->GetSource()->GetColumns();
  480. Node = BuildAtom(Pos, Alias, TNodeFlags::Default);
  481. if (TupleIndex != -1) {
  482. Node = Y("Nth", Node, Q(ToString(TupleIndex)));
  483. }
  484. if (!Node->Init(ctx, src)) {
  485. return false;
  486. }
  487. if (src && Subquery->GetSource()->IsSelect()) {
  488. auto columnsPtr = &Columns;
  489. if (columnsPtr && (columnsPtr->All || columnsPtr->QualifiedAll || columnsPtr->List.size() == 1)) {
  490. Node = Y("SingleMember", Y("SqlAccess", Q("dict"), Y("Take", Node, Y("Uint64", Q("1"))), Y("Uint64", Q("0"))));
  491. } else {
  492. ctx.Error(Pos) << "Source used in expression should contain one concrete column";
  493. if (RefPos) {
  494. ctx.Error(*RefPos) << "Source is used here";
  495. }
  496. return false;
  497. }
  498. }
  499. TNodePtr sample;
  500. if (!BuildSamplingLambda(sample)) {
  501. return false;
  502. } else if (sample) {
  503. Node = Y("block", Q(Y(Y("let", Node, Y("OrderedFlatMap", Node, sample)), Y("return", Node))));
  504. }
  505. return true;
  506. }
  507. TNodePtr Build(TContext& ctx) override {
  508. Y_UNUSED(ctx);
  509. return Node;
  510. }
  511. bool SetSamplingOptions(
  512. TContext& ctx,
  513. TPosition pos,
  514. ESampleClause sampleClause,
  515. ESampleMode mode,
  516. TNodePtr samplingRate,
  517. TNodePtr samplingSeed) override {
  518. if (mode == ESampleMode::System) {
  519. ctx.Error(pos) << "only Bernoulli sampling mode is supported for subqueries";
  520. return false;
  521. }
  522. if (samplingSeed) {
  523. ctx.Error(pos) << "'Repeatable' keyword is not supported for subqueries";
  524. return false;
  525. }
  526. return SetSamplingRate(ctx, sampleClause, samplingRate);
  527. }
  528. bool IsStream() const override {
  529. return Subquery->GetSource()->IsStream();
  530. }
  531. void DoUpdateState() const override {
  532. State.Set(ENodeState::Const, true);
  533. }
  534. TAstNode* Translate(TContext& ctx) const override {
  535. Y_DEBUG_ABORT_UNLESS(Node);
  536. return Node->Translate(ctx);
  537. }
  538. TPtr DoClone() const final {
  539. return new TSubqueryRefNode(Subquery, Alias, TupleIndex);
  540. }
  541. protected:
  542. TNodePtr Subquery;
  543. const TString Alias;
  544. const int TupleIndex;
  545. TNodePtr Node;
  546. };
  547. TNodePtr BuildSubqueryRef(TNodePtr subquery, const TString& alias, int tupleIndex) {
  548. return new TSubqueryRefNode(std::move(subquery), alias, tupleIndex);
  549. }
  550. class TInvalidSubqueryRefNode: public ISource {
  551. public:
  552. TInvalidSubqueryRefNode(TPosition pos)
  553. : ISource(pos)
  554. , Pos(pos)
  555. {
  556. }
  557. bool DoInit(TContext& ctx, ISource* src) override {
  558. Y_UNUSED(src);
  559. ctx.Error(Pos) << "Named subquery can not be used as a top level statement in libraries";
  560. return false;
  561. }
  562. TNodePtr Build(TContext& ctx) override {
  563. Y_UNUSED(ctx);
  564. return {};
  565. }
  566. TPtr DoClone() const final {
  567. return new TInvalidSubqueryRefNode(Pos);
  568. }
  569. protected:
  570. const TPosition Pos;
  571. };
  572. TNodePtr BuildInvalidSubqueryRef(TPosition subqueryPos) {
  573. return new TInvalidSubqueryRefNode(subqueryPos);
  574. }
  575. class TTableSource: public IRealSource {
  576. public:
  577. TTableSource(TPosition pos, const TTableRef& table, const TString& label)
  578. : IRealSource(pos)
  579. , Table(table)
  580. , FakeSource(BuildFakeSource(pos))
  581. {
  582. SetLabel(label.empty() ? Table.ShortName() : label);
  583. }
  584. void GetInputTables(TTableList& tableList) const override {
  585. tableList.push_back(Table);
  586. ISource::GetInputTables(tableList);
  587. }
  588. bool ShouldUseSourceAsColumn(const TString& source) const override {
  589. const auto& label = GetLabel();
  590. return source && source != label && !(label.StartsWith(source) && label[source.size()] == ':');
  591. }
  592. TMaybe<bool> AddColumn(TContext& ctx, TColumnNode& column) override {
  593. Columns.Add(column.GetColumnName(), column.GetCountHint(), column.IsArtificial(), column.IsReliable());
  594. if (!IRealSource::AddColumn(ctx, column)) {
  595. return {};
  596. }
  597. return false;
  598. }
  599. bool SetSamplingOptions(
  600. TContext& ctx,
  601. TPosition pos,
  602. ESampleClause sampleClause,
  603. ESampleMode mode,
  604. TNodePtr samplingRate,
  605. TNodePtr samplingSeed) override
  606. {
  607. Y_UNUSED(pos);
  608. TString modeName;
  609. if (!samplingSeed) {
  610. samplingSeed = Y("Int32", Q("0"));
  611. }
  612. if (ESampleClause::Sample == sampleClause) {
  613. YQL_ENSURE(ESampleMode::Bernoulli == mode, "Internal logic error");
  614. }
  615. switch (mode) {
  616. case ESampleMode::Bernoulli:
  617. modeName = "bernoulli";
  618. break;
  619. case ESampleMode::System:
  620. modeName = "system";
  621. break;
  622. }
  623. if (!samplingRate->Init(ctx, FakeSource.Get())) {
  624. return false;
  625. }
  626. samplingRate = PrepareSamplingRate(pos, sampleClause, samplingRate);
  627. auto sampleSettings = Q(Y(Q(modeName), Y("EvaluateAtom", Y("ToString", samplingRate)), Y("EvaluateAtom", Y("ToString", samplingSeed))));
  628. auto sampleOption = Q(Y(Q("sample"), sampleSettings));
  629. if (Table.Options) {
  630. if (!Table.Options->Init(ctx, this)) {
  631. return false;
  632. }
  633. Table.Options = L(Table.Options, sampleOption);
  634. } else {
  635. Table.Options = Y(sampleOption);
  636. }
  637. return true;
  638. }
  639. bool SetTableHints(TContext& ctx, TPosition pos, const TTableHints& hints, const TTableHints& contextHints) override {
  640. Y_UNUSED(ctx);
  641. TTableHints merged = contextHints;
  642. MergeHints(merged, hints);
  643. Table.Options = BuildInputOptions(pos, merged);
  644. return true;
  645. }
  646. bool SetViewName(TContext& ctx, TPosition pos, const TString& view) override {
  647. return Table.Keys->SetViewName(ctx, pos, view);
  648. }
  649. TNodePtr Build(TContext& ctx) override {
  650. if (!Table.Keys->Init(ctx, nullptr)) {
  651. return nullptr;
  652. }
  653. return AstNode(Table.RefName);
  654. }
  655. bool IsStream() const override {
  656. return IsStreamingService(Table.Service);
  657. }
  658. TPtr DoClone() const final {
  659. return new TTableSource(Pos, Table, GetLabel());
  660. }
  661. bool IsTableSource() const override {
  662. return true;
  663. }
  664. protected:
  665. TTableRef Table;
  666. private:
  667. const TSourcePtr FakeSource;
  668. };
  669. TSourcePtr BuildTableSource(TPosition pos, const TTableRef& table, const TString& label) {
  670. return new TTableSource(pos, table, label);
  671. }
  672. class TInnerSource: public IProxySource {
  673. public:
  674. TInnerSource(TPosition pos, TNodePtr node, const TString& service, const TDeferredAtom& cluster, const TString& label)
  675. : IProxySource(pos, nullptr)
  676. , Node(node)
  677. , Service(service)
  678. , Cluster(cluster)
  679. {
  680. SetLabel(label);
  681. }
  682. bool SetSamplingOptions(TContext& ctx, TPosition pos, ESampleClause sampleClause, ESampleMode mode, TNodePtr samplingRate, TNodePtr samplingSeed) override {
  683. Y_UNUSED(ctx);
  684. SamplingPos = pos;
  685. SamplingClause = sampleClause;
  686. SamplingMode = mode;
  687. SamplingRate = samplingRate;
  688. SamplingSeed = samplingSeed;
  689. return true;
  690. }
  691. bool SetTableHints(TContext& ctx, TPosition pos, const TTableHints& hints, const TTableHints& contextHints) override {
  692. Y_UNUSED(ctx);
  693. HintsPos = pos;
  694. Hints = hints;
  695. ContextHints = contextHints;
  696. return true;
  697. }
  698. bool SetViewName(TContext& ctx, TPosition pos, const TString& view) override {
  699. Y_UNUSED(ctx);
  700. ViewPos = pos;
  701. View = view;
  702. return true;
  703. }
  704. bool ShouldUseSourceAsColumn(const TString& source) const override {
  705. return source && source != GetLabel();
  706. }
  707. TMaybe<bool> AddColumn(TContext& ctx, TColumnNode& column) override {
  708. if (const TString* columnName = column.GetColumnName()) {
  709. if (columnName && IsExprAlias(*columnName)) {
  710. return true;
  711. }
  712. }
  713. return IProxySource::AddColumn(ctx, column);
  714. }
  715. bool DoInit(TContext& ctx, ISource* initSrc) override {
  716. Y_UNUSED(initSrc);
  717. auto source = Node->GetSource();
  718. if (!source) {
  719. NewSource = TryMakeSourceFromExpression(Pos, ctx, Service, Cluster, Node);
  720. source = NewSource.Get();
  721. }
  722. if (!source) {
  723. ctx.Error(Pos) << "Invalid inner source node";
  724. return false;
  725. }
  726. if (SamplingPos) {
  727. if (!source->SetSamplingOptions(ctx, *SamplingPos, SamplingClause, SamplingMode, SamplingRate, SamplingSeed)) {
  728. return false;
  729. }
  730. }
  731. if (ViewPos) {
  732. if (!source->SetViewName(ctx, *ViewPos, View)) {
  733. return false;
  734. }
  735. }
  736. if (HintsPos) {
  737. if (!source->SetTableHints(ctx, *HintsPos, Hints, ContextHints)) {
  738. return false;
  739. }
  740. }
  741. source->SetLabel(Label);
  742. if (!NewSource) {
  743. Node->UseAsInner();
  744. if (!Node->Init(ctx, nullptr)) {
  745. return false;
  746. }
  747. }
  748. SetSource(source);
  749. if (NewSource && !NewSource->Init(ctx, nullptr)) {
  750. return false;
  751. }
  752. return ISource::DoInit(ctx, source);
  753. }
  754. TNodePtr Build(TContext& ctx) override {
  755. Y_UNUSED(ctx);
  756. return NewSource ? NewSource->Build(ctx) : Node;
  757. }
  758. bool IsStream() const override {
  759. auto source = Node->GetSource();
  760. if (source) {
  761. return source->IsStream();
  762. }
  763. // NewSource will be built later in DoInit->TryMakeSourceFromExpression
  764. // where Service will be used in all situations
  765. // let's detect IsStream by Service value
  766. return IsStreamingService(Service);
  767. }
  768. TPtr DoClone() const final {
  769. return new TInnerSource(Pos, SafeClone(Node), Service, Cluster, GetLabel());
  770. }
  771. protected:
  772. TNodePtr Node;
  773. TString Service;
  774. TDeferredAtom Cluster;
  775. TSourcePtr NewSource;
  776. private:
  777. TMaybe<TPosition> SamplingPos;
  778. ESampleClause SamplingClause;
  779. ESampleMode SamplingMode;
  780. TNodePtr SamplingRate;
  781. TNodePtr SamplingSeed;
  782. TMaybe<TPosition> ViewPos;
  783. TString View;
  784. TMaybe<TPosition> HintsPos;
  785. TTableHints Hints;
  786. TTableHints ContextHints;
  787. };
  788. TSourcePtr BuildInnerSource(TPosition pos, TNodePtr node, const TString& service, const TDeferredAtom& cluster, const TString& label) {
  789. return new TInnerSource(pos, node, service, cluster, label);
  790. }
  791. static bool IsComparableExpression(TContext& ctx, const TNodePtr& expr, bool assume, const char* sqlConstruction) {
  792. if (assume && !expr->IsPlainColumn()) {
  793. ctx.Error(expr->GetPos()) << "Only column names can be used in " << sqlConstruction;
  794. return false;
  795. }
  796. if (expr->IsConstant()) {
  797. ctx.Error(expr->GetPos()) << "Unable to " << sqlConstruction << " constant expression";
  798. return false;
  799. }
  800. if (expr->IsAggregated() && !expr->HasState(ENodeState::AggregationKey)) {
  801. ctx.Error(expr->GetPos()) << "Unable to " << sqlConstruction << " aggregated values";
  802. return false;
  803. }
  804. if (expr->IsPlainColumn()) {
  805. return true;
  806. }
  807. if (expr->GetOpName().empty()) {
  808. ctx.Error(expr->GetPos()) << "You should use in " << sqlConstruction << " column name, qualified field, callable function or expression";
  809. return false;
  810. }
  811. return true;
  812. }
  813. /// \todo move to reduce.cpp? or mapreduce.cpp?
  814. class TReduceSource: public IRealSource {
  815. public:
  816. TReduceSource(TPosition pos,
  817. ReduceMode mode,
  818. TSourcePtr source,
  819. TVector<TSortSpecificationPtr>&& orderBy,
  820. TVector<TNodePtr>&& keys,
  821. TVector<TNodePtr>&& args,
  822. TNodePtr udf,
  823. TNodePtr having,
  824. const TWriteSettings& settings,
  825. const TVector<TSortSpecificationPtr>& assumeOrderBy,
  826. bool listCall)
  827. : IRealSource(pos)
  828. , Mode(mode)
  829. , Source(std::move(source))
  830. , OrderBy(std::move(orderBy))
  831. , Keys(std::move(keys))
  832. , Args(std::move(args))
  833. , Udf(udf)
  834. , Having(having)
  835. , Settings(settings)
  836. , AssumeOrderBy(assumeOrderBy)
  837. , ListCall(listCall)
  838. {
  839. YQL_ENSURE(!Keys.empty());
  840. YQL_ENSURE(Source);
  841. }
  842. void GetInputTables(TTableList& tableList) const override {
  843. Source->GetInputTables(tableList);
  844. ISource::GetInputTables(tableList);
  845. }
  846. bool DoInit(TContext& ctx, ISource* src) final {
  847. if (AsInner) {
  848. Source->UseAsInner();
  849. }
  850. YQL_ENSURE(!src);
  851. if (!Source->Init(ctx, src)) {
  852. return false;
  853. }
  854. if (!Source->InitFilters(ctx)) {
  855. return false;
  856. }
  857. src = Source.Get();
  858. for (auto& key: Keys) {
  859. if (!key->Init(ctx, src)) {
  860. return false;
  861. }
  862. auto keyNamePtr = key->GetColumnName();
  863. YQL_ENSURE(keyNamePtr);
  864. if (!src->AddGroupKey(ctx, *keyNamePtr)) {
  865. return false;
  866. }
  867. }
  868. if (Having && !Having->Init(ctx, nullptr)) {
  869. return false;
  870. }
  871. /// SIN: verify reduce one argument
  872. if (Args.size() != 1) {
  873. ctx.Error(Pos) << "REDUCE requires exactly one UDF argument";
  874. return false;
  875. }
  876. if (!Args[0]->Init(ctx, src)) {
  877. return false;
  878. }
  879. for (auto orderSpec: OrderBy) {
  880. if (!orderSpec->OrderExpr->Init(ctx, src)) {
  881. return false;
  882. }
  883. }
  884. if (!Udf->Init(ctx, src)) {
  885. return false;
  886. }
  887. if (Udf->GetLabel().empty()) {
  888. Columns.SetAll();
  889. } else {
  890. Columns.Add(&Udf->GetLabel(), false);
  891. }
  892. const auto label = GetLabel();
  893. for (const auto& sortSpec: AssumeOrderBy) {
  894. auto& expr = sortSpec->OrderExpr;
  895. SetLabel(Source->GetLabel());
  896. if (!expr->Init(ctx, this)) {
  897. return false;
  898. }
  899. if (!IsComparableExpression(ctx, expr, true, "ASSUME ORDER BY")) {
  900. return false;
  901. }
  902. }
  903. SetLabel(label);
  904. return true;
  905. }
  906. TNodePtr Build(TContext& ctx) final {
  907. auto input = Source->Build(ctx);
  908. if (!input) {
  909. return nullptr;
  910. }
  911. auto keysTuple = Y();
  912. if (Keys.size() == 1) {
  913. keysTuple = Y("Member", "row", BuildQuotedAtom(Pos, *Keys.back()->GetColumnName()));
  914. }
  915. else {
  916. for (const auto& key: Keys) {
  917. keysTuple = L(keysTuple, Y("Member", "row", BuildQuotedAtom(Pos, *key->GetColumnName())));
  918. }
  919. keysTuple = Q(keysTuple);
  920. }
  921. auto extractKey = Y("SqlExtractKey", "row", BuildLambda(Pos, Y("row"), keysTuple));
  922. auto extractKeyLambda = BuildLambda(Pos, Y("row"), extractKey);
  923. TNodePtr processPartitions;
  924. if (ListCall) {
  925. if (Mode != ReduceMode::ByAll) {
  926. ctx.Error(Pos) << "TableRows() must be used only with USING ALL";
  927. return nullptr;
  928. }
  929. TNodePtr expr = BuildAtom(Pos, "partitionStream");
  930. processPartitions = Y("SqlReduce", "partitionStream", BuildQuotedAtom(Pos, "byAllList", TNodeFlags::Default), Udf, expr);
  931. } else {
  932. switch (Mode) {
  933. case ReduceMode::ByAll: {
  934. auto columnPtr = Args[0]->GetColumnName();
  935. TNodePtr expr = BuildAtom(Pos, "partitionStream");
  936. if (!columnPtr || *columnPtr != "*") {
  937. expr = Y("Map", "partitionStream", BuildLambda(Pos, Y("keyPair"), Q(L(Y(),\
  938. Y("Nth", "keyPair", Q(ToString("0"))),\
  939. Y("Map", Y("Nth", "keyPair", Q(ToString("1"))), BuildLambda(Pos, Y("row"), Args[0]))))));
  940. }
  941. processPartitions = Y("SqlReduce", "partitionStream", BuildQuotedAtom(Pos, "byAll", TNodeFlags::Default), Udf, expr);
  942. break;
  943. }
  944. case ReduceMode::ByPartition: {
  945. processPartitions = Y("SqlReduce", "partitionStream", extractKeyLambda, Udf,
  946. BuildLambda(Pos, Y("row"), Args[0]));
  947. break;
  948. }
  949. default:
  950. YQL_ENSURE(false, "Unexpected REDUCE mode");
  951. }
  952. }
  953. TNodePtr sortDirection;
  954. TNodePtr sortKeySelector;
  955. FillSortParts(OrderBy, sortDirection, sortKeySelector);
  956. if (!OrderBy.empty()) {
  957. sortKeySelector = BuildLambda(Pos, Y("row"), Y("SqlExtractKey", "row", sortKeySelector));
  958. }
  959. auto partitionByKey = Y(!ListCall && Mode == ReduceMode::ByAll ? "PartitionByKey" : "PartitionsByKeys", "core", extractKeyLambda,
  960. sortDirection, sortKeySelector, BuildLambda(Pos, Y("partitionStream"), processPartitions));
  961. auto inputLabel = ListCall ? "inputRowsList" : "core";
  962. auto block(Y(Y("let", inputLabel, input)));
  963. auto filter = Source->BuildFilter(ctx, inputLabel);
  964. if (filter) {
  965. block = L(block, Y("let", inputLabel, filter));
  966. }
  967. if (ListCall) {
  968. block = L(block, Y("let", "core", "inputRowsList"));
  969. }
  970. if (ctx.EnableSystemColumns) {
  971. block = L(block, Y("let", "core", Y("RemoveSystemMembers", "core")));
  972. }
  973. block = L(block, Y("let", "core", Y("AutoDemux", partitionByKey)));
  974. if (Having) {
  975. block = L(block, Y("let", "core",
  976. Y("Filter", "core", BuildLambda(Pos, Y("row"), Y("Coalesce", Having, Y("Bool", Q("false")))))
  977. ));
  978. }
  979. return Y("block", Q(L(block, Y("return", "core"))));
  980. }
  981. TNodePtr BuildSort(TContext& ctx, const TString& label) override {
  982. Y_UNUSED(ctx);
  983. if (AssumeOrderBy.empty()) {
  984. return nullptr;
  985. }
  986. return Y("let", label, BuildSortSpec(AssumeOrderBy, label, false, true));
  987. }
  988. EOrderKind GetOrderKind() const override {
  989. return AssumeOrderBy.empty() ? EOrderKind::None : EOrderKind::Assume;
  990. }
  991. TWriteSettings GetWriteSettings() const final {
  992. return Settings;
  993. }
  994. bool HasSelectResult() const final {
  995. return !Settings.Discard;
  996. }
  997. TPtr DoClone() const final {
  998. return new TReduceSource(Pos, Mode, Source->CloneSource(), CloneContainer(OrderBy),
  999. CloneContainer(Keys), CloneContainer(Args), SafeClone(Udf), SafeClone(Having), Settings,
  1000. CloneContainer(AssumeOrderBy), ListCall);
  1001. }
  1002. private:
  1003. ReduceMode Mode;
  1004. TSourcePtr Source;
  1005. TVector<TSortSpecificationPtr> OrderBy;
  1006. TVector<TNodePtr> Keys;
  1007. TVector<TNodePtr> Args;
  1008. TNodePtr Udf;
  1009. TNodePtr Having;
  1010. const TWriteSettings Settings;
  1011. TVector<TSortSpecificationPtr> AssumeOrderBy;
  1012. const bool ListCall;
  1013. };
  1014. TSourcePtr BuildReduce(TPosition pos,
  1015. ReduceMode mode,
  1016. TSourcePtr source,
  1017. TVector<TSortSpecificationPtr>&& orderBy,
  1018. TVector<TNodePtr>&& keys,
  1019. TVector<TNodePtr>&& args,
  1020. TNodePtr udf,
  1021. TNodePtr having,
  1022. const TWriteSettings& settings,
  1023. const TVector<TSortSpecificationPtr>& assumeOrderBy,
  1024. bool listCall) {
  1025. return new TReduceSource(pos, mode, std::move(source), std::move(orderBy), std::move(keys),
  1026. std::move(args), udf, having, settings, assumeOrderBy, listCall);
  1027. }
  1028. namespace {
  1029. bool InitAndGetGroupKey(TContext& ctx, const TNodePtr& expr, ISource* src, TStringBuf where, TString& keyColumn) {
  1030. keyColumn.clear();
  1031. YQL_ENSURE(src);
  1032. const bool isJoin = src->GetJoin();
  1033. if (!expr->Init(ctx, src)) {
  1034. return false;
  1035. }
  1036. auto keyNamePtr = expr->GetColumnName();
  1037. if (keyNamePtr && expr->GetLabel().empty()) {
  1038. keyColumn = *keyNamePtr;
  1039. auto sourceNamePtr = expr->GetSourceName();
  1040. auto columnNode = expr->GetColumnNode();
  1041. if (isJoin && (!columnNode || !columnNode->IsArtificial())) {
  1042. if (!sourceNamePtr || sourceNamePtr->empty()) {
  1043. if (!src->IsAlias(EExprSeat::GroupBy, keyColumn)) {
  1044. ctx.Error(expr->GetPos()) << "Columns in " << where << " should have correlation name, error in key: " << keyColumn;
  1045. return false;
  1046. }
  1047. } else {
  1048. keyColumn = DotJoin(*sourceNamePtr, keyColumn);
  1049. }
  1050. }
  1051. }
  1052. return true;
  1053. }
  1054. }
  1055. class TCompositeSelect: public IRealSource {
  1056. public:
  1057. TCompositeSelect(TPosition pos, TSourcePtr source, TSourcePtr originalSource, const TWriteSettings& settings)
  1058. : IRealSource(pos)
  1059. , Source(std::move(source))
  1060. , OriginalSource(std::move(originalSource))
  1061. , Settings(settings)
  1062. {
  1063. YQL_ENSURE(Source);
  1064. }
  1065. void SetSubselects(TVector<TSourcePtr>&& subselects, TVector<TNodePtr>&& grouping, TVector<TNodePtr>&& groupByExpr) {
  1066. Subselects = std::move(subselects);
  1067. Grouping = std::move(grouping);
  1068. GroupByExpr = std::move(groupByExpr);
  1069. Y_DEBUG_ABORT_UNLESS(Subselects.size() > 1);
  1070. }
  1071. void GetInputTables(TTableList& tableList) const override {
  1072. for (const auto& select: Subselects) {
  1073. select->GetInputTables(tableList);
  1074. }
  1075. ISource::GetInputTables(tableList);
  1076. }
  1077. bool DoInit(TContext& ctx, ISource* src) override {
  1078. if (AsInner) {
  1079. Source->UseAsInner();
  1080. }
  1081. if (src) {
  1082. src->AddDependentSource(Source.Get());
  1083. }
  1084. if (!Source->Init(ctx, src)) {
  1085. return false;
  1086. }
  1087. if (!Source->InitFilters(ctx)) {
  1088. return false;
  1089. }
  1090. if (!CalculateGroupingCols(ctx, src)) {
  1091. return false;
  1092. }
  1093. auto origSrc = OriginalSource.Get();
  1094. if (!origSrc->Init(ctx, src)) {
  1095. return false;
  1096. }
  1097. if (origSrc->IsFlattenByColumns() || origSrc->IsFlattenColumns()) {
  1098. Flatten = origSrc->IsFlattenByColumns() ?
  1099. origSrc->BuildFlattenByColumns("row") :
  1100. origSrc->BuildFlattenColumns("row");
  1101. if (!Flatten || !Flatten->Init(ctx, src)) {
  1102. return false;
  1103. }
  1104. }
  1105. if (origSrc->IsFlattenByExprs()) {
  1106. for (auto& expr : static_cast<ISource const*>(origSrc)->Expressions(EExprSeat::FlattenByExpr)) {
  1107. if (!expr->Init(ctx, origSrc)) {
  1108. return false;
  1109. }
  1110. }
  1111. PreFlattenMap = origSrc->BuildPreFlattenMap(ctx);
  1112. if (!PreFlattenMap) {
  1113. return false;
  1114. }
  1115. }
  1116. for (const auto& select: Subselects) {
  1117. select->SetLabel(Label);
  1118. if (AsInner) {
  1119. select->UseAsInner();
  1120. }
  1121. if (!select->Init(ctx, Source.Get())) {
  1122. return false;
  1123. }
  1124. }
  1125. TMaybe<size_t> groupingColumnsCount;
  1126. size_t idx = 0;
  1127. for (const auto& select : Subselects) {
  1128. size_t count = select->GetGroupingColumnsCount();
  1129. if (!groupingColumnsCount.Defined()) {
  1130. groupingColumnsCount = count;
  1131. } else if (*groupingColumnsCount != count) {
  1132. ctx.Error(select->GetPos()) << TStringBuilder() << "Mismatch GROUPING() column count in composite select input #"
  1133. << idx << ": expected " << *groupingColumnsCount << ", got: " << count << ". Please submit bug report";
  1134. return false;
  1135. }
  1136. ++idx;
  1137. }
  1138. return true;
  1139. }
  1140. TMaybe<bool> AddColumn(TContext& ctx, TColumnNode& column) override {
  1141. for (const auto& select: Subselects) {
  1142. if (!select->AddColumn(ctx, column)) {
  1143. return {};
  1144. }
  1145. }
  1146. return true;
  1147. }
  1148. TNodePtr Build(TContext& ctx) override {
  1149. auto input = Source->Build(ctx);
  1150. auto block(Y(Y("let", "composite", input)));
  1151. bool ordered = ctx.UseUnordered(*this);
  1152. if (PreFlattenMap) {
  1153. block = L(block, Y("let", "composite", Y(ordered ? "OrderedFlatMap" : "FlatMap", "composite", BuildLambda(Pos, Y("row"), PreFlattenMap))));
  1154. }
  1155. if (Flatten) {
  1156. block = L(block, Y("let", "composite", Y(ordered ? "OrderedFlatMap" : "FlatMap", "composite", BuildLambda(Pos, Y("row"), Flatten, "res"))));
  1157. }
  1158. auto filter = Source->BuildFilter(ctx, "composite");
  1159. if (filter) {
  1160. block = L(block, Y("let", "composite", filter));
  1161. }
  1162. TNodePtr compositeNode = Y("UnionAll");
  1163. for (const auto& select: Subselects) {
  1164. YQL_ENSURE(dynamic_cast<IComposableSource*>(select.Get()));
  1165. auto addNode = select->Build(ctx);
  1166. if (!addNode) {
  1167. return nullptr;
  1168. }
  1169. compositeNode->Add(addNode);
  1170. }
  1171. block = L(block, Y("let", "core", compositeNode));
  1172. YQL_ENSURE(!Subselects.empty());
  1173. dynamic_cast<IComposableSource*>(Subselects.front().Get())->BuildProjectWindowDistinct(block, ctx, false);
  1174. return Y("block", Q(L(block, Y("return", "core"))));
  1175. }
  1176. bool IsGroupByColumn(const TString& column) const override {
  1177. YQL_ENSURE(!GroupingCols.empty());
  1178. return GroupingCols.contains(column);
  1179. }
  1180. const TSet<TString>& GetGroupingCols() const {
  1181. return GroupingCols;
  1182. }
  1183. TNodePtr BuildSort(TContext& ctx, const TString& label) override {
  1184. return Subselects.front()->BuildSort(ctx, label);
  1185. }
  1186. EOrderKind GetOrderKind() const override {
  1187. return Subselects.front()->GetOrderKind();
  1188. }
  1189. const TColumns* GetColumns() const override{
  1190. return Subselects.front()->GetColumns();
  1191. }
  1192. ISource* RealSource() const {
  1193. return Source.Get();
  1194. }
  1195. TWriteSettings GetWriteSettings() const override {
  1196. return Settings;
  1197. }
  1198. bool HasSelectResult() const override {
  1199. return !Settings.Discard;
  1200. }
  1201. TNodePtr DoClone() const final {
  1202. auto newSource = MakeIntrusive<TCompositeSelect>(Pos, Source->CloneSource(), OriginalSource->CloneSource(), Settings);
  1203. newSource->SetSubselects(CloneContainer(Subselects), CloneContainer(Grouping), CloneContainer(GroupByExpr));
  1204. return newSource;
  1205. }
  1206. private:
  1207. bool CalculateGroupingCols(TContext& ctx, ISource* initSrc) {
  1208. auto origSrc = OriginalSource->CloneSource();
  1209. if (!origSrc->Init(ctx, initSrc)) {
  1210. return false;
  1211. }
  1212. bool hasError = false;
  1213. for (auto& expr: GroupByExpr) {
  1214. if (!expr->Init(ctx, origSrc.Get()) || !IsComparableExpression(ctx, expr, false, "GROUP BY")) {
  1215. hasError = true;
  1216. }
  1217. }
  1218. if (!origSrc->AddExpressions(ctx, GroupByExpr, EExprSeat::GroupBy)) {
  1219. hasError = true;
  1220. }
  1221. YQL_ENSURE(!Grouping.empty());
  1222. for (auto& grouping : Grouping) {
  1223. TString keyColumn;
  1224. if (!InitAndGetGroupKey(ctx, grouping, origSrc.Get(), "grouping sets", keyColumn)) {
  1225. hasError = true;
  1226. } else if (!keyColumn.empty()) {
  1227. GroupingCols.insert(keyColumn);
  1228. }
  1229. }
  1230. return !hasError;
  1231. }
  1232. TSourcePtr Source;
  1233. TSourcePtr OriginalSource;
  1234. TNodePtr Flatten;
  1235. TNodePtr PreFlattenMap;
  1236. const TWriteSettings Settings;
  1237. TVector<TSourcePtr> Subselects;
  1238. TVector<TNodePtr> Grouping;
  1239. TVector<TNodePtr> GroupByExpr;
  1240. TSet<TString> GroupingCols;
  1241. };
  1242. namespace {
  1243. TString FullColumnName(const TColumnNode& column) {
  1244. YQL_ENSURE(column.GetColumnName());
  1245. TString columnName = *column.GetColumnName();
  1246. if (column.IsUseSource()) {
  1247. columnName = DotJoin(*column.GetSourceName(), columnName);
  1248. }
  1249. return columnName;
  1250. }
  1251. }
  1252. /// \todo simplify class
  1253. class TSelectCore: public IRealSource, public IComposableSource {
  1254. public:
  1255. TSelectCore(
  1256. TPosition pos,
  1257. TSourcePtr source,
  1258. const TVector<TNodePtr>& groupByExpr,
  1259. const TVector<TNodePtr>& groupBy,
  1260. bool compactGroupBy,
  1261. const TString& groupBySuffix,
  1262. bool assumeSorted,
  1263. const TVector<TSortSpecificationPtr>& orderBy,
  1264. TNodePtr having,
  1265. const TWinSpecs& winSpecs,
  1266. TLegacyHoppingWindowSpecPtr legacyHoppingWindowSpec,
  1267. const TVector<TNodePtr>& terms,
  1268. bool distinct,
  1269. const TVector<TNodePtr>& without,
  1270. bool selectStream,
  1271. const TWriteSettings& settings,
  1272. TColumnsSets&& uniqueSets,
  1273. TColumnsSets&& distinctSets
  1274. )
  1275. : IRealSource(pos)
  1276. , Source(std::move(source))
  1277. , GroupByExpr(groupByExpr)
  1278. , GroupBy(groupBy)
  1279. , AssumeSorted(assumeSorted)
  1280. , CompactGroupBy(compactGroupBy)
  1281. , GroupBySuffix(groupBySuffix)
  1282. , OrderBy(orderBy)
  1283. , Having(having)
  1284. , WinSpecs(winSpecs)
  1285. , Terms(terms)
  1286. , Without(without)
  1287. , Distinct(distinct)
  1288. , LegacyHoppingWindowSpec(legacyHoppingWindowSpec)
  1289. , SelectStream(selectStream)
  1290. , Settings(settings)
  1291. , UniqueSets(std::move(uniqueSets))
  1292. , DistinctSets(std::move(distinctSets))
  1293. {
  1294. }
  1295. void AllColumns() override {
  1296. if (!OrderByInit) {
  1297. Columns.SetAll();
  1298. }
  1299. }
  1300. void GetInputTables(TTableList& tableList) const override {
  1301. Source->GetInputTables(tableList);
  1302. ISource::GetInputTables(tableList);
  1303. }
  1304. size_t GetGroupingColumnsCount() const override {
  1305. return Source->GetGroupingColumnsCount();
  1306. }
  1307. bool DoInit(TContext& ctx, ISource* initSrc) override {
  1308. if (AsInner) {
  1309. Source->UseAsInner();
  1310. }
  1311. if (!Source->Init(ctx, initSrc)) {
  1312. return false;
  1313. }
  1314. if (SelectStream && !Source->IsStream()) {
  1315. ctx.Error(Pos) << "SELECT STREAM is unsupported for non-streaming sources";
  1316. return false;
  1317. }
  1318. auto src = Source.Get();
  1319. bool hasError = false;
  1320. if (src->IsFlattenByExprs()) {
  1321. for (auto& expr : static_cast<ISource const*>(src)->Expressions(EExprSeat::FlattenByExpr)) {
  1322. if (!expr->Init(ctx, src)) {
  1323. hasError = true;
  1324. continue;
  1325. }
  1326. }
  1327. }
  1328. if (hasError) {
  1329. return false;
  1330. }
  1331. src->SetCompactGroupBy(CompactGroupBy);
  1332. src->SetGroupBySuffix(GroupBySuffix);
  1333. for (auto& term: Terms) {
  1334. term->CollectPreaggregateExprs(ctx, *src, DistinctAggrExpr);
  1335. }
  1336. if (Having) {
  1337. Having->CollectPreaggregateExprs(ctx, *src, DistinctAggrExpr);
  1338. }
  1339. for (auto& expr: GroupByExpr) {
  1340. if (auto sessionWindow = dynamic_cast<TSessionWindow*>(expr.Get())) {
  1341. if (Source->IsStream()) {
  1342. ctx.Error(Pos) << "SessionWindow is unsupported for streaming sources";
  1343. return false;
  1344. }
  1345. sessionWindow->MarkValid();
  1346. }
  1347. if (auto hoppingWindow = dynamic_cast<THoppingWindow*>(expr.Get())) {
  1348. hoppingWindow->MarkValid();
  1349. }
  1350. // need to collect and Init() preaggregated exprs before calling Init() on GROUP BY expression
  1351. TVector<TNodePtr> distinctAggrsInGroupBy;
  1352. expr->CollectPreaggregateExprs(ctx, *src, distinctAggrsInGroupBy);
  1353. for (auto& distinct : distinctAggrsInGroupBy) {
  1354. if (!distinct->Init(ctx, src)) {
  1355. return false;
  1356. }
  1357. }
  1358. DistinctAggrExpr.insert(DistinctAggrExpr.end(), distinctAggrsInGroupBy.begin(), distinctAggrsInGroupBy.end());
  1359. if (!expr->Init(ctx, src) || !IsComparableExpression(ctx, expr, false, "GROUP BY")) {
  1360. hasError = true;
  1361. }
  1362. }
  1363. if (hasError || !src->AddExpressions(ctx, GroupByExpr, EExprSeat::GroupBy)) {
  1364. return false;
  1365. }
  1366. for (auto& expr: DistinctAggrExpr) {
  1367. if (!expr->Init(ctx, src)) {
  1368. hasError = true;
  1369. }
  1370. }
  1371. if (hasError || !src->AddExpressions(ctx, DistinctAggrExpr, EExprSeat::DistinctAggr)) {
  1372. return false;
  1373. }
  1374. /// grouped expressions are available in filters
  1375. if (!Source->InitFilters(ctx)) {
  1376. return false;
  1377. }
  1378. for (auto& expr: GroupBy) {
  1379. TString usedColumn;
  1380. if (!InitAndGetGroupKey(ctx, expr, src, "GROUP BY", usedColumn)) {
  1381. hasError = true;
  1382. } else if (usedColumn) {
  1383. if (!src->AddGroupKey(ctx, usedColumn)) {
  1384. hasError = true;
  1385. }
  1386. }
  1387. }
  1388. if (hasError) {
  1389. return false;
  1390. }
  1391. if (Having && !Having->Init(ctx, src)) {
  1392. return false;
  1393. }
  1394. src->AddWindowSpecs(WinSpecs);
  1395. const bool isJoin = Source->GetJoin();
  1396. if (!InitSelect(ctx, src, isJoin, hasError)) {
  1397. return false;
  1398. }
  1399. src->FinishColumns();
  1400. auto aggRes = src->BuildAggregation("core", ctx);
  1401. if (!aggRes.second) {
  1402. return false;
  1403. }
  1404. Aggregate = aggRes.first;
  1405. if (src->IsFlattenByColumns() || src->IsFlattenColumns()) {
  1406. Flatten = src->IsFlattenByColumns() ?
  1407. src->BuildFlattenByColumns("row") :
  1408. src->BuildFlattenColumns("row");
  1409. if (!Flatten || !Flatten->Init(ctx, src)) {
  1410. return false;
  1411. }
  1412. }
  1413. if (src->IsFlattenByExprs()) {
  1414. PreFlattenMap = src->BuildPreFlattenMap(ctx);
  1415. if (!PreFlattenMap) {
  1416. return false;
  1417. }
  1418. }
  1419. if (GroupByExpr || DistinctAggrExpr) {
  1420. PreaggregatedMap = src->BuildPreaggregatedMap(ctx);
  1421. if (!PreaggregatedMap) {
  1422. return false;
  1423. }
  1424. }
  1425. if (Aggregate) {
  1426. if (!Aggregate->Init(ctx, src)) {
  1427. return false;
  1428. }
  1429. if (Having) {
  1430. Aggregate = Y(
  1431. "Filter",
  1432. Aggregate,
  1433. BuildLambda(Pos, Y("row"), Y("Coalesce", Having, Y("Bool", Q("false"))))
  1434. );
  1435. }
  1436. } else if (Having) {
  1437. if (Distinct) {
  1438. Aggregate = Y(
  1439. "Filter",
  1440. "core",
  1441. BuildLambda(Pos, Y("row"), Y("Coalesce", Having, Y("Bool", Q("false"))))
  1442. );
  1443. ctx.Warning(Having->GetPos(), TIssuesIds::YQL_HAVING_WITHOUT_AGGREGATION_IN_SELECT_DISTINCT)
  1444. << "The usage of HAVING without aggregations with SELECT DISTINCT is non-standard and will stop working soon. Please use WHERE instead.";
  1445. } else {
  1446. ctx.Error(Having->GetPos()) << "HAVING with meaning GROUP BY () should be with aggregation function.";
  1447. return false;
  1448. }
  1449. } else if (!Distinct && !GroupBy.empty()) {
  1450. ctx.Error(Pos) << "No aggregations were specified";
  1451. return false;
  1452. }
  1453. if (hasError) {
  1454. return false;
  1455. }
  1456. if (src->IsCalcOverWindow()) {
  1457. if (src->IsExprSeat(EExprSeat::WindowPartitionBy, EExprType::WithExpression)) {
  1458. PrewindowMap = src->BuildPrewindowMap(ctx);
  1459. if (!PrewindowMap) {
  1460. return false;
  1461. }
  1462. }
  1463. CalcOverWindow = src->BuildCalcOverWindow(ctx, "core");
  1464. if (!CalcOverWindow || !CalcOverWindow->Init(ctx, src)) {
  1465. return false;
  1466. }
  1467. }
  1468. return true;
  1469. }
  1470. TNodePtr Build(TContext& ctx) override {
  1471. auto input = Source->Build(ctx);
  1472. if (!input) {
  1473. return nullptr;
  1474. }
  1475. auto block(Y(Y("let", "core", input)));
  1476. if (Source->HasMatchRecognize()) {
  1477. if (auto matchRecognize = Source->BuildMatchRecognize(ctx, "core")) {
  1478. //use unique name match_recognize to find this block easily in unit tests
  1479. block = L(block, Y("let", "match_recognize", matchRecognize));
  1480. //then bind to the conventional name
  1481. block = L(block, Y("let", "core", "match_recognize"));
  1482. } else {
  1483. return nullptr;
  1484. }
  1485. }
  1486. bool ordered = ctx.UseUnordered(*this);
  1487. if (PreFlattenMap) {
  1488. block = L(block, Y("let", "core", Y(ordered ? "OrderedFlatMap" : "FlatMap", "core", BuildLambda(Pos, Y("row"), PreFlattenMap))));
  1489. }
  1490. if (Flatten) {
  1491. block = L(block, Y("let", "core", Y(ordered ? "OrderedFlatMap" : "FlatMap", "core", BuildLambda(Pos, Y("row"), Flatten, "res"))));
  1492. }
  1493. if (PreaggregatedMap) {
  1494. block = L(block, Y("let", "core", PreaggregatedMap));
  1495. if (Source->IsCompositeSource() && !Columns.QualifiedAll) {
  1496. block = L(block, Y("let", "preaggregated", "core"));
  1497. }
  1498. } else if (Source->IsCompositeSource() && !Columns.QualifiedAll) {
  1499. block = L(block, Y("let", "origcore", "core"));
  1500. }
  1501. auto filter = Source->BuildFilter(ctx, "core");
  1502. if (filter) {
  1503. block = L(block, Y("let", "core", filter));
  1504. }
  1505. if (Aggregate) {
  1506. block = L(block, Y("let", "core", Aggregate));
  1507. ordered = false;
  1508. }
  1509. const bool haveCompositeTerms = Source->IsCompositeSource() && !Columns.All && !Columns.QualifiedAll && !Columns.List.empty();
  1510. if (haveCompositeTerms) {
  1511. // column order does not matter here - it will be set in projection
  1512. YQL_ENSURE(Aggregate);
  1513. block = L(block, Y("let", "core", Y("Map", "core", BuildLambda(Pos, Y("row"), CompositeTerms, "row"))));
  1514. }
  1515. if (auto grouping = Source->BuildGroupingColumns("core")) {
  1516. block = L(block, Y("let", "core", grouping));
  1517. }
  1518. if (!Source->GetCompositeSource()) {
  1519. BuildProjectWindowDistinct(block, ctx, ordered);
  1520. }
  1521. return Y("block", Q(L(block, Y("return", "core"))));
  1522. }
  1523. void BuildProjectWindowDistinct(TNodePtr& block, TContext& ctx, bool ordered) override {
  1524. if (PrewindowMap) {
  1525. block = L(block, Y("let", "core", PrewindowMap));
  1526. }
  1527. if (CalcOverWindow) {
  1528. block = L(block, Y("let", "core", CalcOverWindow));
  1529. }
  1530. block = L(block, Y("let", "core", Y("PersistableRepr", BuildSqlProject(ctx, ordered))));
  1531. if (Distinct) {
  1532. block = L(block, Y("let", "core", Y("PersistableRepr", Y("SqlAggregateAll", Y("RemoveSystemMembers", "core")))));
  1533. }
  1534. }
  1535. TNodePtr BuildSort(TContext& ctx, const TString& label) override {
  1536. Y_UNUSED(ctx);
  1537. if (OrderBy.empty() || DisableSort_) {
  1538. return nullptr;
  1539. }
  1540. auto sorted = BuildSortSpec(OrderBy, label, false, AssumeSorted);
  1541. if (ExtraSortColumns.empty()) {
  1542. return Y("let", label, sorted);
  1543. }
  1544. auto body = Y();
  1545. for (const auto& [column, _] : ExtraSortColumns) {
  1546. body = L(body, Y("let", "row", Y("RemoveMember", "row", Q(column))));
  1547. }
  1548. body = L(body, Y("let", "res", "row"));
  1549. return Y("let", label, Y("OrderedMap", sorted, BuildLambda(Pos, Y("row"), body, "res")));
  1550. }
  1551. TNodePtr BuildCleanupColumns(TContext& ctx, const TString& label) override {
  1552. TNodePtr cleanup;
  1553. if (ctx.EnableSystemColumns && ctx.Settings.Mode != NSQLTranslation::ESqlMode::LIMITED_VIEW) {
  1554. if (Columns.All) {
  1555. cleanup = Y("let", label, Y("RemoveSystemMembers", label));
  1556. } else if (!Columns.List.empty()) {
  1557. const bool isJoin = Source->GetJoin();
  1558. if (!isJoin && Columns.QualifiedAll) {
  1559. if (ctx.SimpleColumns) {
  1560. cleanup = Y("let", label, Y("RemoveSystemMembers", label));
  1561. } else {
  1562. TNodePtr members;
  1563. for (auto& term: Terms) {
  1564. if (term->IsAsterisk()) {
  1565. auto sourceName = term->GetSourceName();
  1566. YQL_ENSURE(*sourceName && !sourceName->empty());
  1567. auto prefix = *sourceName + "._yql_";
  1568. members = members ? L(members, Q(prefix)) : Y(Q(prefix));
  1569. }
  1570. }
  1571. if (members) {
  1572. cleanup = Y("let", label, Y("RemovePrefixMembers", label, Q(members)));
  1573. }
  1574. }
  1575. }
  1576. }
  1577. }
  1578. return cleanup;
  1579. }
  1580. bool IsSelect() const override {
  1581. return true;
  1582. }
  1583. bool HasSelectResult() const override {
  1584. return !Settings.Discard;
  1585. }
  1586. bool IsStream() const override {
  1587. return Source->IsStream();
  1588. }
  1589. EOrderKind GetOrderKind() const override {
  1590. if (OrderBy.empty()) {
  1591. return EOrderKind::None;
  1592. }
  1593. return AssumeSorted ? EOrderKind::Assume : EOrderKind::Sort;
  1594. }
  1595. TWriteSettings GetWriteSettings() const override {
  1596. return Settings;
  1597. }
  1598. TMaybe<bool> AddColumn(TContext& ctx, TColumnNode& column) override {
  1599. const bool aggregated = Source->HasAggregations() || Distinct;
  1600. if (OrderByInit && (Source->GetJoin() || !aggregated)) {
  1601. // ORDER BY will try to find column not only in projection items, but also in Source.
  1602. // ```SELECT a, b FROM T ORDER BY c``` should work if c is present in T
  1603. const bool reliable = column.IsReliable();
  1604. column.SetAsNotReliable();
  1605. auto maybeExist = IRealSource::AddColumn(ctx, column);
  1606. if (reliable && !Source->GetJoin()) {
  1607. column.ResetAsReliable();
  1608. }
  1609. if (!maybeExist || !maybeExist.GetRef()) {
  1610. maybeExist = Source->AddColumn(ctx, column);
  1611. }
  1612. if (!maybeExist.Defined()) {
  1613. return maybeExist;
  1614. }
  1615. if (!DisableSort_ && !aggregated && column.GetColumnName() && IsMissingInProjection(ctx, column)) {
  1616. ExtraSortColumns[FullColumnName(column)] = &column;
  1617. }
  1618. return maybeExist;
  1619. }
  1620. return IRealSource::AddColumn(ctx, column);
  1621. }
  1622. bool IsMissingInProjection(TContext& ctx, const TColumnNode& column) const {
  1623. TString columnName = FullColumnName(column);
  1624. if (Columns.Real.contains(columnName) || Columns.Artificial.contains(columnName)) {
  1625. return false;
  1626. }
  1627. if (!ctx.SimpleColumns && Columns.QualifiedAll && !columnName.Contains('.')) {
  1628. return false;
  1629. }
  1630. if (!Columns.IsColumnPossible(ctx, columnName)) {
  1631. return true;
  1632. }
  1633. for (auto without: Without) {
  1634. auto name = *without->GetColumnName();
  1635. if (Source && Source->GetJoin()) {
  1636. name = DotJoin(*without->GetSourceName(), name);
  1637. }
  1638. if (name == columnName) {
  1639. return true;
  1640. }
  1641. }
  1642. return false;
  1643. }
  1644. TNodePtr PrepareWithout(const TNodePtr& base) {
  1645. auto terms = base;
  1646. if (Without) {
  1647. for (auto without: Without) {
  1648. auto name = *without->GetColumnName();
  1649. if (Source && Source->GetJoin()) {
  1650. name = DotJoin(*without->GetSourceName(), name);
  1651. }
  1652. terms = L(terms, Y("let", "row", Y("RemoveMember", "row", Q(name))));
  1653. }
  1654. }
  1655. if (Source) {
  1656. for (auto column : Source->GetTmpWindowColumns()) {
  1657. terms = L(terms, Y("let", "row", Y("RemoveMember", "row", Q(column))));
  1658. }
  1659. }
  1660. return terms;
  1661. }
  1662. TNodePtr DoClone() const final {
  1663. return new TSelectCore(Pos, Source->CloneSource(), CloneContainer(GroupByExpr),
  1664. CloneContainer(GroupBy), CompactGroupBy, GroupBySuffix, AssumeSorted, CloneContainer(OrderBy),
  1665. SafeClone(Having), CloneContainer(WinSpecs), SafeClone(LegacyHoppingWindowSpec),
  1666. CloneContainer(Terms), Distinct, Without, SelectStream, Settings, TColumnsSets(UniqueSets), TColumnsSets(DistinctSets));
  1667. }
  1668. private:
  1669. bool InitSelect(TContext& ctx, ISource* src, bool isJoin, bool& hasError) {
  1670. for (auto& [name, winSpec] : WinSpecs) {
  1671. for (size_t i = 0; i < winSpec->Partitions.size(); ++i) {
  1672. auto partitionNode = winSpec->Partitions[i];
  1673. if (auto sessionWindow = dynamic_cast<TSessionWindow*>(partitionNode.Get())) {
  1674. if (winSpec->Session) {
  1675. ctx.Error(partitionNode->GetPos()) << "Duplicate session window specification:";
  1676. ctx.Error(winSpec->Session->GetPos()) << "Previous session window is declared here";
  1677. hasError = true;
  1678. continue;
  1679. }
  1680. sessionWindow->MarkValid();
  1681. winSpec->Session = partitionNode;
  1682. }
  1683. if (!partitionNode->Init(ctx, src)) {
  1684. hasError = true;
  1685. continue;
  1686. }
  1687. if (!partitionNode->GetLabel() && !partitionNode->GetColumnName()) {
  1688. TString label = TStringBuilder() << "group_" << name << "_" << i;
  1689. partitionNode->SetLabel(label);
  1690. src->AddTmpWindowColumn(label);
  1691. }
  1692. }
  1693. if (!src->AddExpressions(ctx, winSpec->Partitions, EExprSeat::WindowPartitionBy)) {
  1694. hasError = true;
  1695. }
  1696. }
  1697. if (LegacyHoppingWindowSpec) {
  1698. if (!LegacyHoppingWindowSpec->TimeExtractor->Init(ctx, src)) {
  1699. hasError = true;
  1700. }
  1701. src->SetLegacyHoppingWindowSpec(LegacyHoppingWindowSpec);
  1702. }
  1703. for (auto& term: Terms) {
  1704. if (!term->Init(ctx, src)) {
  1705. hasError = true;
  1706. continue;
  1707. }
  1708. auto column = term->GetColumnName();
  1709. TString label(term->GetLabel());
  1710. bool hasName = true;
  1711. if (label.empty()) {
  1712. auto source = term->GetSourceName();
  1713. if (term->IsAsterisk() && !source->empty()) {
  1714. Columns.QualifiedAll = true;
  1715. label = DotJoin(*source, "*");
  1716. } else if (column) {
  1717. label = isJoin && source && *source ? DotJoin(*source, *column) : *column;
  1718. } else {
  1719. label = Columns.AddUnnamed();
  1720. hasName = false;
  1721. if (ctx.WarnUnnamedColumns) {
  1722. ctx.Warning(term->GetPos(), TIssuesIds::YQL_UNNAMED_COLUMN)
  1723. << "Autogenerated column name " << label << " will be used for expression";
  1724. }
  1725. }
  1726. }
  1727. if (hasName && !Columns.Add(&label, false, false, true)) {
  1728. ctx.Error(Pos) << "Duplicate column: " << label;
  1729. hasError = true;
  1730. }
  1731. }
  1732. CompositeTerms = Y();
  1733. if (!hasError && Source->IsCompositeSource() && !Columns.All && !Columns.QualifiedAll && !Columns.List.empty()) {
  1734. auto compositeSrcPtr = static_cast<TCompositeSelect*>(Source->GetCompositeSource());
  1735. if (compositeSrcPtr) {
  1736. const auto& groupings = compositeSrcPtr->GetGroupingCols();
  1737. for (const auto& column: groupings) {
  1738. if (Source->IsGroupByColumn(column)) {
  1739. continue;
  1740. }
  1741. const TString tableName = (GroupByExpr || DistinctAggrExpr) ? "preaggregated" : "origcore";
  1742. CompositeTerms = L(CompositeTerms, Y("let", "row", Y("AddMember", "row", BuildQuotedAtom(Pos, column), Y("Nothing", Y("MatchType",
  1743. Y("StructMemberType", Y("ListItemType", Y("TypeOf", tableName)), Q(column)),
  1744. Q("Optional"), Y("lambda", Q(Y("item")), "item"), Y("lambda", Q(Y("item")), Y("OptionalType", "item")))))));
  1745. }
  1746. }
  1747. }
  1748. for (auto iter: WinSpecs) {
  1749. auto winSpec = *iter.second;
  1750. for (auto orderSpec: winSpec.OrderBy) {
  1751. if (!orderSpec->OrderExpr->Init(ctx, src)) {
  1752. hasError = true;
  1753. }
  1754. }
  1755. }
  1756. if (Columns.All || Columns.QualifiedAll) {
  1757. Source->AllColumns();
  1758. }
  1759. for (const auto& without: Without) {
  1760. auto namePtr = without->GetColumnName();
  1761. auto sourcePtr = without->GetSourceName();
  1762. YQL_ENSURE(namePtr && *namePtr);
  1763. if (isJoin && !(sourcePtr && *sourcePtr)) {
  1764. ctx.Error(without->GetPos()) << "Expected correlation name for WITHOUT in JOIN";
  1765. hasError = true;
  1766. continue;
  1767. }
  1768. }
  1769. if (Having && !Having->Init(ctx, src)) {
  1770. hasError = true;
  1771. }
  1772. if (!src->IsCompositeSource() && !Columns.All && src->HasAggregations()) {
  1773. WarnIfAliasFromSelectIsUsedInGroupBy(ctx, Terms, GroupBy, GroupByExpr);
  1774. /// verify select aggregation compatibility
  1775. TVector<TNodePtr> exprs(Terms);
  1776. if (Having) {
  1777. exprs.push_back(Having);
  1778. }
  1779. for (const auto& iter: WinSpecs) {
  1780. for (const auto& sortSpec: iter.second->OrderBy) {
  1781. exprs.push_back(sortSpec->OrderExpr);
  1782. }
  1783. }
  1784. if (!ValidateAllNodesForAggregation(ctx, exprs)) {
  1785. hasError = true;
  1786. }
  1787. }
  1788. const auto label = GetLabel();
  1789. for (const auto& sortSpec: OrderBy) {
  1790. auto& expr = sortSpec->OrderExpr;
  1791. SetLabel(Source->GetLabel());
  1792. OrderByInit = true;
  1793. if (!expr->Init(ctx, this)) {
  1794. hasError = true;
  1795. continue;
  1796. }
  1797. OrderByInit = false;
  1798. if (!IsComparableExpression(ctx, expr, AssumeSorted, AssumeSorted ? "ASSUME ORDER BY" : "ORDER BY")) {
  1799. hasError = true;
  1800. continue;
  1801. }
  1802. }
  1803. SetLabel(label);
  1804. return !hasError;
  1805. }
  1806. TNodePtr PrepareJoinCoalesce(TContext& ctx, const TNodePtr& base, bool multipleQualifiedAll, const TVector<TString>& coalesceLabels) {
  1807. const bool isJoin = Source->GetJoin();
  1808. const bool needCoalesce = isJoin && ctx.SimpleColumns &&
  1809. (Columns.All || multipleQualifiedAll || ctx.CoalesceJoinKeysOnQualifiedAll);
  1810. if (!needCoalesce) {
  1811. return base;
  1812. }
  1813. auto terms = base;
  1814. const auto& sameKeyMap = Source->GetJoin()->GetSameKeysMap();
  1815. if (sameKeyMap) {
  1816. terms = L(terms, Y("let", "flatSameKeys", "row"));
  1817. for (const auto& [key, sources]: sameKeyMap) {
  1818. auto coalesceKeys = Y();
  1819. for (const auto& label : coalesceLabels) {
  1820. if (sources.contains(label)) {
  1821. coalesceKeys = L(coalesceKeys, Q(DotJoin(label, key)));
  1822. }
  1823. }
  1824. terms = L(terms, Y("let", "flatSameKeys", Y("CoalesceMembers", "flatSameKeys", Q(coalesceKeys))));
  1825. }
  1826. terms = L(terms, Y("let", "row", "flatSameKeys"));
  1827. }
  1828. return terms;
  1829. }
  1830. TNodePtr BuildSqlProject(TContext& ctx, bool ordered) {
  1831. auto sqlProjectArgs = Y();
  1832. const bool isJoin = Source->GetJoin();
  1833. if (Columns.All) {
  1834. YQL_ENSURE(Columns.List.empty());
  1835. auto terms = PrepareWithout(Y());
  1836. auto options = Y();
  1837. if (isJoin && ctx.SimpleColumns) {
  1838. terms = PrepareJoinCoalesce(ctx, terms, false, Source->GetJoin()->GetJoinLabels());
  1839. auto members = Y();
  1840. for (auto& source : Source->GetJoin()->GetJoinLabels()) {
  1841. YQL_ENSURE(!source.empty());
  1842. members = L(members, BuildQuotedAtom(Pos, source + "."));
  1843. }
  1844. if (GroupByExpr.empty() || ctx.BogousStarInGroupByOverJoin) {
  1845. terms = L(terms, Y("let", "res", Y("DivePrefixMembers", "row", Q(members))));
  1846. } else {
  1847. auto groupExprStruct = Y("AsStruct");
  1848. for (auto node : GroupByExpr) {
  1849. auto label = node->GetLabel();
  1850. YQL_ENSURE(label);
  1851. if (Source->IsGroupByColumn(label)) {
  1852. auto name = BuildQuotedAtom(Pos, label);
  1853. groupExprStruct = L(groupExprStruct, Q(Y(name, Y("Member", "row", name))));
  1854. }
  1855. }
  1856. auto groupColumnsStruct = Y("DivePrefixMembers", "row", Q(members));
  1857. terms = L(terms, Y("let", "res", Y("FlattenMembers", Q(Y(BuildQuotedAtom(Pos, ""), groupExprStruct)),
  1858. Q(Y(BuildQuotedAtom(Pos, ""), groupColumnsStruct)))));
  1859. }
  1860. options = L(options, Q(Y(Q("divePrefix"), Q(members))));
  1861. } else {
  1862. terms = L(terms, Y("let", "res", "row"));
  1863. }
  1864. sqlProjectArgs = L(sqlProjectArgs, Y("SqlProjectStarItem", "projectCoreType", BuildQuotedAtom(Pos, ""), BuildLambda(Pos, Y("row"), terms, "res"), Q(options)));
  1865. } else {
  1866. YQL_ENSURE(!Columns.List.empty());
  1867. YQL_ENSURE(Columns.List.size() == Terms.size());
  1868. TVector<TString> coalesceLabels;
  1869. bool multipleQualifiedAll = false;
  1870. if (isJoin && ctx.SimpleColumns) {
  1871. THashSet<TString> starTerms;
  1872. for (auto& term: Terms) {
  1873. if (term->IsAsterisk()) {
  1874. auto sourceName = term->GetSourceName();
  1875. YQL_ENSURE(*sourceName && !sourceName->empty());
  1876. YQL_ENSURE(Columns.QualifiedAll);
  1877. starTerms.insert(*sourceName);
  1878. }
  1879. }
  1880. TVector<TString> matched;
  1881. TVector<TString> unmatched;
  1882. for (auto& label : Source->GetJoin()->GetJoinLabels()) {
  1883. if (starTerms.contains(label)) {
  1884. matched.push_back(label);
  1885. } else {
  1886. unmatched.push_back(label);
  1887. }
  1888. }
  1889. coalesceLabels.insert(coalesceLabels.end(), matched.begin(), matched.end());
  1890. coalesceLabels.insert(coalesceLabels.end(), unmatched.begin(), unmatched.end());
  1891. multipleQualifiedAll = starTerms.size() > 1;
  1892. }
  1893. auto column = Columns.List.begin();
  1894. auto isNamedColumn = Columns.NamedColumns.begin();
  1895. for (auto& term: Terms) {
  1896. auto sourceName = term->GetSourceName();
  1897. if (!term->IsAsterisk()) {
  1898. auto body = Y();
  1899. body = L(body, Y("let", "res", term));
  1900. TPosition lambdaPos = Pos;
  1901. TPosition aliasPos = Pos;
  1902. if (term->IsImplicitLabel() && ctx.WarnOnAnsiAliasShadowing) {
  1903. // TODO: recanonize for positions below
  1904. lambdaPos = term->GetPos();
  1905. aliasPos = term->GetLabelPos() ? *term->GetLabelPos() : lambdaPos;
  1906. }
  1907. auto projectItem = Y("SqlProjectItem", "projectCoreType", BuildQuotedAtom(aliasPos, *isNamedColumn ? *column : ""), BuildLambda(lambdaPos, Y("row"), body, "res"));
  1908. if (term->IsImplicitLabel() && ctx.WarnOnAnsiAliasShadowing) {
  1909. projectItem = L(projectItem, Q(Y(Q(Y(Q("warnShadow"))))));
  1910. }
  1911. if (!*isNamedColumn) {
  1912. projectItem = L(projectItem, Q(Y(Q(Y(Q("autoName"))))));
  1913. }
  1914. sqlProjectArgs = L(sqlProjectArgs, projectItem);
  1915. } else {
  1916. auto terms = PrepareWithout(Y());
  1917. auto options = Y();
  1918. if (ctx.SimpleColumns && !isJoin) {
  1919. terms = L(terms, Y("let", "res", "row"));
  1920. } else {
  1921. terms = PrepareJoinCoalesce(ctx, terms, multipleQualifiedAll, coalesceLabels);
  1922. auto members = isJoin ? Y() : Y("FlattenMembers");
  1923. if (isJoin) {
  1924. members = L(members, BuildQuotedAtom(Pos, *sourceName + "."));
  1925. if (ctx.SimpleColumns) {
  1926. options = L(options, Q(Y(Q("divePrefix"), Q(members))));
  1927. }
  1928. members = Y(ctx.SimpleColumns ? "DivePrefixMembers" : "SelectMembers", "row", Q(members));
  1929. } else {
  1930. auto prefix = BuildQuotedAtom(Pos, ctx.SimpleColumns ? "" : *sourceName + ".");
  1931. members = L(members, Q(Y(prefix, "row")));
  1932. if (!ctx.SimpleColumns) {
  1933. options = L(options, Q(Y(Q("addPrefix"), prefix)));
  1934. }
  1935. }
  1936. terms = L(terms, Y("let", "res", members));
  1937. }
  1938. sqlProjectArgs = L(sqlProjectArgs, Y("SqlProjectStarItem", "projectCoreType", BuildQuotedAtom(Pos, *sourceName), BuildLambda(Pos, Y("row"), terms, "res"), Q(options)));
  1939. }
  1940. ++column;
  1941. ++isNamedColumn;
  1942. }
  1943. }
  1944. for (const auto& [columnName, column]: ExtraSortColumns) {
  1945. auto body = Y();
  1946. body = L(body, Y("let", "res", column));
  1947. TPosition pos = column->GetPos();
  1948. auto projectItem = Y("SqlProjectItem", "projectCoreType", BuildQuotedAtom(pos, columnName), BuildLambda(pos, Y("row"), body, "res"));
  1949. sqlProjectArgs = L(sqlProjectArgs, projectItem);
  1950. }
  1951. auto block(Y(Y("let", "projectCoreType", Y("TypeOf", "core"))));
  1952. block = L(block, Y("let", "core", Y(ordered ? "OrderedSqlProject" : "SqlProject", "core", Q(sqlProjectArgs))));
  1953. if (!(UniqueSets.empty() && DistinctSets.empty())) {
  1954. block = L(block, Y("let", "core", Y("RemoveSystemMembers", "core")));
  1955. const auto MakeUniqueHint = [this](INode::TPtr& block, const TColumnsSets& sets, bool distinct) {
  1956. if (!sets.empty()) {
  1957. auto assume = Y(distinct ? "AssumeDistinctHint" : "AssumeUniqueHint", "core");
  1958. if (!sets.front().empty()) {
  1959. for (const auto& columns : sets) {
  1960. auto set = Y();
  1961. for (const auto& column : columns) {
  1962. set = L(set, Q(column));
  1963. }
  1964. assume = L(assume, Q(set));
  1965. }
  1966. }
  1967. block = L(block, Y("let", "core", assume));
  1968. }
  1969. };
  1970. MakeUniqueHint(block, DistinctSets, true);
  1971. MakeUniqueHint(block, UniqueSets, false);
  1972. }
  1973. return Y("block", Q(L(block, Y("return", "core"))));
  1974. }
  1975. private:
  1976. TSourcePtr Source;
  1977. TVector<TNodePtr> GroupByExpr;
  1978. TVector<TNodePtr> DistinctAggrExpr;
  1979. TVector<TNodePtr> GroupBy;
  1980. bool AssumeSorted = false;
  1981. bool CompactGroupBy = false;
  1982. TString GroupBySuffix;
  1983. TVector<TSortSpecificationPtr> OrderBy;
  1984. TNodePtr Having;
  1985. TWinSpecs WinSpecs;
  1986. TNodePtr Flatten;
  1987. TNodePtr PreFlattenMap;
  1988. TNodePtr PreaggregatedMap;
  1989. TNodePtr PrewindowMap;
  1990. TNodePtr Aggregate;
  1991. TNodePtr CalcOverWindow;
  1992. TNodePtr CompositeTerms;
  1993. TVector<TNodePtr> Terms;
  1994. TVector<TNodePtr> Without;
  1995. const bool Distinct;
  1996. bool OrderByInit = false;
  1997. TLegacyHoppingWindowSpecPtr LegacyHoppingWindowSpec;
  1998. const bool SelectStream;
  1999. const TWriteSettings Settings;
  2000. const TColumnsSets UniqueSets, DistinctSets;
  2001. TMap<TString, TNodePtr> ExtraSortColumns;
  2002. };
  2003. class TProcessSource: public IRealSource {
  2004. public:
  2005. TProcessSource(
  2006. TPosition pos,
  2007. TSourcePtr source,
  2008. TNodePtr with,
  2009. bool withExtFunction,
  2010. TVector<TNodePtr>&& terms,
  2011. bool listCall,
  2012. bool processStream,
  2013. const TWriteSettings& settings,
  2014. const TVector<TSortSpecificationPtr>& assumeOrderBy
  2015. )
  2016. : IRealSource(pos)
  2017. , Source(std::move(source))
  2018. , With(with)
  2019. , WithExtFunction(withExtFunction)
  2020. , Terms(std::move(terms))
  2021. , ListCall(listCall)
  2022. , ProcessStream(processStream)
  2023. , Settings(settings)
  2024. , AssumeOrderBy(assumeOrderBy)
  2025. {
  2026. }
  2027. void GetInputTables(TTableList& tableList) const override {
  2028. Source->GetInputTables(tableList);
  2029. ISource::GetInputTables(tableList);
  2030. }
  2031. bool DoInit(TContext& ctx, ISource* initSrc) override {
  2032. if (AsInner) {
  2033. Source->UseAsInner();
  2034. }
  2035. if (!Source->Init(ctx, initSrc)) {
  2036. return false;
  2037. }
  2038. if (ProcessStream && !Source->IsStream()) {
  2039. ctx.Error(Pos) << "PROCESS STREAM is unsupported for non-streaming sources";
  2040. return false;
  2041. }
  2042. auto src = Source.Get();
  2043. if (!With) {
  2044. src->AllColumns();
  2045. Columns.SetAll();
  2046. src->FinishColumns();
  2047. return true;
  2048. }
  2049. /// grouped expressions are available in filters
  2050. if (!Source->InitFilters(ctx)) {
  2051. return false;
  2052. }
  2053. TSourcePtr fakeSource = nullptr;
  2054. if (ListCall && !WithExtFunction) {
  2055. fakeSource = BuildFakeSource(src->GetPos());
  2056. src->AllColumns();
  2057. }
  2058. auto processSource = fakeSource != nullptr ? fakeSource.Get() : src;
  2059. Y_DEBUG_ABORT_UNLESS(processSource != nullptr);
  2060. if (!With->Init(ctx, processSource)) {
  2061. return false;
  2062. }
  2063. if (With->GetLabel().empty()) {
  2064. Columns.SetAll();
  2065. } else {
  2066. if (ListCall) {
  2067. ctx.Error(With->GetPos()) << "Label is not allowed to use with TableRows()";
  2068. return false;
  2069. }
  2070. Columns.Add(&With->GetLabel(), false);
  2071. }
  2072. bool hasError = false;
  2073. TNodePtr produce;
  2074. if (WithExtFunction) {
  2075. produce = Y();
  2076. } else {
  2077. TString processCall = (ListCall ? "SqlProcess" : "Apply");
  2078. produce = Y(processCall, With);
  2079. }
  2080. TMaybe<ui32> listPosIndex;
  2081. ui32 termIndex = 0;
  2082. for (auto& term: Terms) {
  2083. if (!term->GetLabel().empty()) {
  2084. ctx.Error(term->GetPos()) << "Labels are not allowed for PROCESS terms";
  2085. hasError = true;
  2086. continue;
  2087. }
  2088. if (!term->Init(ctx, processSource)) {
  2089. hasError = true;
  2090. continue;
  2091. }
  2092. if (ListCall) {
  2093. if (auto atom = dynamic_cast<TTableRows*>(term.Get())) {
  2094. listPosIndex = termIndex;
  2095. }
  2096. }
  2097. ++termIndex;
  2098. produce = L(produce, term);
  2099. }
  2100. if (hasError) {
  2101. return false;
  2102. }
  2103. if (ListCall && !WithExtFunction) {
  2104. YQL_ENSURE(listPosIndex.Defined());
  2105. produce = L(produce, Q(ToString(*listPosIndex)));
  2106. }
  2107. if (!produce->Init(ctx, src)) {
  2108. hasError = true;
  2109. }
  2110. if (!(WithExtFunction && Terms.empty())) {
  2111. TVector<TNodePtr>(1, produce).swap(Terms);
  2112. }
  2113. src->FinishColumns();
  2114. const auto label = GetLabel();
  2115. for (const auto& sortSpec: AssumeOrderBy) {
  2116. auto& expr = sortSpec->OrderExpr;
  2117. SetLabel(Source->GetLabel());
  2118. if (!expr->Init(ctx, this)) {
  2119. hasError = true;
  2120. continue;
  2121. }
  2122. if (!IsComparableExpression(ctx, expr, true, "ASSUME ORDER BY")) {
  2123. hasError = true;
  2124. continue;
  2125. }
  2126. }
  2127. SetLabel(label);
  2128. return !hasError;
  2129. }
  2130. TNodePtr Build(TContext& ctx) override {
  2131. auto input = Source->Build(ctx);
  2132. if (!input) {
  2133. return nullptr;
  2134. }
  2135. if (!With) {
  2136. auto res = input;
  2137. if (ctx.EnableSystemColumns) {
  2138. res = Y("RemoveSystemMembers", res);
  2139. }
  2140. return res;
  2141. }
  2142. TString inputLabel = ListCall ? "inputRowsList" : "core";
  2143. auto block(Y(Y("let", inputLabel, input)));
  2144. auto filter = Source->BuildFilter(ctx, inputLabel);
  2145. if (filter) {
  2146. block = L(block, Y("let", inputLabel, filter));
  2147. }
  2148. if (WithExtFunction) {
  2149. auto preTransform = Y("RemoveSystemMembers", inputLabel);
  2150. if (Terms.size() > 0) {
  2151. preTransform = Y("Map", preTransform, BuildLambda(Pos, Y("row"), Q(Terms[0])));
  2152. }
  2153. block = L(block, Y("let", inputLabel, preTransform));
  2154. block = L(block, Y("let", "transform", With));
  2155. block = L(block, Y("let", "core", Y("Apply", "transform", inputLabel)));
  2156. } else if (ListCall) {
  2157. block = L(block, Y("let", "core", Terms[0]));
  2158. } else {
  2159. auto terms = BuildColumnsTerms(ctx);
  2160. block = L(block, Y("let", "core", Y(ctx.UseUnordered(*this) ? "OrderedFlatMap" : "FlatMap", "core", BuildLambda(Pos, Y("row"), terms, "res"))));
  2161. }
  2162. block = L(block, Y("let", "core", Y("AutoDemux", Y("PersistableRepr", "core"))));
  2163. return Y("block", Q(L(block, Y("return", "core"))));
  2164. }
  2165. TNodePtr BuildSort(TContext& ctx, const TString& label) override {
  2166. Y_UNUSED(ctx);
  2167. if (AssumeOrderBy.empty()) {
  2168. return nullptr;
  2169. }
  2170. return Y("let", label, BuildSortSpec(AssumeOrderBy, label, false, true));
  2171. }
  2172. EOrderKind GetOrderKind() const override {
  2173. if (!With) {
  2174. return EOrderKind::Passthrough;
  2175. }
  2176. return AssumeOrderBy.empty() ? EOrderKind::None : EOrderKind::Assume;
  2177. }
  2178. bool IsSelect() const override {
  2179. return false;
  2180. }
  2181. bool HasSelectResult() const override {
  2182. return !Settings.Discard;
  2183. }
  2184. bool IsStream() const override {
  2185. return Source->IsStream();
  2186. }
  2187. TWriteSettings GetWriteSettings() const override {
  2188. return Settings;
  2189. }
  2190. TNodePtr DoClone() const final {
  2191. return new TProcessSource(Pos, Source->CloneSource(), SafeClone(With), WithExtFunction,
  2192. CloneContainer(Terms), ListCall, ProcessStream, Settings, CloneContainer(AssumeOrderBy));
  2193. }
  2194. private:
  2195. TNodePtr BuildColumnsTerms(TContext& ctx) {
  2196. Y_UNUSED(ctx);
  2197. TNodePtr terms;
  2198. Y_DEBUG_ABORT_UNLESS(Terms.size() == 1);
  2199. if (Columns.All) {
  2200. terms = Y(Y("let", "res", Y("ToSequence", Terms.front())));
  2201. } else {
  2202. Y_DEBUG_ABORT_UNLESS(Columns.List.size() == Terms.size());
  2203. terms = L(Y(), Y("let", "res",
  2204. L(Y("AsStructUnordered"), Q(Y(BuildQuotedAtom(Pos, Columns.List.front()), Terms.front())))));
  2205. terms = L(terms, Y("let", "res", Y("Just", "res")));
  2206. }
  2207. return terms;
  2208. }
  2209. private:
  2210. TSourcePtr Source;
  2211. TNodePtr With;
  2212. const bool WithExtFunction;
  2213. TVector<TNodePtr> Terms;
  2214. const bool ListCall;
  2215. const bool ProcessStream;
  2216. const TWriteSettings Settings;
  2217. TVector<TSortSpecificationPtr> AssumeOrderBy;
  2218. };
  2219. TSourcePtr BuildProcess(
  2220. TPosition pos,
  2221. TSourcePtr source,
  2222. TNodePtr with,
  2223. bool withExtFunction,
  2224. TVector<TNodePtr>&& terms,
  2225. bool listCall,
  2226. bool processStream,
  2227. const TWriteSettings& settings,
  2228. const TVector<TSortSpecificationPtr>& assumeOrderBy
  2229. ) {
  2230. return new TProcessSource(pos, std::move(source), with, withExtFunction, std::move(terms), listCall, processStream, settings, assumeOrderBy);
  2231. }
  2232. class TNestedProxySource: public IProxySource {
  2233. public:
  2234. TNestedProxySource(TPosition pos, const TVector<TNodePtr>& groupBy, TSourcePtr source)
  2235. : IProxySource(pos, source.Get())
  2236. , CompositeSelect(nullptr)
  2237. , Holder(std::move(source))
  2238. , GroupBy(groupBy)
  2239. {}
  2240. TNestedProxySource(TCompositeSelect* compositeSelect, const TVector<TNodePtr>& groupBy)
  2241. : IProxySource(compositeSelect->GetPos(), compositeSelect->RealSource())
  2242. , CompositeSelect(compositeSelect)
  2243. , GroupBy(groupBy)
  2244. {}
  2245. bool DoInit(TContext& ctx, ISource* src) override {
  2246. return Source->Init(ctx, src);
  2247. }
  2248. TNodePtr Build(TContext& ctx) override {
  2249. return CompositeSelect ? BuildAtom(Pos, "composite", TNodeFlags::Default) : Source->Build(ctx);
  2250. }
  2251. bool InitFilters(TContext& ctx) override {
  2252. return CompositeSelect ? true : Source->InitFilters(ctx);
  2253. }
  2254. TNodePtr BuildFilter(TContext& ctx, const TString& label) override {
  2255. return CompositeSelect ? nullptr : Source->BuildFilter(ctx, label);
  2256. }
  2257. IJoin* GetJoin() override {
  2258. return Source->GetJoin();
  2259. }
  2260. bool IsCompositeSource() const override {
  2261. return true;
  2262. }
  2263. ISource* GetCompositeSource() override {
  2264. return CompositeSelect;
  2265. }
  2266. bool AddGrouping(TContext& ctx, const TVector<TString>& columns, TString& hintColumn) override {
  2267. Y_UNUSED(ctx);
  2268. hintColumn = TStringBuilder() << "GroupingHint" << Hints.size();
  2269. ui64 hint = 0;
  2270. if (GroupByColumns.empty()) {
  2271. const bool isJoin = GetJoin();
  2272. for (const auto& groupByNode: GroupBy) {
  2273. auto namePtr = groupByNode->GetColumnName();
  2274. YQL_ENSURE(namePtr);
  2275. TString column = *namePtr;
  2276. if (isJoin) {
  2277. auto sourceNamePtr = groupByNode->GetSourceName();
  2278. if (sourceNamePtr && !sourceNamePtr->empty()) {
  2279. column = DotJoin(*sourceNamePtr, column);
  2280. }
  2281. }
  2282. GroupByColumns.insert(column);
  2283. }
  2284. }
  2285. for (const auto& column: columns) {
  2286. hint <<= 1;
  2287. if (!GroupByColumns.contains(column)) {
  2288. hint += 1;
  2289. }
  2290. }
  2291. Hints.push_back(hint);
  2292. return true;
  2293. }
  2294. size_t GetGroupingColumnsCount() const override {
  2295. return Hints.size();
  2296. }
  2297. TNodePtr BuildGroupingColumns(const TString& label) override {
  2298. if (Hints.empty()) {
  2299. return nullptr;
  2300. }
  2301. auto body = Y();
  2302. for (size_t i = 0; i < Hints.size(); ++i) {
  2303. TString hintColumn = TStringBuilder() << "GroupingHint" << i;
  2304. TString hintValue = ToString(Hints[i]);
  2305. body = L(body, Y("let", "row", Y("AddMember", "row", Q(hintColumn), Y("Uint64", Q(hintValue)))));
  2306. }
  2307. return Y("Map", label, BuildLambda(Pos, Y("row"), body, "row"));
  2308. }
  2309. void FinishColumns() override {
  2310. Source->FinishColumns();
  2311. }
  2312. TMaybe<bool> AddColumn(TContext& ctx, TColumnNode& column) override {
  2313. if (const TString* columnName = column.GetColumnName()) {
  2314. if (columnName && IsExprAlias(*columnName)) {
  2315. return true;
  2316. }
  2317. }
  2318. return Source->AddColumn(ctx, column);
  2319. }
  2320. TPtr DoClone() const final {
  2321. YQL_ENSURE(Hints.empty());
  2322. return Holder.Get() ? new TNestedProxySource(Pos, CloneContainer(GroupBy), Holder->CloneSource()) :
  2323. new TNestedProxySource(CompositeSelect, CloneContainer(GroupBy));
  2324. }
  2325. private:
  2326. TCompositeSelect* CompositeSelect;
  2327. TSourcePtr Holder;
  2328. TVector<TNodePtr> GroupBy;
  2329. mutable TSet<TString> GroupByColumns;
  2330. mutable TVector<ui64> Hints;
  2331. };
  2332. namespace {
  2333. TSourcePtr DoBuildSelectCore(
  2334. TContext& ctx,
  2335. TPosition pos,
  2336. TSourcePtr originalSource,
  2337. TSourcePtr source,
  2338. const TVector<TNodePtr>& groupByExpr,
  2339. const TVector<TNodePtr>& groupBy,
  2340. bool compactGroupBy,
  2341. const TString& groupBySuffix,
  2342. bool assumeSorted,
  2343. const TVector<TSortSpecificationPtr>& orderBy,
  2344. TNodePtr having,
  2345. TWinSpecs&& winSpecs,
  2346. TLegacyHoppingWindowSpecPtr legacyHoppingWindowSpec,
  2347. TVector<TNodePtr>&& terms,
  2348. bool distinct,
  2349. TVector<TNodePtr>&& without,
  2350. bool selectStream,
  2351. const TWriteSettings& settings,
  2352. TColumnsSets&& uniqueSets,
  2353. TColumnsSets&& distinctSets
  2354. ) {
  2355. if (groupBy.empty() || !groupBy.front()->ContentListPtr()) {
  2356. return new TSelectCore(pos, std::move(source), groupByExpr, groupBy, compactGroupBy, groupBySuffix, assumeSorted,
  2357. orderBy, having, winSpecs, legacyHoppingWindowSpec, terms, distinct, without, selectStream, settings, std::move(uniqueSets), std::move(distinctSets));
  2358. }
  2359. if (groupBy.size() == 1) {
  2360. /// actualy no big idea to use grouping function in this case (result allways 0)
  2361. auto contentPtr = groupBy.front()->ContentListPtr();
  2362. source = new TNestedProxySource(pos, *contentPtr, source);
  2363. return DoBuildSelectCore(ctx, pos, originalSource, source, groupByExpr, *contentPtr, compactGroupBy, groupBySuffix,
  2364. assumeSorted, orderBy, having, std::move(winSpecs),
  2365. legacyHoppingWindowSpec, std::move(terms), distinct, std::move(without), selectStream, settings, std::move(uniqueSets), std::move(distinctSets));
  2366. }
  2367. /// \todo some smart merge logic, generalize common part of grouping (expr, flatten, etc)?
  2368. TIntrusivePtr<TCompositeSelect> compositeSelect = new TCompositeSelect(pos, std::move(source), originalSource->CloneSource(), settings);
  2369. size_t totalGroups = 0;
  2370. TVector<TSourcePtr> subselects;
  2371. TVector<TNodePtr> groupingCols;
  2372. for (auto& grouping: groupBy) {
  2373. auto contentPtr = grouping->ContentListPtr();
  2374. TVector<TNodePtr> cache(1, nullptr);
  2375. if (!contentPtr) {
  2376. cache[0] = grouping;
  2377. contentPtr = &cache;
  2378. }
  2379. groupingCols.insert(groupingCols.end(), contentPtr->cbegin(), contentPtr->cend());
  2380. TSourcePtr proxySource = new TNestedProxySource(compositeSelect.Get(), CloneContainer(*contentPtr));
  2381. if (!subselects.empty()) {
  2382. /// clone terms for others usage
  2383. TVector<TNodePtr> termsCopy;
  2384. for (const auto& term: terms) {
  2385. termsCopy.emplace_back(term->Clone());
  2386. }
  2387. std::swap(terms, termsCopy);
  2388. }
  2389. totalGroups += contentPtr->size();
  2390. TSelectCore* selectCore = new TSelectCore(pos, std::move(proxySource), CloneContainer(groupByExpr),
  2391. CloneContainer(*contentPtr), compactGroupBy, groupBySuffix, assumeSorted, orderBy, SafeClone(having), CloneContainer(winSpecs),
  2392. legacyHoppingWindowSpec, terms, distinct, without, selectStream, settings, TColumnsSets(uniqueSets), TColumnsSets(distinctSets));
  2393. subselects.emplace_back(selectCore);
  2394. }
  2395. if (totalGroups > ctx.PragmaGroupByLimit) {
  2396. ctx.Error(pos) << "Unable to GROUP BY more than " << ctx.PragmaGroupByLimit << " groups, you try use " << totalGroups << " groups";
  2397. return nullptr;
  2398. }
  2399. compositeSelect->SetSubselects(std::move(subselects), std::move(groupingCols), CloneContainer(groupByExpr));
  2400. return compositeSelect;
  2401. }
  2402. }
  2403. TSourcePtr BuildSelectCore(
  2404. TContext& ctx,
  2405. TPosition pos,
  2406. TSourcePtr source,
  2407. const TVector<TNodePtr>& groupByExpr,
  2408. const TVector<TNodePtr>& groupBy,
  2409. bool compactGroupBy,
  2410. const TString& groupBySuffix,
  2411. bool assumeSorted,
  2412. const TVector<TSortSpecificationPtr>& orderBy,
  2413. TNodePtr having,
  2414. TWinSpecs&& winSpecs,
  2415. TLegacyHoppingWindowSpecPtr legacyHoppingWindowSpec,
  2416. TVector<TNodePtr>&& terms,
  2417. bool distinct,
  2418. TVector<TNodePtr>&& without,
  2419. bool selectStream,
  2420. const TWriteSettings& settings,
  2421. TColumnsSets&& uniqueSets,
  2422. TColumnsSets&& distinctSets
  2423. )
  2424. {
  2425. return DoBuildSelectCore(ctx, pos, source, source, groupByExpr, groupBy, compactGroupBy, groupBySuffix, assumeSorted, orderBy,
  2426. having, std::move(winSpecs), legacyHoppingWindowSpec, std::move(terms), distinct, std::move(without), selectStream, settings, std::move(uniqueSets), std::move(distinctSets));
  2427. }
  2428. class TUnion: public IRealSource {
  2429. public:
  2430. TUnion(TPosition pos, TVector<TSourcePtr>&& sources, bool quantifierAll, const TWriteSettings& settings)
  2431. : IRealSource(pos)
  2432. , Sources(std::move(sources))
  2433. , QuantifierAll(quantifierAll)
  2434. , Settings(settings)
  2435. {
  2436. }
  2437. const TColumns* GetColumns() const override {
  2438. return IRealSource::GetColumns();
  2439. }
  2440. void GetInputTables(TTableList& tableList) const override {
  2441. for (auto& x : Sources) {
  2442. x->GetInputTables(tableList);
  2443. }
  2444. ISource::GetInputTables(tableList);
  2445. }
  2446. bool DoInit(TContext& ctx, ISource* src) override {
  2447. bool first = true;
  2448. for (auto& s: Sources) {
  2449. s->UseAsInner();
  2450. if (!s->Init(ctx, src)) {
  2451. return false;
  2452. }
  2453. if (!ctx.PositionalUnionAll || first) {
  2454. auto c = s->GetColumns();
  2455. Y_DEBUG_ABORT_UNLESS(c);
  2456. Columns.Merge(*c);
  2457. first = false;
  2458. }
  2459. }
  2460. return true;
  2461. }
  2462. TNodePtr Build(TContext& ctx) override {
  2463. TPtr res;
  2464. if (QuantifierAll) {
  2465. if (ctx.EmitUnionMerge) {
  2466. res = ctx.PositionalUnionAll ? Y("UnionMergePositional") : Y("UnionMerge");
  2467. } else {
  2468. res = ctx.PositionalUnionAll ? Y("UnionAllPositional") : Y("UnionAll");
  2469. }
  2470. } else {
  2471. res = ctx.PositionalUnionAll ? Y("UnionPositional") : Y("Union");
  2472. }
  2473. for (auto& s: Sources) {
  2474. auto input = s->Build(ctx);
  2475. if (!input) {
  2476. return nullptr;
  2477. }
  2478. res->Add(input);
  2479. }
  2480. return res;
  2481. }
  2482. bool IsStream() const override {
  2483. for (auto& s: Sources) {
  2484. if (!s->IsStream()) {
  2485. return false;
  2486. }
  2487. }
  2488. return true;
  2489. }
  2490. TNodePtr DoClone() const final {
  2491. return MakeIntrusive<TUnion>(Pos, CloneContainer(Sources), QuantifierAll, Settings);
  2492. }
  2493. bool IsSelect() const override {
  2494. return true;
  2495. }
  2496. bool HasSelectResult() const override {
  2497. return !Settings.Discard;
  2498. }
  2499. TWriteSettings GetWriteSettings() const override {
  2500. return Settings;
  2501. }
  2502. private:
  2503. TVector<TSourcePtr> Sources;
  2504. bool QuantifierAll;
  2505. const TWriteSettings Settings;
  2506. };
  2507. TSourcePtr BuildUnion(
  2508. TPosition pos,
  2509. TVector<TSourcePtr>&& sources,
  2510. bool quantifierAll,
  2511. const TWriteSettings& settings
  2512. ) {
  2513. return new TUnion(pos, std::move(sources), quantifierAll, settings);
  2514. }
  2515. class TOverWindowSource: public IProxySource {
  2516. public:
  2517. TOverWindowSource(TPosition pos, const TString& windowName, ISource* origSource)
  2518. : IProxySource(pos, origSource)
  2519. , WindowName(windowName)
  2520. {
  2521. Source->SetLabel(origSource->GetLabel());
  2522. }
  2523. TString MakeLocalName(const TString& name) override {
  2524. return Source->MakeLocalName(name);
  2525. }
  2526. void AddTmpWindowColumn(const TString& column) override {
  2527. return Source->AddTmpWindowColumn(column);
  2528. }
  2529. bool AddAggregation(TContext& ctx, TAggregationPtr aggr) override {
  2530. if (aggr->IsOverWindow() || aggr->IsOverWindowDistinct()) {
  2531. return Source->AddAggregationOverWindow(ctx, WindowName, aggr);
  2532. }
  2533. return Source->AddAggregation(ctx, aggr);
  2534. }
  2535. bool AddFuncOverWindow(TContext& ctx, TNodePtr expr) override {
  2536. return Source->AddFuncOverWindow(ctx, WindowName, expr);
  2537. }
  2538. bool IsOverWindowSource() const override {
  2539. return true;
  2540. }
  2541. TMaybe<bool> AddColumn(TContext& ctx, TColumnNode& column) override {
  2542. return Source->AddColumn(ctx, column);
  2543. }
  2544. TNodePtr Build(TContext& ctx) override {
  2545. Y_UNUSED(ctx);
  2546. Y_ABORT("Unexpected call");
  2547. }
  2548. const TString* GetWindowName() const override {
  2549. return &WindowName;
  2550. }
  2551. TWindowSpecificationPtr FindWindowSpecification(TContext& ctx, const TString& windowName) const override {
  2552. return Source->FindWindowSpecification(ctx, windowName);
  2553. }
  2554. TNodePtr GetSessionWindowSpec() const override {
  2555. return Source->GetSessionWindowSpec();
  2556. }
  2557. TNodePtr DoClone() const final {
  2558. return {};
  2559. }
  2560. private:
  2561. const TString WindowName;
  2562. };
  2563. TSourcePtr BuildOverWindowSource(TPosition pos, const TString& windowName, ISource* origSource) {
  2564. return new TOverWindowSource(pos, windowName, origSource);
  2565. }
  2566. class TSkipTakeNode final: public TAstListNode {
  2567. public:
  2568. TSkipTakeNode(TPosition pos, const TNodePtr& skip, const TNodePtr& take)
  2569. : TAstListNode(pos), IsSkipProvided_(!!skip)
  2570. {
  2571. TNodePtr select(AstNode("select"));
  2572. if (skip) {
  2573. select = Y("Skip", select, Y("Coalesce", skip, Y("Uint64", Q("0"))));
  2574. }
  2575. static const TString uiMax = ::ToString(std::numeric_limits<ui64>::max());
  2576. Add("let", "select", Y("Take", select, Y("Coalesce", take, Y("Uint64", Q(uiMax)))));
  2577. }
  2578. TPtr DoClone() const final {
  2579. return {};
  2580. }
  2581. bool HasSkip() const {
  2582. return IsSkipProvided_;
  2583. }
  2584. private:
  2585. const bool IsSkipProvided_;
  2586. };
  2587. TNodePtr BuildSkipTake(TPosition pos, const TNodePtr& skip, const TNodePtr& take) {
  2588. return new TSkipTakeNode(pos, skip, take);
  2589. }
  2590. class TSelect: public IProxySource {
  2591. public:
  2592. TSelect(TPosition pos, TSourcePtr source, TNodePtr skipTake)
  2593. : IProxySource(pos, source.Get())
  2594. , Source(std::move(source))
  2595. , SkipTake(skipTake)
  2596. {}
  2597. bool DoInit(TContext& ctx, ISource* src) override {
  2598. Source->SetLabel(Label);
  2599. if (AsInner) {
  2600. Source->UseAsInner();
  2601. }
  2602. if (IgnoreSort()) {
  2603. Source->DisableSort();
  2604. ctx.Warning(Source->GetPos(), TIssuesIds::YQL_ORDER_BY_WITHOUT_LIMIT_IN_SUBQUERY) << "ORDER BY without LIMIT in subquery will be ignored";
  2605. }
  2606. if (!Source->Init(ctx, src)) {
  2607. return false;
  2608. }
  2609. src = Source.Get();
  2610. if (SkipTake) {
  2611. FakeSource = BuildFakeSource(SkipTake->GetPos());
  2612. if (!SkipTake->Init(ctx, FakeSource.Get())) {
  2613. return false;
  2614. }
  2615. if (SkipTake->HasSkip() && EOrderKind::Sort != Source->GetOrderKind()) {
  2616. ctx.Warning(Source->GetPos(), TIssuesIds::YQL_OFFSET_WITHOUT_SORT) << "LIMIT with OFFSET without ORDER BY may provide different results from run to run";
  2617. }
  2618. }
  2619. return true;
  2620. }
  2621. TNodePtr Build(TContext& ctx) override {
  2622. auto input = Source->Build(ctx);
  2623. if (!input) {
  2624. return nullptr;
  2625. }
  2626. const auto label = "select";
  2627. auto block(Y(Y("let", label, input)));
  2628. auto sortNode = Source->BuildSort(ctx, label);
  2629. if (sortNode && !IgnoreSort()) {
  2630. block = L(block, sortNode);
  2631. }
  2632. if (SkipTake) {
  2633. block = L(block, SkipTake);
  2634. }
  2635. TNodePtr sample;
  2636. if (!BuildSamplingLambda(sample)) {
  2637. return nullptr;
  2638. } else if (sample) {
  2639. block = L(block, Y("let", "select", Y("OrderedFlatMap", "select", sample)));
  2640. }
  2641. if (auto removeNode = Source->BuildCleanupColumns(ctx, label)) {
  2642. block = L(block, removeNode);
  2643. }
  2644. block = L(block, Y("return", label));
  2645. return Y("block", Q(block));
  2646. }
  2647. bool SetSamplingOptions(
  2648. TContext& ctx,
  2649. TPosition pos,
  2650. ESampleClause sampleClause,
  2651. ESampleMode mode,
  2652. TNodePtr samplingRate,
  2653. TNodePtr samplingSeed) override {
  2654. if (mode == ESampleMode::System) {
  2655. ctx.Error(pos) << "only Bernoulli sampling mode is supported for subqueries";
  2656. return false;
  2657. }
  2658. if (samplingSeed) {
  2659. ctx.Error(pos) << "'Repeatable' keyword is not supported for subqueries";
  2660. return false;
  2661. }
  2662. return SetSamplingRate(ctx, sampleClause, samplingRate);
  2663. }
  2664. bool IsSelect() const override {
  2665. return Source->IsSelect();
  2666. }
  2667. bool HasSelectResult() const override {
  2668. return Source->HasSelectResult();
  2669. }
  2670. TPtr DoClone() const final {
  2671. return MakeIntrusive<TSelect>(Pos, Source->CloneSource(), SafeClone(SkipTake));
  2672. }
  2673. protected:
  2674. bool IgnoreSort() const {
  2675. return AsInner && !SkipTake && EOrderKind::Sort == Source->GetOrderKind();
  2676. }
  2677. TSourcePtr Source;
  2678. TNodePtr SkipTake;
  2679. TSourcePtr FakeSource;
  2680. };
  2681. TSourcePtr BuildSelect(TPosition pos, TSourcePtr source, TNodePtr skipTake) {
  2682. return new TSelect(pos, std::move(source), skipTake);
  2683. }
  2684. class TSelectResultNode final: public TAstListNode {
  2685. public:
  2686. TSelectResultNode(TPosition pos, TSourcePtr source, bool writeResult, bool inSubquery,
  2687. TScopedStatePtr scoped)
  2688. : TAstListNode(pos)
  2689. , Source(std::move(source))
  2690. , WriteResult(writeResult)
  2691. , InSubquery(inSubquery)
  2692. , Scoped(scoped)
  2693. {
  2694. YQL_ENSURE(Source, "Invalid source node");
  2695. FakeSource = BuildFakeSource(pos);
  2696. }
  2697. bool IsSelect() const override {
  2698. return true;
  2699. }
  2700. bool HasSelectResult() const override {
  2701. return Source->HasSelectResult();
  2702. }
  2703. bool DoInit(TContext& ctx, ISource* src) override {
  2704. if (!Source->Init(ctx, src)) {
  2705. return false;
  2706. }
  2707. src = Source.Get();
  2708. TTableList tableList;
  2709. Source->GetInputTables(tableList);
  2710. TNodePtr node(BuildInputTables(Pos, tableList, InSubquery, Scoped));
  2711. if (!node->Init(ctx, src)) {
  2712. return false;
  2713. }
  2714. auto writeSettings = src->GetWriteSettings();
  2715. bool asRef = ctx.PragmaRefSelect;
  2716. bool asAutoRef = true;
  2717. if (ctx.PragmaSampleSelect) {
  2718. asRef = false;
  2719. asAutoRef = false;
  2720. }
  2721. auto settings = Y(Q(Y(Q("type"))));
  2722. if (writeSettings.Discard) {
  2723. settings = L(settings, Q(Y(Q("discard"))));
  2724. }
  2725. if (!writeSettings.Label.Empty()) {
  2726. auto labelNode = writeSettings.Label.Build();
  2727. if (!writeSettings.Label.GetLiteral()) {
  2728. labelNode = Y("EvaluateAtom", labelNode);
  2729. }
  2730. if (!labelNode->Init(ctx, FakeSource.Get())) {
  2731. return false;
  2732. }
  2733. settings = L(settings, Q(Y(Q("label"), labelNode)));
  2734. }
  2735. if (asRef) {
  2736. settings = L(settings, Q(Y(Q("ref"))));
  2737. } else if (asAutoRef) {
  2738. settings = L(settings, Q(Y(Q("autoref"))));
  2739. }
  2740. auto columns = Source->GetColumns();
  2741. if (columns && !columns->All && !(columns->QualifiedAll && ctx.SimpleColumns)) {
  2742. auto list = Y();
  2743. YQL_ENSURE(columns->List.size() == columns->NamedColumns.size());
  2744. for (size_t i = 0; i < columns->List.size(); ++i) {
  2745. auto& c = columns->List[i];
  2746. if (c.EndsWith('*')) {
  2747. list = L(list, Q(Y(Q("prefix"), BuildQuotedAtom(Pos, c.substr(0, c.size() - 1)))));
  2748. } else if (columns->NamedColumns[i]) {
  2749. list = L(list, BuildQuotedAtom(Pos, c));
  2750. } else {
  2751. list = L(list, Q(Y(Q("auto"))));
  2752. }
  2753. }
  2754. settings = L(settings, Q(Y(Q("columns"), Q(list))));
  2755. }
  2756. if (ctx.ResultRowsLimit > 0) {
  2757. settings = L(settings, Q(Y(Q("take"), Q(ToString(ctx.ResultRowsLimit)))));
  2758. }
  2759. auto output = Source->Build(ctx);
  2760. if (!output) {
  2761. return false;
  2762. }
  2763. node = L(node, Y("let", "output", output));
  2764. if (WriteResult || writeSettings.Discard) {
  2765. if (EOrderKind::None == Source->GetOrderKind() && ctx.UseUnordered(*Source)) {
  2766. node = L(node, Y("let", "output", Y("Unordered", "output")));
  2767. if (ctx.UnorderedResult) {
  2768. settings = L(settings, Q(Y(Q("unordered"))));
  2769. }
  2770. }
  2771. auto writeResult(BuildWriteResult(Pos, "output", settings));
  2772. if (!writeResult->Init(ctx, src)) {
  2773. return false;
  2774. }
  2775. node = L(node, Y("let", "world", writeResult));
  2776. node = L(node, Y("return", "world"));
  2777. } else {
  2778. node = L(node, Y("return", "output"));
  2779. }
  2780. Add("block", Q(node));
  2781. return true;
  2782. }
  2783. TPtr DoClone() const final {
  2784. return {};
  2785. }
  2786. protected:
  2787. TSourcePtr Source;
  2788. const bool WriteResult;
  2789. const bool InSubquery;
  2790. TScopedStatePtr Scoped;
  2791. TSourcePtr FakeSource;
  2792. };
  2793. TNodePtr BuildSelectResult(TPosition pos, TSourcePtr source, bool writeResult, bool inSubquery,
  2794. TScopedStatePtr scoped) {
  2795. return new TSelectResultNode(pos, std::move(source), writeResult, inSubquery, scoped);
  2796. }
  2797. } // namespace NSQLTranslationV1