mkql_join.cpp 90 KB


  1. #include "mkql_join.h"
  2. #include <yql/essentials/minikql/computation/mkql_custom_list.h>
  3. #include <yql/essentials/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
  4. #include <yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.h>
  5. #include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
  6. #include <yql/essentials/minikql/computation/mkql_llvm_base.h> // Y_IGNORE
  7. #include <yql/essentials/minikql/mkql_node_cast.h>
  8. #include <yql/essentials/minikql/mkql_program_builder.h>
  9. #include <util/system/tempfile.h>
  10. #include <util/stream/file.h>
  11. #include <util/system/fstat.h>
  12. #include <util/generic/ylimits.h>
  13. namespace NKikimr {
  14. namespace NMiniKQL {
  15. namespace {
  16. const ui64 DEFAULT_STACK_ITEMS = 16;
  17. static const TStatKey Join_Spill_Count("Join_Spill_Count", true);
  18. static const TStatKey Join_Spill_MaxFileSize("Join_Spill_MaxFileSize", false);
  19. static const TStatKey Join_Spill_MaxRowsCount("Join_Spill_MaxRowsCount", false);
  20. enum class EOutputMode {
  21. Unknown,
  22. LeftNull,
  23. RightNull,
  24. BothNull,
  25. Cross,
  26. CrossSwap,
  27. None
  28. };
  29. std::vector<bool> FillRequiredStructColumn(const ui32 inputWidth, const std::vector<ui32>& requiredColumns) {
  30. std::vector<bool> result(inputWidth, false);
  31. for (const auto i : requiredColumns) {
  32. result[i] = true;
  33. }
  34. return result;
  35. }
  36. enum ETableIndex : ui32 {
  37. LeftIndex = 0U,
  38. RightIndex = 1U
  39. };
  40. namespace NFlow {
  41. using TFetcher = std::function<EFetchResult(TComputationContext&, NUdf::TUnboxedValue*const*)>;
  42. using TLiveFetcher = std::function<EFetchResult(TComputationContext&, NUdf::TUnboxedValue*)>;
  43. class TSpillList {
  44. public:
  45. TSpillList(TValuePacker& itemPacker, bool singleShot, size_t width = 0ULL)
  46. : Width(width)
  47. , ItemPacker(itemPacker)
  48. , Count(0)
  49. #ifndef NDEBUG
  50. , IsSealed(false)
  51. #endif
  52. , Index(ui64(-1))
  53. , SingleShot(singleShot)
  54. {}
  55. TSpillList(TSpillList&& rhs) = delete;
  56. TSpillList(const TSpillList& rhs) = delete;
  57. void operator=(const TSpillList& rhs) = delete;
  58. void Init() {
  59. Count = 0;
  60. #ifndef NDEBUG
  61. IsSealed = false;
  62. #endif
  63. Index = ui64(-1);
  64. FileState = nullptr;
  65. Heap.clear();
  66. LiveFlow = nullptr;
  67. LiveValue = NUdf::TUnboxedValue();
  68. }
  69. bool Spill() {
  70. if (FileState) {
  71. return false;
  72. }
  73. FileState.reset(new TFileState);
  74. OpenWrite();
  75. for (ui32 i = 0; i < Count; ++i) {
  76. Write(std::move(InMemory(i)));
  77. }
  78. Heap.clear();
  79. return true;
  80. }
  81. void Live(IComputationNode* flow, NUdf::TUnboxedValue&& liveValue) {
  82. Y_DEBUG_ABORT_UNLESS(!IsLive());
  83. Y_DEBUG_ABORT_UNLESS(Count == 0);
  84. LiveFlow = flow;
  85. LiveValue = std::move(liveValue);
  86. }
  87. void Live(TLiveFetcher&& fetcher, NUdf::TUnboxedValue* liveValues) {
  88. Y_DEBUG_ABORT_UNLESS(!IsLive());
  89. Y_DEBUG_ABORT_UNLESS(Count == 0);
  90. Fetcher = std::move(fetcher);
  91. LiveValues = liveValues;
  92. }
  93. void Add(NUdf::TUnboxedValue&& value) {
  94. #ifndef NDEBUG
  95. Y_DEBUG_ABORT_UNLESS(!IsSealed);
  96. #endif
  97. if (SingleShot && Count > 0) {
  98. MKQL_ENSURE(Count == 1, "Counter inconsistent");
  99. return;
  100. }
  101. if (FileState) {
  102. Write(std::move(value));
  103. } else {
  104. if (Count < DEFAULT_STACK_ITEMS) {
  105. Stack[Count] = std::move(value);
  106. }
  107. else {
  108. if (Count == DEFAULT_STACK_ITEMS) {
  109. Y_DEBUG_ABORT_UNLESS(Heap.empty());
  110. Heap.assign(Stack, Stack + DEFAULT_STACK_ITEMS);
  111. }
  112. Heap.push_back(std::move(value));
  113. }
  114. }
  115. ++Count;
  116. }
  117. void Seal(TComputationContext& ctx) {
  118. #ifndef NDEBUG
  119. IsSealed = true;
  120. #endif
  121. if (FileState) {
  122. FileState->Output->Finish();
  123. Cerr << "Spill finished at " << Count << " items" << Endl;
  124. FileState->Output.reset();
  125. Cerr << "File size: " << GetFileLength(FileState->File.GetName()) << ", expected: " << FileState->TotalSize << Endl;
  126. MKQL_INC_STAT(ctx.Stats, Join_Spill_Count);
  127. MKQL_SET_MAX_STAT(ctx.Stats, Join_Spill_MaxFileSize, static_cast<i64>(FileState->TotalSize));
  128. MKQL_SET_MAX_STAT(ctx.Stats, Join_Spill_MaxRowsCount, static_cast<i64>(Count));
  129. }
  130. }
  131. bool IsLive() const {
  132. return bool(LiveFlow) || bool(Fetcher);
  133. }
  134. ui64 GetCount() const {
  135. Y_DEBUG_ABORT_UNLESS(!IsLive());
  136. return Count;
  137. }
  138. bool Empty() const {
  139. return !IsLive() && (Count == 0);
  140. }
  141. NUdf::TUnboxedValue Next(TComputationContext& ctx) {
  142. #ifndef NDEBUG
  143. Y_DEBUG_ABORT_UNLESS(IsSealed);
  144. #endif
  145. if (IsLive()) {
  146. if ((Index + 1) == 0) {
  147. ++Index;
  148. return std::move(LiveValue);
  149. }
  150. auto value = LiveFlow->GetValue(ctx);
  151. while (SingleShot && !value.IsSpecial()) {
  152. // skip all remaining values
  153. value = LiveFlow->GetValue(ctx);
  154. }
  155. if (!value.IsSpecial()) {
  156. ++Index;
  157. }
  158. return value;
  159. }
  160. if ((Index + 1) == Count) {
  161. return NUdf::TUnboxedValuePod::MakeFinish();
  162. }
  163. ++Index;
  164. if (FileState) {
  165. if (Index == 0) {
  166. OpenRead();
  167. }
  168. return Read(ctx);
  169. }
  170. return InMemory(Index);
  171. }
  172. EFetchResult Next(TComputationContext& ctx, NUdf::TUnboxedValue* values) {
  173. if (IsLive()) {
  174. if ((Index + 1) == 0) {
  175. ++Index;
  176. if (values != LiveValues)
  177. for (auto i = 0U; i < Width; ++i)
  178. *values++ = std::move(*LiveValues++);
  179. LiveValues = nullptr;
  180. return EFetchResult::One;
  181. }
  182. auto result = Fetcher(ctx, values);
  183. while (SingleShot && EFetchResult::One == result) {
  184. // skip all remaining values
  185. result = Fetcher(ctx, values);
  186. }
  187. if (EFetchResult::One == result) {
  188. ++Index;
  189. }
  190. return result;
  191. }
  192. if ((Index + 1) == Count) {
  193. return EFetchResult::Finish;
  194. }
  195. ++Index;
  196. if (FileState) {
  197. if (Index == 0) {
  198. OpenRead();
  199. }
  200. std::copy_n(Read(ctx).GetElements(), Width, values);
  201. return EFetchResult::One;
  202. }
  203. std::copy_n(InMemory(Index).GetElements(), Width, values);
  204. return EFetchResult::One;
  205. }
  206. void Rewind() {
  207. Y_DEBUG_ABORT_UNLESS(!IsLive());
  208. #ifndef NDEBUG
  209. Y_DEBUG_ABORT_UNLESS(IsSealed);
  210. #endif
  211. Index = ui64(-1);
  212. if (FileState) {
  213. OpenRead();
  214. }
  215. }
  216. private:
  217. NUdf::TUnboxedValue& InMemory(ui32 index) {
  218. return !Heap.empty() ? Heap[index] : Stack[index];
  219. }
  220. const NUdf::TUnboxedValue& InMemory(ui32 index) const {
  221. return !Heap.empty() ? Heap[index] : Stack[index];
  222. }
  223. void OpenWrite() {
  224. Cerr << "Spill started at " << Count << " items to " << FileState->File.GetName() << Endl;
  225. FileState->Output.reset(new TFixedBufferFileOutput(FileState->File.GetName()));
  226. FileState->Output->SetFlushPropagateMode(false);
  227. FileState->Output->SetFinishPropagateMode(false);
  228. }
  229. void Write(NUdf::TUnboxedValue&& value) {
  230. Y_DEBUG_ABORT_UNLESS(FileState->Output);
  231. TStringBuf serialized = ItemPacker.Pack(value);
  232. ui32 length = serialized.size();
  233. FileState->Output->Write(&length, sizeof(length));
  234. FileState->Output->Write(serialized.data(), length);
  235. FileState->TotalSize += sizeof(length);
  236. FileState->TotalSize += length;
  237. }
  238. void OpenRead() {
  239. FileState->Input.reset();
  240. FileState->Input.reset(new TFileInput(FileState->File.GetName()));
  241. }
  242. NUdf::TUnboxedValue Read(TComputationContext& ctx) {
  243. ui32 length = 0;
  244. auto wasRead = FileState->Input->Load(&length, sizeof(length));
  245. Y_ABORT_UNLESS(wasRead == sizeof(length));
  246. FileState->Buffer.Reserve(length);
  247. wasRead = FileState->Input->Load((void*)FileState->Buffer.Data(), length);
  248. Y_ABORT_UNLESS(wasRead == length);
  249. return ReadValue = ItemPacker.Unpack(TStringBuf(FileState->Buffer.Data(), length), ctx.HolderFactory);
  250. }
  251. private:
  252. const size_t Width;
  253. TValuePacker& ItemPacker;
  254. ui64 Count;
  255. NUdf::TUnboxedValue ReadValue;
  256. NUdf::TUnboxedValue Stack[DEFAULT_STACK_ITEMS];
  257. TUnboxedValueVector Heap;
  258. #ifndef NDEBUG
  259. bool IsSealed;
  260. #endif
  261. ui64 Index;
  262. const bool SingleShot;
  263. struct TFileState {
  264. TFileState()
  265. : File(TTempFileHandle::InCurrentDir())
  266. , TotalSize(0)
  267. {}
  268. TTempFileHandle File;
  269. ui64 TotalSize;
  270. std::unique_ptr<TFileInput> Input;
  271. std::unique_ptr<TFixedBufferFileOutput> Output;
  272. TBuffer Buffer;
  273. };
  274. std::unique_ptr<TFileState> FileState;
  275. IComputationNode* LiveFlow = nullptr;
  276. TLiveFetcher Fetcher;
  277. NUdf::TUnboxedValue LiveValue;
  278. NUdf::TUnboxedValue* LiveValues = nullptr;
  279. };
  280. template <EJoinKind Kind, bool TTrackRss>
  281. class TCommonJoinCoreWrapper : public TStatefulFlowComputationNode<TCommonJoinCoreWrapper<Kind, TTrackRss>> {
  282. using TSelf = TCommonJoinCoreWrapper<Kind, TTrackRss>;
  283. using TBase = TStatefulFlowComputationNode<TSelf>;
  284. typedef TBase TBaseComputation;
  285. public:
  286. class TValue : public TComputationValue<TValue> {
  287. friend TSelf;
  288. public:
  289. using TBase = TComputationValue<TValue>;
  290. TValue(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TSelf* self)
  291. : TBase(memInfo)
  292. , Self(self)
  293. , List1(Self->Packer.RefMutableObject(ctx, false, Self->InputStructType), IsAnyJoinLeft(Self->AnyJoinSettings))
  294. , List2(Self->Packer.RefMutableObject(ctx, false, Self->InputStructType), IsAnyJoinRight(Self->AnyJoinSettings))
  295. {
  296. Init();
  297. }
  298. void Init() {
  299. List1.Init();
  300. List2.Init();
  301. CrossMove1 = true;
  302. EatInput = true;
  303. KeyHasNulls = false;
  304. OutputMode = EOutputMode::Unknown;
  305. InitialUsage = std::nullopt;
  306. }
  307. private:
  308. // copypaste to resolve -Woverloaded-virtual
  309. bool Next(NUdf::TUnboxedValue&) override {
  310. this->ThrowNotSupported(__func__);
  311. return false;
  312. }
  313. NUdf::TUnboxedValue Next(IComputationNode* flow, TComputationContext& ctx) {
  314. while (EatInput) {
  315. if (!InitialUsage) {
  316. InitialUsage = ctx.HolderFactory.GetPagePool().GetUsed();
  317. }
  318. if (auto value = flow->GetValue(ctx); value.IsYield()) {
  319. return value;
  320. } else if (value.IsFinish()) {
  321. EatInput = false;
  322. } else {
  323. if (!KeyHasNulls && (Kind == EJoinKind::Exclusion || Kind == EJoinKind::Full)) {
  324. for (ui32 i = 0U; i < Self->KeyColumns.size(); ++i) {
  325. if (!value.GetElement(Self->KeyColumns[i])) {
  326. KeyHasNulls = true;
  327. break;
  328. }
  329. }
  330. }
  331. switch (const auto tableIndex = value.GetElement(Self->TableIndexPos).template Get<ui32>()) {
  332. case LeftIndex:
  333. if (Kind == EJoinKind::RightOnly || (Kind == EJoinKind::Exclusion && !List2.Empty() && !KeyHasNulls)) {
  334. EatInput = false;
  335. OutputMode = EOutputMode::None;
  336. break;
  337. }
  338. if (Self->SortedTableOrder && *Self->SortedTableOrder == RightIndex) {
  339. List1.Live(flow, std::move(value));
  340. EatInput = false;
  341. } else {
  342. List1.Add(std::move(value));
  343. if (ctx.CheckAdjustedMemLimit<TTrackRss>(Self->MemLimit, *InitialUsage)) {
  344. List1.Spill();
  345. }
  346. }
  347. break;
  348. case RightIndex:
  349. if (Kind == EJoinKind::LeftOnly || (Kind == EJoinKind::Exclusion && !List1.Empty() && !KeyHasNulls)) {
  350. EatInput = false;
  351. OutputMode = EOutputMode::None;
  352. break;
  353. }
  354. if (Self->SortedTableOrder && *Self->SortedTableOrder == LeftIndex) {
  355. List2.Live(flow, std::move(value));
  356. EatInput = false;
  357. } else {
  358. List2.Add(std::move(value));
  359. if (ctx.CheckAdjustedMemLimit<TTrackRss>(Self->MemLimit, *InitialUsage)) {
  360. List2.Spill();
  361. }
  362. }
  363. break;
  364. default: THROW yexception() << "Bad table index: " << tableIndex;
  365. }
  366. }
  367. }
  368. while (true) {
  369. switch (OutputMode) {
  370. case EOutputMode::Unknown: {
  371. List1.Seal(ctx);
  372. List2.Seal(ctx);
  373. switch (Kind) {
  374. case EJoinKind::Cross:
  375. case EJoinKind::Inner:
  376. if (List1.Empty() || List2.Empty()) {
  377. OutputMode = EOutputMode::None;
  378. }
  379. break;
  380. case EJoinKind::Left:
  381. if (List1.Empty()) {
  382. OutputMode = EOutputMode::None;
  383. }
  384. break;
  385. case EJoinKind::LeftOnly:
  386. if (List1.Empty() || !List2.Empty()) {
  387. OutputMode = EOutputMode::None;
  388. } else {
  389. OutputMode = EOutputMode::RightNull;
  390. }
  391. break;
  392. case EJoinKind::Right:
  393. if (List2.Empty()) {
  394. OutputMode = EOutputMode::None;
  395. }
  396. break;
  397. case EJoinKind::RightOnly:
  398. if (List2.Empty() || !List1.Empty()) {
  399. OutputMode = EOutputMode::None;
  400. } else {
  401. OutputMode = EOutputMode::LeftNull;
  402. }
  403. break;
  404. case EJoinKind::Exclusion:
  405. if (!List1.Empty() && !List2.Empty() && !KeyHasNulls) {
  406. OutputMode = EOutputMode::None;
  407. } else if (List1.Empty()) {
  408. OutputMode = EOutputMode::LeftNull;
  409. } else if (List2.Empty()) {
  410. OutputMode = EOutputMode::RightNull;
  411. } else {
  412. OutputMode = EOutputMode::BothNull;
  413. }
  414. break;
  415. case EJoinKind::Full:
  416. break;
  417. case EJoinKind::LeftSemi:
  418. if (List1.Empty() || List2.Empty()) {
  419. OutputMode = EOutputMode::None;
  420. } else {
  421. OutputMode = EOutputMode::RightNull;
  422. }
  423. break;
  424. case EJoinKind::RightSemi:
  425. if (List1.Empty() || List2.Empty()) {
  426. OutputMode = EOutputMode::None;
  427. } else {
  428. OutputMode = EOutputMode::LeftNull;
  429. }
  430. break;
  431. default:
  432. Y_ABORT("Unknown kind");
  433. }
  434. if (OutputMode == EOutputMode::Unknown) {
  435. if (List1.Empty()) {
  436. OutputMode = EOutputMode::LeftNull;
  437. } else if (List2.Empty()) {
  438. OutputMode = EOutputMode::RightNull;
  439. } else if (List1.IsLive()) {
  440. OutputMode = EOutputMode::Cross;
  441. } else if (List2.IsLive()) {
  442. OutputMode = EOutputMode::CrossSwap;
  443. } else {
  444. OutputMode = List1.GetCount() >= List2.GetCount() ?
  445. EOutputMode::Cross : EOutputMode::CrossSwap;
  446. }
  447. }
  448. }
  449. continue;
  450. case EOutputMode::LeftNull:
  451. if (const auto item = List2.Next(ctx); item.IsSpecial()) {
  452. return item;
  453. } else {
  454. return PrepareNullItem<true>(ctx, item);
  455. }
  456. case EOutputMode::RightNull:
  457. if (const auto item = List1.Next(ctx); item.IsSpecial()) {
  458. return item;
  459. } else {
  460. return PrepareNullItem<false>(ctx, item);
  461. }
  462. case EOutputMode::BothNull:
  463. if (CrossMove1) {
  464. if (const auto item = List1.Next(ctx); item.IsFinish()) {
  465. CrossMove1 = false;
  466. } else if (item.IsYield()) {
  467. return item;
  468. } else {
  469. return PrepareNullItem<false>(ctx, item);
  470. }
  471. }
  472. if (const auto item = List2.Next(ctx); item.IsSpecial()) {
  473. return item;
  474. } else {
  475. return PrepareNullItem<true>(ctx, item);
  476. }
  477. case EOutputMode::Cross:
  478. return PrepareCrossItem<false>(ctx);
  479. case EOutputMode::CrossSwap:
  480. return PrepareCrossItem<true>(ctx);
  481. case EOutputMode::None:
  482. return NUdf::TUnboxedValuePod::MakeFinish();
  483. default:
  484. Y_ABORT("Unknown output mode");
  485. }
  486. }
  487. }
  488. template <bool IsLeftNull>
  489. NUdf::TUnboxedValue PrepareNullItem(TComputationContext& ctx, const NUdf::TUnboxedValue& value) {
  490. const auto structObj = Self->ResStruct.NewArray(ctx, Self->LeftInputColumns.size() + Self->RightInputColumns.size(), ResItems);
  491. for (ui32 i = 0; i < Self->LeftInputColumns.size(); ++i) {
  492. ui32 inIndex = Self->LeftInputColumns[i];
  493. ui32 outIndex = Self->LeftOutputColumns[i];
  494. if constexpr (IsLeftNull) {
  495. ResItems[outIndex] = NUdf::TUnboxedValuePod();
  496. continue;
  497. }
  498. auto member = value.GetElement(inIndex);
  499. if (Self->IsRequiredColumn[inIndex]) {
  500. ResItems[outIndex] = member.Release().GetOptionalValue();
  501. } else {
  502. ResItems[outIndex] = std::move(member);
  503. }
  504. }
  505. for (ui32 i = 0; i < Self->RightInputColumns.size(); ++i) {
  506. ui32 inIndex = Self->RightInputColumns[i];
  507. ui32 outIndex = Self->RightOutputColumns[i];
  508. if constexpr (!IsLeftNull) {
  509. ResItems[outIndex] = NUdf::TUnboxedValuePod();
  510. continue;
  511. }
  512. auto member = value.GetElement(inIndex);
  513. if (Self->IsRequiredColumn[inIndex]) {
  514. ResItems[outIndex] = member.Release().GetOptionalValue();
  515. }
  516. else {
  517. ResItems[outIndex] = std::move(member);
  518. }
  519. }
  520. return structObj;
  521. }
  522. template <bool SwapLists>
  523. NUdf::TUnboxedValue PrepareCrossItem(TComputationContext& ctx) {
  524. if (KeyHasNulls) {
  525. for (;;) {
  526. const auto& value = (CrossMove1 == SwapLists ? List2 : List1).Next(ctx);
  527. if (value.IsFinish() && CrossMove1) {
  528. CrossMove1 = false;
  529. continue;
  530. }
  531. if (value.IsSpecial()) {
  532. return value;
  533. }
  534. return (CrossMove1 == SwapLists) ? PrepareNullItem<true>(ctx, value) : PrepareNullItem<false>(ctx, value);
  535. }
  536. }
  537. for (;;) {
  538. if (CrossMove1) {
  539. CrossValue1 = (SwapLists ? List2 : List1).Next(ctx);
  540. if (CrossValue1.IsSpecial()) {
  541. return CrossValue1;
  542. }
  543. CrossMove1 = false;
  544. (SwapLists ? List1 : List2).Rewind();
  545. }
  546. CrossValue2 = (SwapLists ? List1 : List2).Next(ctx);
  547. if (CrossValue2.IsFinish()) {
  548. CrossMove1 = true;
  549. continue;
  550. }
  551. auto structObj = Self->ResStruct.NewArray(ctx, Self->LeftInputColumns.size() + Self->RightInputColumns.size(), ResItems);
  552. for (ui32 i = 0; i < Self->LeftInputColumns.size(); ++i) {
  553. ui32 inIndex = Self->LeftInputColumns[i];
  554. ui32 outIndex = Self->LeftOutputColumns[i];
  555. auto member = (SwapLists ? CrossValue2 : CrossValue1).GetElement(inIndex);
  556. if (Self->IsRequiredColumn[inIndex]) {
  557. ResItems[outIndex] = member.Release().GetOptionalValue();
  558. } else {
  559. ResItems[outIndex] = std::move(member);
  560. }
  561. }
  562. for (ui32 i = 0; i < Self->RightInputColumns.size(); ++i) {
  563. ui32 inIndex = Self->RightInputColumns[i];
  564. ui32 outIndex = Self->RightOutputColumns[i];
  565. auto member = (SwapLists ? CrossValue1 : CrossValue2).GetElement(inIndex);
  566. if (Self->IsRequiredColumn[inIndex]) {
  567. ResItems[outIndex] = member.Release().GetOptionalValue();
  568. } else {
  569. ResItems[outIndex] = std::move(member);
  570. }
  571. }
  572. return std::move(structObj);
  573. }
  574. }
  575. private:
  576. const TSelf* const Self;
  577. bool EatInput;
  578. bool KeyHasNulls;
  579. std::optional<ui64> InitialUsage;
  580. EOutputMode OutputMode;
  581. bool CrossMove1;
  582. NUdf::TUnboxedValue CrossValue1;
  583. NUdf::TUnboxedValue CrossValue2;
  584. TSpillList List1;
  585. TSpillList List2;
  586. NUdf::TUnboxedValue* ResItems = nullptr;
  587. };
  588. TCommonJoinCoreWrapper(TComputationMutables& mutables, IComputationNode* flow, const TType* inputStructType, ui32 inputWidth, ui32 tableIndexPos,
  589. std::vector<ui32>&& leftInputColumns, std::vector<ui32>&& rightInputColumns, std::vector<ui32>&& requiredColumns,
  590. std::vector<ui32>&& leftOutputColumns, std::vector<ui32>&& rightOutputColumns, ui64 memLimit,
  591. std::optional<ui32> sortedTableOrder, std::vector<ui32>&& keyColumns, EAnyJoinSettings anyJoinSettings)
  592. : TBaseComputation(mutables, flow, EValueRepresentation::Boxed, EValueRepresentation::Any)
  593. , Flow(flow)
  594. , InputStructType(inputStructType)
  595. , Packer(mutables)
  596. , TableIndexPos(tableIndexPos)
  597. , LeftInputColumns(std::move(leftInputColumns))
  598. , RightInputColumns(std::move(rightInputColumns))
  599. , RequiredColumns(std::move(requiredColumns))
  600. , LeftOutputColumns(std::move(leftOutputColumns))
  601. , RightOutputColumns(std::move(rightOutputColumns))
  602. , MemLimit(memLimit)
  603. , SortedTableOrder(sortedTableOrder)
  604. , KeyColumns(std::move(keyColumns))
  605. , IsRequiredColumn(FillRequiredStructColumn(inputWidth, RequiredColumns))
  606. , ResStruct(mutables)
  607. , ResStreamIndex(mutables.CurValueIndex++)
  608. , AnyJoinSettings(anyJoinSettings)
  609. {
  610. }
  611. NUdf::TUnboxedValue DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
  612. if (state.IsInvalid()) {
  613. state = ctx.HolderFactory.Create<TValue>(ctx, this);
  614. }
  615. return static_cast<TValue*>(state.AsBoxed().Get())->Next(Flow, ctx);
  616. }
  617. private:
  618. void RegisterDependencies() const final {
  619. this->FlowDependsOn(Flow);
  620. }
  621. IComputationNode* const Flow;
  622. const TType* const InputStructType;
  623. const TMutableObjectOverBoxedValue<TValuePackerBoxed> Packer;
  624. const ui32 TableIndexPos;
  625. const std::vector<ui32> LeftInputColumns;
  626. const std::vector<ui32> RightInputColumns;
  627. const std::vector<ui32> RequiredColumns;
  628. const std::vector<ui32> LeftOutputColumns;
  629. const std::vector<ui32> RightOutputColumns;
  630. const ui64 MemLimit;
  631. const std::optional<ui32> SortedTableOrder;
  632. const std::vector<ui32> KeyColumns;
  633. const std::vector<bool> IsRequiredColumn;
  634. const TContainerCacheOnContext ResStruct;
  635. const ui32 ResStreamIndex;
  636. const EAnyJoinSettings AnyJoinSettings;
  637. };
  638. template <EJoinKind Kind, bool TTrackRss>
  639. class TWideCommonJoinCoreWrapper : public TStatefulWideFlowCodegeneratorNode<TWideCommonJoinCoreWrapper<Kind, TTrackRss>>
  640. #ifndef MKQL_DISABLE_CODEGEN
  641. , public ICodegeneratorRootNode
  642. #endif
  643. {
  644. using TSelf = TWideCommonJoinCoreWrapper<Kind, TTrackRss>;
  645. using TBase = TStatefulWideFlowCodegeneratorNode<TSelf>;
  646. typedef TBase TBaseComputation;
  647. public:
  648. class TValue : public TComputationValue<TValue> {
  649. friend TSelf;
  650. public:
  651. using TBase = TComputationValue<TValue>;
  652. TValue(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TSelf* self, TFetcher&& fetcher)
  653. : TBase(memInfo)
  654. , Self(self)
  655. , Fetcher(std::move(fetcher))
  656. , Values(Self->InputRepresentations.size(), NUdf::TUnboxedValuePod())
  657. , CrossValues1(std::max(Self->LeftInputColumns.size(), Self->RightInputColumns.size()), NUdf::TUnboxedValuePod())
  658. , CrossValues2(std::max(Self->LeftInputColumns.size(), Self->RightInputColumns.size()), NUdf::TUnboxedValuePod())
  659. , List1(Self->PackerLeft.RefMutableObject(ctx, false, Self->InputLeftType), IsAnyJoinLeft(Self->AnyJoinSettings), Self->InputLeftType->GetElementsCount())
  660. , List2(Self->PackerRight.RefMutableObject(ctx, false, Self->InputRightType), IsAnyJoinRight(Self->AnyJoinSettings), Self->InputRightType->GetElementsCount())
  661. , Fields(GetPointers(Values))
  662. , Stubs(Values.size(), nullptr)
  663. {
  664. Init();
  665. }
  666. void Init() {
  667. List1.Init();
  668. List2.Init();
  669. CrossMove1 = true;
  670. EatInput = true;
  671. KeyHasNulls = false;
  672. OutputMode = EOutputMode::Unknown;
  673. InitialUsage = std::nullopt;
  674. }
  675. private:
  676. // copypaste to resolve -Woverloaded-virtual
  677. bool Next(NUdf::TUnboxedValue&) override {
  678. this->ThrowNotSupported(__func__);
  679. return false;
  680. }
  681. EFetchResult FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) {
  682. while (EatInput) {
  683. if (!InitialUsage) {
  684. InitialUsage = ctx.HolderFactory.GetPagePool().GetUsed();
  685. }
  686. switch (Fetcher(ctx, Fields.data())) {
  687. case EFetchResult::Yield:
  688. return EFetchResult::Yield;
  689. case EFetchResult::Finish:
  690. EatInput = false;
  691. continue;
  692. default:
  693. break;
  694. }
  695. if (!KeyHasNulls && (Kind == EJoinKind::Exclusion || Kind == EJoinKind::Full)) {
  696. for (ui32 i = 0U; i < Self->KeyColumns.size(); ++i) {
  697. if (!*Fields[Self->KeyColumns[i]]) {
  698. KeyHasNulls = true;
  699. break;
  700. }
  701. }
  702. }
  703. switch (const auto tableIndex = Fields[Self->TableIndexPos]->template Get<ui32>()) {
  704. case LeftIndex:
  705. if (Kind == EJoinKind::RightOnly || (Kind == EJoinKind::Exclusion && !List2.Empty() && !KeyHasNulls)) {
  706. EatInput = false;
  707. OutputMode = EOutputMode::None;
  708. break;
  709. }
  710. if (Self->SortedTableOrder && *Self->SortedTableOrder == RightIndex) {
  711. auto fetcher = IsAnyJoinLeft(Self->AnyJoinSettings) ?
  712. TLiveFetcher(std::bind(Fetcher, std::placeholders::_1, Stubs.data())):
  713. [this] (TComputationContext& ctx, NUdf::TUnboxedValue* output) {
  714. if (const auto status = Fetcher(ctx, Fields.data()); EFetchResult::One != status)
  715. return status;
  716. std::transform(Self->LeftInputColumns.cbegin(), Self->LeftInputColumns.cend(), output, [this] (ui32 index) { return std::move(this->Values[index]); });
  717. return EFetchResult::One;
  718. };
  719. std::transform(Self->LeftInputColumns.cbegin(), Self->LeftInputColumns.cend(), Values.data(), [this] (ui32 index) { return std::move(this->Values[index]); });
  720. List1.Live(std::move(fetcher), Values.data());
  721. EatInput = false;
  722. } else {
  723. NUdf::TUnboxedValue* items = nullptr;
  724. auto value = ctx.HolderFactory.CreateDirectArrayHolder(Self->LeftInputColumns.size(), items);
  725. std::transform(Self->LeftInputColumns.cbegin(), Self->LeftInputColumns.cend(), items, [this] (ui32 index) { return std::move(this->Values[index]); });
  726. List1.Add(std::move(value));
  727. if (ctx.CheckAdjustedMemLimit<TTrackRss>(Self->MemLimit, *InitialUsage)) {
  728. List1.Spill();
  729. }
  730. }
  731. break;
  732. case RightIndex:
  733. if (Kind == EJoinKind::LeftOnly || (Kind == EJoinKind::Exclusion && !List1.Empty() && !KeyHasNulls)) {
  734. EatInput = false;
  735. OutputMode = EOutputMode::None;
  736. break;
  737. }
  738. if (Self->SortedTableOrder && *Self->SortedTableOrder == LeftIndex) {
  739. auto fetcher = IsAnyJoinRight(Self->AnyJoinSettings) ?
  740. TLiveFetcher(std::bind(Fetcher, std::placeholders::_1, Stubs.data())):
  741. [this] (TComputationContext& ctx, NUdf::TUnboxedValue* output) {
  742. if (const auto status = Fetcher(ctx, Fields.data()); EFetchResult::One != status)
  743. return status;
  744. std::transform(Self->RightInputColumns.cbegin(), Self->RightInputColumns.cend(), output, [this] (ui32 index) { return std::move(this->Values[index]); });
  745. return EFetchResult::One;
  746. };
  747. std::transform(Self->RightInputColumns.cbegin(), Self->RightInputColumns.cend(), Values.data(), [this] (ui32 index) { return std::move(this->Values[index]); });
  748. List2.Live(std::move(fetcher), Values.data());
  749. EatInput = false;
  750. } else {
  751. NUdf::TUnboxedValue* items = nullptr;
  752. auto value = ctx.HolderFactory.CreateDirectArrayHolder(Self->RightInputColumns.size(), items);
  753. std::transform(Self->RightInputColumns.cbegin(), Self->RightInputColumns.cend(), items, [this] (ui32 index) { return std::move(this->Values[index]); });
  754. List2.Add(std::move(value));
  755. if (ctx.CheckAdjustedMemLimit<TTrackRss>(Self->MemLimit, *InitialUsage)) {
  756. List2.Spill();
  757. }
  758. }
  759. break;
  760. default: THROW yexception() << "Bad table index: " << tableIndex;
  761. }
  762. }
  763. while (true) {
  764. switch (OutputMode) {
  765. case EOutputMode::Unknown: {
  766. List1.Seal(ctx);
  767. List2.Seal(ctx);
  768. switch (Kind) {
  769. case EJoinKind::Cross:
  770. case EJoinKind::Inner:
  771. if (List1.Empty() || List2.Empty()) {
  772. OutputMode = EOutputMode::None;
  773. }
  774. break;
  775. case EJoinKind::Left:
  776. if (List1.Empty()) {
  777. OutputMode = EOutputMode::None;
  778. }
  779. break;
  780. case EJoinKind::LeftOnly:
  781. if (List1.Empty() || !List2.Empty()) {
  782. OutputMode = EOutputMode::None;
  783. } else {
  784. OutputMode = EOutputMode::RightNull;
  785. }
  786. break;
  787. case EJoinKind::Right:
  788. if (List2.Empty()) {
  789. OutputMode = EOutputMode::None;
  790. }
  791. break;
  792. case EJoinKind::RightOnly:
  793. if (List2.Empty() || !List1.Empty()) {
  794. OutputMode = EOutputMode::None;
  795. } else {
  796. OutputMode = EOutputMode::LeftNull;
  797. }
  798. break;
  799. case EJoinKind::Exclusion:
  800. if (!List1.Empty() && !List2.Empty() && !KeyHasNulls) {
  801. OutputMode = EOutputMode::None;
  802. } else if (List1.Empty()) {
  803. OutputMode = EOutputMode::LeftNull;
  804. } else if (List2.Empty()) {
  805. OutputMode = EOutputMode::RightNull;
  806. } else {
  807. OutputMode = EOutputMode::BothNull;
  808. }
  809. break;
  810. case EJoinKind::Full:
  811. break;
  812. case EJoinKind::LeftSemi:
  813. if (List1.Empty() || List2.Empty()) {
  814. OutputMode = EOutputMode::None;
  815. } else {
  816. OutputMode = EOutputMode::RightNull;
  817. }
  818. break;
  819. case EJoinKind::RightSemi:
  820. if (List1.Empty() || List2.Empty()) {
  821. OutputMode = EOutputMode::None;
  822. } else {
  823. OutputMode = EOutputMode::LeftNull;
  824. }
  825. break;
  826. default:
  827. Y_ABORT("Unknown kind");
  828. }
  829. if (OutputMode == EOutputMode::Unknown) {
  830. if (List1.Empty()) {
  831. OutputMode = EOutputMode::LeftNull;
  832. } else if (List2.Empty()) {
  833. OutputMode = EOutputMode::RightNull;
  834. } else if (List1.IsLive()) {
  835. OutputMode = EOutputMode::Cross;
  836. } else if (List2.IsLive()) {
  837. OutputMode = EOutputMode::CrossSwap;
  838. } else {
  839. OutputMode = List1.GetCount() >= List2.GetCount() ?
  840. EOutputMode::Cross : EOutputMode::CrossSwap;
  841. }
  842. }
  843. }
  844. continue;
  845. case EOutputMode::LeftNull:
  846. if (const auto res = List2.Next(ctx, Values.data()); EFetchResult::One != res) {
  847. return res;
  848. }
  849. PrepareNullItem<true>(ctx, output);
  850. return EFetchResult::One;
  851. case EOutputMode::RightNull:
  852. if (const auto res = List1.Next(ctx, Values.data()); EFetchResult::One != res) {
  853. return res;
  854. }
  855. PrepareNullItem<false>(ctx, output);
  856. return EFetchResult::One;
  857. case EOutputMode::BothNull:
  858. if (CrossMove1) {
  859. switch (List1.Next(ctx, Values.data())) {
  860. case EFetchResult::Finish: CrossMove1 = false; break;
  861. case EFetchResult::Yield: return EFetchResult::Yield;
  862. case EFetchResult::One:
  863. PrepareNullItem<false>(ctx, output);
  864. return EFetchResult::One;
  865. }
  866. }
  867. if (const auto res = List2.Next(ctx, Values.data()); EFetchResult::One != res) {
  868. return res;
  869. }
  870. PrepareNullItem<true>(ctx, output);
  871. return EFetchResult::One;
  872. case EOutputMode::Cross:
  873. return PrepareCrossItem<false>(ctx, output);
  874. case EOutputMode::CrossSwap:
  875. return PrepareCrossItem<true>(ctx, output);
  876. case EOutputMode::None:
  877. return EFetchResult::Finish;
  878. default:
  879. Y_ABORT("Unknown output mode");
  880. }
  881. }
  882. }
  883. template <bool IsLeftNull>
  884. void PrepareNullItem(TComputationContext&, NUdf::TUnboxedValue*const* output) {
  885. for (ui32 i = 0; i < Self->LeftInputColumns.size(); ++i) {
  886. if (const auto out = output[Self->LeftOutputColumns[i]]) {
  887. if constexpr (IsLeftNull) {
  888. *out = NUdf::TUnboxedValuePod();
  889. } else if (Self->IsRequiredColumn[Self->LeftInputColumns[i]]) {
  890. *out = Values[i].Release().GetOptionalValue();
  891. } else {
  892. *out = std::move(Values[i]);
  893. }
  894. }
  895. }
  896. for (ui32 i = 0; i < Self->RightInputColumns.size(); ++i) {
  897. if (const auto out = output[Self->RightOutputColumns[i]]) {
  898. if constexpr (!IsLeftNull) {
  899. *out = NUdf::TUnboxedValuePod();
  900. } else if (Self->IsRequiredColumn[Self->RightInputColumns[i]]) {
  901. *out = Values[i].Release().GetOptionalValue();
  902. } else {
  903. *out = std::move(Values[i]);
  904. }
  905. }
  906. }
  907. }
  908. template <bool SwapLists>
  909. EFetchResult PrepareCrossItem(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) {
  910. if (KeyHasNulls) {
  911. for (;;) {
  912. if (const auto res = (CrossMove1 == SwapLists ? List2 : List1).Next(ctx, Values.data()); EFetchResult::Finish == res && CrossMove1) {
  913. CrossMove1 = false;
  914. continue;
  915. } else if (EFetchResult::One != res) {
  916. return res;
  917. }
  918. if (CrossMove1 == SwapLists)
  919. PrepareNullItem<true>(ctx, output);
  920. else
  921. PrepareNullItem<false>(ctx, output);
  922. return EFetchResult::One;
  923. }
  924. }
  925. for (;;) {
  926. if (CrossMove1) {
  927. if (const auto res = (SwapLists ? List2 : List1).Next(ctx, CrossValues1.data()); EFetchResult::One != res) {
  928. return res;
  929. }
  930. CrossMove1 = false;
  931. (SwapLists ? List1 : List2).Rewind();
  932. }
  933. if (const auto res = (SwapLists ? List1 : List2).Next(ctx, CrossValues2.data()); EFetchResult::Finish == res) {
  934. CrossMove1 = true;
  935. continue;
  936. } else if (EFetchResult::Yield == res) {
  937. return EFetchResult::Yield;
  938. }
  939. const auto& lValues = SwapLists ? CrossValues2 : CrossValues1;
  940. const auto& rValues = SwapLists ? CrossValues1 : CrossValues2;
  941. for (ui32 i = 0; i < Self->LeftInputColumns.size(); ++i) {
  942. if (const auto out = output[Self->LeftOutputColumns[i]]) {
  943. if (Self->IsRequiredColumn[Self->LeftInputColumns[i]]) {
  944. *out = NUdf::TUnboxedValue(lValues[i]).Release().GetOptionalValue();
  945. } else {
  946. *out = lValues[i];
  947. }
  948. }
  949. }
  950. for (ui32 i = 0; i < Self->RightInputColumns.size(); ++i) {
  951. if (const auto out = output[Self->RightOutputColumns[i]]) {
  952. if (Self->IsRequiredColumn[Self->RightInputColumns[i]]) {
  953. *out = NUdf::TUnboxedValue(rValues[i]).Release().GetOptionalValue();
  954. } else {
  955. *out = rValues[i];
  956. }
  957. }
  958. }
  959. return EFetchResult::One;
  960. }
  961. }
  962. private:
  963. static std::vector<NUdf::TUnboxedValue*> GetPointers(std::vector<NUdf::TUnboxedValue>& array) {
  964. std::vector<NUdf::TUnboxedValue*> pointers;
  965. pointers.reserve(array.size());
  966. std::transform(array.begin(), array.end(), std::back_inserter(pointers), [](NUdf::TUnboxedValue& v) { return std::addressof(v); });
  967. return pointers;
  968. }
  969. const TSelf* const Self;
  970. TFetcher Fetcher;
  971. bool EatInput;
  972. bool KeyHasNulls;
  973. std::optional<ui64> InitialUsage;
  974. EOutputMode OutputMode;
  975. bool CrossMove1;
  976. std::vector<NUdf::TUnboxedValue> Values, CrossValues1, CrossValues2;
  977. TSpillList List1, List2;
  978. NUdf::TUnboxedValue* ResItems = nullptr;
  979. const std::vector<NUdf::TUnboxedValue*> Fields;
  980. const std::vector<NUdf::TUnboxedValue*> Stubs;
  981. };
  982. TWideCommonJoinCoreWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, const TTupleType* inputLeftType, const TTupleType* inputRightType,
  983. std::vector<EValueRepresentation>&& inputRepresentations, std::vector<EValueRepresentation>&& outputRepresentations, ui32 tableIndexPos,
  984. std::vector<ui32>&& leftInputColumns, std::vector<ui32>&& rightInputColumns, std::vector<ui32>&& requiredColumns,
  985. std::vector<ui32>&& leftOutputColumns, std::vector<ui32>&& rightOutputColumns, ui64 memLimit,
  986. std::optional<ui32> sortedTableOrder, std::vector<ui32>&& keyColumns, EAnyJoinSettings anyJoinSettings)
  987. : TBaseComputation(mutables, flow, EValueRepresentation::Any)
  988. , Flow(flow), InputRepresentations(std::move(inputRepresentations)), OutputRepresentations(std::move(outputRepresentations))
  989. , InputLeftType(inputLeftType), InputRightType(inputRightType)
  990. , PackerLeft(mutables), PackerRight(mutables)
  991. , TableIndexPos(tableIndexPos)
  992. , LeftInputColumns(std::move(leftInputColumns))
  993. , RightInputColumns(std::move(rightInputColumns))
  994. , RequiredColumns(std::move(requiredColumns))
  995. , LeftOutputColumns(std::move(leftOutputColumns))
  996. , RightOutputColumns(std::move(rightOutputColumns))
  997. , MemLimit(memLimit)
  998. , SortedTableOrder(sortedTableOrder)
  999. , KeyColumns(std::move(keyColumns))
  1000. , IsRequiredColumn(FillRequiredStructColumn(InputRepresentations.size(), RequiredColumns))
  1001. , AnyJoinSettings(anyJoinSettings)
  1002. {}
  1003. EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
  1004. if (state.IsInvalid()) {
  1005. MakeState(ctx, state);
  1006. }
  1007. return static_cast<TValue*>(state.AsBoxed().Get())->FetchValues(ctx, output);
  1008. }
  1009. #ifndef MKQL_DISABLE_CODEGEN
  1010. ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
  1011. auto& context = ctx.Codegen.GetContext();
  1012. const auto valueType = Type::getInt128Ty(context);
  1013. const auto indexType = Type::getInt32Ty(context);
  1014. const auto size = LeftOutputColumns.size() + RightOutputColumns.size();
  1015. const auto arrayType = ArrayType::get(valueType, size);
  1016. const auto fieldsType = ArrayType::get(PointerType::getUnqual(valueType), size);
  1017. const auto atTop = &ctx.Func->getEntryBlock().back();
  1018. const auto values = new AllocaInst(arrayType, 0U, "values", atTop);
  1019. const auto fields = new AllocaInst(fieldsType, 0U, "fields", atTop);
  1020. ICodegeneratorInlineWideNode::TGettersList getters(size);
  1021. Value* initV = UndefValue::get(arrayType);
  1022. Value* initF = UndefValue::get(fieldsType);
  1023. std::vector<Value*> pointers;
  1024. pointers.reserve(size);
  1025. for (auto i = 0U; i < size; ++i) {
  1026. pointers.emplace_back(GetElementPtrInst::CreateInBounds(arrayType, values, {ConstantInt::get(indexType, 0), ConstantInt::get(indexType, i)}, (TString("ptr_") += ToString(i)).c_str(), atTop));
  1027. initV = InsertValueInst::Create(initV, ConstantInt::get(valueType, 0), {i}, (TString("zero_") += ToString(i)).c_str(), atTop);
  1028. initF = InsertValueInst::Create(initF, pointers.back(), {i}, (TString("insert_") += ToString(i)).c_str(), atTop);
  1029. getters[i] = [i, values, valueType, indexType, arrayType](const TCodegenContext& ctx, BasicBlock*& block) {
  1030. Y_UNUSED(ctx);
  1031. const auto pointer = GetElementPtrInst::CreateInBounds(arrayType, values, {ConstantInt::get(indexType, 0), ConstantInt::get(indexType, i)}, (TString("ptr_") += ToString(i)).c_str(), block);
  1032. return new LoadInst(valueType, pointer, (TString("load_") += ToString(i)).c_str(), block);
  1033. };
  1034. }
  1035. new StoreInst(initV, values, atTop);
  1036. new StoreInst(initF, fields, atTop);
  1037. TLLVMFieldsStructure<TComputationValue<TNull>> fieldsStruct(context);
  1038. const auto stateType = StructType::get(context, fieldsStruct.GetFieldsArray());
  1039. const auto statePtrType = PointerType::getUnqual(stateType);
  1040. const auto make = BasicBlock::Create(context, "make", ctx.Func);
  1041. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  1042. BranchInst::Create(make, main, IsInvalid(statePtr, block, context), block);
  1043. block = make;
  1044. const auto ptrType = PointerType::getUnqual(StructType::get(context));
  1045. const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
  1046. const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TWideCommonJoinCoreWrapper::MakeState));
  1047. const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false);
  1048. const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block);
  1049. CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block);
  1050. BranchInst::Create(main, block);
  1051. block = main;
  1052. for (ui32 i = 0U; i < OutputRepresentations.size(); ++i) {
  1053. ValueCleanup(OutputRepresentations[i], pointers[i], ctx, block);
  1054. }
  1055. new StoreInst(initV, values, block);
  1056. const auto state = new LoadInst(valueType, statePtr, "state", block);
  1057. const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
  1058. const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block);
  1059. const auto func = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TValue::FetchValues));
  1060. const auto funcType = FunctionType::get(Type::getInt32Ty(context), { statePtrType, ctx.Ctx->getType(), fields->getType() }, false);
  1061. const auto funcPtr = CastInst::Create(Instruction::IntToPtr, func, PointerType::getUnqual(funcType), "fetch_func", block);
  1062. const auto result = CallInst::Create(funcType, funcPtr, { stateArg, ctx.Ctx, fields }, "fetch", block);
  1063. for (ui32 i = 0U; i < OutputRepresentations.size(); ++i) {
  1064. ValueRelease(OutputRepresentations[i], pointers[i], ctx, block);
  1065. }
  1066. return {result, std::move(getters)};
  1067. }
  1068. #endif
  1069. private:
  1070. void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
  1071. #ifdef MKQL_DISABLE_CODEGEN
  1072. state = ctx.HolderFactory.Create<TValue>(ctx, this, std::bind(&IComputationWideFlowNode::FetchValues, Flow, std::placeholders::_1, std::placeholders::_2));
  1073. #else
  1074. state = ctx.ExecuteLLVM && Fetch ?
  1075. ctx.HolderFactory.Create<TValue>(ctx, this, Fetch):
  1076. ctx.HolderFactory.Create<TValue>(ctx, this, std::bind(&IComputationWideFlowNode::FetchValues, Flow, std::placeholders::_1, std::placeholders::_2));
  1077. #endif
  1078. }
  1079. void RegisterDependencies() const final {
  1080. this->FlowDependsOn(Flow);
  1081. }
  1082. IComputationWideFlowNode* const Flow;
  1083. const std::vector<EValueRepresentation> InputRepresentations;
  1084. const std::vector<EValueRepresentation> OutputRepresentations;
  1085. const TTupleType* const InputLeftType;
  1086. const TTupleType* const InputRightType;
  1087. const TMutableObjectOverBoxedValue<TValuePackerBoxed> PackerLeft, PackerRight;
  1088. const ui32 TableIndexPos;
  1089. const std::vector<ui32> LeftInputColumns;
  1090. const std::vector<ui32> RightInputColumns;
  1091. const std::vector<ui32> RequiredColumns;
  1092. const std::vector<ui32> LeftOutputColumns;
  1093. const std::vector<ui32> RightOutputColumns;
  1094. const ui64 MemLimit;
  1095. const std::optional<ui32> SortedTableOrder;
  1096. const std::vector<ui32> KeyColumns;
  1097. const std::vector<bool> IsRequiredColumn;
  1098. const EAnyJoinSettings AnyJoinSettings;
  1099. #ifndef MKQL_DISABLE_CODEGEN
  1100. typedef EFetchResult (*TFetchPtr)(TComputationContext&, NUdf::TUnboxedValue*const*);
  1101. TFetchPtr Fetch = nullptr;
  1102. Function* FetchFunc = nullptr;
  1103. void FinalizeFunctions(NYql::NCodegen::ICodegen& codegen) final {
  1104. if (FetchFunc) {
  1105. Fetch = reinterpret_cast<TFetchPtr>(codegen.GetPointerToFunction(FetchFunc));
  1106. }
  1107. }
  1108. void GenerateFunctions(NYql::NCodegen::ICodegen& codegen) final {
  1109. codegen.ExportSymbol(FetchFunc = GenerateFetchFunction(codegen));
  1110. }
  1111. TString MakeName() const {
  1112. TStringStream out;
  1113. out << this->DebugString() << "::Fetch_(" << static_cast<const void*>(this) << ").";
  1114. return out.Str();
  1115. }
  1116. Function* GenerateFetchFunction(NYql::NCodegen::ICodegen& codegen) const {
  1117. auto& module = codegen.GetModule();
  1118. auto& context = codegen.GetContext();
  1119. const auto& name = MakeName();
  1120. if (const auto f = module.getFunction(name.c_str()))
  1121. return f;
  1122. const auto valueType = Type::getInt128Ty(context);
  1123. const auto pointerType = PointerType::getUnqual(valueType);
  1124. const auto arrayType = ArrayType::get(pointerType, InputRepresentations.size());
  1125. const auto contextType = GetCompContextType(context);
  1126. const auto resultType = Type::getInt32Ty(context);
  1127. const auto funcType = FunctionType::get(resultType, {PointerType::getUnqual(contextType), PointerType::getUnqual(arrayType)}, false);
  1128. TCodegenContext ctx(codegen);
  1129. ctx.Func = cast<Function>(module.getOrInsertFunction(name.c_str(), funcType).getCallee());
  1130. DISubprogramAnnotator annotator(ctx, ctx.Func);
  1131. auto args = ctx.Func->arg_begin();
  1132. ctx.Ctx = &*args;
  1133. const auto outputArg = &*++args;
  1134. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  1135. const auto good = BasicBlock::Create(context, "good", ctx.Func);
  1136. const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
  1137. auto block = main;
  1138. const auto result = GetNodeValues(Flow, ctx, block);
  1139. const auto special = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_SLE, result.first, ConstantInt::get(result.first->getType(), 0), "special", block);
  1140. BranchInst::Create(exit, good, special, block);
  1141. block = good;
  1142. const auto fields = new LoadInst(arrayType, outputArg, "fields", block);
  1143. for (ui32 i = 0U; i < InputRepresentations.size(); ++i) {
  1144. const auto save = BasicBlock::Create(context, (TString("save_") += ToString(i)).c_str(), ctx.Func);
  1145. const auto skip = BasicBlock::Create(context, (TString("skip_") += ToString(i)).c_str(), ctx.Func);
  1146. const auto pointer = ExtractValueInst::Create(fields, i, (TString("pointer_") += ToString(i)).c_str(), block);
  1147. const auto null = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, pointer, ConstantPointerNull::get(pointerType), (TString("null_") += ToString(i)).c_str(), block);
  1148. BranchInst::Create(skip, save, null, block);
  1149. block = save;
  1150. const auto value = result.second[i](ctx, block);
  1151. ValueUnRef(InputRepresentations[i], pointer, ctx, block);
  1152. new StoreInst(value, pointer, block);
  1153. ValueAddRef(InputRepresentations[i], value, ctx, block);
  1154. BranchInst::Create(skip, block);
  1155. block = skip;
  1156. }
  1157. BranchInst::Create(exit, block);
  1158. block = exit;
  1159. ReturnInst::Create(context, result.first, block);
  1160. return ctx.Func;
  1161. }
  1162. #endif
  1163. };
  1164. }
  1165. namespace NStream {
  1166. class TSpillList {
  1167. public:
  1168. TSpillList(TValuePacker& itemPacker, bool singleShot)
  1169. : ItemPacker(itemPacker)
  1170. , Ctx(nullptr)
  1171. , Count(0)
  1172. #ifndef NDEBUG
  1173. , IsSealed(false)
  1174. #endif
  1175. , Index(ui64(-1))
  1176. , SingleShot(singleShot)
  1177. {}
  1178. TSpillList(TSpillList&& rhs) = delete;
  1179. TSpillList(const TSpillList& rhs) = delete;
  1180. void operator=(const TSpillList& rhs) = delete;
  1181. void Init(TComputationContext& ctx) {
  1182. Ctx = &ctx;
  1183. Count = 0;
  1184. #ifndef NDEBUG
  1185. IsSealed = false;
  1186. #endif
  1187. Index = ui64(-1);
  1188. FileState = nullptr;
  1189. Heap.clear();
  1190. LiveStream = NUdf::TUnboxedValue();
  1191. LiveValue = NUdf::TUnboxedValue();
  1192. }
  1193. TComputationContext& GetCtx() const {
  1194. return *Ctx;
  1195. }
  1196. bool Spill() {
  1197. if (FileState) {
  1198. return false;
  1199. }
  1200. FileState.reset(new TFileState);
  1201. OpenWrite();
  1202. for (ui32 i = 0; i < Count; ++i) {
  1203. Write(std::move(InMemory(i)));
  1204. }
  1205. Heap.clear();
  1206. return true;
  1207. }
  1208. void Live(NUdf::TUnboxedValue& stream, NUdf::TUnboxedValue&& liveValue) {
  1209. Y_DEBUG_ABORT_UNLESS(!IsLive());
  1210. Y_DEBUG_ABORT_UNLESS(Count == 0);
  1211. LiveStream = stream;
  1212. LiveValue = std::move(liveValue);
  1213. }
  1214. void Add(NUdf::TUnboxedValue&& value) {
  1215. #ifndef NDEBUG
  1216. Y_DEBUG_ABORT_UNLESS(!IsSealed);
  1217. #endif
  1218. if (SingleShot && Count > 0) {
  1219. MKQL_ENSURE(Count == 1, "Counter inconsistent");
  1220. return;
  1221. }
  1222. if (FileState) {
  1223. Write(std::move(value));
  1224. } else {
  1225. if (Count < DEFAULT_STACK_ITEMS) {
  1226. Stack[Count] = std::move(value);
  1227. }
  1228. else {
  1229. if (Count == DEFAULT_STACK_ITEMS) {
  1230. Y_DEBUG_ABORT_UNLESS(Heap.empty());
  1231. Heap.assign(Stack, Stack + DEFAULT_STACK_ITEMS);
  1232. }
  1233. Heap.push_back(std::move(value));
  1234. }
  1235. }
  1236. ++Count;
  1237. }
  1238. void Seal() {
  1239. #ifndef NDEBUG
  1240. IsSealed = true;
  1241. #endif
  1242. if (FileState) {
  1243. FileState->Output->Finish();
  1244. Cerr << "Spill finished at " << Count << " items" << Endl;
  1245. FileState->Output.reset();
  1246. Cerr << "File size: " << GetFileLength(FileState->File.GetName()) << ", expected: " << FileState->TotalSize << Endl;
  1247. MKQL_INC_STAT(Ctx->Stats, Join_Spill_Count);
  1248. MKQL_SET_MAX_STAT(Ctx->Stats, Join_Spill_MaxFileSize, static_cast<i64>(FileState->TotalSize));
  1249. MKQL_SET_MAX_STAT(Ctx->Stats, Join_Spill_MaxRowsCount, static_cast<i64>(Count));
  1250. }
  1251. }
  1252. bool IsLive() const {
  1253. return bool(LiveStream);
  1254. }
  1255. ui64 GetCount() const {
  1256. Y_DEBUG_ABORT_UNLESS(!IsLive());
  1257. return Count;
  1258. }
  1259. bool Empty() const {
  1260. return !IsLive() && (Count == 0);
  1261. }
  1262. NUdf::EFetchStatus Next(NUdf::TUnboxedValue& result) {
  1263. #ifndef NDEBUG
  1264. Y_DEBUG_ABORT_UNLESS(IsSealed);
  1265. #endif
  1266. if (IsLive()) {
  1267. auto status = NUdf::EFetchStatus::Ok;
  1268. NUdf::TUnboxedValue value;
  1269. if ((Index + 1) == 0) {
  1270. value = std::move(LiveValue);
  1271. } else {
  1272. status = LiveStream.Fetch(value);
  1273. while (SingleShot && status == NUdf::EFetchStatus::Ok) {
  1274. // skip all remaining values
  1275. status = LiveStream.Fetch(value);
  1276. }
  1277. }
  1278. if (status == NUdf::EFetchStatus::Ok) {
  1279. result = std::move(value);
  1280. ++Index;
  1281. }
  1282. return status;
  1283. }
  1284. if ((Index + 1) == Count) {
  1285. return NUdf::EFetchStatus::Finish;
  1286. }
  1287. ++Index;
  1288. if (FileState) {
  1289. if (Index == 0) {
  1290. OpenRead();
  1291. }
  1292. result = Read();
  1293. return NUdf::EFetchStatus::Ok;
  1294. }
  1295. result = InMemory(Index);
  1296. return NUdf::EFetchStatus::Ok;
  1297. }
  1298. void Rewind() {
  1299. Y_DEBUG_ABORT_UNLESS(!IsLive());
  1300. #ifndef NDEBUG
  1301. Y_DEBUG_ABORT_UNLESS(IsSealed);
  1302. #endif
  1303. Index = ui64(-1);
  1304. if (FileState) {
  1305. OpenRead();
  1306. }
  1307. }
  1308. private:
  1309. NUdf::TUnboxedValue& InMemory(ui32 index) {
  1310. return !Heap.empty() ? Heap[index] : Stack[index];
  1311. }
  1312. const NUdf::TUnboxedValue& InMemory(ui32 index) const {
  1313. return !Heap.empty() ? Heap[index] : Stack[index];
  1314. }
  1315. void OpenWrite() {
  1316. Cerr << "Spill started at " << Count << " items to " << FileState->File.GetName() << Endl;
  1317. FileState->Output.reset(new TFixedBufferFileOutput(FileState->File.GetName()));
  1318. FileState->Output->SetFlushPropagateMode(false);
  1319. FileState->Output->SetFinishPropagateMode(false);
  1320. }
  1321. void Write(NUdf::TUnboxedValue&& value) {
  1322. Y_DEBUG_ABORT_UNLESS(FileState->Output);
  1323. TStringBuf serialized = ItemPacker.Pack(value);
  1324. ui32 length = serialized.size();
  1325. FileState->Output->Write(&length, sizeof(length));
  1326. FileState->Output->Write(serialized.data(), length);
  1327. FileState->TotalSize += sizeof(length);
  1328. FileState->TotalSize += length;
  1329. }
  1330. void OpenRead() {
  1331. FileState->Input.reset();
  1332. FileState->Input.reset(new TFileInput(FileState->File.GetName()));
  1333. }
  1334. NUdf::TUnboxedValue Read() {
  1335. ui32 length = 0;
  1336. auto wasRead = FileState->Input->Load(&length, sizeof(length));
  1337. Y_ABORT_UNLESS(wasRead == sizeof(length));
  1338. FileState->Buffer.Reserve(length);
  1339. wasRead = FileState->Input->Load((void*)FileState->Buffer.Data(), length);
  1340. Y_ABORT_UNLESS(wasRead == length);
  1341. return ItemPacker.Unpack(TStringBuf(FileState->Buffer.Data(), length), Ctx->HolderFactory);
  1342. }
  1343. private:
  1344. TValuePacker& ItemPacker;
  1345. TComputationContext* Ctx;
  1346. ui64 Count;
  1347. NUdf::TUnboxedValue Stack[DEFAULT_STACK_ITEMS];
  1348. TUnboxedValueVector Heap;
  1349. #ifndef NDEBUG
  1350. bool IsSealed;
  1351. #endif
  1352. ui64 Index;
  1353. const bool SingleShot;
  1354. struct TFileState {
  1355. TFileState()
  1356. : File(TTempFileHandle::InCurrentDir())
  1357. , TotalSize(0)
  1358. {}
  1359. TTempFileHandle File;
  1360. ui64 TotalSize;
  1361. std::unique_ptr<TFileInput> Input;
  1362. std::unique_ptr<TFixedBufferFileOutput> Output;
  1363. TBuffer Buffer;
  1364. };
  1365. std::unique_ptr<TFileState> FileState;
  1366. NUdf::TUnboxedValue LiveStream;
  1367. NUdf::TUnboxedValue LiveValue;
  1368. };
  1369. template <EJoinKind Kind, bool TTrackRss>
  1370. class TCommonJoinCoreWrapper : public TMutableComputationNode<TCommonJoinCoreWrapper<Kind, TTrackRss>> {
  1371. using TSelf = TCommonJoinCoreWrapper<Kind, TTrackRss>;
  1372. using TBase = TMutableComputationNode<TSelf>;
  1373. typedef TBase TBaseComputation;
  1374. public:
  1375. class TValue : public TComputationValue<TValue> {
  1376. public:
  1377. using TBase = TComputationValue<TValue>;
  1378. TValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& stream,
  1379. TComputationContext& ctx, const TSelf* self)
  1380. : TBase(memInfo)
  1381. , Stream(std::move(stream))
  1382. , Ctx(ctx)
  1383. , Self(self)
  1384. , List1(Self->Packer.RefMutableObject(ctx, false, Self->InputStructType), IsAnyJoinLeft(Self->AnyJoinSettings))
  1385. , List2(Self->Packer.RefMutableObject(ctx, false, Self->InputStructType), IsAnyJoinRight(Self->AnyJoinSettings))
  1386. {
  1387. Init();
  1388. }
  1389. void Reset(NUdf::TUnboxedValue&& stream) {
  1390. Stream = std::move(stream);
  1391. Init();
  1392. }
  1393. void Init() {
  1394. List1.Init(Ctx);
  1395. List2.Init(Ctx);
  1396. CrossMove1 = true;
  1397. EatInput = true;
  1398. KeyHasNulls = false;
  1399. OutputMode = EOutputMode::Unknown;
  1400. InitialUsage = std::nullopt;
  1401. }
  1402. private:
  1403. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
  1404. while (EatInput) {
  1405. if (!InitialUsage) {
  1406. InitialUsage = Ctx.HolderFactory.GetPagePool().GetUsed();
  1407. }
  1408. NUdf::TUnboxedValue value;
  1409. const auto status = Stream.Fetch(value);
  1410. if (status == NUdf::EFetchStatus::Yield) {
  1411. return status;
  1412. }
  1413. if (status == NUdf::EFetchStatus::Finish) {
  1414. EatInput = false;
  1415. } else {
  1416. if (!KeyHasNulls && (Kind == EJoinKind::Exclusion || Kind == EJoinKind::Full)) {
  1417. for (ui32 i = 0U; i < Self->KeyColumns.size(); ++i) {
  1418. if (!value.GetElement(Self->KeyColumns[i])) {
  1419. KeyHasNulls = true;
  1420. break;
  1421. }
  1422. }
  1423. }
  1424. switch (const auto tableIndex = value.GetElement(Self->TableIndexPos).template Get<ui32>()) {
  1425. case LeftIndex:
  1426. if (Kind == EJoinKind::RightOnly || (Kind == EJoinKind::Exclusion && !List2.Empty() && !KeyHasNulls)) {
  1427. EatInput = false;
  1428. OutputMode = EOutputMode::None;
  1429. break;
  1430. }
  1431. if (Self->SortedTableOrder && *Self->SortedTableOrder == RightIndex) {
  1432. List1.Live(Stream, std::move(value));
  1433. EatInput = false;
  1434. } else {
  1435. List1.Add(std::move(value));
  1436. if (Ctx.CheckAdjustedMemLimit<TTrackRss>(Self->MemLimit, *InitialUsage)) {
  1437. List1.Spill();
  1438. }
  1439. }
  1440. break;
  1441. case RightIndex:
  1442. if (Kind == EJoinKind::LeftOnly || (Kind == EJoinKind::Exclusion && !List1.Empty() && !KeyHasNulls)) {
  1443. EatInput = false;
  1444. OutputMode = EOutputMode::None;
  1445. break;
  1446. }
  1447. if (Self->SortedTableOrder && *Self->SortedTableOrder == LeftIndex) {
  1448. List2.Live(Stream, std::move(value));
  1449. EatInput = false;
  1450. } else {
  1451. List2.Add(std::move(value));
  1452. if (Ctx.CheckAdjustedMemLimit<TTrackRss>(Self->MemLimit, *InitialUsage)) {
  1453. List2.Spill();
  1454. }
  1455. }
  1456. break;
  1457. default: THROW yexception() << "Bad table index: " << tableIndex;
  1458. }
  1459. }
  1460. }
  1461. while (true) {
  1462. switch (OutputMode) {
  1463. case EOutputMode::Unknown: {
  1464. List1.Seal();
  1465. List2.Seal();
  1466. switch (Kind) {
  1467. case EJoinKind::Cross:
  1468. case EJoinKind::Inner:
  1469. if (List1.Empty() || List2.Empty()) {
  1470. OutputMode = EOutputMode::None;
  1471. }
  1472. break;
  1473. case EJoinKind::Left:
  1474. if (List1.Empty()) {
  1475. OutputMode = EOutputMode::None;
  1476. }
  1477. break;
  1478. case EJoinKind::LeftOnly:
  1479. if (List1.Empty() || !List2.Empty()) {
  1480. OutputMode = EOutputMode::None;
  1481. } else {
  1482. OutputMode = EOutputMode::RightNull;
  1483. }
  1484. break;
  1485. case EJoinKind::Right:
  1486. if (List2.Empty()) {
  1487. OutputMode = EOutputMode::None;
  1488. }
  1489. break;
  1490. case EJoinKind::RightOnly:
  1491. if (List2.Empty() || !List1.Empty()) {
  1492. OutputMode = EOutputMode::None;
  1493. } else {
  1494. OutputMode = EOutputMode::LeftNull;
  1495. }
  1496. break;
  1497. case EJoinKind::Exclusion:
  1498. if (!List1.Empty() && !List2.Empty() && !KeyHasNulls) {
  1499. OutputMode = EOutputMode::None;
  1500. } else if (List1.Empty()) {
  1501. OutputMode = EOutputMode::LeftNull;
  1502. } else if (List2.Empty()) {
  1503. OutputMode = EOutputMode::RightNull;
  1504. } else {
  1505. OutputMode = EOutputMode::BothNull;
  1506. }
  1507. break;
  1508. case EJoinKind::Full:
  1509. break;
  1510. case EJoinKind::LeftSemi:
  1511. if (List1.Empty() || List2.Empty()) {
  1512. OutputMode = EOutputMode::None;
  1513. } else {
  1514. OutputMode = EOutputMode::RightNull;
  1515. }
  1516. break;
  1517. case EJoinKind::RightSemi:
  1518. if (List1.Empty() || List2.Empty()) {
  1519. OutputMode = EOutputMode::None;
  1520. } else {
  1521. OutputMode = EOutputMode::LeftNull;
  1522. }
  1523. break;
  1524. default:
  1525. Y_ABORT("Unknown kind");
  1526. }
  1527. if (OutputMode == EOutputMode::Unknown) {
  1528. if (List1.Empty()) {
  1529. OutputMode = EOutputMode::LeftNull;
  1530. } else if (List2.Empty()) {
  1531. OutputMode = EOutputMode::RightNull;
  1532. } else if (List1.IsLive()) {
  1533. OutputMode = EOutputMode::Cross;
  1534. } else if (List2.IsLive()) {
  1535. OutputMode = EOutputMode::CrossSwap;
  1536. } else {
  1537. OutputMode = List1.GetCount() >= List2.GetCount() ?
  1538. EOutputMode::Cross : EOutputMode::CrossSwap;
  1539. }
  1540. }
  1541. }
  1542. continue;
  1543. case EOutputMode::LeftNull: {
  1544. NUdf::TUnboxedValue value;
  1545. auto status = List2.Next(value);
  1546. if (status != NUdf::EFetchStatus::Ok) {
  1547. return status;
  1548. }
  1549. result = PrepareNullItem<true>(value);
  1550. return NUdf::EFetchStatus::Ok;
  1551. }
  1552. break;
  1553. case EOutputMode::RightNull: {
  1554. NUdf::TUnboxedValue value;
  1555. auto status = List1.Next(value);
  1556. if (status != NUdf::EFetchStatus::Ok) {
  1557. return status;
  1558. }
  1559. result = PrepareNullItem<false>(value);
  1560. return NUdf::EFetchStatus::Ok;
  1561. }
  1562. break;
  1563. case EOutputMode::BothNull: {
  1564. NUdf::TUnboxedValue value;
  1565. if (CrossMove1) {
  1566. switch (const auto status = List1.Next(value)) {
  1567. case NUdf::EFetchStatus::Finish: CrossMove1 = false; break;
  1568. case NUdf::EFetchStatus::Yield: return status;
  1569. case NUdf::EFetchStatus::Ok:
  1570. result = PrepareNullItem<false>(value);
  1571. return NUdf::EFetchStatus::Ok;
  1572. }
  1573. }
  1574. switch (const auto status = List2.Next(value)) {
  1575. case NUdf::EFetchStatus::Yield:
  1576. case NUdf::EFetchStatus::Finish: return status;
  1577. case NUdf::EFetchStatus::Ok:
  1578. result = PrepareNullItem<true>(value);
  1579. return NUdf::EFetchStatus::Ok;
  1580. }
  1581. }
  1582. break;
  1583. case EOutputMode::Cross:
  1584. return PrepareCrossItem<false>(result);
  1585. case EOutputMode::CrossSwap:
  1586. return PrepareCrossItem<true>(result);
  1587. case EOutputMode::None:
  1588. return NUdf::EFetchStatus::Finish;
  1589. default:
  1590. Y_ABORT("Unknown output mode");
  1591. }
  1592. }
  1593. }
  1594. template <bool IsLeftNull>
  1595. NUdf::TUnboxedValue PrepareNullItem(const NUdf::TUnboxedValue& value) {
  1596. const auto structObj = Self->ResStruct.NewArray(Ctx, Self->LeftInputColumns.size() + Self->RightInputColumns.size(), ResItems);
  1597. for (ui32 i = 0; i < Self->LeftInputColumns.size(); ++i) {
  1598. ui32 inIndex = Self->LeftInputColumns[i];
  1599. ui32 outIndex = Self->LeftOutputColumns[i];
  1600. if (IsLeftNull) {
  1601. ResItems[outIndex] = NUdf::TUnboxedValuePod();
  1602. continue;
  1603. }
  1604. auto member = value.GetElement(inIndex);
  1605. if (Self->IsRequiredColumn[inIndex]) {
  1606. ResItems[outIndex] = member.Release().GetOptionalValue();
  1607. } else {
  1608. ResItems[outIndex] = std::move(member);
  1609. }
  1610. }
  1611. for (ui32 i = 0; i < Self->RightInputColumns.size(); ++i) {
  1612. ui32 inIndex = Self->RightInputColumns[i];
  1613. ui32 outIndex = Self->RightOutputColumns[i];
  1614. if (!IsLeftNull) {
  1615. ResItems[outIndex] = NUdf::TUnboxedValuePod();
  1616. continue;
  1617. }
  1618. auto member = value.GetElement(inIndex);
  1619. if (Self->IsRequiredColumn[inIndex]) {
  1620. ResItems[outIndex] = member.Release().GetOptionalValue();
  1621. }
  1622. else {
  1623. ResItems[outIndex] = std::move(member);
  1624. }
  1625. }
  1626. return structObj;
  1627. }
  1628. template <bool SwapLists>
  1629. NUdf::EFetchStatus PrepareCrossItem(NUdf::TUnboxedValue& result) {
  1630. if (KeyHasNulls) {
  1631. for (;;) {
  1632. NUdf::TUnboxedValue value;
  1633. auto status = (CrossMove1 == SwapLists ? List2 : List1).Next(value);
  1634. if (status == NUdf::EFetchStatus::Finish && CrossMove1) {
  1635. CrossMove1 = false;
  1636. continue;
  1637. }
  1638. if (status != NUdf::EFetchStatus::Ok) {
  1639. return status;
  1640. }
  1641. result = (CrossMove1 == SwapLists) ? PrepareNullItem<true>(value) : PrepareNullItem<false>(value);
  1642. return status;
  1643. }
  1644. }
  1645. for (;;) {
  1646. if (CrossMove1) {
  1647. auto status = (SwapLists ? List2 : List1).Next(CrossValue1);
  1648. if (status != NUdf::EFetchStatus::Ok) {
  1649. return status;
  1650. }
  1651. CrossMove1 = false;
  1652. (SwapLists ? List1 : List2).Rewind();
  1653. }
  1654. auto status = (SwapLists ? List1 : List2).Next(CrossValue2);
  1655. MKQL_ENSURE(status != NUdf::EFetchStatus::Yield, "Unexpected stream status");
  1656. if (status == NUdf::EFetchStatus::Finish) {
  1657. CrossMove1 = true;
  1658. continue;
  1659. }
  1660. auto structObj = Self->ResStruct.NewArray(Ctx, Self->LeftInputColumns.size() + Self->RightInputColumns.size(), ResItems);
  1661. for (ui32 i = 0; i < Self->LeftInputColumns.size(); ++i) {
  1662. ui32 inIndex = Self->LeftInputColumns[i];
  1663. ui32 outIndex = Self->LeftOutputColumns[i];
  1664. auto member = (SwapLists ? CrossValue2 : CrossValue1).GetElement(inIndex);
  1665. if (Self->IsRequiredColumn[inIndex]) {
  1666. ResItems[outIndex] = member.Release().GetOptionalValue();
  1667. } else {
  1668. ResItems[outIndex] = std::move(member);
  1669. }
  1670. }
  1671. for (ui32 i = 0; i < Self->RightInputColumns.size(); ++i) {
  1672. ui32 inIndex = Self->RightInputColumns[i];
  1673. ui32 outIndex = Self->RightOutputColumns[i];
  1674. auto member = (SwapLists ? CrossValue1 : CrossValue2).GetElement(inIndex);
  1675. if (Self->IsRequiredColumn[inIndex]) {
  1676. ResItems[outIndex] = member.Release().GetOptionalValue();
  1677. } else {
  1678. ResItems[outIndex] = std::move(member);
  1679. }
  1680. }
  1681. result = std::move(structObj);
  1682. return NUdf::EFetchStatus::Ok;
  1683. }
  1684. }
  1685. private:
  1686. NUdf::TUnboxedValue Stream;
  1687. TComputationContext& Ctx;
  1688. const TSelf* const Self;
  1689. bool EatInput;
  1690. bool KeyHasNulls;
  1691. std::optional<ui64> InitialUsage;
  1692. EOutputMode OutputMode;
  1693. bool CrossMove1;
  1694. NUdf::TUnboxedValue CrossValue1;
  1695. NUdf::TUnboxedValue CrossValue2;
  1696. TSpillList List1;
  1697. TSpillList List2;
  1698. NUdf::TUnboxedValue* ResItems = nullptr;
  1699. };
  1700. TCommonJoinCoreWrapper(TComputationMutables& mutables, IComputationNode* stream, const TType* inputStructType, ui32 inputWidth, ui32 tableIndexPos,
  1701. std::vector<ui32>&& leftInputColumns, std::vector<ui32>&& rightInputColumns, std::vector<ui32>&& requiredColumns,
  1702. std::vector<ui32>&& leftOutputColumns, std::vector<ui32>&& rightOutputColumns, ui64 memLimit,
  1703. std::optional<ui32> sortedTableOrder, std::vector<ui32>&& keyColumns, EAnyJoinSettings anyJoinSettings)
  1704. : TBaseComputation(mutables)
  1705. , Stream(stream)
  1706. , InputStructType(inputStructType)
  1707. , Packer(mutables)
  1708. , TableIndexPos(tableIndexPos)
  1709. , LeftInputColumns(std::move(leftInputColumns))
  1710. , RightInputColumns(std::move(rightInputColumns))
  1711. , RequiredColumns(std::move(requiredColumns))
  1712. , LeftOutputColumns(std::move(leftOutputColumns))
  1713. , RightOutputColumns(std::move(rightOutputColumns))
  1714. , MemLimit(memLimit)
  1715. , SortedTableOrder(sortedTableOrder)
  1716. , KeyColumns(std::move(keyColumns))
  1717. , IsRequiredColumn(FillRequiredStructColumn(inputWidth, RequiredColumns))
  1718. , ResStruct(mutables)
  1719. , ResStreamIndex(mutables.CurValueIndex++)
  1720. , AnyJoinSettings(anyJoinSettings)
  1721. {
  1722. }
  1723. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  1724. auto& resStream = ctx.MutableValues[ResStreamIndex];
  1725. if (!resStream || resStream.IsInvalid() || !resStream.UniqueBoxed()) {
  1726. resStream = ctx.HolderFactory.Create<TValue>(Stream->GetValue(ctx), ctx, this);
  1727. } else {
  1728. static_cast<TValue&>(*resStream.AsBoxed()).Reset(Stream->GetValue(ctx));
  1729. }
  1730. return static_cast<const NUdf::TUnboxedValuePod&>(resStream);
  1731. }
  1732. private:
  1733. void RegisterDependencies() const final {
  1734. this->DependsOn(Stream);
  1735. }
  1736. IComputationNode* const Stream;
  1737. const TType* const InputStructType;
  1738. const TMutableObjectOverBoxedValue<TValuePackerBoxed> Packer;
  1739. const ui32 TableIndexPos;
  1740. const std::vector<ui32> LeftInputColumns;
  1741. const std::vector<ui32> RightInputColumns;
  1742. const std::vector<ui32> RequiredColumns;
  1743. const std::vector<ui32> LeftOutputColumns;
  1744. const std::vector<ui32> RightOutputColumns;
  1745. const ui64 MemLimit;
  1746. const std::optional<ui32> SortedTableOrder;
  1747. const std::vector<ui32> KeyColumns;
  1748. const std::vector<bool> IsRequiredColumn;
  1749. const TContainerCacheOnContext ResStruct;
  1750. const ui32 ResStreamIndex;
  1751. const EAnyJoinSettings AnyJoinSettings;
  1752. };
  1753. }
  1754. }
  1755. IComputationNode* WrapCommonJoinCore(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  1756. MKQL_ENSURE(callable.GetInputsCount() == 11U || callable.GetInputsCount() == 12U, "Expected 12 args");
  1757. const auto type = callable.GetType()->GetReturnType();
  1758. const auto inputRowType = type->IsFlow() ?
  1759. AS_TYPE(TFlowType, callable.GetInput(0))->GetItemType():
  1760. AS_TYPE(TStreamType, callable.GetInput(0))->GetItemType();
  1761. std::vector<EValueRepresentation> inputRepresentations;
  1762. std::vector<TType*> fieldTypes;
  1763. if (inputRowType->IsTuple()) {
  1764. const auto tupleType = AS_TYPE(TTupleType, inputRowType);
  1765. inputRepresentations.reserve(tupleType->GetElementsCount());
  1766. fieldTypes.reserve(tupleType->GetElementsCount());
  1767. for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i) {
  1768. fieldTypes.emplace_back(tupleType->GetElementType(i));
  1769. inputRepresentations.emplace_back(GetValueRepresentation(fieldTypes.back()));
  1770. }
  1771. } else if (inputRowType->IsMulti()) {
  1772. const auto tupleType = AS_TYPE(TMultiType, inputRowType);
  1773. inputRepresentations.reserve(tupleType->GetElementsCount());
  1774. fieldTypes.reserve(tupleType->GetElementsCount());
  1775. for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i) {
  1776. fieldTypes.emplace_back(tupleType->GetElementType(i));
  1777. inputRepresentations.emplace_back(GetValueRepresentation(fieldTypes.back()));
  1778. }
  1779. } else if (inputRowType->IsStruct()) {
  1780. const auto structType = AS_TYPE(TStructType, inputRowType);
  1781. inputRepresentations.reserve(structType->GetMembersCount());
  1782. fieldTypes.reserve(structType->GetMembersCount());
  1783. for (ui32 i = 0U; i < structType->GetMembersCount(); ++i) {
  1784. fieldTypes.emplace_back(structType->GetMemberType(i));
  1785. inputRepresentations.emplace_back(GetValueRepresentation(fieldTypes.back()));
  1786. }
  1787. }
  1788. const auto outputRowType = type->IsFlow() ?
  1789. AS_TYPE(TFlowType, type)->GetItemType():
  1790. AS_TYPE(TStreamType, type)->GetItemType();
  1791. std::vector<EValueRepresentation> outputRepresentations;
  1792. if (outputRowType->IsTuple()) {
  1793. const auto tupleType = AS_TYPE(TTupleType, outputRowType);
  1794. outputRepresentations.reserve(tupleType->GetElementsCount());
  1795. for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i)
  1796. outputRepresentations.emplace_back(GetValueRepresentation(tupleType->GetElementType(i)));
  1797. } else if (outputRowType->IsMulti()) {
  1798. const auto tupleType = AS_TYPE(TMultiType, outputRowType);
  1799. outputRepresentations.reserve(tupleType->GetElementsCount());
  1800. for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i)
  1801. outputRepresentations.emplace_back(GetValueRepresentation(tupleType->GetElementType(i)));
  1802. } else if (outputRowType->IsStruct()) {
  1803. const auto structType = AS_TYPE(TStructType, outputRowType);
  1804. outputRepresentations.reserve(structType->GetMembersCount());
  1805. for (ui32 i = 0U; i < structType->GetMembersCount(); ++i)
  1806. outputRepresentations.emplace_back(GetValueRepresentation(structType->GetMemberType(i)));
  1807. }
  1808. const auto rawKind = AS_VALUE(TDataLiteral, callable.GetInput(1))->AsValue().Get<ui32>();
  1809. const auto kind = GetJoinKind(rawKind);
  1810. std::vector<ui32> leftInputColumns;
  1811. std::vector<ui32> rightInputColumns;
  1812. std::vector<ui32> requiredColumns;
  1813. std::vector<ui32> leftOutputColumns;
  1814. std::vector<ui32> rightOutputColumns;
  1815. std::vector<ui32> keyColumns;
  1816. const auto leftInputColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(2));
  1817. const auto rightInputColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(3));
  1818. const auto requiredColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(4));
  1819. const auto leftOutputColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(5));
  1820. const auto rightOutputColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(6));
  1821. const auto keyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(7));
  1822. std::vector<TType*> leftTypes;
  1823. leftTypes.reserve(leftInputColumnsNode->GetValuesCount());
  1824. leftInputColumns.reserve(leftInputColumnsNode->GetValuesCount());
  1825. for (ui32 i = 0; i < leftInputColumnsNode->GetValuesCount(); ++i) {
  1826. leftInputColumns.push_back(AS_VALUE(TDataLiteral, leftInputColumnsNode->GetValue(i))->AsValue().Get<ui32>());
  1827. leftTypes.emplace_back(fieldTypes[leftInputColumns.back()]);
  1828. }
  1829. std::vector<TType*> rightTypes;
  1830. rightTypes.reserve(rightInputColumnsNode->GetValuesCount());
  1831. rightInputColumns.reserve(rightInputColumnsNode->GetValuesCount());
  1832. for (ui32 i = 0; i < rightInputColumnsNode->GetValuesCount(); ++i) {
  1833. rightInputColumns.push_back(AS_VALUE(TDataLiteral, rightInputColumnsNode->GetValue(i))->AsValue().Get<ui32>());
  1834. rightTypes.emplace_back(fieldTypes[rightInputColumns.back()]);
  1835. }
  1836. requiredColumns.reserve(requiredColumnsNode->GetValuesCount());
  1837. for (ui32 i = 0; i < requiredColumnsNode->GetValuesCount(); ++i) {
  1838. requiredColumns.push_back(AS_VALUE(TDataLiteral, requiredColumnsNode->GetValue(i))->AsValue().Get<ui32>());
  1839. }
  1840. leftOutputColumns.reserve(leftOutputColumnsNode->GetValuesCount());
  1841. for (ui32 i = 0; i < leftOutputColumnsNode->GetValuesCount(); ++i) {
  1842. leftOutputColumns.push_back(AS_VALUE(TDataLiteral, leftOutputColumnsNode->GetValue(i))->AsValue().Get<ui32>());
  1843. }
  1844. rightOutputColumns.reserve(rightOutputColumnsNode->GetValuesCount());
  1845. for (ui32 i = 0; i < rightOutputColumnsNode->GetValuesCount(); ++i) {
  1846. rightOutputColumns.push_back(AS_VALUE(TDataLiteral, rightOutputColumnsNode->GetValue(i))->AsValue().Get<ui32>());
  1847. }
  1848. keyColumns.reserve(keyColumnsNode->GetValuesCount());
  1849. for (ui32 i = 0; i < keyColumnsNode->GetValuesCount(); ++i) {
  1850. keyColumns.push_back(AS_VALUE(TDataLiteral, keyColumnsNode->GetValue(i))->AsValue().Get<ui32>());
  1851. }
  1852. const ui64 memLimit = AS_VALUE(TDataLiteral, callable.GetInput(8))->AsValue().Get<ui64>();
  1853. std::optional<ui32> sortedTableOrder;
  1854. if (!callable.GetInput(9).GetStaticType()->IsVoid()) {
  1855. sortedTableOrder = AS_VALUE(TDataLiteral, callable.GetInput(9))->AsValue().Get<ui32>();
  1856. MKQL_ENSURE(*sortedTableOrder < 2, "Bad sorted table order");
  1857. }
  1858. const EAnyJoinSettings anyJoinSettings = GetAnyJoinSettings(AS_VALUE(TDataLiteral, callable.GetInput(10))->AsValue().Get<ui32>());
  1859. const auto tableIndexPos = 12U == callable.GetInputsCount() ?
  1860. AS_VALUE(TDataLiteral, callable.GetInput(11U))->AsValue().Get<ui32>():
  1861. AS_TYPE(TStructType, inputRowType)->GetMemberIndex("_yql_table_index");
  1862. const bool trackRss = EGraphPerProcess::Single == ctx.GraphPerProcess;
  1863. const auto flow = LocateNode(ctx.NodeLocator, callable, 0U);
  1864. const auto leftInputType = TTupleType::Create(leftTypes.size(), leftTypes.data(), ctx.Env);
  1865. const auto rightInputType = TTupleType::Create(rightTypes.size(), rightTypes.data(), ctx.Env);
  1866. #define MAKE_COMMON_JOIN_CORE_WRAPPER(KIND)\
  1867. case EJoinKind::KIND: \
  1868. if (type->IsFlow()) { \
  1869. if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) \
  1870. if (trackRss) \
  1871. return new NFlow::TWideCommonJoinCoreWrapper<EJoinKind::KIND, true>(ctx.Mutables, wide, leftInputType, rightInputType, std::move(inputRepresentations), std::move(outputRepresentations), tableIndexPos, \
  1872. std::move(leftInputColumns), std::move(rightInputColumns), std::move(requiredColumns), \
  1873. std::move(leftOutputColumns), std::move(rightOutputColumns), memLimit, sortedTableOrder, std::move(keyColumns), anyJoinSettings); \
  1874. else \
  1875. return new NFlow::TWideCommonJoinCoreWrapper<EJoinKind::KIND, false>(ctx.Mutables, wide, leftInputType, rightInputType, std::move(inputRepresentations), std::move(outputRepresentations), tableIndexPos, \
  1876. std::move(leftInputColumns), std::move(rightInputColumns), std::move(requiredColumns), \
  1877. std::move(leftOutputColumns), std::move(rightOutputColumns), memLimit, sortedTableOrder, std::move(keyColumns), anyJoinSettings); \
  1878. else \
  1879. if (trackRss) \
  1880. return new NFlow::TCommonJoinCoreWrapper<EJoinKind::KIND, true>(ctx.Mutables, flow, inputRowType, inputRepresentations.size(), tableIndexPos, \
  1881. std::move(leftInputColumns), std::move(rightInputColumns), std::move(requiredColumns), \
  1882. std::move(leftOutputColumns), std::move(rightOutputColumns), memLimit, sortedTableOrder, std::move(keyColumns), anyJoinSettings); \
  1883. else \
  1884. return new NFlow::TCommonJoinCoreWrapper<EJoinKind::KIND, false>(ctx.Mutables, flow, inputRowType, inputRepresentations.size(), tableIndexPos, \
  1885. std::move(leftInputColumns), std::move(rightInputColumns), std::move(requiredColumns), \
  1886. std::move(leftOutputColumns), std::move(rightOutputColumns), memLimit, sortedTableOrder, std::move(keyColumns), anyJoinSettings); \
  1887. } else { \
  1888. if (trackRss) \
  1889. return new NStream::TCommonJoinCoreWrapper<EJoinKind::KIND, true>(ctx.Mutables, flow, inputRowType, inputRepresentations.size(), tableIndexPos, \
  1890. std::move(leftInputColumns), std::move(rightInputColumns), std::move(requiredColumns), \
  1891. std::move(leftOutputColumns), std::move(rightOutputColumns), memLimit, sortedTableOrder, std::move(keyColumns), anyJoinSettings); \
  1892. else \
  1893. return new NStream::TCommonJoinCoreWrapper<EJoinKind::KIND, false>(ctx.Mutables, flow, inputRowType, inputRepresentations.size(), tableIndexPos, \
  1894. std::move(leftInputColumns), std::move(rightInputColumns), std::move(requiredColumns), \
  1895. std::move(leftOutputColumns), std::move(rightOutputColumns), memLimit, sortedTableOrder, std::move(keyColumns), anyJoinSettings); \
  1896. }
  1897. switch (kind) {
  1898. MAKE_COMMON_JOIN_CORE_WRAPPER(Inner)
  1899. MAKE_COMMON_JOIN_CORE_WRAPPER(Left)
  1900. MAKE_COMMON_JOIN_CORE_WRAPPER(Right)
  1901. MAKE_COMMON_JOIN_CORE_WRAPPER(Full)
  1902. MAKE_COMMON_JOIN_CORE_WRAPPER(LeftOnly)
  1903. MAKE_COMMON_JOIN_CORE_WRAPPER(RightOnly)
  1904. MAKE_COMMON_JOIN_CORE_WRAPPER(Exclusion)
  1905. MAKE_COMMON_JOIN_CORE_WRAPPER(LeftSemi)
  1906. MAKE_COMMON_JOIN_CORE_WRAPPER(RightSemi)
  1907. MAKE_COMMON_JOIN_CORE_WRAPPER(Cross)
  1908. default:
  1909. Y_ABORT("Unknown kind");
  1910. }
  1911. #undef MAKE_COMMON_JOIN_CORE_WRAPPER
  1912. }
  1913. }
  1914. }