join.cpp 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674
  1. #include "source.h"
  2. #include "context.h"
  3. #include <yql/essentials/utils/yql_panic.h>
  4. #include <library/cpp/charset/ci_string.h>
  5. #include <util/generic/hash_set.h>
  6. #include <util/string/cast.h>
  7. #include <util/string/split.h>
  8. #include <util/string/join.h>
  9. using namespace NYql;
  10. namespace NSQLTranslationV1 {
  11. TString NormalizeJoinOp(const TString& joinOp) {
  12. TVector<TString> joinOpsParts;
  13. Split(joinOp, " ", joinOpsParts);
  14. for (auto&x : joinOpsParts) {
  15. x.to_title();
  16. }
  17. return JoinSeq("", joinOpsParts);
  18. }
  19. struct TJoinDescr {
  20. TString Op;
  21. TJoinLinkSettings LinkSettings;
  22. struct TFullColumn {
  23. ui32 Source;
  24. TNodePtr Column;
  25. };
  26. TVector<std::pair<TFullColumn, TFullColumn>> Keys;
  27. explicit TJoinDescr(const TString& op)
  28. : Op(op)
  29. {}
  30. };
  31. class TJoinBase: public IJoin {
  32. public:
  33. TJoinBase(TPosition pos, TVector<TSourcePtr>&& sources, TVector<bool>&& anyFlags)
  34. : IJoin(pos)
  35. , Sources(std::move(sources))
  36. , AnyFlags(std::move(anyFlags))
  37. {
  38. YQL_ENSURE(Sources.size() == AnyFlags.size());
  39. }
  40. void AllColumns() override {
  41. for (auto& source: Sources) {
  42. source->AllColumns();
  43. }
  44. }
  45. TMaybe<bool> AddColumn(TContext& ctx, TColumnNode& column) override {
  46. ISource* srcByName = nullptr;
  47. if (column.IsArtificial()) {
  48. return true;
  49. }
  50. if (const auto sourceName = *column.GetSourceName()) {
  51. for (auto& source: Sources) {
  52. if (sourceName == source->GetLabel()) {
  53. srcByName = source.Get();
  54. break;
  55. }
  56. }
  57. if (!srcByName) {
  58. if (column.IsAsterisk()) {
  59. ctx.Error(column.GetPos()) << "Unknown correlation name for asterisk: " << sourceName;
  60. return {};
  61. }
  62. // \todo add warning, either mistake in correlation name, either it's a column
  63. column.ResetColumn("", sourceName);
  64. column.SetUseSourceAsColumn();
  65. column.SetAsNotReliable();
  66. }
  67. }
  68. if (column.IsAsterisk()) {
  69. if (!column.GetCountHint()) {
  70. if (srcByName) {
  71. srcByName->AllColumns();
  72. } else {
  73. for (auto& source: Sources) {
  74. source->AllColumns();
  75. }
  76. }
  77. }
  78. return true;
  79. }
  80. if (srcByName) {
  81. column.ResetAsReliable();
  82. if (!srcByName->AddColumn(ctx, column)) {
  83. return {};
  84. }
  85. if (!KeysInitializing && !column.IsAsterisk()) {
  86. column.SetUseSource();
  87. }
  88. return true;
  89. } else {
  90. unsigned acceptedColumns = 0;
  91. TIntrusivePtr<TColumnNode> tryColumn = static_cast<TColumnNode*>(column.Clone().Get());
  92. tryColumn->SetAsNotReliable();
  93. TString lastAcceptedColumnSource;
  94. for (auto& source: Sources) {
  95. if (source->AddColumn(ctx, *tryColumn)) {
  96. ++acceptedColumns;
  97. lastAcceptedColumnSource = source->GetLabel();
  98. }
  99. }
  100. if (!acceptedColumns) {
  101. TStringBuilder sb;
  102. const auto& fullColumnName = FullColumnName(column);
  103. sb << "Column " << fullColumnName << " is not fit to any source";
  104. for (auto& source: Sources) {
  105. if (const auto mistype = source->FindColumnMistype(fullColumnName)) {
  106. sb << ". Did you mean " << mistype.GetRef() << "?";
  107. break;
  108. }
  109. }
  110. ctx.Error(column.GetPos()) << sb;
  111. return {};
  112. } else {
  113. column.SetAsNotReliable();
  114. }
  115. return false;
  116. }
  117. }
  118. const TColumns* GetColumns() const override {
  119. YQL_ENSURE(IsColumnDone, "Unable to GetColumns while it's not finished");
  120. return &JoinedColumns;
  121. }
  122. void GetInputTables(TTableList& tableList) const override {
  123. for (auto& src: Sources) {
  124. src->GetInputTables(tableList);
  125. }
  126. ISource::GetInputTables(tableList);
  127. }
  128. TNodePtr BuildJoinKeys(TContext& ctx, const TVector<TDeferredAtom>& names) override {
  129. const size_t n = JoinOps.size();
  130. TString what(Sources[n]->GetLabel());
  131. static const TSet<TString> noRightSourceJoinOps = {"LeftOnly", "LeftSemi"};
  132. for (size_t nn = n; nn > 0 && noRightSourceJoinOps.contains(JoinOps[nn-1]); --nn) {
  133. what = Sources[nn-1]->GetLabel();
  134. }
  135. const TString with(Sources[n + 1]->GetLabel());
  136. for (auto index = n; index <= n + 1; ++index) {
  137. const auto& label = Sources[index]->GetLabel();
  138. if (label.Contains('.')) {
  139. ctx.Error(Sources[index]->GetPos()) << "Invalid label: " << label << ", unable to use name with dot symbol, you should use AS <simple alias name>";
  140. return nullptr;
  141. }
  142. }
  143. if (what.empty() && with.empty()) {
  144. ctx.Error() << "At least one correlation name is required in join";
  145. return nullptr;
  146. }
  147. if (what == with) {
  148. ctx.Error() << "Self joins are not supporting ON syntax";
  149. return nullptr;
  150. }
  151. TPosition pos(ctx.Pos());
  152. TNodePtr expr;
  153. for (auto& name: names) {
  154. auto lhs = BuildColumn(Pos, name, what);
  155. auto rhs = BuildColumn(Pos, name, with);
  156. if (!lhs || !rhs) {
  157. return nullptr;
  158. }
  159. TNodePtr eq(BuildBinaryOp(ctx, pos, "==", lhs, rhs));
  160. if (expr) {
  161. expr = BuildBinaryOp(ctx, pos, "And", expr, eq);
  162. } else {
  163. expr = eq;
  164. }
  165. }
  166. if (expr && Sources.size() > 2) {
  167. ctx.Error() << "Multi-way JOINs should be connected with ON clause instead of USING clause";
  168. return nullptr;
  169. }
  170. return expr;
  171. }
  172. bool DoInit(TContext& ctx, ISource* src) override;
  173. void SetupJoin(const TString& opName, TNodePtr expr, const TJoinLinkSettings& linkSettings) override {
  174. JoinOps.push_back(opName);
  175. JoinExprs.push_back(expr);
  176. JoinLinkSettings.push_back(linkSettings);
  177. }
  178. bool IsStream() const override {
  179. return AnyOf(Sources, [] (const TSourcePtr& s) { return s->IsStream(); });
  180. }
  181. protected:
  182. static TString FullColumnName(const TColumnNode& column) {
  183. auto sourceName = *column.GetSourceName();
  184. auto columnName = *column.GetColumnName();
  185. return sourceName ? DotJoin(sourceName, columnName) : columnName;
  186. }
  187. bool InitKeysOrFilters(TContext& ctx, ui32 joinIdx, TNodePtr expr) {
  188. const TString joinOp(JoinOps[joinIdx]);
  189. const TJoinLinkSettings linkSettings(JoinLinkSettings[joinIdx]);
  190. const TCallNode* op = nullptr;
  191. if (expr) {
  192. const TString opName(expr->GetOpName());
  193. if (opName != "==") {
  194. ctx.Error(expr->GetPos()) << "JOIN ON expression must be a conjunction of equality predicates";
  195. return false;
  196. }
  197. op = expr->GetCallNode();
  198. YQL_ENSURE(op, "Invalid JOIN equal operation node");
  199. YQL_ENSURE(op->GetArgs().size() == 2, "Invalid JOIN equal operation arguments");
  200. }
  201. ui32 idx = 0;
  202. THashMap<TString, ui32> sources;
  203. for (auto& source: Sources) {
  204. auto label = source->GetLabel();
  205. if (!label) {
  206. ctx.Error(source->GetPos()) << "JOIN: missing correlation name for source";
  207. return false;
  208. }
  209. sources.insert({ source->GetLabel(), idx });
  210. ++idx;
  211. }
  212. if (sources.size() != Sources.size()) {
  213. ctx.Error(expr ? expr->GetPos() : Pos) << "JOIN: all correlation names must be different";
  214. return false;
  215. }
  216. ui32 pos = 0;
  217. ui32 leftArg = 0;
  218. ui32 rightArg = 0;
  219. ui32 leftSourceIdx = 0;
  220. ui32 rightSourceIdx = 0;
  221. const TString* leftSource = nullptr;
  222. const TString* rightSource = nullptr;
  223. const TString* sameColumnNamePtr = nullptr;
  224. TSet<TString> joinedSources;
  225. if (op) {
  226. const TString* columnNamePtr = nullptr;
  227. for (auto& arg : op->GetArgs()) {
  228. const auto sourceNamePtr = arg->GetSourceName();
  229. if (!sourceNamePtr) {
  230. ctx.Error(expr->GetPos()) << "JOIN: each equality predicate argument must depend on exactly one JOIN input";
  231. return false;
  232. }
  233. const auto sourceName = *sourceNamePtr;
  234. if (sourceName.empty()) {
  235. ctx.Error(expr->GetPos()) << "JOIN: column requires correlation name";
  236. return false;
  237. }
  238. auto it = sources.find(sourceName);
  239. if (it != sources.end()) {
  240. joinedSources.insert(sourceName);
  241. if (it->second == joinIdx + 1) {
  242. rightArg = pos;
  243. rightSource = sourceNamePtr;
  244. rightSourceIdx = it->second;
  245. }
  246. else if (it->second > joinIdx + 1) {
  247. ctx.Error(expr->GetPos()) << "JOIN: can not use source: " << sourceName << " in equality predicate, it is out of current join scope";
  248. return false;
  249. }
  250. else {
  251. leftArg = pos;
  252. leftSource = sourceNamePtr;
  253. leftSourceIdx = it->second;
  254. }
  255. }
  256. else {
  257. ctx.Error(expr->GetPos()) << "JOIN: unknown corellation name: " << sourceName;
  258. return false;
  259. }
  260. if (!columnNamePtr) {
  261. columnNamePtr = arg->GetColumnName();
  262. } else {
  263. auto curColumnNamePtr = arg->GetColumnName();
  264. if (curColumnNamePtr && *curColumnNamePtr == *columnNamePtr) {
  265. sameColumnNamePtr = columnNamePtr;
  266. }
  267. }
  268. ++pos;
  269. }
  270. } else {
  271. for (auto& x : sources) {
  272. if (x.second == joinIdx) {
  273. leftArg = pos;
  274. leftSourceIdx = x.second;
  275. joinedSources.insert(x.first);
  276. }
  277. else if (x.second = joinIdx + 1) {
  278. rightArg = pos;
  279. rightSourceIdx = x.second;
  280. joinedSources.insert(x.first);
  281. }
  282. }
  283. }
  284. if (joinedSources.size() == 1) {
  285. ctx.Error(expr ? expr->GetPos() : Pos) << "JOIN: different correlation names are required for joined tables";
  286. return false;
  287. }
  288. if (op) {
  289. if (joinedSources.size() != 2) {
  290. ctx.Error(expr->GetPos()) << "JOIN ON expression must be a conjunction of equality predicates over at most two sources";
  291. return false;
  292. }
  293. if (!rightSource) {
  294. ctx.Error(expr->GetPos()) << "JOIN ON equality predicate must have one of its arguments from the rightmost source";
  295. return false;
  296. }
  297. }
  298. KeysInitializing = true;
  299. if (op) {
  300. for (auto& arg : op->GetArgs()) {
  301. if (!arg->Init(ctx, this)) {
  302. return false;
  303. }
  304. }
  305. Y_DEBUG_ABORT_UNLESS(leftSource);
  306. if (sameColumnNamePtr) {
  307. SameKeyMap[*sameColumnNamePtr].insert(*leftSource);
  308. SameKeyMap[*sameColumnNamePtr].insert(*rightSource);
  309. }
  310. }
  311. if (joinIdx == JoinDescrs.size()) {
  312. TJoinDescr newDescr(joinOp);
  313. newDescr.LinkSettings = linkSettings;
  314. JoinDescrs.push_back(std::move(newDescr));
  315. }
  316. JoinDescrs.back().Keys.push_back({ { leftSourceIdx, op ? op->GetArgs()[leftArg] : nullptr},
  317. { rightSourceIdx, op ? op->GetArgs()[rightArg] : nullptr } });
  318. KeysInitializing = false;
  319. return true;
  320. }
  321. bool IsJoinKeysInitializing() const override {
  322. return KeysInitializing;
  323. }
  324. protected:
  325. TVector<TString> JoinOps;
  326. TVector<TNodePtr> JoinExprs;
  327. TVector<TJoinLinkSettings> JoinLinkSettings;
  328. TVector<TJoinDescr> JoinDescrs;
  329. THashMap<TString, THashSet<TString>> SameKeyMap;
  330. const TVector<TSourcePtr> Sources;
  331. const TVector<bool> AnyFlags;
  332. TColumns JoinedColumns;
  333. bool KeysInitializing = false;
  334. bool IsColumnDone = false;
  335. void FinishColumns() override {
  336. if (IsColumnDone) {
  337. return;
  338. }
  339. YQL_ENSURE(JoinOps.size()+1 == Sources.size());
  340. bool excludeNextSource = false;
  341. decltype(JoinOps)::const_iterator opIter = JoinOps.begin();
  342. for (auto& src: Sources) {
  343. if (excludeNextSource) {
  344. excludeNextSource = false;
  345. if (opIter != JoinOps.end()) {
  346. ++opIter;
  347. }
  348. continue;
  349. }
  350. if (opIter != JoinOps.end()) {
  351. auto joinOper = *opIter;
  352. ++opIter;
  353. if (joinOper == "LeftSemi" || joinOper == "LeftOnly") {
  354. excludeNextSource = true;
  355. }
  356. if (joinOper == "RightSemi" || joinOper == "RightOnly") {
  357. continue;
  358. }
  359. }
  360. auto columnsPtr = src->GetColumns();
  361. if (!columnsPtr) {
  362. continue;
  363. }
  364. TColumns upColumns;
  365. upColumns.Merge(*columnsPtr);
  366. upColumns.SetPrefix(src->GetLabel());
  367. JoinedColumns.Merge(upColumns);
  368. }
  369. IsColumnDone = true;
  370. }
  371. };
  372. bool TJoinBase::DoInit(TContext& ctx, ISource* initSrc) {
  373. for (auto& source: Sources) {
  374. if (!source->Init(ctx, initSrc)) {
  375. return false;
  376. }
  377. auto src = source.Get();
  378. if (src->IsFlattenByExprs()) {
  379. for (auto& expr : static_cast<ISource const*>(src)->Expressions(EExprSeat::FlattenByExpr)) {
  380. if (!expr->Init(ctx, src)) {
  381. return false;
  382. }
  383. }
  384. }
  385. }
  386. YQL_ENSURE(JoinOps.size() == JoinExprs.size(), "Invalid join exprs number");
  387. YQL_ENSURE(JoinOps.size() == JoinLinkSettings.size());
  388. const TSet<TString> allowedJoinOps = {"Inner", "Left", "Right", "Full", "LeftOnly", "RightOnly", "Exclusion", "LeftSemi", "RightSemi", "Cross"};
  389. for (auto& opName: JoinOps) {
  390. if (!allowedJoinOps.contains(opName)) {
  391. ctx.Error(Pos) << "Invalid join op: " << opName;
  392. return false;
  393. }
  394. }
  395. ui32 idx = 0;
  396. for (auto expr: JoinExprs) {
  397. if (expr) {
  398. TDeque<TNodePtr> conjQueue;
  399. conjQueue.push_back(expr);
  400. while (!conjQueue.empty()) {
  401. TNodePtr cur = conjQueue.front();
  402. conjQueue.pop_front();
  403. if (cur->GetOpName() == "And") {
  404. auto conj = cur->GetCallNode();
  405. YQL_ENSURE(conj, "Invalid And operation node");
  406. conjQueue.insert(conjQueue.begin(), conj->GetArgs().begin(), conj->GetArgs().end());
  407. } else if (!InitKeysOrFilters(ctx, idx, cur)) {
  408. return false;
  409. }
  410. }
  411. } else {
  412. if (!InitKeysOrFilters(ctx, idx, nullptr)) {
  413. return false;
  414. }
  415. }
  416. ++idx;
  417. }
  418. TSet<ui32> joinedSources;
  419. for (auto& descr: JoinDescrs) {
  420. for (auto& key : descr.Keys) {
  421. joinedSources.insert(key.first.Source);
  422. joinedSources.insert(key.second.Source);
  423. }
  424. }
  425. for (idx = 0; idx < Sources.size(); ++idx) {
  426. if (!joinedSources.contains(idx)) {
  427. ctx.Error(Sources[idx]->GetPos()) << "Source: " << Sources[idx]->GetLabel() << " was not used in join expressions";
  428. return false;
  429. }
  430. }
  431. return ISource::DoInit(ctx, initSrc);
  432. }
  433. class TEquiJoin: public TJoinBase {
  434. public:
  435. TEquiJoin(TPosition pos, TVector<TSourcePtr>&& sources, TVector<bool>&& anyFlags, bool strictJoinKeyTypes)
  436. : TJoinBase(pos, std::move(sources), std::move(anyFlags))
  437. , StrictJoinKeyTypes(strictJoinKeyTypes)
  438. {
  439. }
  440. TNodePtr Build(TContext& ctx) override {
  441. TMap<std::pair<TString, TString>, TNodePtr> extraColumns;
  442. TNodePtr joinTree;
  443. for (auto& descr: JoinDescrs) {
  444. auto leftBranch = joinTree;
  445. bool leftAny = false;
  446. if (!leftBranch) {
  447. leftBranch = BuildQuotedAtom(Pos, Sources[descr.Keys[0].first.Source]->GetLabel());
  448. leftAny = AnyFlags[descr.Keys[0].first.Source];
  449. }
  450. bool rightAny = AnyFlags[descr.Keys[0].second.Source];
  451. auto leftKeys = GetColumnNames(ctx, extraColumns, descr.Keys, true);
  452. auto rightKeys = GetColumnNames(ctx, extraColumns, descr.Keys, false);
  453. if (!leftKeys || !rightKeys) {
  454. return nullptr;
  455. }
  456. TNodePtr linkOptions = Y();
  457. if (TJoinLinkSettings::EStrategy::SortedMerge == descr.LinkSettings.Strategy) {
  458. linkOptions = L(linkOptions, Q(Y(Q("forceSortedMerge"))));
  459. } else if (TJoinLinkSettings::EStrategy::StreamLookup == descr.LinkSettings.Strategy) {
  460. auto streamlookup = Y(Q("forceStreamLookup"));
  461. for (auto&& option: descr.LinkSettings.Values) {
  462. streamlookup = L(streamlookup, Q(option));
  463. }
  464. linkOptions = L(linkOptions, Q(streamlookup));
  465. } else if (TJoinLinkSettings::EStrategy::ForceMap == descr.LinkSettings.Strategy) {
  466. linkOptions = L(linkOptions, Q(Y(Q("join_algo"), Q("MapJoin"))));
  467. } else if (TJoinLinkSettings::EStrategy::ForceGrace == descr.LinkSettings.Strategy) {
  468. linkOptions = L(linkOptions, Q(Y(Q("join_algo"), Q("GraceJoin"))));
  469. }
  470. if (leftAny) {
  471. linkOptions = L(linkOptions, Q(Y(Q("left"), Q("any"))));
  472. }
  473. if (rightAny) {
  474. linkOptions = L(linkOptions, Q(Y(Q("right"), Q("any"))));
  475. }
  476. if (descr.LinkSettings.Compact) {
  477. linkOptions = L(linkOptions, Q(Y(Q("compact"))));
  478. }
  479. joinTree = Q(Y(
  480. Q(descr.Op),
  481. leftBranch,
  482. BuildQuotedAtom(Pos, Sources[descr.Keys[0].second.Source]->GetLabel()),
  483. leftKeys,
  484. rightKeys,
  485. Q(linkOptions)
  486. ));
  487. }
  488. TNodePtr equiJoin(Y("EquiJoin"));
  489. bool ordered = false;
  490. for (size_t i = 0; i < Sources.size(); ++i) {
  491. auto& source = Sources[i];
  492. auto sourceNode = source->Build(ctx);
  493. if (!sourceNode) {
  494. return nullptr;
  495. }
  496. const bool useOrderedForSource = ctx.UseUnordered(*source);
  497. ordered = ordered || useOrderedForSource;
  498. if (source->IsFlattenByColumns() || source->IsFlattenColumns()) {
  499. auto flatten = source->IsFlattenByColumns() ?
  500. source->BuildFlattenByColumns("row") :
  501. source->BuildFlattenColumns("row");
  502. if (!flatten) {
  503. return nullptr;
  504. }
  505. auto block = Y(Y("let", "flatten", sourceNode));
  506. if (source->IsFlattenByExprs()) {
  507. auto premap = source->BuildPreFlattenMap(ctx);
  508. if (!premap) {
  509. return nullptr;
  510. }
  511. block = L(block, Y("let", "flatten", Y(useOrderedForSource ? "OrderedFlatMap" : "FlatMap", "flatten", BuildLambda(Pos, Y("row"), premap))));
  512. }
  513. block = L(block, Y("let", "flatten", Y(useOrderedForSource ? "OrderedFlatMap" : "FlatMap", "flatten", BuildLambda(Pos, Y("row"), flatten, "res"))));
  514. sourceNode = Y("block", Q(L(block, Y("return", "flatten"))));
  515. }
  516. TNodePtr extraMembers;
  517. for (auto it = extraColumns.lower_bound({ source->GetLabel(), "" }); it != extraColumns.end(); ++it) {
  518. if (it->first.first != source->GetLabel()) {
  519. break;
  520. }
  521. if (!extraMembers) {
  522. extraMembers = Y();
  523. }
  524. extraMembers = L(
  525. extraMembers,
  526. Y("let", "row", Y("AddMember", "row", BuildQuotedAtom(it->second->GetPos(), it->first.second), it->second))
  527. );
  528. }
  529. if (extraMembers) {
  530. sourceNode = Y(useOrderedForSource ? "OrderedMap" : "Map", sourceNode, BuildLambda(Pos, Y("row"), extraMembers, "row"));
  531. }
  532. sourceNode = Y("RemoveSystemMembers", sourceNode);
  533. equiJoin = L(equiJoin, Q(Y(sourceNode, BuildQuotedAtom(source->GetPos(), source->GetLabel()))));
  534. }
  535. TNodePtr removeMembers;
  536. for(auto it: extraColumns) {
  537. if (!removeMembers) {
  538. removeMembers = Y();
  539. }
  540. removeMembers = L(
  541. removeMembers,
  542. Y("let", "row", Y("ForceRemoveMember", "row", BuildQuotedAtom(Pos, DotJoin(it.first.first, it.first.second))))
  543. );
  544. }
  545. auto options = Y();
  546. if (StrictJoinKeyTypes) {
  547. options = L(options, Q(Y(Q("strict_keys"))));
  548. }
  549. equiJoin = L(equiJoin, joinTree, Q(options));
  550. if (removeMembers) {
  551. equiJoin = Y(ordered ? "OrderedMap" : "Map", equiJoin, BuildLambda(Pos, Y("row"), removeMembers, "row"));
  552. }
  553. return equiJoin;
  554. }
  555. const THashMap<TString, THashSet<TString>>& GetSameKeysMap() const override {
  556. return SameKeyMap;
  557. }
  558. TVector<TString> GetJoinLabels() const override {
  559. TVector<TString> labels;
  560. for (auto& source: Sources) {
  561. const auto label = source->GetLabel();
  562. YQL_ENSURE(label);
  563. labels.push_back(label);
  564. }
  565. return labels;
  566. }
  567. TPtr DoClone() const final {
  568. TVector<TSourcePtr> clonedSources;
  569. for (auto& cur: Sources) {
  570. clonedSources.push_back(cur->CloneSource());
  571. }
  572. auto newSource = MakeIntrusive<TEquiJoin>(Pos, std::move(clonedSources), TVector<bool>(AnyFlags), StrictJoinKeyTypes);
  573. newSource->JoinOps = JoinOps;
  574. newSource->JoinExprs = CloneContainer(JoinExprs);
  575. newSource->JoinLinkSettings = JoinLinkSettings;
  576. return newSource;
  577. }
  578. private:
  579. TNodePtr GetColumnNames(
  580. TContext& ctx,
  581. TMap<std::pair<TString, TString>, TNodePtr>& extraColumns,
  582. const TVector<std::pair<TJoinDescr::TFullColumn, TJoinDescr::TFullColumn>>& keys,
  583. bool left
  584. ) {
  585. Y_UNUSED(ctx);
  586. auto res = Y();
  587. for (auto& it: keys) {
  588. auto tableName = Sources[left ? it.first.Source : it.second.Source]->GetLabel();
  589. TString columnName;
  590. auto column = left ? it.first.Column : it.second.Column;
  591. if (!column) {
  592. continue;
  593. }
  594. if (column->GetColumnName()) {
  595. columnName = *column->GetColumnName();
  596. } else {
  597. TStringStream str;
  598. str << "_equijoin_column_" << extraColumns.size();
  599. columnName = str.Str();
  600. extraColumns.insert({ std::make_pair(tableName, columnName), column });
  601. }
  602. res = L(res, BuildQuotedAtom(Pos, tableName));
  603. res = L(res, BuildQuotedAtom(Pos, columnName));
  604. }
  605. return Q(res);
  606. }
  607. const bool StrictJoinKeyTypes;
  608. };
  609. TSourcePtr BuildEquiJoin(TPosition pos, TVector<TSourcePtr>&& sources, TVector<bool>&& anyFlags, bool strictJoinKeyTypes) {
  610. return new TEquiJoin(pos, std::move(sources), std::move(anyFlags), strictJoinKeyTypes);
  611. }
  612. } // namespace NSQLTranslationV1