mkql_block_map_join.cpp 55 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538
  1. #include "mkql_block_map_join.h"
  2. #include <yql/essentials/minikql/computation/mkql_block_builder.h>
  3. #include <yql/essentials/minikql/computation/mkql_block_impl.h>
  4. #include <yql/essentials/minikql/computation/mkql_block_reader.h>
  5. #include <yql/essentials/minikql/computation/mkql_block_trimmer.h>
  6. #include <yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.h>
  7. #include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h>
  8. #include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
  9. #include <yql/essentials/minikql/mkql_block_map_join_utils.h>
  10. #include <yql/essentials/minikql/mkql_node_cast.h>
  11. #include <yql/essentials/minikql/mkql_program_builder.h>
  12. #include <util/generic/serialized_enum.h>
  13. namespace NKikimr {
  14. namespace NMiniKQL {
  15. namespace {
  16. size_t CalcMaxBlockLength(const TVector<TType*>& items) {
  17. return CalcBlockLen(std::accumulate(items.cbegin(), items.cend(), 0ULL,
  18. [](size_t max, const TType* type) {
  19. const TType* itemType = AS_TYPE(TBlockType, type)->GetItemType();
  20. return std::max(max, CalcMaxBlockItemSize(itemType));
  21. }));
  22. }
  23. ui64 CalculateTupleHash(const std::vector<ui64>& hashes) {
  24. ui64 hash = 0;
  25. for (size_t i = 0; i < hashes.size(); i++) {
  26. if (!hashes[i]) {
  27. return 0;
  28. }
  29. hash = CombineHashes(hash, hashes[i]);
  30. }
  31. return hash;
  32. }
  33. template <bool RightRequired>
  34. class TBlockJoinState : public TBlockState {
  35. public:
  36. TBlockJoinState(TMemoryUsageInfo* memInfo, TComputationContext& ctx,
  37. const TVector<TType*>& inputItems,
  38. const TVector<ui32>& leftIOMap,
  39. const TVector<TType*> outputItems)
  40. : TBlockState(memInfo, outputItems.size())
  41. , InputWidth_(inputItems.size() - 1)
  42. , OutputWidth_(outputItems.size() - 1)
  43. , Inputs_(inputItems.size())
  44. , LeftIOMap_(leftIOMap)
  45. , InputsDescr_(ToValueDescr(inputItems))
  46. {
  47. const auto& pgBuilder = ctx.Builder->GetPgBuilder();
  48. MaxLength_ = CalcMaxBlockLength(outputItems);
  49. TBlockTypeHelper helper;
  50. for (size_t i = 0; i < inputItems.size(); i++) {
  51. TType* blockItemType = AS_TYPE(TBlockType, inputItems[i])->GetItemType();
  52. Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
  53. Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder));
  54. Hashers_.push_back(helper.MakeHasher(blockItemType));
  55. }
  56. // The last output column (i.e. block length) doesn't require a block builder.
  57. for (size_t i = 0; i < OutputWidth_; i++) {
  58. const TType* blockItemType = AS_TYPE(TBlockType, outputItems[i])->GetItemType();
  59. Builders_.push_back(MakeArrayBuilder(TTypeInfoHelper(), blockItemType, ctx.ArrowMemoryPool, MaxLength_, &pgBuilder, &BuilderAllocatedSize_));
  60. }
  61. MaxBuilderAllocatedSize_ = MaxAllocatedFactor_ * BuilderAllocatedSize_;
  62. }
  63. void CopyRow() {
  64. // Copy items from the "left" stream.
  65. // Use the mapping from input fields to output ones to
  66. // produce a tight loop to copy row items.
  67. for (size_t i = 0; i < LeftIOMap_.size(); i++) {
  68. AddItem(GetItem(LeftIOMap_[i]), i);
  69. }
  70. OutputRows_++;
  71. }
  72. void MakeRow(const NUdf::TUnboxedValuePod& value) {
  73. size_t builderIndex = 0;
  74. // Copy items from the "left" stream.
  75. // Use the mapping from input fields to output ones to
  76. // produce a tight loop to copy row items.
  77. for (size_t i = 0; i < LeftIOMap_.size(); i++, builderIndex++) {
  78. AddItem(GetItem(LeftIOMap_[i]), i);
  79. }
  80. // Convert and append items from the "right" dict.
  81. // Since the keys are copied to the output only from the
  82. // "left" stream, process all values unconditionally.
  83. if constexpr (RightRequired) {
  84. for (size_t i = 0; builderIndex < OutputWidth_; i++) {
  85. AddValue(value.GetElement(i), builderIndex++);
  86. }
  87. } else {
  88. if (value) {
  89. for (size_t i = 0; builderIndex < OutputWidth_; i++) {
  90. AddValue(value.GetElement(i), builderIndex++);
  91. }
  92. } else {
  93. while (builderIndex < OutputWidth_) {
  94. AddValue(value, builderIndex++);
  95. }
  96. }
  97. }
  98. OutputRows_++;
  99. }
  100. void MakeRow(const std::vector<NYql::NUdf::TBlockItem>& rightColumns) {
  101. size_t builderIndex = 0;
  102. for (size_t i = 0; i < LeftIOMap_.size(); i++, builderIndex++) {
  103. AddItem(GetItem(LeftIOMap_[i]), builderIndex);
  104. }
  105. if (!rightColumns.empty()) {
  106. Y_ENSURE(LeftIOMap_.size() + rightColumns.size() == OutputWidth_);
  107. for (size_t i = 0; i < rightColumns.size(); i++) {
  108. AddItem(rightColumns[i], builderIndex++);
  109. }
  110. } else {
  111. while (builderIndex < OutputWidth_) {
  112. AddItem(TBlockItem(), builderIndex++);
  113. }
  114. }
  115. OutputRows_++;
  116. }
  117. void MakeBlocks(const THolderFactory& holderFactory) {
  118. Values.back() = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(OutputRows_)));
  119. OutputRows_ = 0;
  120. BuilderAllocatedSize_ = 0;
  121. for (size_t i = 0; i < Builders_.size(); i++) {
  122. Values[i] = holderFactory.CreateArrowBlock(Builders_[i]->Build(IsFinished_));
  123. }
  124. FillArrays();
  125. }
  126. TBlockItem GetItem(size_t idx, size_t offset = 0) const {
  127. Y_ENSURE(Current_ + offset < InputRows_);
  128. const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum();
  129. ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[idx], datum.descr());
  130. if (datum.is_scalar()) {
  131. return Readers_[idx]->GetScalarItem(*datum.scalar());
  132. }
  133. MKQL_ENSURE(datum.is_array(), "Expecting array");
  134. return Readers_[idx]->GetItem(*datum.array(), Current_ + offset);
  135. }
  136. std::pair<TBlockItem, ui64> GetItemWithHash(size_t idx, size_t offset) const {
  137. auto item = GetItem(idx, offset);
  138. ui64 hash = Hashers_[idx]->Hash(item);
  139. return std::make_pair(item, hash);
  140. }
  141. NUdf::TUnboxedValuePod GetValue(const THolderFactory& holderFactory, size_t idx) const {
  142. return Converters_[idx]->MakeValue(GetItem(idx), holderFactory);
  143. }
  144. void Reset() {
  145. Current_ = 0;
  146. InputRows_ = GetBlockCount(Inputs_.back());
  147. }
  148. void Finish() {
  149. IsFinished_ = true;
  150. }
  151. void NextRow() {
  152. Current_++;
  153. }
  154. bool HasBlocks() {
  155. return Count > 0;
  156. }
  157. bool IsNotFull() const {
  158. return OutputRows_ < MaxLength_
  159. && BuilderAllocatedSize_ <= MaxBuilderAllocatedSize_;
  160. }
  161. bool IsEmpty() const {
  162. return OutputRows_ == 0;
  163. }
  164. bool IsFinished() const {
  165. return IsFinished_;
  166. }
  167. size_t RemainingRowsCount() const {
  168. Y_ENSURE(InputRows_ >= Current_);
  169. return InputRows_ - Current_;
  170. }
  171. NUdf::TUnboxedValue* GetRawInputFields() {
  172. return Inputs_.data();
  173. }
  174. size_t GetInputWidth() const {
  175. // Mind the last block length column.
  176. return InputWidth_ + 1;
  177. }
  178. size_t GetOutputWidth() const {
  179. // Mind the last block length column.
  180. return OutputWidth_ + 1;
  181. }
  182. private:
  183. void AddItem(const TBlockItem& item, size_t idx) {
  184. Builders_[idx]->Add(item);
  185. }
  186. void AddValue(const NUdf::TUnboxedValuePod& value, size_t idx) {
  187. Builders_[idx]->Add(value);
  188. }
  189. size_t Current_ = 0;
  190. bool IsFinished_ = false;
  191. size_t MaxLength_;
  192. size_t BuilderAllocatedSize_ = 0;
  193. size_t MaxBuilderAllocatedSize_ = 0;
  194. static const size_t MaxAllocatedFactor_ = 4;
  195. size_t InputRows_ = 0;
  196. size_t OutputRows_ = 0;
  197. size_t InputWidth_;
  198. size_t OutputWidth_;
  199. TUnboxedValueVector Inputs_;
  200. const TVector<ui32> LeftIOMap_;
  201. const std::vector<arrow::ValueDescr> InputsDescr_;
  202. TVector<std::unique_ptr<IBlockReader>> Readers_;
  203. TVector<std::unique_ptr<IBlockItemConverter>> Converters_;
  204. TVector<std::unique_ptr<IArrayBuilder>> Builders_;
  205. TVector<NYql::NUdf::IBlockItemHasher::TPtr> Hashers_;
  206. };
  207. class TBlockStorage : public TComputationValue<TBlockStorage> {
  208. using TBase = TComputationValue<TBlockStorage>;
  209. public:
  210. struct TBlock {
  211. size_t Size;
  212. std::vector<arrow::Datum> Columns;
  213. TBlock() = default;
  214. TBlock(size_t size, std::vector<arrow::Datum> columns)
  215. : Size(size)
  216. , Columns(std::move(columns))
  217. {}
  218. };
  219. struct TRowEntry {
  220. ui32 BlockOffset;
  221. ui32 ItemOffset;
  222. TRowEntry() = default;
  223. TRowEntry(ui32 blockOffset, ui32 itemOffset)
  224. : BlockOffset(blockOffset)
  225. , ItemOffset(itemOffset)
  226. {}
  227. };
  228. class TRowIterator {
  229. friend class TBlockStorage;
  230. public:
  231. TRowIterator() = default;
  232. TRowIterator(const TRowIterator&) = default;
  233. TRowIterator& operator=(const TRowIterator&) = default;
  234. TMaybe<TRowEntry> Next() {
  235. Y_ENSURE(IsValid());
  236. if (IsEmpty()) {
  237. return Nothing();
  238. }
  239. auto entry = TRowEntry(CurrentBlockOffset_, CurrentItemOffset_);
  240. auto& block = BlockStorage_->GetBlock(CurrentBlockOffset_);
  241. CurrentItemOffset_++;
  242. if (CurrentItemOffset_ == block.Size) {
  243. CurrentBlockOffset_++;
  244. CurrentItemOffset_ = 0;
  245. }
  246. return entry;
  247. }
  248. bool IsValid() const {
  249. return BlockStorage_;
  250. }
  251. bool IsEmpty() const {
  252. Y_ENSURE(IsValid());
  253. return CurrentBlockOffset_ >= BlockStorage_->GetBlockCount();
  254. }
  255. private:
  256. TRowIterator(const TBlockStorage* blockStorage)
  257. : BlockStorage_(blockStorage)
  258. {}
  259. private:
  260. size_t CurrentBlockOffset_ = 0;
  261. size_t CurrentItemOffset_ = 0;
  262. const TBlockStorage* BlockStorage_ = nullptr;
  263. };
  264. TBlockStorage(
  265. TMemoryUsageInfo* memInfo,
  266. const TVector<TType*>& itemTypes,
  267. NUdf::TUnboxedValue stream,
  268. TStringBuf resourceTag,
  269. arrow::MemoryPool* pool
  270. )
  271. : TBase(memInfo)
  272. , InputsDescr_(ToValueDescr(itemTypes))
  273. , Stream_(std::move(stream))
  274. , Inputs_(itemTypes.size())
  275. , ResourceTag_(std::move(resourceTag))
  276. {
  277. TBlockTypeHelper helper;
  278. for (size_t i = 0; i < itemTypes.size(); i++) {
  279. TType* blockItemType = AS_TYPE(TBlockType, itemTypes[i])->GetItemType();
  280. Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
  281. Hashers_.push_back(helper.MakeHasher(blockItemType));
  282. Comparators_.push_back(helper.MakeComparator(blockItemType));
  283. Trimmers_.push_back(MakeBlockTrimmer(TTypeInfoHelper(), blockItemType, pool));
  284. }
  285. }
  286. NUdf::EFetchStatus FetchStream() {
  287. switch (Stream_.WideFetch(Inputs_.data(), Inputs_.size())) {
  288. case NUdf::EFetchStatus::Yield:
  289. return NUdf::EFetchStatus::Yield;
  290. case NUdf::EFetchStatus::Finish:
  291. IsFinished_ = true;
  292. return NUdf::EFetchStatus::Finish;
  293. case NUdf::EFetchStatus::Ok:
  294. break;
  295. }
  296. Y_ENSURE(!IsFinished_, "Got data on finished stream");
  297. std::vector<arrow::Datum> blockColumns;
  298. for (size_t i = 0; i < Inputs_.size() - 1; i++) {
  299. auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum();
  300. ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[i], datum.descr());
  301. if (datum.is_scalar()) {
  302. blockColumns.push_back(datum);
  303. } else {
  304. MKQL_ENSURE(datum.is_array(), "Expecting array");
  305. blockColumns.push_back(Trimmers_[i]->Trim(datum.array()));
  306. }
  307. }
  308. auto blockSize = ::GetBlockCount(Inputs_[Inputs_.size() - 1]);
  309. Data_.emplace_back(blockSize, std::move(blockColumns));
  310. RowCount_ += blockSize;
  311. return NUdf::EFetchStatus::Ok;
  312. }
  313. const TBlock& GetBlock(size_t blockOffset) const {
  314. Y_ENSURE(blockOffset < GetBlockCount());
  315. return Data_[blockOffset];
  316. }
  317. size_t GetBlockCount() const {
  318. return Data_.size();
  319. }
  320. TRowEntry GetRowEntry(size_t blockOffset, size_t itemOffset) const {
  321. auto& block = GetBlock(blockOffset);
  322. Y_ENSURE(itemOffset < block.Size);
  323. return TRowEntry(blockOffset, itemOffset);
  324. }
  325. TRowIterator GetRowIterator() const {
  326. return TRowIterator(this);
  327. }
  328. size_t GetRowCount() const {
  329. return RowCount_;
  330. }
  331. TBlockItem GetItem(TRowEntry entry, ui32 columnIdx) const {
  332. Y_ENSURE(columnIdx < Inputs_.size() - 1);
  333. return GetItemFromBlock(GetBlock(entry.BlockOffset), columnIdx, entry.ItemOffset);
  334. }
  335. TBlockItem GetItemFromBlock(const TBlock& block, ui32 columnIdx, size_t offset) const {
  336. Y_ENSURE(offset < block.Size);
  337. const auto& datum = block.Columns[columnIdx];
  338. if (datum.is_scalar()) {
  339. return Readers_[columnIdx]->GetScalarItem(*datum.scalar());
  340. } else {
  341. MKQL_ENSURE(datum.is_array(), "Expecting array");
  342. return Readers_[columnIdx]->GetItem(*datum.array(), offset);
  343. }
  344. }
  345. void GetRow(TRowEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) const {
  346. Y_ENSURE(row.size() == ioMap.size());
  347. for (size_t i = 0; i < row.size(); i++) {
  348. row[i] = GetItem(entry, ioMap[i]);
  349. }
  350. }
  351. const TVector<NUdf::IBlockItemComparator::TPtr>& GetItemComparators() const {
  352. return Comparators_;
  353. }
  354. const TVector<NUdf::IBlockItemHasher::TPtr>& GetItemHashers() const {
  355. return Hashers_;
  356. }
  357. bool IsFinished() const {
  358. return IsFinished_;
  359. }
  360. private:
  361. NUdf::TStringRef GetResourceTag() const override {
  362. return NUdf::TStringRef(ResourceTag_);
  363. }
  364. void* GetResource() override {
  365. return this;
  366. }
  367. protected:
  368. const std::vector<arrow::ValueDescr> InputsDescr_;
  369. TVector<std::unique_ptr<IBlockReader>> Readers_;
  370. TVector<NUdf::IBlockItemHasher::TPtr> Hashers_;
  371. TVector<NUdf::IBlockItemComparator::TPtr> Comparators_;
  372. TVector<IBlockTrimmer::TPtr> Trimmers_;
  373. std::vector<TBlock> Data_;
  374. size_t RowCount_ = 0;
  375. bool IsFinished_ = false;
  376. NUdf::TUnboxedValue Stream_;
  377. TUnboxedValueVector Inputs_;
  378. const TStringBuf ResourceTag_;
  379. };
  380. class TBlockStorageWrapper : public TMutableComputationNode<TBlockStorageWrapper> {
  381. using TBaseComputation = TMutableComputationNode<TBlockStorageWrapper>;
  382. public:
  383. TBlockStorageWrapper(
  384. TComputationMutables& mutables,
  385. TVector<TType*>&& itemTypes,
  386. IComputationNode* stream,
  387. const TStringBuf& resourceTag
  388. )
  389. : TBaseComputation(mutables, EValueRepresentation::Boxed)
  390. , ItemTypes_(std::move(itemTypes))
  391. , Stream_(stream)
  392. , ResourceTag_(resourceTag)
  393. {}
  394. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  395. return ctx.HolderFactory.Create<TBlockStorage>(
  396. ItemTypes_,
  397. std::move(Stream_->GetValue(ctx)),
  398. ResourceTag_,
  399. &ctx.ArrowMemoryPool
  400. );
  401. }
  402. private:
  403. void RegisterDependencies() const final {
  404. DependsOn(Stream_);
  405. }
  406. private:
  407. const TVector<TType*> ItemTypes_;
  408. IComputationNode* const Stream_;
  409. const TString ResourceTag_;
  410. };
  411. class TBlockIndex : public TComputationValue<TBlockIndex> {
  412. using TBase = TComputationValue<TBlockIndex>;
  413. struct TIndexNode {
  414. TBlockStorage::TRowEntry Entry;
  415. TIndexNode* Next;
  416. TIndexNode() = delete;
  417. TIndexNode(TBlockStorage::TRowEntry entry, TIndexNode* next = nullptr)
  418. : Entry(entry)
  419. , Next(next)
  420. {}
  421. };
  422. class TIndexMapValue {
  423. public:
  424. TIndexMapValue()
  425. : Raw(0)
  426. {}
  427. TIndexMapValue(TBlockStorage::TRowEntry entry) {
  428. TIndexEntryUnion un;
  429. un.Entry = entry;
  430. Y_ENSURE(((un.Raw << 1) >> 1) == un.Raw);
  431. Raw = (un.Raw << 1) | 1;
  432. }
  433. TIndexMapValue(TIndexNode* entryList)
  434. : EntryList(entryList)
  435. {}
  436. bool IsInplace() const {
  437. return Raw & 1;
  438. }
  439. TIndexNode* GetList() const {
  440. Y_ENSURE(!IsInplace());
  441. return EntryList;
  442. }
  443. TBlockStorage::TRowEntry GetEntry() const {
  444. Y_ENSURE(IsInplace());
  445. TIndexEntryUnion un;
  446. un.Raw = Raw >> 1;
  447. return un.Entry;
  448. }
  449. private:
  450. union TIndexEntryUnion {
  451. TBlockStorage::TRowEntry Entry;
  452. ui64 Raw;
  453. };
  454. union {
  455. TIndexNode* EntryList;
  456. ui64 Raw;
  457. };
  458. };
  459. using TIndexMap = TRobinHoodHashFixedMap<
  460. ui64,
  461. TIndexMapValue,
  462. std::equal_to<ui64>,
  463. std::hash<ui64>,
  464. TMKQLHugeAllocator<char>
  465. >;
  466. static_assert(sizeof(TIndexMapValue) == 8);
  467. static_assert(std::max(TIndexMap::GetCellSize(), static_cast<ui32>(sizeof(TIndexNode))) == BlockMapJoinIndexEntrySize);
  468. public:
  469. class TIterator {
  470. friend class TBlockIndex;
  471. enum class EIteratorType {
  472. EMPTY,
  473. INPLACE,
  474. LIST
  475. };
  476. public:
  477. TIterator() = default;
  478. TIterator(const TIterator&) = delete;
  479. TIterator& operator=(const TIterator&) = delete;
  480. TIterator(TIterator&& other) {
  481. *this = std::move(other);
  482. }
  483. TIterator& operator=(TIterator&& other) {
  484. if (this != &other) {
  485. Type_ = other.Type_;
  486. BlockIndex_ = other.BlockIndex_;
  487. ItemsToLookup_ = std::move(other.ItemsToLookup_);
  488. switch (Type_) {
  489. case EIteratorType::EMPTY:
  490. break;
  491. case EIteratorType::INPLACE:
  492. Entry_ = other.Entry_;
  493. EntryConsumed_ = other.EntryConsumed_;
  494. break;
  495. case EIteratorType::LIST:
  496. Node_ = other.Node_;
  497. break;
  498. }
  499. other.BlockIndex_ = nullptr;
  500. }
  501. return *this;
  502. }
  503. TMaybe<TBlockStorage::TRowEntry> Next() {
  504. Y_ENSURE(IsValid());
  505. switch (Type_) {
  506. case EIteratorType::EMPTY:
  507. return Nothing();
  508. case EIteratorType::INPLACE:
  509. if (EntryConsumed_) {
  510. return Nothing();
  511. }
  512. EntryConsumed_ = true;
  513. return BlockIndex_->IsKeyEquals(Entry_, ItemsToLookup_) ? TMaybe<TBlockStorage::TRowEntry>(Entry_) : Nothing();
  514. case EIteratorType::LIST:
  515. for (; Node_ != nullptr; Node_ = Node_->Next) {
  516. if (BlockIndex_->IsKeyEquals(Node_->Entry, ItemsToLookup_)) {
  517. auto entry = Node_->Entry;
  518. Node_ = Node_->Next;
  519. return entry;
  520. }
  521. }
  522. return Nothing();
  523. }
  524. }
  525. bool IsValid() const {
  526. return BlockIndex_;
  527. }
  528. bool IsEmpty() const {
  529. Y_ENSURE(IsValid());
  530. switch (Type_) {
  531. case EIteratorType::EMPTY:
  532. return true;
  533. case EIteratorType::INPLACE:
  534. return EntryConsumed_;
  535. case EIteratorType::LIST:
  536. return Node_ == nullptr;
  537. }
  538. }
  539. void Reset() {
  540. *this = TIterator();
  541. }
  542. private:
  543. TIterator(const TBlockIndex* blockIndex)
  544. : Type_(EIteratorType::EMPTY)
  545. , BlockIndex_(blockIndex)
  546. {}
  547. TIterator(const TBlockIndex* blockIndex, TBlockStorage::TRowEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
  548. : Type_(EIteratorType::INPLACE)
  549. , BlockIndex_(blockIndex)
  550. , Entry_(entry)
  551. , EntryConsumed_(false)
  552. , ItemsToLookup_(std::move(itemsToLookup))
  553. {}
  554. TIterator(const TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
  555. : Type_(EIteratorType::LIST)
  556. , BlockIndex_(blockIndex)
  557. , Node_(node)
  558. , ItemsToLookup_(std::move(itemsToLookup))
  559. {}
  560. private:
  561. EIteratorType Type_;
  562. const TBlockIndex* BlockIndex_ = nullptr;
  563. union {
  564. TIndexNode* Node_;
  565. struct {
  566. TBlockStorage::TRowEntry Entry_;
  567. bool EntryConsumed_;
  568. };
  569. };
  570. std::vector<NYql::NUdf::TBlockItem> ItemsToLookup_;
  571. };
  572. public:
  573. TBlockIndex(
  574. TMemoryUsageInfo* memInfo,
  575. const TVector<ui32>& keyColumns,
  576. NUdf::TUnboxedValue blockStorage,
  577. bool any,
  578. TStringBuf resourceTag
  579. )
  580. : TBase(memInfo)
  581. , KeyColumns_(keyColumns)
  582. , BlockStorage_(std::move(blockStorage))
  583. , Any_(any)
  584. , ResourceTag_(std::move(resourceTag))
  585. {}
  586. void BuildIndex() {
  587. if (Index_) {
  588. return;
  589. }
  590. auto& blockStorage = *static_cast<TBlockStorage*>(BlockStorage_.GetResource());
  591. Y_ENSURE(blockStorage.IsFinished(), "Index build should be done after all data has been read");
  592. Index_ = std::make_unique<TIndexMap>(CalculateRHHashTableCapacity(blockStorage.GetRowCount()));
  593. for (size_t blockOffset = 0; blockOffset < blockStorage.GetBlockCount(); blockOffset++) {
  594. const auto& block = blockStorage.GetBlock(blockOffset);
  595. auto blockSize = block.Size;
  596. std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> insertBatch;
  597. std::array<TBlockStorage::TRowEntry, PrefetchBatchSize> insertBatchEntries;
  598. std::array<std::vector<NYql::NUdf::TBlockItem>, PrefetchBatchSize> insertBatchKeys;
  599. ui32 insertBatchLen = 0;
  600. auto processInsertBatch = [&]() {
  601. Index_->BatchInsert({insertBatch.data(), insertBatchLen}, [&](size_t i, TIndexMap::iterator iter, bool isNew) {
  602. auto value = static_cast<TIndexMapValue*>(Index_->GetMutablePayload(iter));
  603. if (isNew) {
  604. // Store single entry inplace
  605. *value = TIndexMapValue(insertBatchEntries[i]);
  606. Index_->CheckGrow();
  607. } else {
  608. if (Any_ && ContainsKey(value, insertBatchKeys[i])) {
  609. return;
  610. }
  611. // Store as list
  612. if (value->IsInplace()) {
  613. *value = TIndexMapValue(InsertIndexNode(value->GetEntry()));
  614. }
  615. *value = TIndexMapValue(InsertIndexNode(insertBatchEntries[i], value->GetList()));
  616. }
  617. });
  618. };
  619. Y_ENSURE(blockOffset <= std::numeric_limits<ui32>::max());
  620. Y_ENSURE(blockSize <= std::numeric_limits<ui32>::max());
  621. for (size_t itemOffset = 0; itemOffset < blockSize; itemOffset++) {
  622. ui64 keyHash = GetKey(block, itemOffset, insertBatchKeys[insertBatchLen]);
  623. if (!keyHash) {
  624. continue;
  625. }
  626. insertBatchEntries[insertBatchLen] = TBlockStorage::TRowEntry(blockOffset, itemOffset);
  627. insertBatch[insertBatchLen].ConstructKey(keyHash);
  628. insertBatchLen++;
  629. if (insertBatchLen == PrefetchBatchSize) {
  630. processInsertBatch();
  631. insertBatchLen = 0;
  632. }
  633. }
  634. if (insertBatchLen > 0) {
  635. processInsertBatch();
  636. }
  637. }
  638. }
  639. template<typename TGetKey>
  640. void BatchLookup(size_t batchSize, std::array<TIterator, PrefetchBatchSize>& iterators, TGetKey&& getKey) {
  641. Y_ENSURE(batchSize <= PrefetchBatchSize);
  642. std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> lookupBatch;
  643. std::array<std::vector<NYql::NUdf::TBlockItem>, PrefetchBatchSize> itemsBatch;
  644. for (size_t i = 0; i < batchSize; i++) {
  645. const auto& [items, keyHash] = getKey(i);
  646. lookupBatch[i].ConstructKey(keyHash);
  647. itemsBatch[i] = items;
  648. }
  649. Index_->BatchLookup({lookupBatch.data(), batchSize}, [&](size_t i, TIndexMap::iterator iter) {
  650. if (!iter) {
  651. // Empty iterator
  652. iterators[i] = TIterator(this);
  653. return;
  654. }
  655. auto value = static_cast<const TIndexMapValue*>(Index_->GetPayload(iter));
  656. if (value->IsInplace()) {
  657. iterators[i] = TIterator(this, value->GetEntry(), std::move(itemsBatch[i]));
  658. } else {
  659. iterators[i] = TIterator(this, value->GetList(), std::move(itemsBatch[i]));
  660. }
  661. });
  662. }
  663. bool IsKeyEquals(TBlockStorage::TRowEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
  664. auto& blockStorage = *static_cast<TBlockStorage*>(BlockStorage_.GetResource());
  665. Y_ENSURE(keyItems.size() == KeyColumns_.size());
  666. for (size_t i = 0; i < KeyColumns_.size(); i++) {
  667. auto indexItem = blockStorage.GetItem(entry, KeyColumns_[i]);
  668. if (blockStorage.GetItemComparators()[KeyColumns_[i]]->Equals(indexItem, keyItems[i])) {
  669. return true;
  670. }
  671. }
  672. return false;
  673. }
  674. const NUdf::TUnboxedValue& GetBlockStorage() const {
  675. return BlockStorage_;
  676. }
  677. private:
  678. ui64 GetKey(const TBlockStorage::TBlock& block, size_t offset, std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
  679. auto& blockStorage = *static_cast<TBlockStorage*>(BlockStorage_.GetResource());
  680. ui64 keyHash = 0;
  681. keyItems.clear();
  682. for (ui32 keyColumn : KeyColumns_) {
  683. auto item = blockStorage.GetItemFromBlock(block, keyColumn, offset);
  684. if (!item) {
  685. keyItems.clear();
  686. return 0;
  687. }
  688. keyHash = CombineHashes(keyHash, blockStorage.GetItemHashers()[keyColumn]->Hash(item));
  689. keyItems.push_back(std::move(item));
  690. }
  691. return keyHash;
  692. }
  693. TIndexNode* InsertIndexNode(TBlockStorage::TRowEntry entry, TIndexNode* currentHead = nullptr) {
  694. return &IndexNodes_.emplace_back(entry, currentHead);
  695. }
  696. bool ContainsKey(const TIndexMapValue* chain, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
  697. if (chain->IsInplace()) {
  698. return IsKeyEquals(chain->GetEntry(), keyItems);
  699. } else {
  700. for (TIndexNode* node = chain->GetList(); node != nullptr; node = node->Next) {
  701. if (IsKeyEquals(node->Entry, keyItems)) {
  702. return true;
  703. }
  704. node = node->Next;
  705. }
  706. return false;
  707. }
  708. }
  709. NUdf::TStringRef GetResourceTag() const override {
  710. return NUdf::TStringRef(ResourceTag_);
  711. }
  712. void* GetResource() override {
  713. return this;
  714. }
  715. private:
  716. const TVector<ui32>& KeyColumns_;
  717. NUdf::TUnboxedValue BlockStorage_;
  718. std::unique_ptr<TIndexMap> Index_;
  719. std::deque<TIndexNode> IndexNodes_;
  720. const bool Any_;
  721. const TStringBuf ResourceTag_;
  722. };
  723. class TBlockMapJoinIndexWrapper : public TMutableComputationNode<TBlockMapJoinIndexWrapper> {
  724. using TBaseComputation = TMutableComputationNode<TBlockMapJoinIndexWrapper>;
  725. public:
  726. TBlockMapJoinIndexWrapper(
  727. TComputationMutables& mutables,
  728. TVector<ui32>&& keyColumns,
  729. IComputationNode* blockStorage,
  730. bool any,
  731. const TStringBuf& resourceTag
  732. )
  733. : TBaseComputation(mutables, EValueRepresentation::Boxed)
  734. , KeyColumns_(std::move(keyColumns))
  735. , BlockStorage_(blockStorage)
  736. , Any_(any)
  737. , ResourceTag_(resourceTag)
  738. {}
  739. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  740. return ctx.HolderFactory.Create<TBlockIndex>(
  741. KeyColumns_,
  742. std::move(BlockStorage_->GetValue(ctx)),
  743. Any_,
  744. ResourceTag_
  745. );
  746. }
  747. private:
  748. void RegisterDependencies() const final {
  749. DependsOn(BlockStorage_);
  750. }
  751. private:
  752. const TVector<ui32> KeyColumns_;
  753. IComputationNode* const BlockStorage_;
  754. const bool Any_;
  755. const TString ResourceTag_;
  756. };
  757. template <bool WithoutRight, bool RightRequired>
  758. class TBlockMapJoinCoreWraper : public TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired>>
  759. {
  760. using TBaseComputation = TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired>>;
  761. using TJoinState = TBlockJoinState<RightRequired>;
  762. using TStorageState = TBlockStorage;
  763. using TIndexState = TBlockIndex;
  764. public:
  765. TBlockMapJoinCoreWraper(
  766. TComputationMutables& mutables,
  767. const TVector<TType*>&& resultItemTypes,
  768. const TVector<TType*>&& leftItemTypes,
  769. const TVector<ui32>&& leftKeyColumns,
  770. const TVector<ui32>&& leftIOMap,
  771. const TVector<ui32>&& rightIOMap,
  772. IComputationNode* leftStream,
  773. IComputationNode* rightBlockIndex
  774. )
  775. : TBaseComputation(mutables, EValueRepresentation::Boxed)
  776. , ResultItemTypes_(std::move(resultItemTypes))
  777. , LeftItemTypes_(std::move(leftItemTypes))
  778. , LeftKeyColumns_(std::move(leftKeyColumns))
  779. , LeftIOMap_(std::move(leftIOMap))
  780. , RightIOMap_(std::move(rightIOMap))
  781. , LeftStream_(leftStream)
  782. , RightBlockIndex_(rightBlockIndex)
  783. , KeyTupleCache_(mutables)
  784. {}
  785. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  786. const auto joinState = ctx.HolderFactory.Create<TJoinState>(
  787. ctx,
  788. LeftItemTypes_,
  789. LeftIOMap_,
  790. ResultItemTypes_
  791. );
  792. return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory,
  793. std::move(joinState),
  794. LeftKeyColumns_,
  795. RightIOMap_,
  796. std::move(LeftStream_->GetValue(ctx)),
  797. std::move(RightBlockIndex_->GetValue(ctx))
  798. );
  799. }
  800. private:
  801. class TStreamValue : public TComputationValue<TStreamValue> {
  802. using TBase = TComputationValue<TStreamValue>;
  803. public:
  804. TStreamValue(
  805. TMemoryUsageInfo* memInfo,
  806. const THolderFactory& holderFactory,
  807. NUdf::TUnboxedValue&& joinState,
  808. const TVector<ui32>& leftKeyColumns,
  809. const TVector<ui32>& rightIOMap,
  810. NUdf::TUnboxedValue&& leftStream,
  811. NUdf::TUnboxedValue&& rightBlockIndex
  812. )
  813. : TBase(memInfo)
  814. , JoinState_(joinState)
  815. , LeftKeyColumns_(leftKeyColumns)
  816. , RightIOMap_(rightIOMap)
  817. , LeftStream_(leftStream)
  818. , RightBlockIndex_(rightBlockIndex)
  819. , HolderFactory_(holderFactory)
  820. {}
  821. private:
  822. NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
  823. auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get());
  824. auto& indexState = *static_cast<TIndexState*>(RightBlockIndex_.GetResource());
  825. auto& storageState = *static_cast<TStorageState*>(indexState.GetBlockStorage().GetResource());
  826. if (!RightStreamConsumed_) {
  827. auto fetchStatus = NUdf::EFetchStatus::Ok;
  828. while (fetchStatus != NUdf::EFetchStatus::Finish) {
  829. fetchStatus = storageState.FetchStream();
  830. if (fetchStatus == NUdf::EFetchStatus::Yield) {
  831. return NUdf::EFetchStatus::Yield;
  832. }
  833. }
  834. indexState.BuildIndex();
  835. RightStreamConsumed_ = true;
  836. }
  837. auto* inputFields = joinState.GetRawInputFields();
  838. const size_t inputWidth = joinState.GetInputWidth();
  839. const size_t outputWidth = joinState.GetOutputWidth();
  840. MKQL_ENSURE(width == outputWidth,
  841. "The given width doesn't equal to the result type size");
  842. std::vector<NYql::NUdf::TBlockItem> leftKeyColumns(LeftKeyColumns_.size());
  843. std::vector<ui64> leftKeyColumnHashes(LeftKeyColumns_.size());
  844. std::vector<NYql::NUdf::TBlockItem> rightRow(RightIOMap_.size());
  845. while (!joinState.HasBlocks()) {
  846. while (joinState.IsNotFull() && LookupBatchCurrent_ < LookupBatchSize_) {
  847. auto& iter = LookupBatchIterators_[LookupBatchCurrent_];
  848. if constexpr (WithoutRight) {
  849. if (bool(iter.IsEmpty()) != RightRequired) {
  850. joinState.CopyRow();
  851. }
  852. joinState.NextRow();
  853. LookupBatchCurrent_++;
  854. continue;
  855. } else if constexpr (!RightRequired) {
  856. if (iter.IsEmpty()) {
  857. joinState.MakeRow(std::vector<NYql::NUdf::TBlockItem>());
  858. joinState.NextRow();
  859. LookupBatchCurrent_++;
  860. continue;
  861. }
  862. }
  863. while (joinState.IsNotFull() && !iter.IsEmpty()) {
  864. auto key = iter.Next();
  865. storageState.GetRow(*key, RightIOMap_, rightRow);
  866. joinState.MakeRow(rightRow);
  867. }
  868. if (iter.IsEmpty()) {
  869. joinState.NextRow();
  870. LookupBatchCurrent_++;
  871. }
  872. }
  873. if (joinState.IsNotFull() && joinState.RemainingRowsCount() > 0) {
  874. LookupBatchSize_ = std::min(PrefetchBatchSize, static_cast<ui32>(joinState.RemainingRowsCount()));
  875. indexState.BatchLookup(LookupBatchSize_, LookupBatchIterators_, [&](size_t i) {
  876. MakeLeftKeys(leftKeyColumns, leftKeyColumnHashes, i);
  877. ui64 keyHash = CalculateTupleHash(leftKeyColumnHashes);
  878. return std::make_pair(std::ref(leftKeyColumns), keyHash);
  879. });
  880. LookupBatchCurrent_ = 0;
  881. continue;
  882. }
  883. if (joinState.IsNotFull() && !joinState.IsFinished()) {
  884. switch (LeftStream_.WideFetch(inputFields, inputWidth)) {
  885. case NUdf::EFetchStatus::Yield:
  886. return NUdf::EFetchStatus::Yield;
  887. case NUdf::EFetchStatus::Ok:
  888. joinState.Reset();
  889. continue;
  890. case NUdf::EFetchStatus::Finish:
  891. joinState.Finish();
  892. break;
  893. }
  894. // Leave the loop, if no values left in the stream.
  895. Y_DEBUG_ABORT_UNLESS(joinState.IsFinished());
  896. }
  897. if (joinState.IsEmpty()) {
  898. return NUdf::EFetchStatus::Finish;
  899. }
  900. joinState.MakeBlocks(HolderFactory_);
  901. }
  902. const auto sliceSize = joinState.Slice();
  903. for (size_t i = 0; i < outputWidth; i++) {
  904. output[i] = joinState.Get(sliceSize, HolderFactory_, i);
  905. }
  906. return NUdf::EFetchStatus::Ok;
  907. }
  908. void MakeLeftKeys(std::vector<NYql::NUdf::TBlockItem>& items, std::vector<ui64>& hashes, size_t offset) const {
  909. auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get());
  910. Y_ENSURE(items.size() == LeftKeyColumns_.size());
  911. Y_ENSURE(hashes.size() == LeftKeyColumns_.size());
  912. for (size_t i = 0; i < LeftKeyColumns_.size(); i++) {
  913. std::tie(items[i], hashes[i]) = joinState.GetItemWithHash(LeftKeyColumns_[i], offset);
  914. }
  915. }
  916. NUdf::TUnboxedValue JoinState_;
  917. const TVector<ui32>& LeftKeyColumns_;
  918. const TVector<ui32>& RightIOMap_;
  919. bool RightStreamConsumed_ = false;
  920. std::array<typename TIndexState::TIterator, PrefetchBatchSize> LookupBatchIterators_;
  921. ui32 LookupBatchCurrent_ = 0;
  922. ui32 LookupBatchSize_ = 0;
  923. NUdf::TUnboxedValue LeftStream_;
  924. NUdf::TUnboxedValue RightBlockIndex_;
  925. const THolderFactory& HolderFactory_;
  926. };
  927. void RegisterDependencies() const final {
  928. this->DependsOn(LeftStream_);
  929. this->DependsOn(RightBlockIndex_);
  930. }
  931. private:
  932. const TVector<TType*> ResultItemTypes_;
  933. const TVector<TType*> LeftItemTypes_;
  934. const TVector<ui32> LeftKeyColumns_;
  935. const TVector<ui32> LeftIOMap_;
  936. const TVector<ui32> RightIOMap_;
  937. IComputationNode* const LeftStream_;
  938. IComputationNode* const RightBlockIndex_;
  939. const TContainerCacheOnContext KeyTupleCache_;
  940. };
  941. class TBlockCrossJoinCoreWraper : public TMutableComputationNode<TBlockCrossJoinCoreWraper>
  942. {
  943. using TBaseComputation = TMutableComputationNode<TBlockCrossJoinCoreWraper>;
  944. using TJoinState = TBlockJoinState<true>;
  945. using TStorageState = TBlockStorage;
  946. public:
  947. TBlockCrossJoinCoreWraper(
  948. TComputationMutables& mutables,
  949. const TVector<TType*>&& resultItemTypes,
  950. const TVector<TType*>&& leftItemTypes,
  951. const TVector<ui32>&& leftIOMap,
  952. const TVector<ui32>&& rightIOMap,
  953. IComputationNode* leftStream,
  954. IComputationNode* rightBlockStorage
  955. )
  956. : TBaseComputation(mutables, EValueRepresentation::Boxed)
  957. , ResultItemTypes_(std::move(resultItemTypes))
  958. , LeftItemTypes_(std::move(leftItemTypes))
  959. , LeftIOMap_(std::move(leftIOMap))
  960. , RightIOMap_(std::move(rightIOMap))
  961. , LeftStream_(std::move(leftStream))
  962. , RightBlockStorage_(std::move(rightBlockStorage))
  963. , KeyTupleCache_(mutables)
  964. {}
  965. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  966. const auto joinState = ctx.HolderFactory.Create<TJoinState>(
  967. ctx,
  968. LeftItemTypes_,
  969. LeftIOMap_,
  970. ResultItemTypes_
  971. );
  972. return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory,
  973. std::move(joinState),
  974. RightIOMap_,
  975. std::move(LeftStream_->GetValue(ctx)),
  976. std::move(RightBlockStorage_->GetValue(ctx))
  977. );
  978. }
  979. private:
  980. class TStreamValue : public TComputationValue<TStreamValue> {
  981. using TBase = TComputationValue<TStreamValue>;
  982. public:
  983. TStreamValue(
  984. TMemoryUsageInfo* memInfo,
  985. const THolderFactory& holderFactory,
  986. NUdf::TUnboxedValue&& joinState,
  987. const TVector<ui32>& rightIOMap,
  988. NUdf::TUnboxedValue&& leftStream,
  989. NUdf::TUnboxedValue&& rightBlockStorage
  990. )
  991. : TBase(memInfo)
  992. , JoinState_(joinState)
  993. , RightIOMap_(rightIOMap)
  994. , LeftStream_(leftStream)
  995. , RightBlockStorage_(rightBlockStorage)
  996. , HolderFactory_(holderFactory)
  997. {}
  998. private:
  999. NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
  1000. auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get());
  1001. auto& storageState = *static_cast<TStorageState*>(RightBlockStorage_.GetResource());
  1002. if (!RightStreamConsumed_) {
  1003. auto fetchStatus = NUdf::EFetchStatus::Ok;
  1004. while (fetchStatus != NUdf::EFetchStatus::Finish) {
  1005. fetchStatus = storageState.FetchStream();
  1006. if (fetchStatus == NUdf::EFetchStatus::Yield) {
  1007. return NUdf::EFetchStatus::Yield;
  1008. }
  1009. }
  1010. RightStreamConsumed_ = true;
  1011. RightRowIterator_ = storageState.GetRowIterator();
  1012. }
  1013. auto* inputFields = joinState.GetRawInputFields();
  1014. const size_t inputWidth = joinState.GetInputWidth();
  1015. const size_t outputWidth = joinState.GetOutputWidth();
  1016. MKQL_ENSURE(width == outputWidth,
  1017. "The given width doesn't equal to the result type size");
  1018. std::vector<NYql::NUdf::TBlockItem> rightRow(RightIOMap_.size());
  1019. while (!joinState.HasBlocks()) {
  1020. while (!RightRowIterator_.IsEmpty() && joinState.RemainingRowsCount() > 0 && joinState.IsNotFull()) {
  1021. auto rowEntry = *RightRowIterator_.Next();
  1022. storageState.GetRow(rowEntry, RightIOMap_, rightRow);
  1023. joinState.MakeRow(rightRow);
  1024. }
  1025. if (joinState.IsNotFull() && joinState.RemainingRowsCount() > 0) {
  1026. joinState.NextRow();
  1027. RightRowIterator_ = storageState.GetRowIterator();
  1028. continue;
  1029. }
  1030. if (joinState.IsNotFull() && !joinState.IsFinished()) {
  1031. switch (LeftStream_.WideFetch(inputFields, inputWidth)) {
  1032. case NUdf::EFetchStatus::Yield:
  1033. return NUdf::EFetchStatus::Yield;
  1034. case NUdf::EFetchStatus::Ok:
  1035. joinState.Reset();
  1036. continue;
  1037. case NUdf::EFetchStatus::Finish:
  1038. joinState.Finish();
  1039. break;
  1040. }
  1041. // Leave the loop, if no values left in the stream.
  1042. Y_DEBUG_ABORT_UNLESS(joinState.IsFinished());
  1043. }
  1044. if (joinState.IsEmpty()) {
  1045. return NUdf::EFetchStatus::Finish;
  1046. }
  1047. joinState.MakeBlocks(HolderFactory_);
  1048. }
  1049. const auto sliceSize = joinState.Slice();
  1050. for (size_t i = 0; i < outputWidth; i++) {
  1051. output[i] = joinState.Get(sliceSize, HolderFactory_, i);
  1052. }
  1053. return NUdf::EFetchStatus::Ok;
  1054. }
  1055. NUdf::TUnboxedValue JoinState_;
  1056. const TVector<ui32>& RightIOMap_;
  1057. bool RightStreamConsumed_ = false;
  1058. TStorageState::TRowIterator RightRowIterator_;
  1059. NUdf::TUnboxedValue LeftStream_;
  1060. NUdf::TUnboxedValue RightBlockStorage_;
  1061. const THolderFactory& HolderFactory_;
  1062. };
  1063. void RegisterDependencies() const final {
  1064. this->DependsOn(LeftStream_);
  1065. this->DependsOn(RightBlockStorage_);
  1066. }
  1067. private:
  1068. const TVector<TType*> ResultItemTypes_;
  1069. const TVector<TType*> LeftItemTypes_;
  1070. const TVector<ui32> LeftIOMap_;
  1071. const TVector<ui32> RightIOMap_;
  1072. IComputationNode* const LeftStream_;
  1073. IComputationNode* const RightBlockStorage_;
  1074. const TContainerCacheOnContext KeyTupleCache_;
  1075. };
  1076. } // namespace
  1077. IComputationNode* WrapBlockStorage(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  1078. MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 arg");
  1079. const auto resultType = callable.GetType()->GetReturnType();
  1080. MKQL_ENSURE(resultType->IsResource(), "Expected Resource as a result type");
  1081. auto resultResourceType = AS_TYPE(TResourceType, resultType);
  1082. MKQL_ENSURE(resultResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource");
  1083. const auto inputType = callable.GetInput(0).GetStaticType();
  1084. MKQL_ENSURE(inputType->IsStream(), "Expected WideStream as an input stream");
  1085. const auto inputStreamType = AS_TYPE(TStreamType, inputType);
  1086. MKQL_ENSURE(inputStreamType->GetItemType()->IsMulti(),
  1087. "Expected Multi as a left stream item type");
  1088. const auto inputStreamComponents = GetWideComponents(inputStreamType);
  1089. MKQL_ENSURE(inputStreamComponents.size() > 0, "Expected at least one column");
  1090. TVector<TType*> inputStreamItems(inputStreamComponents.cbegin(), inputStreamComponents.cend());
  1091. const auto inputStream = LocateNode(ctx.NodeLocator, callable, 0);
  1092. return new TBlockStorageWrapper(
  1093. ctx.Mutables,
  1094. std::move(inputStreamItems),
  1095. inputStream,
  1096. resultResourceType->GetTag()
  1097. );
  1098. }
  1099. IComputationNode* WrapBlockMapJoinIndex(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  1100. MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args");
  1101. const auto resultType = callable.GetType()->GetReturnType();
  1102. MKQL_ENSURE(resultType->IsResource(), "Expected Resource as a result type");
  1103. auto resultResourceType = AS_TYPE(TResourceType, resultType);
  1104. MKQL_ENSURE(resultResourceType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource");
  1105. const auto inputType = callable.GetInput(0).GetStaticType();
  1106. MKQL_ENSURE(inputType->IsResource(), "Expected Resource as an input type");
  1107. auto inputResourceType = AS_TYPE(TResourceType, inputType);
  1108. MKQL_ENSURE(inputResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource");
  1109. auto origInputItemType = AS_VALUE(TTypeType, callable.GetInput(1));
  1110. MKQL_ENSURE(origInputItemType->IsMulti(), "Expected Multi as an input item type");
  1111. const auto streamComponents = AS_TYPE(TMultiType, origInputItemType)->GetElements();
  1112. MKQL_ENSURE(streamComponents.size() > 0, "Expected at least one column");
  1113. const auto keyColumnsLiteral = callable.GetInput(2);
  1114. const auto keyColumnsTuple = AS_VALUE(TTupleLiteral, keyColumnsLiteral);
  1115. TVector<ui32> keyColumns;
  1116. keyColumns.reserve(keyColumnsTuple->GetValuesCount());
  1117. for (ui32 i = 0; i < keyColumnsTuple->GetValuesCount(); i++) {
  1118. const auto item = AS_VALUE(TDataLiteral, keyColumnsTuple->GetValue(i));
  1119. keyColumns.emplace_back(item->AsValue().Get<ui32>());
  1120. }
  1121. for (ui32 keyColumn : keyColumns) {
  1122. MKQL_ENSURE(keyColumn < streamComponents.size() - 1, "Key column out of range");
  1123. }
  1124. const auto anyNode = callable.GetInput(3);
  1125. const auto any = AS_VALUE(TDataLiteral, anyNode)->AsValue().Get<bool>();
  1126. const auto blockStorage = LocateNode(ctx.NodeLocator, callable, 0);
  1127. return new TBlockMapJoinIndexWrapper(
  1128. ctx.Mutables,
  1129. std::move(keyColumns),
  1130. blockStorage,
  1131. any,
  1132. resultResourceType->GetTag()
  1133. );
  1134. }
  1135. IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  1136. MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args");
  1137. const auto joinType = callable.GetType()->GetReturnType();
  1138. MKQL_ENSURE(joinType->IsStream(), "Expected WideStream as a resulting stream");
  1139. const auto joinStreamType = AS_TYPE(TStreamType, joinType);
  1140. MKQL_ENSURE(joinStreamType->GetItemType()->IsMulti(),
  1141. "Expected Multi as a resulting item type");
  1142. const auto joinComponents = GetWideComponents(joinStreamType);
  1143. MKQL_ENSURE(joinComponents.size() > 0, "Expected at least one column");
  1144. const TVector<TType*> joinItems(joinComponents.cbegin(), joinComponents.cend());
  1145. const auto leftType = callable.GetInput(0).GetStaticType();
  1146. MKQL_ENSURE(leftType->IsStream(), "Expected WideStream as a left stream");
  1147. const auto leftStreamType = AS_TYPE(TStreamType, leftType);
  1148. MKQL_ENSURE(leftStreamType->GetItemType()->IsMulti(),
  1149. "Expected Multi as a left stream item type");
  1150. const auto leftStreamComponents = GetWideComponents(leftStreamType);
  1151. MKQL_ENSURE(leftStreamComponents.size() > 0, "Expected at least one column");
  1152. const TVector<TType*> leftStreamItems(leftStreamComponents.cbegin(), leftStreamComponents.cend());
  1153. const auto joinKindNode = callable.GetInput(3);
  1154. const auto rawKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>();
  1155. const auto joinKind = GetJoinKind(rawKind);
  1156. Y_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left ||
  1157. joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly || joinKind == EJoinKind::Cross);
  1158. const auto rightBlockStorageType = callable.GetInput(1).GetStaticType();
  1159. MKQL_ENSURE(rightBlockStorageType->IsResource(), "Expected Resource as a right type");
  1160. auto rightBlockStorageResourceType = AS_TYPE(TResourceType, rightBlockStorageType);
  1161. if (joinKind != EJoinKind::Cross) {
  1162. MKQL_ENSURE(rightBlockStorageResourceType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource");
  1163. } else {
  1164. MKQL_ENSURE(rightBlockStorageResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource");
  1165. }
  1166. auto origRightItemType = AS_VALUE(TTypeType, callable.GetInput(2));
  1167. MKQL_ENSURE(origRightItemType->IsMulti(), "Expected Multi as a right stream item type");
  1168. const auto rightStreamComponents = AS_TYPE(TMultiType, origRightItemType)->GetElements();
  1169. MKQL_ENSURE(rightStreamComponents.size() > 0, "Expected at least one column");
  1170. const TVector<TType*> rightStreamItems(rightStreamComponents.cbegin(), rightStreamComponents.cend());
  1171. const auto leftKeyColumnsLiteral = callable.GetInput(4);
  1172. const auto leftKeyColumnsTuple = AS_VALUE(TTupleLiteral, leftKeyColumnsLiteral);
  1173. TVector<ui32> leftKeyColumns;
  1174. leftKeyColumns.reserve(leftKeyColumnsTuple->GetValuesCount());
  1175. for (ui32 i = 0; i < leftKeyColumnsTuple->GetValuesCount(); i++) {
  1176. const auto item = AS_VALUE(TDataLiteral, leftKeyColumnsTuple->GetValue(i));
  1177. leftKeyColumns.emplace_back(item->AsValue().Get<ui32>());
  1178. }
  1179. const THashSet<ui32> leftKeySet(leftKeyColumns.cbegin(), leftKeyColumns.cend());
  1180. const auto leftKeyDropsLiteral = callable.GetInput(5);
  1181. const auto leftKeyDropsTuple = AS_VALUE(TTupleLiteral, leftKeyDropsLiteral);
  1182. THashSet<ui32> leftKeyDrops;
  1183. leftKeyDrops.reserve(leftKeyDropsTuple->GetValuesCount());
  1184. for (ui32 i = 0; i < leftKeyDropsTuple->GetValuesCount(); i++) {
  1185. const auto item = AS_VALUE(TDataLiteral, leftKeyDropsTuple->GetValue(i));
  1186. leftKeyDrops.emplace(item->AsValue().Get<ui32>());
  1187. }
  1188. for (const auto& drop : leftKeyDrops) {
  1189. MKQL_ENSURE(leftKeySet.contains(drop),
  1190. "Only key columns has to be specified in drop column set");
  1191. }
  1192. const auto rightKeyColumnsLiteral = callable.GetInput(6);
  1193. const auto rightKeyColumnsTuple = AS_VALUE(TTupleLiteral, rightKeyColumnsLiteral);
  1194. TVector<ui32> rightKeyColumns;
  1195. rightKeyColumns.reserve(rightKeyColumnsTuple->GetValuesCount());
  1196. for (ui32 i = 0; i < rightKeyColumnsTuple->GetValuesCount(); i++) {
  1197. const auto item = AS_VALUE(TDataLiteral, rightKeyColumnsTuple->GetValue(i));
  1198. rightKeyColumns.emplace_back(item->AsValue().Get<ui32>());
  1199. }
  1200. const THashSet<ui32> rightKeySet(rightKeyColumns.cbegin(), rightKeyColumns.cend());
  1201. const auto rightKeyDropsLiteral = callable.GetInput(7);
  1202. const auto rightKeyDropsTuple = AS_VALUE(TTupleLiteral, rightKeyDropsLiteral);
  1203. THashSet<ui32> rightKeyDrops;
  1204. rightKeyDrops.reserve(rightKeyDropsTuple->GetValuesCount());
  1205. for (ui32 i = 0; i < rightKeyDropsTuple->GetValuesCount(); i++) {
  1206. const auto item = AS_VALUE(TDataLiteral, rightKeyDropsTuple->GetValue(i));
  1207. rightKeyDrops.emplace(item->AsValue().Get<ui32>());
  1208. }
  1209. for (const auto& drop : rightKeyDrops) {
  1210. MKQL_ENSURE(rightKeySet.contains(drop),
  1211. "Only key columns has to be specified in drop column set");
  1212. }
  1213. if (joinKind == EJoinKind::Cross) {
  1214. MKQL_ENSURE(leftKeyColumns.empty() && leftKeyDrops.empty() && rightKeyColumns.empty() && rightKeyDrops.empty(),
  1215. "Specifying key columns is not allowed for cross join");
  1216. }
  1217. MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Key columns mismatch");
  1218. // XXX: Mind the last wide item, containing block length.
  1219. TVector<ui32> leftIOMap;
  1220. for (size_t i = 0; i < leftStreamItems.size() - 1; i++) {
  1221. if (leftKeyDrops.contains(i)) {
  1222. continue;
  1223. }
  1224. leftIOMap.push_back(i);
  1225. }
  1226. // XXX: Mind the last wide item, containing block length.
  1227. TVector<ui32> rightIOMap;
  1228. if (joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left || joinKind == EJoinKind::Cross) {
  1229. for (size_t i = 0; i < rightStreamItems.size() - 1; i++) {
  1230. if (rightKeyDrops.contains(i)) {
  1231. continue;
  1232. }
  1233. rightIOMap.push_back(i);
  1234. }
  1235. } else {
  1236. MKQL_ENSURE(rightKeyDrops.empty(), "Right key drops are not allowed for semi/only join");
  1237. }
  1238. const auto leftStream = LocateNode(ctx.NodeLocator, callable, 0);
  1239. const auto rightBlockStorage = LocateNode(ctx.NodeLocator, callable, 1);
  1240. #define JOIN_WRAPPER(WITHOUT_RIGHT, RIGHT_REQUIRED) \
  1241. return new TBlockMapJoinCoreWraper<WITHOUT_RIGHT, RIGHT_REQUIRED>( \
  1242. ctx.Mutables, \
  1243. std::move(joinItems), \
  1244. std::move(leftStreamItems), \
  1245. std::move(leftKeyColumns), \
  1246. std::move(leftIOMap), \
  1247. std::move(rightIOMap), \
  1248. leftStream, \
  1249. rightBlockStorage \
  1250. )
  1251. switch (joinKind) {
  1252. case EJoinKind::Inner:
  1253. JOIN_WRAPPER(false, true);
  1254. case EJoinKind::Left:
  1255. JOIN_WRAPPER(false, false);
  1256. case EJoinKind::LeftSemi:
  1257. MKQL_ENSURE(rightIOMap.empty(), "Can't access right table on left semi join");
  1258. JOIN_WRAPPER(true, true);
  1259. case EJoinKind::LeftOnly:
  1260. MKQL_ENSURE(rightIOMap.empty(), "Can't access right table on left only join");
  1261. JOIN_WRAPPER(true, false);
  1262. case EJoinKind::Cross:
  1263. return new TBlockCrossJoinCoreWraper(
  1264. ctx.Mutables,
  1265. std::move(joinItems),
  1266. std::move(leftStreamItems),
  1267. std::move(leftIOMap),
  1268. std::move(rightIOMap),
  1269. leftStream,
  1270. rightBlockStorage
  1271. );
  1272. default:
  1273. /* TODO: Display the human-readable join kind name. */
  1274. MKQL_ENSURE(false, "BlockMapJoinCore doesn't support join type #"
  1275. << static_cast<ui32>(joinKind));
  1276. }
  1277. #undef JOIN_WRAPPER
  1278. }
  1279. } // namespace NMiniKQL
  1280. } // namespace NKikimr