mkql_block_map_join.cpp 50 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399
  1. #include "mkql_block_map_join.h"
  2. #include <yql/essentials/minikql/computation/mkql_block_builder.h>
  3. #include <yql/essentials/minikql/computation/mkql_block_impl.h>
  4. #include <yql/essentials/minikql/computation/mkql_block_reader.h>
  5. #include <yql/essentials/minikql/computation/mkql_block_trimmer.h>
  6. #include <yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.h>
  7. #include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h>
  8. #include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
  9. #include <yql/essentials/minikql/mkql_block_map_join_utils.h>
  10. #include <yql/essentials/minikql/mkql_node_cast.h>
  11. #include <yql/essentials/minikql/mkql_program_builder.h>
  12. #include <util/generic/serialized_enum.h>
  13. namespace NKikimr {
  14. namespace NMiniKQL {
  15. namespace {
  16. size_t CalcMaxBlockLength(const TVector<TType*>& items) {
  17. return CalcBlockLen(std::accumulate(items.cbegin(), items.cend(), 0ULL,
  18. [](size_t max, const TType* type) {
  19. const TType* itemType = AS_TYPE(TBlockType, type)->GetItemType();
  20. return std::max(max, CalcMaxBlockItemSize(itemType));
  21. }));
  22. }
  23. ui64 CalculateTupleHash(const std::vector<ui64>& hashes) {
  24. ui64 hash = 0;
  25. for (size_t i = 0; i < hashes.size(); i++) {
  26. if (!hashes[i]) {
  27. return 0;
  28. }
  29. hash = CombineHashes(hash, hashes[i]);
  30. }
  31. return hash;
  32. }
  33. template <bool RightRequired>
  34. class TBlockJoinState : public TBlockState {
  35. public:
  36. TBlockJoinState(TMemoryUsageInfo* memInfo, TComputationContext& ctx,
  37. const TVector<TType*>& inputItems,
  38. const TVector<ui32>& leftIOMap,
  39. const TVector<TType*> outputItems)
  40. : TBlockState(memInfo, outputItems.size())
  41. , InputWidth_(inputItems.size() - 1)
  42. , OutputWidth_(outputItems.size() - 1)
  43. , Inputs_(inputItems.size())
  44. , LeftIOMap_(leftIOMap)
  45. , InputsDescr_(ToValueDescr(inputItems))
  46. {
  47. const auto& pgBuilder = ctx.Builder->GetPgBuilder();
  48. MaxLength_ = CalcMaxBlockLength(outputItems);
  49. TBlockTypeHelper helper;
  50. for (size_t i = 0; i < inputItems.size(); i++) {
  51. TType* blockItemType = AS_TYPE(TBlockType, inputItems[i])->GetItemType();
  52. Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
  53. Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder));
  54. Hashers_.push_back(helper.MakeHasher(blockItemType));
  55. }
  56. // The last output column (i.e. block length) doesn't require a block builder.
  57. for (size_t i = 0; i < OutputWidth_; i++) {
  58. const TType* blockItemType = AS_TYPE(TBlockType, outputItems[i])->GetItemType();
  59. Builders_.push_back(MakeArrayBuilder(TTypeInfoHelper(), blockItemType, ctx.ArrowMemoryPool, MaxLength_, &pgBuilder, &BuilderAllocatedSize_));
  60. }
  61. MaxBuilderAllocatedSize_ = MaxAllocatedFactor_ * BuilderAllocatedSize_;
  62. }
  63. void CopyRow() {
  64. // Copy items from the "left" stream.
  65. // Use the mapping from input fields to output ones to
  66. // produce a tight loop to copy row items.
  67. for (size_t i = 0; i < LeftIOMap_.size(); i++) {
  68. AddItem(GetItem(LeftIOMap_[i]), i);
  69. }
  70. OutputRows_++;
  71. }
  72. void MakeRow(const NUdf::TUnboxedValuePod& value) {
  73. size_t builderIndex = 0;
  74. // Copy items from the "left" stream.
  75. // Use the mapping from input fields to output ones to
  76. // produce a tight loop to copy row items.
  77. for (size_t i = 0; i < LeftIOMap_.size(); i++, builderIndex++) {
  78. AddItem(GetItem(LeftIOMap_[i]), i);
  79. }
  80. // Convert and append items from the "right" dict.
  81. // Since the keys are copied to the output only from the
  82. // "left" stream, process all values unconditionally.
  83. if constexpr (RightRequired) {
  84. for (size_t i = 0; builderIndex < OutputWidth_; i++) {
  85. AddValue(value.GetElement(i), builderIndex++);
  86. }
  87. } else {
  88. if (value) {
  89. for (size_t i = 0; builderIndex < OutputWidth_; i++) {
  90. AddValue(value.GetElement(i), builderIndex++);
  91. }
  92. } else {
  93. while (builderIndex < OutputWidth_) {
  94. AddValue(value, builderIndex++);
  95. }
  96. }
  97. }
  98. OutputRows_++;
  99. }
  100. void MakeRow(const std::vector<NYql::NUdf::TBlockItem>& rightColumns) {
  101. size_t builderIndex = 0;
  102. for (size_t i = 0; i < LeftIOMap_.size(); i++, builderIndex++) {
  103. AddItem(GetItem(LeftIOMap_[i]), builderIndex);
  104. }
  105. if (!rightColumns.empty()) {
  106. Y_ENSURE(LeftIOMap_.size() + rightColumns.size() == OutputWidth_);
  107. for (size_t i = 0; i < rightColumns.size(); i++) {
  108. AddItem(rightColumns[i], builderIndex++);
  109. }
  110. } else {
  111. while (builderIndex < OutputWidth_) {
  112. AddItem(TBlockItem(), builderIndex++);
  113. }
  114. }
  115. OutputRows_++;
  116. }
  117. void MakeBlocks(const THolderFactory& holderFactory) {
  118. Values.back() = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(OutputRows_)));
  119. OutputRows_ = 0;
  120. BuilderAllocatedSize_ = 0;
  121. for (size_t i = 0; i < Builders_.size(); i++) {
  122. Values[i] = holderFactory.CreateArrowBlock(Builders_[i]->Build(IsFinished_));
  123. }
  124. FillArrays();
  125. }
  126. TBlockItem GetItem(size_t idx, size_t offset = 0) const {
  127. Y_ENSURE(Current_ + offset < InputRows_);
  128. const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum();
  129. ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[idx], datum.descr());
  130. if (datum.is_scalar()) {
  131. return Readers_[idx]->GetScalarItem(*datum.scalar());
  132. }
  133. MKQL_ENSURE(datum.is_array(), "Expecting array");
  134. return Readers_[idx]->GetItem(*datum.array(), Current_ + offset);
  135. }
  136. std::pair<TBlockItem, ui64> GetItemWithHash(size_t idx, size_t offset) const {
  137. auto item = GetItem(idx, offset);
  138. ui64 hash = Hashers_[idx]->Hash(item);
  139. return std::make_pair(item, hash);
  140. }
  141. NUdf::TUnboxedValuePod GetValue(const THolderFactory& holderFactory, size_t idx) const {
  142. return Converters_[idx]->MakeValue(GetItem(idx), holderFactory);
  143. }
  144. void Reset() {
  145. Current_ = 0;
  146. InputRows_ = GetBlockCount(Inputs_.back());
  147. }
  148. void Finish() {
  149. IsFinished_ = true;
  150. }
  151. void NextRow() {
  152. Current_++;
  153. }
  154. bool HasBlocks() {
  155. return Count > 0;
  156. }
  157. bool IsNotFull() const {
  158. return OutputRows_ < MaxLength_
  159. && BuilderAllocatedSize_ <= MaxBuilderAllocatedSize_;
  160. }
  161. bool IsEmpty() const {
  162. return OutputRows_ == 0;
  163. }
  164. bool IsFinished() const {
  165. return IsFinished_;
  166. }
  167. size_t RemainingRowsCount() const {
  168. Y_ENSURE(InputRows_ >= Current_);
  169. return InputRows_ - Current_;
  170. }
  171. NUdf::TUnboxedValue* GetRawInputFields() {
  172. return Inputs_.data();
  173. }
  174. size_t GetInputWidth() const {
  175. // Mind the last block length column.
  176. return InputWidth_ + 1;
  177. }
  178. size_t GetOutputWidth() const {
  179. // Mind the last block length column.
  180. return OutputWidth_ + 1;
  181. }
  182. private:
  183. void AddItem(const TBlockItem& item, size_t idx) {
  184. Builders_[idx]->Add(item);
  185. }
  186. void AddValue(const NUdf::TUnboxedValuePod& value, size_t idx) {
  187. Builders_[idx]->Add(value);
  188. }
  189. size_t Current_ = 0;
  190. bool IsFinished_ = false;
  191. size_t MaxLength_;
  192. size_t BuilderAllocatedSize_ = 0;
  193. size_t MaxBuilderAllocatedSize_ = 0;
  194. static const size_t MaxAllocatedFactor_ = 4;
  195. size_t InputRows_ = 0;
  196. size_t OutputRows_ = 0;
  197. size_t InputWidth_;
  198. size_t OutputWidth_;
  199. TUnboxedValueVector Inputs_;
  200. const TVector<ui32> LeftIOMap_;
  201. const std::vector<arrow::ValueDescr> InputsDescr_;
  202. TVector<std::unique_ptr<IBlockReader>> Readers_;
  203. TVector<std::unique_ptr<IBlockItemConverter>> Converters_;
  204. TVector<std::unique_ptr<IArrayBuilder>> Builders_;
  205. TVector<NYql::NUdf::IBlockItemHasher::TPtr> Hashers_;
  206. };
  207. template <typename TDerived>
  208. class TBlockStorageBase : public TComputationValue<TDerived> {
  209. using TSelf = TBlockStorageBase<TDerived>;
  210. using TBase = TComputationValue<TDerived>;
  211. public:
  212. struct TBlock {
  213. size_t Size;
  214. std::vector<arrow::Datum> Columns;
  215. TBlock() = default;
  216. TBlock(size_t size, std::vector<arrow::Datum> columns)
  217. : Size(size)
  218. , Columns(std::move(columns))
  219. {}
  220. };
  221. struct TRowEntry {
  222. ui32 BlockOffset;
  223. ui32 ItemOffset;
  224. TRowEntry() = default;
  225. TRowEntry(ui32 blockOffset, ui32 itemOffset)
  226. : BlockOffset(blockOffset)
  227. , ItemOffset(itemOffset)
  228. {}
  229. };
  230. class TRowIterator {
  231. friend class TBlockStorageBase;
  232. public:
  233. TRowIterator() = default;
  234. TRowIterator(const TRowIterator&) = default;
  235. TRowIterator& operator=(const TRowIterator&) = default;
  236. TMaybe<TRowEntry> Next() {
  237. Y_ENSURE(IsValid());
  238. if (IsEmpty()) {
  239. return Nothing();
  240. }
  241. auto entry = TRowEntry(CurrentBlockOffset_, CurrentItemOffset_);
  242. auto& block = BlockStorage_->GetBlock(CurrentBlockOffset_);
  243. CurrentItemOffset_++;
  244. if (CurrentItemOffset_ == block.Size) {
  245. CurrentBlockOffset_++;
  246. CurrentItemOffset_ = 0;
  247. }
  248. return entry;
  249. }
  250. bool IsValid() const {
  251. return BlockStorage_;
  252. }
  253. bool IsEmpty() const {
  254. Y_ENSURE(IsValid());
  255. return CurrentBlockOffset_ >= BlockStorage_->GetBlockCount();
  256. }
  257. private:
  258. TRowIterator(const TSelf* blockStorage)
  259. : BlockStorage_(blockStorage)
  260. {}
  261. private:
  262. size_t CurrentBlockOffset_ = 0;
  263. size_t CurrentItemOffset_ = 0;
  264. const TSelf* BlockStorage_ = nullptr;
  265. };
  266. TBlockStorageBase(
  267. TMemoryUsageInfo* memInfo,
  268. const TVector<TType*>& itemTypes,
  269. NUdf::TUnboxedValue stream,
  270. arrow::MemoryPool* pool
  271. )
  272. : TBase(memInfo)
  273. , InputsDescr_(ToValueDescr(itemTypes))
  274. , Stream_(stream)
  275. , Inputs_(itemTypes.size())
  276. {
  277. TBlockTypeHelper helper;
  278. for (size_t i = 0; i < itemTypes.size(); i++) {
  279. TType* blockItemType = AS_TYPE(TBlockType, itemTypes[i])->GetItemType();
  280. Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
  281. Hashers_.push_back(helper.MakeHasher(blockItemType));
  282. Comparators_.push_back(helper.MakeComparator(blockItemType));
  283. Trimmers_.push_back(MakeBlockTrimmer(TTypeInfoHelper(), blockItemType, pool));
  284. }
  285. }
  286. NUdf::EFetchStatus FetchStream() {
  287. switch (Stream_.WideFetch(Inputs_.data(), Inputs_.size())) {
  288. case NUdf::EFetchStatus::Yield:
  289. return NUdf::EFetchStatus::Yield;
  290. case NUdf::EFetchStatus::Finish:
  291. return NUdf::EFetchStatus::Finish;
  292. case NUdf::EFetchStatus::Ok:
  293. break;
  294. }
  295. std::vector<arrow::Datum> blockColumns;
  296. for (size_t i = 0; i < Inputs_.size() - 1; i++) {
  297. auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum();
  298. ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[i], datum.descr());
  299. if (datum.is_scalar()) {
  300. blockColumns.push_back(datum);
  301. } else {
  302. MKQL_ENSURE(datum.is_array(), "Expecting array");
  303. blockColumns.push_back(Trimmers_[i]->Trim(datum.array()));
  304. }
  305. }
  306. auto blockSize = ::GetBlockCount(Inputs_[Inputs_.size() - 1]);
  307. Data_.emplace_back(blockSize, std::move(blockColumns));
  308. RowCount_ += blockSize;
  309. return NUdf::EFetchStatus::Ok;
  310. }
  311. const TBlock& GetBlock(size_t blockOffset) const {
  312. Y_ENSURE(blockOffset < GetBlockCount());
  313. return Data_[blockOffset];
  314. }
  315. size_t GetBlockCount() const {
  316. return Data_.size();
  317. }
  318. TRowEntry GetRowEntry(size_t blockOffset, size_t itemOffset) const {
  319. auto& block = GetBlock(blockOffset);
  320. Y_ENSURE(itemOffset < block.Size);
  321. return TRowEntry(blockOffset, itemOffset);
  322. }
  323. TRowIterator GetRowIterator() const {
  324. return TRowIterator(this);
  325. }
  326. TBlockItem GetItem(TRowEntry entry, ui32 columnIdx) const {
  327. Y_ENSURE(columnIdx < Inputs_.size() - 1);
  328. return GetItemFromBlock(GetBlock(entry.BlockOffset), columnIdx, entry.ItemOffset);
  329. }
  330. void GetRow(TRowEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) const {
  331. Y_ENSURE(row.size() == ioMap.size());
  332. for (size_t i = 0; i < row.size(); i++) {
  333. row[i] = GetItem(entry, ioMap[i]);
  334. }
  335. }
  336. protected:
  337. TBlockItem GetItemFromBlock(const TBlock& block, ui32 columnIdx, size_t offset) const {
  338. Y_ENSURE(offset < block.Size);
  339. const auto& datum = block.Columns[columnIdx];
  340. if (datum.is_scalar()) {
  341. return Readers_[columnIdx]->GetScalarItem(*datum.scalar());
  342. } else {
  343. MKQL_ENSURE(datum.is_array(), "Expecting array");
  344. return Readers_[columnIdx]->GetItem(*datum.array(), offset);
  345. }
  346. }
  347. protected:
  348. const std::vector<arrow::ValueDescr> InputsDescr_;
  349. TVector<std::unique_ptr<IBlockReader>> Readers_;
  350. TVector<NUdf::IBlockItemHasher::TPtr> Hashers_;
  351. TVector<NUdf::IBlockItemComparator::TPtr> Comparators_;
  352. TVector<IBlockTrimmer::TPtr> Trimmers_;
  353. std::vector<TBlock> Data_;
  354. size_t RowCount_ = 0;
  355. NUdf::TUnboxedValue Stream_;
  356. TUnboxedValueVector Inputs_;
  357. };
  358. class TBlockStorage: public TBlockStorageBase<TBlockStorage> {
  359. private:
  360. using TBase = TBlockStorageBase<TBlockStorage>;
  361. public:
  362. using TBase::TBase;
  363. };
  364. class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> {
  365. using TBase = TBlockStorageBase<TIndexedBlockStorage>;
  366. struct TIndexNode {
  367. TRowEntry Entry;
  368. TIndexNode* Next;
  369. TIndexNode() = delete;
  370. TIndexNode(TRowEntry entry, TIndexNode* next = nullptr)
  371. : Entry(entry)
  372. , Next(next)
  373. {}
  374. };
  375. class TIndexMapValue {
  376. public:
  377. TIndexMapValue()
  378. : Raw(0)
  379. {}
  380. TIndexMapValue(TRowEntry entry) {
  381. TIndexEntryUnion un;
  382. un.Entry = entry;
  383. Y_ENSURE(((un.Raw << 1) >> 1) == un.Raw);
  384. Raw = (un.Raw << 1) | 1;
  385. }
  386. TIndexMapValue(TIndexNode* entryList)
  387. : EntryList(entryList)
  388. {}
  389. bool IsInplace() const {
  390. return Raw & 1;
  391. }
  392. TIndexNode* GetList() const {
  393. Y_ENSURE(!IsInplace());
  394. return EntryList;
  395. }
  396. TRowEntry GetEntry() const {
  397. Y_ENSURE(IsInplace());
  398. TIndexEntryUnion un;
  399. un.Raw = Raw >> 1;
  400. return un.Entry;
  401. }
  402. private:
  403. union TIndexEntryUnion {
  404. TRowEntry Entry;
  405. ui64 Raw;
  406. };
  407. union {
  408. TIndexNode* EntryList;
  409. ui64 Raw;
  410. };
  411. };
  412. using TIndexMap = TRobinHoodHashFixedMap<
  413. ui64,
  414. TIndexMapValue,
  415. std::equal_to<ui64>,
  416. std::hash<ui64>,
  417. TMKQLHugeAllocator<char>
  418. >;
  419. static_assert(sizeof(TIndexMapValue) == 8);
  420. static_assert(std::max(TIndexMap::GetCellSize(), static_cast<ui32>(sizeof(TIndexNode))) == BlockMapJoinIndexEntrySize);
  421. public:
  422. class TIterator {
  423. friend class TIndexedBlockStorage;
  424. enum class EIteratorType {
  425. EMPTY,
  426. INPLACE,
  427. LIST
  428. };
  429. public:
  430. TIterator() = default;
  431. TIterator(const TIterator&) = delete;
  432. TIterator& operator=(const TIterator&) = delete;
  433. TIterator(TIterator&& other) {
  434. *this = std::move(other);
  435. }
  436. TIterator& operator=(TIterator&& other) {
  437. if (this != &other) {
  438. Type_ = other.Type_;
  439. BlockIndex_ = other.BlockIndex_;
  440. ItemsToLookup_ = std::move(other.ItemsToLookup_);
  441. switch (Type_) {
  442. case EIteratorType::EMPTY:
  443. break;
  444. case EIteratorType::INPLACE:
  445. Entry_ = other.Entry_;
  446. EntryConsumed_ = other.EntryConsumed_;
  447. break;
  448. case EIteratorType::LIST:
  449. Node_ = other.Node_;
  450. break;
  451. }
  452. other.BlockIndex_ = nullptr;
  453. }
  454. return *this;
  455. }
  456. TMaybe<TRowEntry> Next() {
  457. Y_ENSURE(IsValid());
  458. switch (Type_) {
  459. case EIteratorType::EMPTY:
  460. return Nothing();
  461. case EIteratorType::INPLACE:
  462. if (EntryConsumed_) {
  463. return Nothing();
  464. }
  465. EntryConsumed_ = true;
  466. return BlockIndex_->IsKeyEquals(Entry_, ItemsToLookup_) ? TMaybe<TRowEntry>(Entry_) : Nothing();
  467. case EIteratorType::LIST:
  468. for (; Node_ != nullptr; Node_ = Node_->Next) {
  469. if (BlockIndex_->IsKeyEquals(Node_->Entry, ItemsToLookup_)) {
  470. auto entry = Node_->Entry;
  471. Node_ = Node_->Next;
  472. return entry;
  473. }
  474. }
  475. return Nothing();
  476. }
  477. }
  478. bool IsValid() const {
  479. return BlockIndex_;
  480. }
  481. bool IsEmpty() const {
  482. Y_ENSURE(IsValid());
  483. switch (Type_) {
  484. case EIteratorType::EMPTY:
  485. return true;
  486. case EIteratorType::INPLACE:
  487. return EntryConsumed_;
  488. case EIteratorType::LIST:
  489. return Node_ == nullptr;
  490. }
  491. }
  492. void Reset() {
  493. *this = TIterator();
  494. }
  495. private:
  496. TIterator(const TIndexedBlockStorage* blockIndex)
  497. : Type_(EIteratorType::EMPTY)
  498. , BlockIndex_(blockIndex)
  499. {}
  500. TIterator(const TIndexedBlockStorage* blockIndex, TRowEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
  501. : Type_(EIteratorType::INPLACE)
  502. , BlockIndex_(blockIndex)
  503. , Entry_(entry)
  504. , EntryConsumed_(false)
  505. , ItemsToLookup_(std::move(itemsToLookup))
  506. {}
  507. TIterator(const TIndexedBlockStorage* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
  508. : Type_(EIteratorType::LIST)
  509. , BlockIndex_(blockIndex)
  510. , Node_(node)
  511. , ItemsToLookup_(std::move(itemsToLookup))
  512. {}
  513. private:
  514. EIteratorType Type_;
  515. const TIndexedBlockStorage* BlockIndex_ = nullptr;
  516. union {
  517. TIndexNode* Node_;
  518. struct {
  519. TRowEntry Entry_;
  520. bool EntryConsumed_;
  521. };
  522. };
  523. std::vector<NYql::NUdf::TBlockItem> ItemsToLookup_;
  524. };
  525. public:
  526. TIndexedBlockStorage(
  527. TMemoryUsageInfo* memInfo,
  528. const TVector<TType*>& itemTypes,
  529. const TVector<ui32>& keyColumns,
  530. NUdf::TUnboxedValue stream,
  531. bool any,
  532. arrow::MemoryPool* pool
  533. )
  534. : TBase(memInfo, itemTypes, stream, pool)
  535. , KeyColumns_(keyColumns)
  536. , Any_(any)
  537. {}
  538. NUdf::EFetchStatus FetchStream() {
  539. Y_ENSURE(!Index_, "Data fetch shouldn't be done after the index has been built");
  540. return TBase::FetchStream();
  541. }
  542. void BuildIndex() {
  543. Index_ = std::make_unique<TIndexMap>(CalculateRHHashTableCapacity(RowCount_));
  544. for (size_t blockOffset = 0; blockOffset < Data_.size(); blockOffset++) {
  545. const auto& block = GetBlock(blockOffset);
  546. auto blockSize = block.Size;
  547. std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> insertBatch;
  548. std::array<TRowEntry, PrefetchBatchSize> insertBatchEntries;
  549. std::array<std::vector<NYql::NUdf::TBlockItem>, PrefetchBatchSize> insertBatchKeys;
  550. ui32 insertBatchLen = 0;
  551. auto processInsertBatch = [&]() {
  552. Index_->BatchInsert({insertBatch.data(), insertBatchLen}, [&](size_t i, TIndexMap::iterator iter, bool isNew) {
  553. auto value = static_cast<TIndexMapValue*>(Index_->GetMutablePayload(iter));
  554. if (isNew) {
  555. // Store single entry inplace
  556. *value = TIndexMapValue(insertBatchEntries[i]);
  557. Index_->CheckGrow();
  558. } else {
  559. if (Any_ && ContainsKey(value, insertBatchKeys[i])) {
  560. return;
  561. }
  562. // Store as list
  563. if (value->IsInplace()) {
  564. *value = TIndexMapValue(InsertIndexNode(value->GetEntry()));
  565. }
  566. *value = TIndexMapValue(InsertIndexNode(insertBatchEntries[i], value->GetList()));
  567. }
  568. });
  569. };
  570. Y_ENSURE(blockOffset <= std::numeric_limits<ui32>::max());
  571. Y_ENSURE(blockSize <= std::numeric_limits<ui32>::max());
  572. for (size_t itemOffset = 0; itemOffset < blockSize; itemOffset++) {
  573. ui64 keyHash = GetKey(block, itemOffset, insertBatchKeys[insertBatchLen]);
  574. if (!keyHash) {
  575. continue;
  576. }
  577. insertBatchEntries[insertBatchLen] = TRowEntry(blockOffset, itemOffset);
  578. insertBatch[insertBatchLen].ConstructKey(keyHash);
  579. insertBatchLen++;
  580. if (insertBatchLen == PrefetchBatchSize) {
  581. processInsertBatch();
  582. insertBatchLen = 0;
  583. }
  584. }
  585. if (insertBatchLen > 0) {
  586. processInsertBatch();
  587. }
  588. }
  589. }
  590. template<typename TGetKey>
  591. void BatchLookup(size_t batchSize, std::array<TIndexedBlockStorage::TIterator, PrefetchBatchSize>& iterators, TGetKey&& getKey) {
  592. Y_ENSURE(batchSize <= PrefetchBatchSize);
  593. std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> lookupBatch;
  594. std::array<std::vector<NYql::NUdf::TBlockItem>, PrefetchBatchSize> itemsBatch;
  595. for (size_t i = 0; i < batchSize; i++) {
  596. const auto& [items, keyHash] = getKey(i);
  597. lookupBatch[i].ConstructKey(keyHash);
  598. itemsBatch[i] = items;
  599. }
  600. Index_->BatchLookup({lookupBatch.data(), batchSize}, [&](size_t i, TIndexMap::iterator iter) {
  601. if (!iter) {
  602. // Empty iterator
  603. iterators[i] = TIterator(this);
  604. return;
  605. }
  606. auto value = static_cast<const TIndexMapValue*>(Index_->GetPayload(iter));
  607. if (value->IsInplace()) {
  608. iterators[i] = TIterator(this, value->GetEntry(), std::move(itemsBatch[i]));
  609. } else {
  610. iterators[i] = TIterator(this, value->GetList(), std::move(itemsBatch[i]));
  611. }
  612. });
  613. }
  614. bool IsKeyEquals(TRowEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
  615. Y_ENSURE(keyItems.size() == KeyColumns_.size());
  616. for (size_t i = 0; i < KeyColumns_.size(); i++) {
  617. auto indexItem = GetItem(entry, KeyColumns_[i]);
  618. if (Comparators_[KeyColumns_[i]]->Equals(indexItem, keyItems[i])) {
  619. return true;
  620. }
  621. }
  622. return false;
  623. }
  624. private:
  625. ui64 GetKey(const TBlock& block, size_t offset, std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
  626. ui64 keyHash = 0;
  627. keyItems.clear();
  628. for (ui32 keyColumn : KeyColumns_) {
  629. auto item = GetItemFromBlock(block, keyColumn, offset);
  630. if (!item) {
  631. keyItems.clear();
  632. return 0;
  633. }
  634. keyHash = CombineHashes(keyHash, Hashers_[keyColumn]->Hash(item));
  635. keyItems.push_back(std::move(item));
  636. }
  637. return keyHash;
  638. }
  639. TIndexNode* InsertIndexNode(TRowEntry entry, TIndexNode* currentHead = nullptr) {
  640. return &IndexNodes_.emplace_back(entry, currentHead);
  641. }
  642. bool ContainsKey(const TIndexMapValue* chain, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
  643. if (chain->IsInplace()) {
  644. return IsKeyEquals(chain->GetEntry(), keyItems);
  645. } else {
  646. for (TIndexNode* node = chain->GetList(); node != nullptr; node = node->Next) {
  647. if (IsKeyEquals(node->Entry, keyItems)) {
  648. return true;
  649. }
  650. node = node->Next;
  651. }
  652. return false;
  653. }
  654. }
  655. private:
  656. const TVector<ui32>& KeyColumns_;
  657. std::unique_ptr<TIndexMap> Index_;
  658. std::deque<TIndexNode> IndexNodes_;
  659. const bool Any_;
  660. };
  661. template <bool WithoutRight, bool RightRequired, bool RightAny>
  662. class TBlockMapJoinCoreWraper : public TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired, RightAny>>
  663. {
  664. using TBaseComputation = TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired, RightAny>>;
  665. using TJoinState = TBlockJoinState<RightRequired>;
  666. using TIndexState = TIndexedBlockStorage;
  667. public:
  668. TBlockMapJoinCoreWraper(
  669. TComputationMutables& mutables,
  670. const TVector<TType*>&& resultItemTypes,
  671. const TVector<TType*>&& leftItemTypes,
  672. const TVector<ui32>&& leftKeyColumns,
  673. const TVector<ui32>&& leftIOMap,
  674. const TVector<TType*>&& rightItemTypes,
  675. const TVector<ui32>&& rightKeyColumns,
  676. const TVector<ui32>&& rightIOMap,
  677. IComputationNode* leftStream,
  678. IComputationNode* rightStream
  679. )
  680. : TBaseComputation(mutables, EValueRepresentation::Boxed)
  681. , ResultItemTypes_(std::move(resultItemTypes))
  682. , LeftItemTypes_(std::move(leftItemTypes))
  683. , LeftKeyColumns_(std::move(leftKeyColumns))
  684. , LeftIOMap_(std::move(leftIOMap))
  685. , RightItemTypes_(std::move(rightItemTypes))
  686. , RightKeyColumns_(std::move(rightKeyColumns))
  687. , RightIOMap_(std::move(rightIOMap))
  688. , LeftStream_(std::move(leftStream))
  689. , RightStream_(std::move(rightStream))
  690. , KeyTupleCache_(mutables)
  691. {}
  692. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  693. const auto joinState = ctx.HolderFactory.Create<TJoinState>(
  694. ctx,
  695. LeftItemTypes_,
  696. LeftIOMap_,
  697. ResultItemTypes_
  698. );
  699. const auto indexState = ctx.HolderFactory.Create<TIndexState>(
  700. RightItemTypes_,
  701. RightKeyColumns_,
  702. std::move(RightStream_->GetValue(ctx)),
  703. RightAny,
  704. &ctx.ArrowMemoryPool
  705. );
  706. return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory,
  707. std::move(joinState),
  708. std::move(indexState),
  709. std::move(LeftStream_->GetValue(ctx)),
  710. LeftKeyColumns_,
  711. std::move(RightStream_->GetValue(ctx)),
  712. RightKeyColumns_,
  713. RightIOMap_
  714. );
  715. }
  716. private:
  717. class TStreamValue : public TComputationValue<TStreamValue> {
  718. using TBase = TComputationValue<TStreamValue>;
  719. public:
  720. TStreamValue(
  721. TMemoryUsageInfo* memInfo,
  722. const THolderFactory& holderFactory,
  723. NUdf::TUnboxedValue&& joinState,
  724. NUdf::TUnboxedValue&& indexState,
  725. NUdf::TUnboxedValue&& leftStream,
  726. const TVector<ui32>& leftKeyColumns,
  727. NUdf::TUnboxedValue&& rightStream,
  728. const TVector<ui32>& rightKeyColumns,
  729. const TVector<ui32>& rightIOMap
  730. )
  731. : TBase(memInfo)
  732. , JoinState_(joinState)
  733. , IndexState_(indexState)
  734. , LeftStream_(leftStream)
  735. , LeftKeyColumns_(leftKeyColumns)
  736. , RightStream_(rightStream)
  737. , RightKeyColumns_(rightKeyColumns)
  738. , RightIOMap_(rightIOMap)
  739. , HolderFactory_(holderFactory)
  740. {}
  741. private:
  742. NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
  743. auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get());
  744. auto& indexState = *static_cast<TIndexState*>(IndexState_.AsBoxed().Get());
  745. if (!RightStreamConsumed_) {
  746. auto fetchStatus = NUdf::EFetchStatus::Ok;
  747. while (fetchStatus != NUdf::EFetchStatus::Finish) {
  748. fetchStatus = indexState.FetchStream();
  749. if (fetchStatus == NUdf::EFetchStatus::Yield) {
  750. return NUdf::EFetchStatus::Yield;
  751. }
  752. }
  753. indexState.BuildIndex();
  754. RightStreamConsumed_ = true;
  755. }
  756. auto* inputFields = joinState.GetRawInputFields();
  757. const size_t inputWidth = joinState.GetInputWidth();
  758. const size_t outputWidth = joinState.GetOutputWidth();
  759. MKQL_ENSURE(width == outputWidth,
  760. "The given width doesn't equal to the result type size");
  761. std::vector<NYql::NUdf::TBlockItem> leftKeyColumns(LeftKeyColumns_.size());
  762. std::vector<ui64> leftKeyColumnHashes(LeftKeyColumns_.size());
  763. std::vector<NYql::NUdf::TBlockItem> rightRow(RightIOMap_.size());
  764. while (!joinState.HasBlocks()) {
  765. while (joinState.IsNotFull() && LookupBatchCurrent_ < LookupBatchSize_) {
  766. auto& iter = LookupBatchIterators_[LookupBatchCurrent_];
  767. if constexpr (WithoutRight) {
  768. if (bool(iter.IsEmpty()) != RightRequired) {
  769. joinState.CopyRow();
  770. }
  771. joinState.NextRow();
  772. LookupBatchCurrent_++;
  773. continue;
  774. } else if constexpr (!RightRequired) {
  775. if (iter.IsEmpty()) {
  776. joinState.MakeRow(std::vector<NYql::NUdf::TBlockItem>());
  777. joinState.NextRow();
  778. LookupBatchCurrent_++;
  779. continue;
  780. }
  781. }
  782. while (joinState.IsNotFull() && !iter.IsEmpty()) {
  783. auto key = iter.Next();
  784. indexState.GetRow(*key, RightIOMap_, rightRow);
  785. joinState.MakeRow(rightRow);
  786. }
  787. if (iter.IsEmpty()) {
  788. joinState.NextRow();
  789. LookupBatchCurrent_++;
  790. }
  791. }
  792. if (joinState.IsNotFull() && joinState.RemainingRowsCount() > 0) {
  793. LookupBatchSize_ = std::min(PrefetchBatchSize, static_cast<ui32>(joinState.RemainingRowsCount()));
  794. indexState.BatchLookup(LookupBatchSize_, LookupBatchIterators_, [&](size_t i) {
  795. MakeLeftKeys(leftKeyColumns, leftKeyColumnHashes, i);
  796. ui64 keyHash = CalculateTupleHash(leftKeyColumnHashes);
  797. return std::make_pair(std::ref(leftKeyColumns), keyHash);
  798. });
  799. LookupBatchCurrent_ = 0;
  800. continue;
  801. }
  802. if (joinState.IsNotFull() && !joinState.IsFinished()) {
  803. switch (LeftStream_.WideFetch(inputFields, inputWidth)) {
  804. case NUdf::EFetchStatus::Yield:
  805. return NUdf::EFetchStatus::Yield;
  806. case NUdf::EFetchStatus::Ok:
  807. joinState.Reset();
  808. continue;
  809. case NUdf::EFetchStatus::Finish:
  810. joinState.Finish();
  811. break;
  812. }
  813. // Leave the loop, if no values left in the stream.
  814. Y_DEBUG_ABORT_UNLESS(joinState.IsFinished());
  815. }
  816. if (joinState.IsEmpty()) {
  817. return NUdf::EFetchStatus::Finish;
  818. }
  819. joinState.MakeBlocks(HolderFactory_);
  820. }
  821. const auto sliceSize = joinState.Slice();
  822. for (size_t i = 0; i < outputWidth; i++) {
  823. output[i] = joinState.Get(sliceSize, HolderFactory_, i);
  824. }
  825. return NUdf::EFetchStatus::Ok;
  826. }
  827. void MakeLeftKeys(std::vector<NYql::NUdf::TBlockItem>& items, std::vector<ui64>& hashes, size_t offset) const {
  828. auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get());
  829. Y_ENSURE(items.size() == LeftKeyColumns_.size());
  830. Y_ENSURE(hashes.size() == LeftKeyColumns_.size());
  831. for (size_t i = 0; i < LeftKeyColumns_.size(); i++) {
  832. std::tie(items[i], hashes[i]) = joinState.GetItemWithHash(LeftKeyColumns_[i], offset);
  833. }
  834. }
  835. NUdf::TUnboxedValue JoinState_;
  836. NUdf::TUnboxedValue IndexState_;
  837. NUdf::TUnboxedValue LeftStream_;
  838. const TVector<ui32>& LeftKeyColumns_;
  839. NUdf::TUnboxedValue RightStream_;
  840. const TVector<ui32>& RightKeyColumns_;
  841. const TVector<ui32>& RightIOMap_;
  842. bool RightStreamConsumed_ = false;
  843. std::array<typename TIndexState::TIterator, PrefetchBatchSize> LookupBatchIterators_;
  844. ui32 LookupBatchCurrent_ = 0;
  845. ui32 LookupBatchSize_ = 0;
  846. const THolderFactory& HolderFactory_;
  847. };
  848. void RegisterDependencies() const final {
  849. this->DependsOn(LeftStream_);
  850. this->DependsOn(RightStream_);
  851. }
  852. private:
  853. const TVector<TType*> ResultItemTypes_;
  854. const TVector<TType*> LeftItemTypes_;
  855. const TVector<ui32> LeftKeyColumns_;
  856. const TVector<ui32> LeftIOMap_;
  857. const TVector<TType*> RightItemTypes_;
  858. const TVector<ui32> RightKeyColumns_;
  859. const TVector<ui32> RightIOMap_;
  860. IComputationNode* const LeftStream_;
  861. IComputationNode* const RightStream_;
  862. const TContainerCacheOnContext KeyTupleCache_;
  863. };
  864. class TBlockCrossJoinCoreWraper : public TMutableComputationNode<TBlockCrossJoinCoreWraper>
  865. {
  866. using TBaseComputation = TMutableComputationNode<TBlockCrossJoinCoreWraper>;
  867. using TJoinState = TBlockJoinState<true>;
  868. using TStorageState = TBlockStorage;
  869. public:
  870. TBlockCrossJoinCoreWraper(
  871. TComputationMutables& mutables,
  872. const TVector<TType*>&& resultItemTypes,
  873. const TVector<TType*>&& leftItemTypes,
  874. const TVector<ui32>&& leftKeyColumns,
  875. const TVector<ui32>&& leftIOMap,
  876. const TVector<TType*>&& rightItemTypes,
  877. const TVector<ui32>&& rightKeyColumns,
  878. const TVector<ui32>&& rightIOMap,
  879. IComputationNode* leftStream,
  880. IComputationNode* rightStream
  881. )
  882. : TBaseComputation(mutables, EValueRepresentation::Boxed)
  883. , ResultItemTypes_(std::move(resultItemTypes))
  884. , LeftItemTypes_(std::move(leftItemTypes))
  885. , LeftKeyColumns_(std::move(leftKeyColumns))
  886. , LeftIOMap_(std::move(leftIOMap))
  887. , RightItemTypes_(std::move(rightItemTypes))
  888. , RightKeyColumns_(std::move(rightKeyColumns))
  889. , RightIOMap_(std::move(rightIOMap))
  890. , LeftStream_(std::move(leftStream))
  891. , RightStream_(std::move(rightStream))
  892. , KeyTupleCache_(mutables)
  893. {}
  894. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  895. const auto joinState = ctx.HolderFactory.Create<TJoinState>(
  896. ctx,
  897. LeftItemTypes_,
  898. LeftIOMap_,
  899. ResultItemTypes_
  900. );
  901. const auto indexState = ctx.HolderFactory.Create<TStorageState>(
  902. RightItemTypes_,
  903. std::move(RightStream_->GetValue(ctx)),
  904. &ctx.ArrowMemoryPool
  905. );
  906. return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory,
  907. std::move(joinState),
  908. std::move(indexState),
  909. std::move(LeftStream_->GetValue(ctx)),
  910. std::move(RightStream_->GetValue(ctx)),
  911. RightIOMap_
  912. );
  913. }
  914. private:
  915. class TStreamValue : public TComputationValue<TStreamValue> {
  916. using TBase = TComputationValue<TStreamValue>;
  917. public:
  918. TStreamValue(
  919. TMemoryUsageInfo* memInfo,
  920. const THolderFactory& holderFactory,
  921. NUdf::TUnboxedValue&& joinState,
  922. NUdf::TUnboxedValue&& storageState,
  923. NUdf::TUnboxedValue&& leftStream,
  924. NUdf::TUnboxedValue&& rightStream,
  925. const TVector<ui32>& rightIOMap
  926. )
  927. : TBase(memInfo)
  928. , JoinState_(joinState)
  929. , StorageState_(storageState)
  930. , LeftStream_(leftStream)
  931. , RightStream_(rightStream)
  932. , RightIOMap_(rightIOMap)
  933. , HolderFactory_(holderFactory)
  934. {}
  935. private:
  936. NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
  937. auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get());
  938. auto& storageState = *static_cast<TStorageState*>(StorageState_.AsBoxed().Get());
  939. if (!RightStreamConsumed_) {
  940. auto fetchStatus = NUdf::EFetchStatus::Ok;
  941. while (fetchStatus != NUdf::EFetchStatus::Finish) {
  942. fetchStatus = storageState.FetchStream();
  943. if (fetchStatus == NUdf::EFetchStatus::Yield) {
  944. return NUdf::EFetchStatus::Yield;
  945. }
  946. }
  947. RightStreamConsumed_ = true;
  948. RightRowIterator_ = storageState.GetRowIterator();
  949. }
  950. auto* inputFields = joinState.GetRawInputFields();
  951. const size_t inputWidth = joinState.GetInputWidth();
  952. const size_t outputWidth = joinState.GetOutputWidth();
  953. MKQL_ENSURE(width == outputWidth,
  954. "The given width doesn't equal to the result type size");
  955. std::vector<NYql::NUdf::TBlockItem> rightRow(RightIOMap_.size());
  956. while (!joinState.HasBlocks()) {
  957. while (!RightRowIterator_.IsEmpty() && joinState.RemainingRowsCount() > 0 && joinState.IsNotFull()) {
  958. auto rowEntry = *RightRowIterator_.Next();
  959. storageState.GetRow(rowEntry, RightIOMap_, rightRow);
  960. joinState.MakeRow(rightRow);
  961. }
  962. if (joinState.IsNotFull() && joinState.RemainingRowsCount() > 0) {
  963. joinState.NextRow();
  964. RightRowIterator_ = storageState.GetRowIterator();
  965. continue;
  966. }
  967. if (joinState.IsNotFull() && !joinState.IsFinished()) {
  968. switch (LeftStream_.WideFetch(inputFields, inputWidth)) {
  969. case NUdf::EFetchStatus::Yield:
  970. return NUdf::EFetchStatus::Yield;
  971. case NUdf::EFetchStatus::Ok:
  972. joinState.Reset();
  973. continue;
  974. case NUdf::EFetchStatus::Finish:
  975. joinState.Finish();
  976. break;
  977. }
  978. // Leave the loop, if no values left in the stream.
  979. Y_DEBUG_ABORT_UNLESS(joinState.IsFinished());
  980. }
  981. if (joinState.IsEmpty()) {
  982. return NUdf::EFetchStatus::Finish;
  983. }
  984. joinState.MakeBlocks(HolderFactory_);
  985. }
  986. const auto sliceSize = joinState.Slice();
  987. for (size_t i = 0; i < outputWidth; i++) {
  988. output[i] = joinState.Get(sliceSize, HolderFactory_, i);
  989. }
  990. return NUdf::EFetchStatus::Ok;
  991. }
  992. NUdf::TUnboxedValue JoinState_;
  993. NUdf::TUnboxedValue StorageState_;
  994. NUdf::TUnboxedValue LeftStream_;
  995. NUdf::TUnboxedValue RightStream_;
  996. const TVector<ui32>& RightIOMap_;
  997. bool RightStreamConsumed_ = false;
  998. TStorageState::TRowIterator RightRowIterator_;
  999. const THolderFactory& HolderFactory_;
  1000. };
  1001. void RegisterDependencies() const final {
  1002. this->DependsOn(LeftStream_);
  1003. this->DependsOn(RightStream_);
  1004. }
  1005. private:
  1006. const TVector<TType*> ResultItemTypes_;
  1007. const TVector<TType*> LeftItemTypes_;
  1008. const TVector<ui32> LeftKeyColumns_;
  1009. const TVector<ui32> LeftIOMap_;
  1010. const TVector<TType*> RightItemTypes_;
  1011. const TVector<ui32> RightKeyColumns_;
  1012. const TVector<ui32> RightIOMap_;
  1013. IComputationNode* const LeftStream_;
  1014. IComputationNode* const RightStream_;
  1015. const TContainerCacheOnContext KeyTupleCache_;
  1016. };
  1017. } // namespace
  1018. IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  1019. MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args");
  1020. const auto joinType = callable.GetType()->GetReturnType();
  1021. MKQL_ENSURE(joinType->IsStream(), "Expected WideStream as a resulting stream");
  1022. const auto joinStreamType = AS_TYPE(TStreamType, joinType);
  1023. MKQL_ENSURE(joinStreamType->GetItemType()->IsMulti(),
  1024. "Expected Multi as a resulting item type");
  1025. const auto joinComponents = GetWideComponents(joinStreamType);
  1026. MKQL_ENSURE(joinComponents.size() > 0, "Expected at least one column");
  1027. const TVector<TType*> joinItems(joinComponents.cbegin(), joinComponents.cend());
  1028. const auto leftType = callable.GetInput(0).GetStaticType();
  1029. MKQL_ENSURE(leftType->IsStream(), "Expected WideStream as a left stream");
  1030. const auto leftStreamType = AS_TYPE(TStreamType, leftType);
  1031. MKQL_ENSURE(leftStreamType->GetItemType()->IsMulti(),
  1032. "Expected Multi as a left stream item type");
  1033. const auto leftStreamComponents = GetWideComponents(leftStreamType);
  1034. MKQL_ENSURE(leftStreamComponents.size() > 0, "Expected at least one column");
  1035. const TVector<TType*> leftStreamItems(leftStreamComponents.cbegin(), leftStreamComponents.cend());
  1036. const auto rightType = callable.GetInput(1).GetStaticType();
  1037. MKQL_ENSURE(rightType->IsStream(), "Expected WideStream as a right stream");
  1038. const auto rightStreamType = AS_TYPE(TStreamType, rightType);
  1039. MKQL_ENSURE(rightStreamType->GetItemType()->IsMulti(),
  1040. "Expected Multi as a right stream item type");
  1041. const auto rightStreamComponents = GetWideComponents(rightStreamType);
  1042. MKQL_ENSURE(rightStreamComponents.size() > 0, "Expected at least one column");
  1043. const TVector<TType*> rightStreamItems(rightStreamComponents.cbegin(), rightStreamComponents.cend());
  1044. const auto joinKindNode = callable.GetInput(2);
  1045. const auto rawKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>();
  1046. const auto joinKind = GetJoinKind(rawKind);
  1047. Y_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left ||
  1048. joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly || joinKind == EJoinKind::Cross);
  1049. const auto leftKeyColumnsLiteral = callable.GetInput(3);
  1050. const auto leftKeyColumnsTuple = AS_VALUE(TTupleLiteral, leftKeyColumnsLiteral);
  1051. TVector<ui32> leftKeyColumns;
  1052. leftKeyColumns.reserve(leftKeyColumnsTuple->GetValuesCount());
  1053. for (ui32 i = 0; i < leftKeyColumnsTuple->GetValuesCount(); i++) {
  1054. const auto item = AS_VALUE(TDataLiteral, leftKeyColumnsTuple->GetValue(i));
  1055. leftKeyColumns.emplace_back(item->AsValue().Get<ui32>());
  1056. }
  1057. const THashSet<ui32> leftKeySet(leftKeyColumns.cbegin(), leftKeyColumns.cend());
  1058. const auto leftKeyDropsLiteral = callable.GetInput(4);
  1059. const auto leftKeyDropsTuple = AS_VALUE(TTupleLiteral, leftKeyDropsLiteral);
  1060. THashSet<ui32> leftKeyDrops;
  1061. leftKeyDrops.reserve(leftKeyDropsTuple->GetValuesCount());
  1062. for (ui32 i = 0; i < leftKeyDropsTuple->GetValuesCount(); i++) {
  1063. const auto item = AS_VALUE(TDataLiteral, leftKeyDropsTuple->GetValue(i));
  1064. leftKeyDrops.emplace(item->AsValue().Get<ui32>());
  1065. }
  1066. for (const auto& drop : leftKeyDrops) {
  1067. MKQL_ENSURE(leftKeySet.contains(drop),
  1068. "Only key columns has to be specified in drop column set");
  1069. }
  1070. const auto rightKeyColumnsLiteral = callable.GetInput(5);
  1071. const auto rightKeyColumnsTuple = AS_VALUE(TTupleLiteral, rightKeyColumnsLiteral);
  1072. TVector<ui32> rightKeyColumns;
  1073. rightKeyColumns.reserve(rightKeyColumnsTuple->GetValuesCount());
  1074. for (ui32 i = 0; i < rightKeyColumnsTuple->GetValuesCount(); i++) {
  1075. const auto item = AS_VALUE(TDataLiteral, rightKeyColumnsTuple->GetValue(i));
  1076. rightKeyColumns.emplace_back(item->AsValue().Get<ui32>());
  1077. }
  1078. const THashSet<ui32> rightKeySet(rightKeyColumns.cbegin(), rightKeyColumns.cend());
  1079. const auto rightKeyDropsLiteral = callable.GetInput(6);
  1080. const auto rightKeyDropsTuple = AS_VALUE(TTupleLiteral, rightKeyDropsLiteral);
  1081. THashSet<ui32> rightKeyDrops;
  1082. rightKeyDrops.reserve(rightKeyDropsTuple->GetValuesCount());
  1083. for (ui32 i = 0; i < rightKeyDropsTuple->GetValuesCount(); i++) {
  1084. const auto item = AS_VALUE(TDataLiteral, rightKeyDropsTuple->GetValue(i));
  1085. rightKeyDrops.emplace(item->AsValue().Get<ui32>());
  1086. }
  1087. for (const auto& drop : rightKeyDrops) {
  1088. MKQL_ENSURE(rightKeySet.contains(drop),
  1089. "Only key columns has to be specified in drop column set");
  1090. }
  1091. if (joinKind == EJoinKind::Cross) {
  1092. MKQL_ENSURE(leftKeyColumns.empty() && leftKeyDrops.empty() && rightKeyColumns.empty() && rightKeyDrops.empty(),
  1093. "Specifying key columns is not allowed for cross join");
  1094. }
  1095. MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Key columns mismatch");
  1096. const auto rightAnyNode = callable.GetInput(7);
  1097. const auto rightAny = AS_VALUE(TDataLiteral, rightAnyNode)->AsValue().Get<bool>();
  1098. // XXX: Mind the last wide item, containing block length.
  1099. TVector<ui32> leftIOMap;
  1100. for (size_t i = 0; i < leftStreamItems.size() - 1; i++) {
  1101. if (leftKeyDrops.contains(i)) {
  1102. continue;
  1103. }
  1104. leftIOMap.push_back(i);
  1105. }
  1106. // XXX: Mind the last wide item, containing block length.
  1107. TVector<ui32> rightIOMap;
  1108. if (joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left || joinKind == EJoinKind::Cross) {
  1109. for (size_t i = 0; i < rightStreamItems.size() - 1; i++) {
  1110. if (rightKeyDrops.contains(i)) {
  1111. continue;
  1112. }
  1113. rightIOMap.push_back(i);
  1114. }
  1115. } else {
  1116. MKQL_ENSURE(rightKeyDrops.empty(), "Right key drops are not allowed for semi/only join");
  1117. }
  1118. const auto leftStream = LocateNode(ctx.NodeLocator, callable, 0);
  1119. const auto rightStream = LocateNode(ctx.NodeLocator, callable, 1);
  1120. #define JOIN_WRAPPER(WITHOUT_RIGHT, RIGHT_REQUIRED, RIGHT_ANY) \
  1121. return new TBlockMapJoinCoreWraper<WITHOUT_RIGHT, RIGHT_REQUIRED, RIGHT_ANY>( \
  1122. ctx.Mutables, \
  1123. std::move(joinItems), \
  1124. std::move(leftStreamItems), \
  1125. std::move(leftKeyColumns), \
  1126. std::move(leftIOMap), \
  1127. std::move(rightStreamItems), \
  1128. std::move(rightKeyColumns), \
  1129. std::move(rightIOMap), \
  1130. leftStream, \
  1131. rightStream \
  1132. )
  1133. switch (joinKind) {
  1134. case EJoinKind::Inner:
  1135. if (rightAny) {
  1136. JOIN_WRAPPER(false, true, true);
  1137. } else {
  1138. JOIN_WRAPPER(false, true, false);
  1139. }
  1140. case EJoinKind::Left:
  1141. if (rightAny) {
  1142. JOIN_WRAPPER(false, false, true);
  1143. } else {
  1144. JOIN_WRAPPER(false, false, false);
  1145. }
  1146. case EJoinKind::LeftSemi:
  1147. MKQL_ENSURE(rightIOMap.empty(), "Can't access right table on left semi join");
  1148. if (rightAny) {
  1149. JOIN_WRAPPER(true, true, true);
  1150. } else {
  1151. JOIN_WRAPPER(true, true, false);
  1152. }
  1153. case EJoinKind::LeftOnly:
  1154. MKQL_ENSURE(rightIOMap.empty(), "Can't access right table on left only join");
  1155. if (rightAny) {
  1156. JOIN_WRAPPER(true, false, true);
  1157. } else {
  1158. JOIN_WRAPPER(true, false, false);
  1159. }
  1160. case EJoinKind::Cross:
  1161. MKQL_ENSURE(!rightAny, "rightAny can't be used with cross join");
  1162. return new TBlockCrossJoinCoreWraper(
  1163. ctx.Mutables,
  1164. std::move(joinItems),
  1165. std::move(leftStreamItems),
  1166. std::move(leftKeyColumns),
  1167. std::move(leftIOMap),
  1168. std::move(rightStreamItems),
  1169. std::move(rightKeyColumns),
  1170. std::move(rightIOMap),
  1171. leftStream,
  1172. rightStream
  1173. );
  1174. default:
  1175. /* TODO: Display the human-readable join kind name. */
  1176. MKQL_ENSURE(false, "BlockMapJoinCore doesn't support join type #"
  1177. << static_cast<ui32>(joinKind));
  1178. }
  1179. #undef JOIN_WRAPPER
  1180. }
  1181. } // namespace NMiniKQL
  1182. } // namespace NKikimr