source.cpp 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992
  1. #include "source.h"
  2. #include "context.h"
  3. #include <yql/essentials/ast/yql_ast_escaping.h>
  4. #include <yql/essentials/ast/yql_expr.h>
  5. #include <yql/essentials/core/sql_types/simple_types.h>
  6. #include <yql/essentials/minikql/mkql_type_ops.h>
  7. #include <yql/essentials/parser/pg_catalog/catalog.h>
  8. #include <yql/essentials/utils/yql_panic.h>
  9. #include <library/cpp/containers/stack_vector/stack_vec.h>
  10. #include <library/cpp/charset/ci_string.h>
  11. #include <util/generic/hash_set.h>
  12. #include <util/stream/str.h>
  13. #include <util/string/cast.h>
  14. #include <util/string/escape.h>
  15. #include <util/string/subst.h>
  16. using namespace NYql;
  17. namespace NSQLTranslationV1 {
  18. TTableRef::TTableRef(const TString& refName, const TString& service, const TDeferredAtom& cluster, TNodePtr keys)
  19. : RefName(refName)
  20. , Service(to_lower(service))
  21. , Cluster(cluster)
  22. , Keys(keys)
  23. {
  24. }
  25. TString TTableRef::ShortName() const {
  26. Y_DEBUG_ABORT_UNLESS(Keys);
  27. if (Keys->GetTableKeys()->GetTableName()) {
  28. return *Keys->GetTableKeys()->GetTableName();
  29. }
  30. return TString();
  31. }
  32. ISource::ISource(TPosition pos)
  33. : INode(pos)
  34. {
  35. }
  36. ISource::~ISource()
  37. {
  38. }
  39. TSourcePtr ISource::CloneSource() const {
  40. Y_DEBUG_ABORT_UNLESS(dynamic_cast<ISource*>(Clone().Get()), "Cloned node is no source");
  41. TSourcePtr result = static_cast<ISource*>(Clone().Get());
  42. for (auto curFilter: Filters) {
  43. result->Filters.emplace_back(curFilter->Clone());
  44. }
  45. for (int i = 0; i < static_cast<int>(EExprSeat::Max); ++i) {
  46. result->NamedExprs[i] = CloneContainer(NamedExprs[i]);
  47. }
  48. result->FlattenColumns = FlattenColumns;
  49. result->FlattenMode = FlattenMode;
  50. return result;
  51. }
  52. bool ISource::IsFake() const {
  53. return false;
  54. }
  55. void ISource::AllColumns() {
  56. return;
  57. }
  58. const TColumns* ISource::GetColumns() const {
  59. return nullptr;
  60. }
  61. void ISource::GetInputTables(TTableList& tableList) const {
  62. for (auto srcPtr: UsedSources) {
  63. srcPtr->GetInputTables(tableList);
  64. }
  65. return;
  66. }
  67. TMaybe<bool> ISource::AddColumn(TContext& ctx, TColumnNode& column) {
  68. if (column.IsReliable()) {
  69. ctx.Error(Pos) << "Source does not allow column references";
  70. ctx.Error(column.GetPos()) << "Column reference " <<
  71. (column.GetColumnName() ? "'" + *column.GetColumnName() + "'" : "(expr)");
  72. }
  73. return {};
  74. }
  75. void ISource::FinishColumns() {
  76. }
  77. bool ISource::AddFilter(TContext& ctx, TNodePtr filter) {
  78. Y_UNUSED(ctx);
  79. Filters.push_back(filter);
  80. return true;
  81. }
  82. bool ISource::AddGroupKey(TContext& ctx, const TString& column) {
  83. if (!GroupKeys.insert(column).second) {
  84. ctx.Error() << "Duplicate grouping column: " << column;
  85. return false;
  86. }
  87. OrderedGroupKeys.push_back(column);
  88. return true;
  89. }
  90. void ISource::SetCompactGroupBy(bool compactGroupBy) {
  91. CompactGroupBy = compactGroupBy;
  92. }
  93. void ISource::SetGroupBySuffix(const TString& suffix) {
  94. GroupBySuffix = suffix;
  95. }
  96. bool ISource::AddExpressions(TContext& ctx, const TVector<TNodePtr>& expressions, EExprSeat exprSeat) {
  97. YQL_ENSURE(exprSeat < EExprSeat::Max);
  98. THashSet<TString> names;
  99. THashSet<TString> aliasSet;
  100. // TODO: merge FlattenBy with FlattenByExpr
  101. const bool isFlatten = (exprSeat == EExprSeat::FlattenBy || exprSeat == EExprSeat::FlattenByExpr);
  102. THashSet<TString>& aliases = isFlatten ? FlattenByAliases : aliasSet;
  103. for (const auto& expr: expressions) {
  104. const auto& alias = expr->GetLabel();
  105. const auto& columnNamePtr = expr->GetColumnName();
  106. if (alias) {
  107. ExprAliases.insert(alias);
  108. if (!aliases.emplace(alias).second) {
  109. ctx.Error(expr->GetPos()) << "Duplicate alias found: " << alias << " in " << exprSeat << " section";
  110. return false;
  111. }
  112. if (names.contains(alias)) {
  113. ctx.Error(expr->GetPos()) << "Collision between alias and column name: " << alias << " in " << exprSeat << " section";
  114. return false;
  115. }
  116. }
  117. if (columnNamePtr) {
  118. const auto& sourceName = *expr->GetSourceName();
  119. auto columnName = *columnNamePtr;
  120. if (sourceName) {
  121. columnName = DotJoin(sourceName, columnName);
  122. }
  123. if (!names.emplace(columnName).second) {
  124. ctx.Error(expr->GetPos()) << "Duplicate column name found: " << columnName << " in " << exprSeat << " section";
  125. return false;
  126. }
  127. if (!alias && aliases.contains(columnName)) {
  128. ctx.Error(expr->GetPos()) << "Collision between alias and column name: " << columnName << " in " << exprSeat << " section";
  129. return false;
  130. }
  131. if (alias && exprSeat == EExprSeat::GroupBy) {
  132. auto columnAlias = GroupByColumnAliases.emplace(columnName, alias);
  133. auto oldAlias = columnAlias.first->second;
  134. if (columnAlias.second && oldAlias != alias) {
  135. ctx.Error(expr->GetPos()) << "Alias for column not same, column: " << columnName <<
  136. ", exist alias: " << oldAlias << ", another alias: " << alias;
  137. return false;
  138. }
  139. }
  140. }
  141. if (exprSeat == EExprSeat::GroupBy) {
  142. if (auto sessionWindow = dynamic_cast<TSessionWindow*>(expr.Get())) {
  143. if (SessionWindow) {
  144. ctx.Error(expr->GetPos()) << "Duplicate session window specification:";
  145. ctx.Error(SessionWindow->GetPos()) << "Previous session window is declared here";
  146. return false;
  147. }
  148. SessionWindow = expr;
  149. }
  150. if (auto hoppingWindow = dynamic_cast<THoppingWindow*>(expr.Get())) {
  151. if (HoppingWindow) {
  152. ctx.Error(expr->GetPos()) << "Duplicate hopping window specification:";
  153. ctx.Error(HoppingWindow->GetPos()) << "Previous hopping window is declared here";
  154. return false;
  155. }
  156. HoppingWindow = expr;
  157. }
  158. }
  159. Expressions(exprSeat).emplace_back(expr);
  160. }
  161. return true;
  162. }
  163. void ISource::SetFlattenByMode(const TString& mode) {
  164. FlattenMode = mode;
  165. }
  166. void ISource::MarkFlattenColumns() {
  167. FlattenColumns = true;
  168. }
  169. bool ISource::IsFlattenColumns() const {
  170. return FlattenColumns;
  171. }
  172. TString ISource::MakeLocalName(const TString& name) {
  173. auto iter = GenIndexes.find(name);
  174. if (iter == GenIndexes.end()) {
  175. iter = GenIndexes.emplace(name, 0).first;
  176. }
  177. TStringBuilder str;
  178. str << name << iter->second;
  179. ++iter->second;
  180. return std::move(str);
  181. }
  182. bool ISource::AddAggregation(TContext& ctx, TAggregationPtr aggr) {
  183. Y_UNUSED(ctx);
  184. YQL_ENSURE(aggr);
  185. Aggregations.push_back(aggr);
  186. return true;
  187. }
  188. bool ISource::HasAggregations() const {
  189. return !Aggregations.empty() || !GroupKeys.empty();
  190. }
  191. void ISource::AddWindowSpecs(TWinSpecs winSpecs) {
  192. WinSpecs = winSpecs;
  193. }
  194. bool ISource::AddFuncOverWindow(TContext& ctx, TNodePtr expr) {
  195. Y_UNUSED(ctx);
  196. Y_UNUSED(expr);
  197. return false;
  198. }
  199. void ISource::AddTmpWindowColumn(const TString& column) {
  200. TmpWindowColumns.push_back(column);
  201. }
  202. const TVector<TString>& ISource::GetTmpWindowColumns() const {
  203. return TmpWindowColumns;
  204. }
  205. void ISource::SetLegacyHoppingWindowSpec(TLegacyHoppingWindowSpecPtr spec) {
  206. LegacyHoppingWindowSpec = spec;
  207. }
  208. TLegacyHoppingWindowSpecPtr ISource::GetLegacyHoppingWindowSpec() const {
  209. return LegacyHoppingWindowSpec;
  210. }
  211. TNodePtr ISource::GetSessionWindowSpec() const {
  212. return SessionWindow;
  213. }
  214. TNodePtr ISource::GetHoppingWindowSpec() const {
  215. return HoppingWindow;
  216. }
  217. TWindowSpecificationPtr ISource::FindWindowSpecification(TContext& ctx, const TString& windowName) const {
  218. auto winIter = WinSpecs.find(windowName);
  219. if (winIter == WinSpecs.end()) {
  220. ctx.Error(Pos) << "Unable to find window specification for window '" << windowName << "'";
  221. return {};
  222. }
  223. YQL_ENSURE(winIter->second);
  224. return winIter->second;
  225. }
  226. inline TVector<TNodePtr>& ISource::Expressions(EExprSeat exprSeat) {
  227. return NamedExprs[static_cast<size_t>(exprSeat)];
  228. }
  229. const TVector<TNodePtr>& ISource::Expressions(EExprSeat exprSeat) const {
  230. return NamedExprs[static_cast<size_t>(exprSeat)];
  231. }
  232. inline TNodePtr ISource::AliasOrColumn(const TNodePtr& node, bool withSource) {
  233. auto result = node->GetLabel();
  234. if (!result) {
  235. const auto columnNamePtr = node->GetColumnName();
  236. YQL_ENSURE(columnNamePtr);
  237. result = *columnNamePtr;
  238. if (withSource) {
  239. const auto sourceNamePtr = node->GetSourceName();
  240. if (sourceNamePtr) {
  241. result = DotJoin(*sourceNamePtr, result);
  242. }
  243. }
  244. }
  245. return BuildQuotedAtom(node->GetPos(), result);
  246. }
  247. bool ISource::AddAggregationOverWindow(TContext& ctx, const TString& windowName, TAggregationPtr func) {
  248. if (ctx.DistinctOverWindow) {
  249. YQL_ENSURE(func->IsOverWindow() || func->IsOverWindowDistinct());
  250. } else {
  251. YQL_ENSURE(func->IsOverWindow());
  252. if (func->IsDistinct()) {
  253. ctx.Error(func->GetPos()) << "Aggregation with distinct is not allowed over window: " << windowName;
  254. return false;
  255. }
  256. }
  257. if (!FindWindowSpecification(ctx, windowName)) {
  258. return false;
  259. }
  260. AggregationOverWindow[windowName].emplace_back(std::move(func));
  261. return true;
  262. }
  263. bool ISource::AddFuncOverWindow(TContext& ctx, const TString& windowName, TNodePtr func) {
  264. if (!FindWindowSpecification(ctx, windowName)) {
  265. return false;
  266. }
  267. FuncOverWindow[windowName].emplace_back(std::move(func));
  268. return true;
  269. }
  270. void ISource::SetMatchRecognize(TMatchRecognizeBuilderPtr matchRecognize) {
  271. MatchRecognizeBuilder = matchRecognize;
  272. }
  273. bool ISource::IsCompositeSource() const {
  274. return false;
  275. }
  276. bool ISource::IsGroupByColumn(const TString& column) const {
  277. return GroupKeys.contains(column);
  278. }
  279. bool ISource::IsFlattenByColumns() const {
  280. return !Expressions(EExprSeat::FlattenBy).empty();
  281. }
  282. bool ISource::IsFlattenByExprs() const {
  283. return !Expressions(EExprSeat::FlattenByExpr).empty();
  284. }
  285. bool ISource::IsAlias(EExprSeat exprSeat, const TString& column) const {
  286. for (const auto& exprNode: Expressions(exprSeat)) {
  287. const auto& labelName = exprNode->GetLabel();
  288. if (labelName && labelName == column) {
  289. return true;
  290. }
  291. }
  292. return false;
  293. }
  294. bool ISource::IsExprAlias(const TString& column) const {
  295. std::array<EExprSeat, 5> exprSeats = {{EExprSeat::FlattenBy, EExprSeat::FlattenByExpr, EExprSeat::GroupBy,
  296. EExprSeat::WindowPartitionBy, EExprSeat::DistinctAggr}};
  297. for (auto seat: exprSeats) {
  298. if (IsAlias(seat, column)) {
  299. return true;
  300. }
  301. }
  302. return false;
  303. }
  304. bool ISource::IsExprSeat(EExprSeat exprSeat, EExprType type) const {
  305. auto expressions = Expressions(exprSeat);
  306. if (!expressions) {
  307. return false;
  308. }
  309. for (const auto& exprNode: expressions) {
  310. if (exprNode->GetLabel()) {
  311. return type == EExprType::WithExpression;
  312. }
  313. }
  314. return type == EExprType::ColumnOnly;
  315. }
  316. TString ISource::GetGroupByColumnAlias(const TString& column) const {
  317. auto iter = GroupByColumnAliases.find(column);
  318. if (iter == GroupByColumnAliases.end()) {
  319. return {};
  320. }
  321. return iter->second;
  322. }
  323. const TString* ISource::GetWindowName() const {
  324. return {};
  325. }
  326. bool ISource::IsCalcOverWindow() const {
  327. return !AggregationOverWindow.empty() || !FuncOverWindow.empty() ||
  328. AnyOf(WinSpecs, [](const auto& item) { return item.second->Session; });
  329. }
  330. bool ISource::IsOverWindowSource() const {
  331. return !WinSpecs.empty();
  332. }
  333. bool ISource::IsStream() const {
  334. return false;
  335. }
  336. EOrderKind ISource::GetOrderKind() const {
  337. return EOrderKind::None;
  338. }
  339. TWriteSettings ISource::GetWriteSettings() const {
  340. return {};
  341. }
  342. TNodePtr ISource::PrepareSamplingRate(TPosition pos, ESampleClause clause, TNodePtr samplingRate) {
  343. if (ESampleClause::Sample == clause) {
  344. samplingRate = Y("*", samplingRate, Y("Double", Q("100")));
  345. }
  346. auto ensureLow = Y("Ensure", "samplingRate", Y(">=", "samplingRate", Y("Double", Q("0"))), Y("String", BuildQuotedAtom(pos, "Expected sampling rate to be nonnegative")));
  347. auto ensureHigh = Y("Ensure", "samplingRate", Y("<=", "samplingRate", Y("Double", Q("100"))), Y("String", BuildQuotedAtom(pos, "Sampling rate is over 100%")));
  348. auto block(Y(Y("let", "samplingRate", samplingRate)));
  349. block = L(block, Y("let", "samplingRate", ensureLow));
  350. block = L(block, Y("let", "samplingRate", ensureHigh));
  351. samplingRate = Y("block", Q(L(block, Y("return", "samplingRate"))));
  352. return samplingRate;
  353. }
  354. bool ISource::SetSamplingOptions(TContext& ctx,
  355. TPosition pos,
  356. ESampleClause sampleClause,
  357. ESampleMode mode,
  358. TNodePtr samplingRate,
  359. TNodePtr samplingSeed) {
  360. Y_UNUSED(pos);
  361. Y_UNUSED(sampleClause);
  362. Y_UNUSED(mode);
  363. Y_UNUSED(samplingRate);
  364. Y_UNUSED(samplingSeed);
  365. ctx.Error() << "Sampling is only supported for table sources";
  366. return false;
  367. }
  368. bool ISource::SetTableHints(TContext& ctx, TPosition pos, const TTableHints& hints, const TTableHints& contextHints) {
  369. Y_UNUSED(pos);
  370. Y_UNUSED(contextHints);
  371. if (hints) {
  372. ctx.Error() << "Explicit hints are only supported for table sources";
  373. return false;
  374. }
  375. return true;
  376. }
  377. bool ISource::AddGrouping(TContext& ctx, const TVector<TString>& columns, TString& grouingColumn) {
  378. Y_UNUSED(columns);
  379. Y_UNUSED(grouingColumn);
  380. ctx.Error() << "Source not support grouping hint";
  381. return false;
  382. }
  383. size_t ISource::GetGroupingColumnsCount() const {
  384. return 0;
  385. }
  386. TNodePtr ISource::BuildFilter(TContext& ctx, const TString& label) {
  387. return Filters.empty() ? nullptr : Y(ctx.UseUnordered(*this) ? "OrderedFilter" : "Filter", label, BuildFilterLambda());
  388. }
  389. TNodePtr ISource::BuildFilterLambda() {
  390. if (Filters.empty()) {
  391. return BuildLambda(Pos, Y("row"), Y("Bool", Q("true")));
  392. }
  393. YQL_ENSURE(Filters[0]->HasState(ENodeState::Initialized));
  394. TNodePtr filter(Filters[0]);
  395. for (ui32 i = 1; i < Filters.size(); ++i) {
  396. YQL_ENSURE(Filters[i]->HasState(ENodeState::Initialized));
  397. filter = Y("And", filter, Filters[i]);
  398. }
  399. filter = Y("Coalesce", filter, Y("Bool", Q("false")));
  400. return BuildLambda(Pos, Y("row"), filter);
  401. }
  402. TNodePtr ISource::BuildFlattenByColumns(const TString& label) {
  403. auto columnsList = Y("FlattenByColumns", Q(FlattenMode), label);
  404. for (const auto& column: Expressions(EExprSeat::FlattenBy)) {
  405. const auto columnNamePtr = column->GetColumnName();
  406. YQL_ENSURE(columnNamePtr);
  407. if (column->GetLabel().empty()) {
  408. columnsList = L(columnsList, Q(*columnNamePtr));
  409. } else {
  410. columnsList = L(columnsList, Q(Y(Q(*columnNamePtr), Q(column->GetLabel()))));
  411. }
  412. }
  413. return Y(Y("let", "res", columnsList));
  414. }
  415. TNodePtr ISource::BuildFlattenColumns(const TString& label) {
  416. return Y(Y("let", "res", Y("Just", Y("FlattenStructs", label))));
  417. }
  418. namespace {
  419. TNodePtr BuildLambdaBodyForExprAliases(TPosition pos, const TVector<TNodePtr>& exprs) {
  420. auto structObj = BuildAtom(pos, "row", TNodeFlags::Default);
  421. for (const auto& exprNode: exprs) {
  422. const auto name = exprNode->GetLabel();
  423. YQL_ENSURE(name);
  424. structObj = structObj->Y("ForceRemoveMember", structObj, structObj->Q(name));
  425. if (dynamic_cast<const TSessionWindow*>(exprNode.Get())) {
  426. continue;
  427. }
  428. if (dynamic_cast<const THoppingWindow*>(exprNode.Get())) {
  429. continue;
  430. }
  431. structObj = structObj->Y("AddMember", structObj, structObj->Q(name), exprNode);
  432. }
  433. return structObj->Y("AsList", structObj);
  434. }
  435. }
  436. TNodePtr ISource::BuildPreaggregatedMap(TContext& ctx) {
  437. Y_UNUSED(ctx);
  438. const auto& groupByExprs = Expressions(EExprSeat::GroupBy);
  439. const auto& distinctAggrExprs = Expressions(EExprSeat::DistinctAggr);
  440. YQL_ENSURE(groupByExprs || distinctAggrExprs);
  441. TNodePtr res;
  442. if (groupByExprs) {
  443. auto body = BuildLambdaBodyForExprAliases(Pos, groupByExprs);
  444. res = Y("FlatMap", "core", BuildLambda(Pos, Y("row"), body));
  445. }
  446. if (distinctAggrExprs) {
  447. auto body = BuildLambdaBodyForExprAliases(Pos, distinctAggrExprs);
  448. auto lambda = BuildLambda(Pos, Y("row"), body);
  449. res = res ? Y("FlatMap", res, lambda) : Y("FlatMap", "core", lambda);
  450. }
  451. return res;
  452. }
  453. TNodePtr ISource::BuildPreFlattenMap(TContext& ctx) {
  454. Y_UNUSED(ctx);
  455. YQL_ENSURE(IsFlattenByExprs());
  456. return BuildLambdaBodyForExprAliases(Pos, Expressions(EExprSeat::FlattenByExpr));
  457. }
  458. TNodePtr ISource::BuildPrewindowMap(TContext& ctx) {
  459. auto feed = BuildAtom(Pos, "row", TNodeFlags::Default);
  460. for (const auto& exprNode: Expressions(EExprSeat::WindowPartitionBy)) {
  461. const auto name = exprNode->GetLabel();
  462. if (name && !dynamic_cast<const TSessionWindow*>(exprNode.Get())) {
  463. feed = Y("AddMember", feed, Q(name), exprNode);
  464. }
  465. }
  466. return Y(ctx.UseUnordered(*this) ? "OrderedFlatMap" : "FlatMap", "core", BuildLambda(Pos, Y("row"), Y("AsList", feed)));
  467. }
  468. bool ISource::BuildSamplingLambda(TNodePtr& node) {
  469. if (!SamplingRate) {
  470. return true;
  471. }
  472. auto res = Y("Coalesce", Y("SafeCast", SamplingRate, Y("DataType", Q("Double"))), Y("Double", Q("0")));
  473. res = Y("/", res, Y("Double", Q("100")));
  474. res = Y(Y("let", "res", Y("OptionalIf", Y("<", Y("Random", Y("DependsOn", "row")), res), "row")));
  475. node = BuildLambda(GetPos(), Y("row"), res, "res");
  476. return !!node;
  477. }
  478. bool ISource::SetSamplingRate(TContext& ctx, ESampleClause clause, TNodePtr samplingRate) {
  479. if (samplingRate) {
  480. if (!samplingRate->Init(ctx, this)) {
  481. return false;
  482. }
  483. SamplingRate = PrepareSamplingRate(Pos, clause, samplingRate);
  484. }
  485. return true;
  486. }
  487. std::pair<TNodePtr, bool> ISource::BuildAggregation(const TString& label, TContext& ctx) {
  488. if (GroupKeys.empty() && Aggregations.empty() && !IsCompositeSource() && !LegacyHoppingWindowSpec) {
  489. return { nullptr, true };
  490. }
  491. auto keysTuple = Y();
  492. YQL_ENSURE(GroupKeys.size() == OrderedGroupKeys.size());
  493. for (const auto& key: OrderedGroupKeys) {
  494. YQL_ENSURE(GroupKeys.contains(key));
  495. keysTuple = L(keysTuple, BuildQuotedAtom(Pos, key));
  496. }
  497. std::map<std::pair<bool, TString>, std::vector<IAggregation*>> genericAggrs;
  498. for (const auto& aggr: Aggregations) {
  499. if (const auto key = aggr->GetGenericKey()) {
  500. genericAggrs[{aggr->IsDistinct(), *key}].emplace_back(aggr.Get());
  501. }
  502. }
  503. for (const auto& aggr : genericAggrs) {
  504. for (size_t i = 1U; i < aggr.second.size(); ++i) {
  505. aggr.second.front()->Join(aggr.second[i]);
  506. }
  507. }
  508. const auto listType = Y("TypeOf", label);
  509. auto aggrArgs = Y();
  510. const bool overState = GroupBySuffix == "CombineState" || GroupBySuffix == "MergeState" ||
  511. GroupBySuffix == "MergeFinalize" || GroupBySuffix == "MergeManyFinalize";
  512. const bool allowAggApply = !LegacyHoppingWindowSpec && !SessionWindow && !HoppingWindow;
  513. for (const auto& aggr: Aggregations) {
  514. auto res = aggr->AggregationTraits(listType, overState, GroupBySuffix == "MergeManyFinalize", allowAggApply, ctx);
  515. if (!res.second) {
  516. return { nullptr, false };
  517. }
  518. if (res.first) {
  519. aggrArgs = L(aggrArgs, res.first);
  520. }
  521. }
  522. auto options = Y();
  523. if (CompactGroupBy || GroupBySuffix == "Finalize") {
  524. options = L(options, Q(Y(Q("compact"))));
  525. }
  526. if (LegacyHoppingWindowSpec) {
  527. auto hoppingTraits = Y(
  528. "HoppingTraits",
  529. Y("ListItemType", listType),
  530. BuildLambda(Pos, Y("row"), LegacyHoppingWindowSpec->TimeExtractor),
  531. LegacyHoppingWindowSpec->Hop,
  532. LegacyHoppingWindowSpec->Interval,
  533. LegacyHoppingWindowSpec->Delay,
  534. LegacyHoppingWindowSpec->DataWatermarks ? Q("true") : Q("false"),
  535. Q("v1"));
  536. options = L(options, Q(Y(Q("hopping"), hoppingTraits)));
  537. }
  538. if (SessionWindow) {
  539. YQL_ENSURE(SessionWindow->GetLabel());
  540. auto sessionWindow = dynamic_cast<TSessionWindow*>(SessionWindow.Get());
  541. YQL_ENSURE(sessionWindow);
  542. options = L(options, Q(Y(Q("session"),
  543. Q(Y(BuildQuotedAtom(Pos, SessionWindow->GetLabel()), sessionWindow->BuildTraits(label))))));
  544. }
  545. if (HoppingWindow) {
  546. YQL_ENSURE(HoppingWindow->GetLabel());
  547. auto hoppingWindow = dynamic_cast<THoppingWindow*>(HoppingWindow.Get());
  548. YQL_ENSURE(hoppingWindow);
  549. options = L(options, Q(Y(Q("hopping"),
  550. Q(Y(BuildQuotedAtom(Pos, HoppingWindow->GetLabel()), hoppingWindow->BuildTraits(label))))));
  551. }
  552. return { Y("AssumeColumnOrderPartial", Y("Aggregate" + GroupBySuffix, label, Q(keysTuple), Q(aggrArgs), Q(options)), Q(keysTuple)), true };
  553. }
  554. TMaybe<TString> ISource::FindColumnMistype(const TString& name) const {
  555. auto result = FindMistypeIn(GroupKeys, name);
  556. return result ? result : FindMistypeIn(ExprAliases, name);
  557. }
  558. void ISource::AddDependentSource(ISource* usedSource) {
  559. UsedSources.push_back(usedSource);
  560. }
  561. class TYqlFrameBound final: public TCallNode {
  562. public:
  563. TYqlFrameBound(TPosition pos, TNodePtr bound)
  564. : TCallNode(pos, "EvaluateExpr", 1, 1, { bound })
  565. , FakeSource(BuildFakeSource(pos))
  566. {
  567. }
  568. bool DoInit(TContext& ctx, ISource* src) override {
  569. if (!ValidateArguments(ctx)) {
  570. return false;
  571. }
  572. if (!Args[0]->Init(ctx, FakeSource.Get())) {
  573. return false;
  574. }
  575. return TCallNode::DoInit(ctx, src);
  576. }
  577. TNodePtr DoClone() const final {
  578. return new TYqlFrameBound(Pos, Args[0]->Clone());
  579. }
  580. private:
  581. TSourcePtr FakeSource;
  582. };
  583. TNodePtr BuildFrameNode(const TFrameBound& frame, EFrameType frameType) {
  584. TString settingStr;
  585. switch (frame.Settings) {
  586. case FramePreceding: settingStr = "preceding"; break;
  587. case FrameCurrentRow: settingStr = "currentRow"; break;
  588. case FrameFollowing: settingStr = "following"; break;
  589. default: YQL_ENSURE(false, "Unexpected frame setting");
  590. }
  591. TNodePtr node = frame.Bound;
  592. TPosition pos = frame.Pos;
  593. if (frameType != EFrameType::FrameByRows) {
  594. TVector<TNodePtr> settings;
  595. settings.push_back(BuildQuotedAtom(pos, settingStr, TNodeFlags::Default));
  596. if (frame.Settings != FrameCurrentRow) {
  597. if (!node) {
  598. node = BuildQuotedAtom(pos, "unbounded", TNodeFlags::Default);
  599. } else if (!node->IsLiteral()) {
  600. node = new TYqlFrameBound(pos, node);
  601. }
  602. settings.push_back(std::move(node));
  603. }
  604. return BuildTuple(pos, std::move(settings));
  605. }
  606. // TODO: switch FrameByRows to common format above
  607. YQL_ENSURE(frame.Settings != FrameCurrentRow, "Should be already replaced by 0 preceding/following");
  608. if (!node) {
  609. node = BuildLiteralVoid(pos);
  610. } else if (node->IsLiteral()) {
  611. YQL_ENSURE(node->GetLiteralType() == "Int32");
  612. i32 value = FromString<i32>(node->GetLiteralValue());
  613. YQL_ENSURE(value >= 0);
  614. if (frame.Settings == FramePreceding) {
  615. value = -value;
  616. }
  617. node = new TCallNodeImpl(pos, "Int32", { BuildQuotedAtom(pos, ToString(value), TNodeFlags::Default) });
  618. } else {
  619. if (frame.Settings == FramePreceding) {
  620. node = new TCallNodeImpl(pos, "Minus", { node->Clone() });
  621. }
  622. node = new TYqlFrameBound(pos, node);
  623. }
  624. return node;
  625. }
  626. TNodePtr ISource::BuildWindowFrame(const TFrameSpecification& spec, bool isCompact) {
  627. YQL_ENSURE(spec.FrameExclusion == FrameExclNone);
  628. YQL_ENSURE(spec.FrameBegin);
  629. YQL_ENSURE(spec.FrameEnd);
  630. auto frameBeginNode = BuildFrameNode(*spec.FrameBegin, spec.FrameType);
  631. auto frameEndNode = BuildFrameNode(*spec.FrameEnd, spec.FrameType);
  632. auto begin = Q(Y(Q("begin"), frameBeginNode));
  633. auto end = Q(Y(Q("end"), frameEndNode));
  634. return isCompact ? Q(Y(begin, end, Q(Y(Q("compact"))))) : Q(Y(begin, end));
  635. }
  636. class TSessionWindowTraits final: public TCallNode {
  637. public:
  638. TSessionWindowTraits(TPosition pos, const TVector<TNodePtr>& args)
  639. : TCallNode(pos, "SessionWindowTraits", args)
  640. , FakeSource(BuildFakeSource(pos))
  641. {
  642. YQL_ENSURE(args.size() == 4);
  643. }
  644. bool DoInit(TContext& ctx, ISource* src) override {
  645. if (!ValidateArguments(ctx)) {
  646. return false;
  647. }
  648. if (!Args.back()->Init(ctx, FakeSource.Get())) {
  649. return false;
  650. }
  651. return TCallNode::DoInit(ctx, src);
  652. }
  653. TNodePtr DoClone() const final {
  654. return new TSessionWindowTraits(Pos, CloneContainer(Args));
  655. }
  656. private:
  657. TSourcePtr FakeSource;
  658. };
  659. TNodePtr ISource::BuildCalcOverWindow(TContext& ctx, const TString& label) {
  660. YQL_ENSURE(IsCalcOverWindow());
  661. TSet<TString> usedWindows;
  662. for (auto& it : AggregationOverWindow) {
  663. usedWindows.insert(it.first);
  664. }
  665. for (auto& it : FuncOverWindow) {
  666. usedWindows.insert(it.first);
  667. }
  668. for (auto& it : WinSpecs) {
  669. if (it.second->Session) {
  670. usedWindows.insert(it.first);
  671. }
  672. }
  673. YQL_ENSURE(!usedWindows.empty());
  674. const bool onePartition = usedWindows.size() == 1;
  675. const auto useLabel = onePartition ? label : "partitioning";
  676. const auto listType = Y("TypeOf", useLabel);
  677. auto framesProcess = Y();
  678. auto resultNode = onePartition ? Y() : Y(Y("let", "partitioning", label));
  679. for (const auto& name : usedWindows) {
  680. auto spec = FindWindowSpecification(ctx, name);
  681. YQL_ENSURE(spec);
  682. auto aggsIter = AggregationOverWindow.find(name);
  683. auto funcsIter = FuncOverWindow.find(name);
  684. const auto& aggs = (aggsIter == AggregationOverWindow.end()) ? TVector<TAggregationPtr>() : aggsIter->second;
  685. const auto& funcs = (funcsIter == FuncOverWindow.end()) ? TVector<TNodePtr>() : funcsIter->second;
  686. auto frames = Y();
  687. TString frameType;
  688. switch (spec->Frame->FrameType) {
  689. case EFrameType::FrameByRows: frameType = "WinOnRows"; break;
  690. case EFrameType::FrameByRange: frameType = "WinOnRange"; break;
  691. case EFrameType::FrameByGroups: frameType = "WinOnGroups"; break;
  692. }
  693. YQL_ENSURE(frameType);
  694. auto callOnFrame = Y(frameType, BuildWindowFrame(*spec->Frame, spec->IsCompact));
  695. for (auto& agg : aggs) {
  696. auto winTraits = agg->WindowTraits(listType, ctx);
  697. callOnFrame = L(callOnFrame, winTraits);
  698. }
  699. for (auto& func : funcs) {
  700. auto winSpec = func->WindowSpecFunc(listType);
  701. callOnFrame = L(callOnFrame, winSpec);
  702. }
  703. frames = L(frames, callOnFrame);
  704. auto keysTuple = Y();
  705. for (const auto& key: spec->Partitions) {
  706. if (!dynamic_cast<TSessionWindow*>(key.Get())) {
  707. keysTuple = L(keysTuple, AliasOrColumn(key, GetJoin()));
  708. }
  709. }
  710. auto sortSpec = spec->OrderBy.empty() ? Y("Void") : BuildSortSpec(spec->OrderBy, useLabel, true, false);
  711. if (spec->Session) {
  712. TString label = spec->Session->GetLabel();
  713. YQL_ENSURE(label);
  714. auto sessionWindow = dynamic_cast<TSessionWindow*>(spec->Session.Get());
  715. YQL_ENSURE(sessionWindow);
  716. auto labelNode = BuildQuotedAtom(sessionWindow->GetPos(), label);
  717. auto sessionTraits = sessionWindow->BuildTraits(useLabel);
  718. framesProcess = Y("CalcOverSessionWindow", useLabel, Q(keysTuple), sortSpec, Q(frames), sessionTraits, Q(Y(labelNode)));
  719. } else {
  720. YQL_ENSURE(aggs || funcs);
  721. framesProcess = Y("CalcOverWindow", useLabel, Q(keysTuple), sortSpec, Q(frames));
  722. }
  723. if (!onePartition) {
  724. resultNode = L(resultNode, Y("let", "partitioning", framesProcess));
  725. }
  726. }
  727. if (onePartition) {
  728. return framesProcess;
  729. } else {
  730. return Y("block", Q(L(resultNode, Y("return", "partitioning"))));
  731. }
  732. }
  733. TNodePtr ISource::BuildSort(TContext& ctx, const TString& label) {
  734. Y_UNUSED(ctx);
  735. Y_UNUSED(label);
  736. return nullptr;
  737. }
  738. TNodePtr ISource::BuildCleanupColumns(TContext& ctx, const TString& label) {
  739. Y_UNUSED(ctx);
  740. Y_UNUSED(label);
  741. return nullptr;
  742. }
  743. TNodePtr ISource::BuildGroupingColumns(const TString& label) {
  744. Y_UNUSED(label);
  745. return nullptr;
  746. }
  747. IJoin* ISource::GetJoin() {
  748. return nullptr;
  749. }
  750. ISource* ISource::GetCompositeSource() {
  751. return nullptr;
  752. }
  753. bool ISource::IsSelect() const {
  754. return true;
  755. }
  756. bool ISource::IsTableSource() const {
  757. return false;
  758. }
  759. bool ISource::ShouldUseSourceAsColumn(const TString& source) const {
  760. Y_UNUSED(source);
  761. return false;
  762. }
  763. bool ISource::IsJoinKeysInitializing() const {
  764. return false;
  765. }
  766. bool ISource::DoInit(TContext& ctx, ISource* src) {
  767. for (auto& column: Expressions(EExprSeat::FlattenBy)) {
  768. if (!column->Init(ctx, this)) {
  769. return false;
  770. }
  771. }
  772. if (IsFlattenColumns() && src) {
  773. src->AllColumns();
  774. }
  775. return true;
  776. }
  777. bool ISource::InitFilters(TContext& ctx) {
  778. for (auto& filter: Filters) {
  779. if (!filter->Init(ctx, this)) {
  780. return false;
  781. }
  782. if (filter->IsAggregated() && !filter->IsConstant() && !filter->HasState(ENodeState::AggregationKey)) {
  783. ctx.Error(filter->GetPos()) << "Can not use aggregated values in filtering";
  784. return false;
  785. }
  786. }
  787. return true;
  788. }
  789. TAstNode* ISource::Translate(TContext& ctx) const {
  790. Y_DEBUG_ABORT_UNLESS(false);
  791. Y_UNUSED(ctx);
  792. return nullptr;
  793. }
  794. void ISource::FillSortParts(const TVector<TSortSpecificationPtr>& orderBy, TNodePtr& sortDirection, TNodePtr& sortKeySelector) {
  795. TNodePtr expr;
  796. if (orderBy.empty()) {
  797. YQL_ENSURE(!sortKeySelector);
  798. sortDirection = sortKeySelector = Y("Void");
  799. return;
  800. } else if (orderBy.size() == 1) {
  801. auto& sortSpec = orderBy.front();
  802. expr = Y("PersistableRepr", sortSpec->OrderExpr);
  803. sortDirection = Y("Bool", Q(sortSpec->Ascending ? "true" : "false"));
  804. } else {
  805. auto exprList = Y();
  806. sortDirection = Y();
  807. for (const auto& sortSpec: orderBy) {
  808. const auto asc = sortSpec->Ascending;
  809. sortDirection = L(sortDirection, Y("Bool", Q(asc ? "true" : "false")));
  810. exprList = L(exprList, Y("PersistableRepr", sortSpec->OrderExpr));
  811. }
  812. sortDirection = Q(sortDirection);
  813. expr = Q(exprList);
  814. }
  815. sortKeySelector = BuildLambda(Pos, Y("row"), expr);
  816. }
  817. TNodePtr ISource::BuildSortSpec(const TVector<TSortSpecificationPtr>& orderBy, const TString& label, bool traits, bool assume) {
  818. YQL_ENSURE(!orderBy.empty());
  819. TNodePtr dirsNode;
  820. TNodePtr keySelectorNode;
  821. FillSortParts(orderBy, dirsNode, keySelectorNode);
  822. if (traits) {
  823. return Y("SortTraits", Y("TypeOf", label), dirsNode, keySelectorNode);
  824. } else if (assume) {
  825. return Y("AssumeSorted", label, dirsNode, keySelectorNode);
  826. } else {
  827. return Y("Sort", label, dirsNode, keySelectorNode);
  828. }
  829. }
  830. bool ISource::HasMatchRecognize() const {
  831. return static_cast<bool>(MatchRecognizeBuilder);
  832. }
  833. TNodePtr ISource::BuildMatchRecognize(TContext& ctx, TString&& inputTable){
  834. YQL_ENSURE(HasMatchRecognize());
  835. return MatchRecognizeBuilder->Build(ctx, std::move(inputTable), this);
  836. };
  837. IJoin::IJoin(TPosition pos)
  838. : ISource(pos)
  839. {
  840. }
  841. IJoin::~IJoin()
  842. {
  843. }
  844. IJoin* IJoin::GetJoin() {
  845. return this;
  846. }
  847. } // namespace NSQLTranslationV1