Browse Source

Block map join memory optimizations

* Split block index into separate storage and index parts
* BlockMapJoinCore index memory reservation
* Use custom mkql allocator for the index table in BlockMapJoinCore
commit_hash:aaf0c5c68925a61b602f0c39e56cce67d8be17c2
ziganshinmr 1 month ago
parent
commit
3cfd089f58

+ 219 - 166
yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp

@@ -239,24 +239,147 @@ private:
     TVector<NYql::NUdf::IBlockItemHasher::TPtr> Hashers_;
 };
 
-class TBlockIndex : public TComputationValue<TBlockIndex> {
-    struct TIndexEntry {
+template <typename TDerived>
+class TBlockStorageBase : public TComputationValue<TDerived> {
+    using TBase = TComputationValue<TDerived>;
+
+public:
+    struct TBlock {
+        size_t Size;
+        std::vector<arrow::Datum> Columns;
+
+        TBlock() = default;
+        TBlock(size_t size, std::vector<arrow::Datum> columns)
+            : Size(size)
+            , Columns(std::move(columns))
+        {}
+    };
+
+    struct TRowEntry {
         ui32 BlockOffset;
         ui32 ItemOffset;
 
-        TIndexEntry() = default;
-        TIndexEntry(ui32 blockOffset, ui32 itemOffset)
+        TRowEntry() = default;
+        TRowEntry(ui32 blockOffset, ui32 itemOffset)
             : BlockOffset(blockOffset)
             , ItemOffset(itemOffset)
         {}
     };
 
+    TBlockStorageBase(
+        TMemoryUsageInfo* memInfo,
+        const TVector<TType*>& itemTypes,
+        NUdf::TUnboxedValue stream,
+        arrow::MemoryPool* pool
+    )
+        : TBase(memInfo)
+        , InputsDescr_(ToValueDescr(itemTypes))
+        , Stream_(stream)
+        , Inputs_(itemTypes.size())
+    {
+        TBlockTypeHelper helper;
+        for (size_t i = 0; i < itemTypes.size(); i++) {
+            TType* blockItemType = AS_TYPE(TBlockType, itemTypes[i])->GetItemType();
+            Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
+            Hashers_.push_back(helper.MakeHasher(blockItemType));
+            Comparators_.push_back(helper.MakeComparator(blockItemType));
+            Trimmers_.push_back(MakeBlockTrimmer(TTypeInfoHelper(), blockItemType, pool));
+        }
+    }
+
+    NUdf::EFetchStatus FetchStream() {
+        switch (Stream_.WideFetch(Inputs_.data(), Inputs_.size())) {
+        case NUdf::EFetchStatus::Yield:
+            return NUdf::EFetchStatus::Yield;
+        case NUdf::EFetchStatus::Finish:
+            return NUdf::EFetchStatus::Finish;
+        case NUdf::EFetchStatus::Ok:
+            break;
+        }
+
+        std::vector<arrow::Datum> blockColumns;
+        for (size_t i = 0; i < Inputs_.size() - 1; i++) {
+            auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum();
+            ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[i], datum.descr());
+            if (datum.is_scalar()) {
+                blockColumns.push_back(datum);
+            } else {
+                MKQL_ENSURE(datum.is_array(), "Expecting array");
+                blockColumns.push_back(Trimmers_[i]->Trim(datum.array()));
+            }
+        }
+
+        auto blockSize = ::GetBlockCount(Inputs_[Inputs_.size() - 1]);
+        Data_.emplace_back(blockSize, std::move(blockColumns));
+        RowCount_ += blockSize;
+
+        return NUdf::EFetchStatus::Ok;
+    }
+
+    const TBlock& GetBlock(size_t blockOffset) const {
+        Y_ENSURE(blockOffset < GetBlockCount());
+        return Data_[blockOffset];
+    }
+
+    size_t GetBlockCount() const {
+        return Data_.size();
+    }
+
+    TBlockItem GetItem(TRowEntry entry, ui32 columnIdx) const {
+        Y_ENSURE(columnIdx < Inputs_.size() - 1);
+        return GetItemFromBlock(GetBlock(entry.BlockOffset), columnIdx, entry.ItemOffset);
+    }
+
+    void GetRow(TRowEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) const {
+        Y_ENSURE(row.size() == ioMap.size());
+        for (size_t i = 0; i < row.size(); i++) {
+            row[i] = GetItem(entry, ioMap[i]);
+        }
+    }
+
+protected:
+    TBlockItem GetItemFromBlock(const TBlock& block, ui32 columnIdx, size_t offset) const {
+        Y_ENSURE(offset < block.Size);
+        const auto& datum = block.Columns[columnIdx];
+        if (datum.is_scalar()) {
+            return Readers_[columnIdx]->GetScalarItem(*datum.scalar());
+        } else {
+            MKQL_ENSURE(datum.is_array(), "Expecting array");
+            return Readers_[columnIdx]->GetItem(*datum.array(), offset);
+        }
+    }
+
+protected:
+    const std::vector<arrow::ValueDescr> InputsDescr_;
+
+    TVector<std::unique_ptr<IBlockReader>> Readers_;
+    TVector<NUdf::IBlockItemHasher::TPtr> Hashers_;
+    TVector<NUdf::IBlockItemComparator::TPtr> Comparators_;
+    TVector<IBlockTrimmer::TPtr> Trimmers_;
+
+    std::vector<TBlock> Data_;
+    size_t RowCount_ = 0;
+
+    NUdf::TUnboxedValue Stream_;
+    TUnboxedValueVector Inputs_;
+};
+
+class TBlockStorage: public TBlockStorageBase<TBlockStorage> {
+private:
+    using TBase = TBlockStorageBase<TBlockStorage>;
+public:
+    using TBase::TBase;
+};
+
+class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> {
+    using TBase = TBlockStorageBase<TIndexedBlockStorage>;
+
     struct TIndexNode {
-        TIndexEntry Entry;
+        TRowEntry Entry;
         TIndexNode* Next;
 
         TIndexNode() = delete;
-        TIndexNode(TIndexEntry entry, TIndexNode* next = nullptr)
+        TIndexNode(TRowEntry entry, TIndexNode* next = nullptr)
             : Entry(entry)
             , Next(next)
         {}
@@ -268,7 +391,7 @@ class TBlockIndex : public TComputationValue<TBlockIndex> {
             : Raw(0)
         {}
 
-        TIndexMapValue(TIndexEntry entry) {
+        TIndexMapValue(TRowEntry entry) {
             TIndexEntryUnion un;
             un.Entry = entry;
 
@@ -289,7 +412,7 @@ class TBlockIndex : public TComputationValue<TBlockIndex> {
             return EntryList;
         }
 
-        TIndexEntry GetEntry() const {
+        TRowEntry GetEntry() const {
             Y_ENSURE(IsInplace());
 
             TIndexEntryUnion un;
@@ -299,7 +422,7 @@ class TBlockIndex : public TComputationValue<TBlockIndex> {
 
     private:
         union TIndexEntryUnion {
-            TIndexEntry Entry;
+            TRowEntry Entry;
             ui64 Raw;
         };
 
@@ -309,13 +432,12 @@ class TBlockIndex : public TComputationValue<TBlockIndex> {
         };
     };
 
-    using TBase = TComputationValue<TBlockIndex>;
     using TIndexMap = TRobinHoodHashFixedMap<
         ui64,
         TIndexMapValue,
         std::equal_to<ui64>,
         std::hash<ui64>,
-        TMKQLAllocator<char>
+        TMKQLHugeAllocator<char>
     >;
 
     static_assert(sizeof(TIndexMapValue) == 8);
@@ -323,6 +445,8 @@ class TBlockIndex : public TComputationValue<TBlockIndex> {
 
 public:
     class TIterator {
+        friend class TIndexedBlockStorage;
+
         enum class EIteratorType {
             EMPTY,
             INPLACE,
@@ -332,26 +456,6 @@ public:
     public:
         TIterator() = default;
 
-        TIterator(const TBlockIndex* blockIndex)
-            : Type_(EIteratorType::EMPTY)
-            , BlockIndex_(blockIndex)
-        {}
-
-        TIterator(const TBlockIndex* blockIndex, TIndexEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
-            : Type_(EIteratorType::INPLACE)
-            , BlockIndex_(blockIndex)
-            , Entry_(entry)
-            , EntryConsumed_(false)
-            , ItemsToLookup_(std::move(itemsToLookup))
-        {}
-
-        TIterator(const TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
-            : Type_(EIteratorType::LIST)
-            , BlockIndex_(blockIndex)
-            , Node_(node)
-            , ItemsToLookup_(std::move(itemsToLookup))
-        {}
-
         TIterator(const TIterator&) = delete;
         TIterator& operator=(const TIterator&) = delete;
 
@@ -384,7 +488,7 @@ public:
             return *this;
         }
 
-        TMaybe<TIndexEntry> Next() {
+        TMaybe<TRowEntry> Next() {
             Y_ENSURE(IsValid());
 
             switch (Type_) {
@@ -397,7 +501,7 @@ public:
                 }
 
                 EntryConsumed_ = true;
-                return BlockIndex_->IsKeyEquals(Entry_, ItemsToLookup_) ? TMaybe<TIndexEntry>(Entry_) : Nothing();
+                return BlockIndex_->IsKeyEquals(Entry_, ItemsToLookup_) ? TMaybe<TRowEntry>(Entry_) : Nothing();
 
             case EIteratorType::LIST:
                 for (; Node_ != nullptr; Node_ = Node_->Next) {
@@ -433,14 +537,35 @@ public:
             *this = TIterator();
         }
 
+    private:
+        TIterator(const TIndexedBlockStorage* blockIndex)
+            : Type_(EIteratorType::EMPTY)
+            , BlockIndex_(blockIndex)
+        {}
+
+        TIterator(const TIndexedBlockStorage* blockIndex, TRowEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
+            : Type_(EIteratorType::INPLACE)
+            , BlockIndex_(blockIndex)
+            , Entry_(entry)
+            , EntryConsumed_(false)
+            , ItemsToLookup_(std::move(itemsToLookup))
+        {}
+
+        TIterator(const TIndexedBlockStorage* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
+            : Type_(EIteratorType::LIST)
+            , BlockIndex_(blockIndex)
+            , Node_(node)
+            , ItemsToLookup_(std::move(itemsToLookup))
+        {}
+
     private:
         EIteratorType Type_;
-        const TBlockIndex* BlockIndex_ = nullptr;
+        const TIndexedBlockStorage* BlockIndex_ = nullptr;
 
         union {
             TIndexNode* Node_;
             struct {
-                TIndexEntry Entry_;
+                TRowEntry Entry_;
                 bool EntryConsumed_;
             };
         };
@@ -449,7 +574,7 @@ public:
     };
 
 public:
-    TBlockIndex(
+    TIndexedBlockStorage(
         TMemoryUsageInfo* memInfo,
         const TVector<TType*>& itemTypes,
         const TVector<ui32>& keyColumns,
@@ -457,106 +582,75 @@ public:
         bool any,
         arrow::MemoryPool* pool
     )
-        : TBase(memInfo)
-        , InputsDescr_(ToValueDescr(itemTypes))
+        : TBase(memInfo, itemTypes, stream, pool)
         , KeyColumns_(keyColumns)
-        , Stream_(stream)
-        , Inputs_(itemTypes.size())
         , Any_(any)
-    {
-        TBlockTypeHelper helper;
-        for (size_t i = 0; i < itemTypes.size(); i++) {
-            TType* blockItemType = AS_TYPE(TBlockType, itemTypes[i])->GetItemType();
-            Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
-            Hashers_.push_back(helper.MakeHasher(blockItemType));
-            Comparators_.push_back(helper.MakeComparator(blockItemType));
-            Trimmers_.push_back(MakeBlockTrimmer(TTypeInfoHelper(), blockItemType, pool));
-        }
-    }
+    {}
 
     NUdf::EFetchStatus FetchStream() {
-        switch (Stream_.WideFetch(Inputs_.data(), Inputs_.size())) {
-        case NUdf::EFetchStatus::Yield:
-            return NUdf::EFetchStatus::Yield;
-        case NUdf::EFetchStatus::Finish:
-            return NUdf::EFetchStatus::Finish;
-        case NUdf::EFetchStatus::Ok:
-            break;
-        }
+        Y_ENSURE(!Index_, "Data fetch shouldn't be done after the index has been built");
+        return TBase::FetchStream();
+    }
 
-        {
-            std::vector<arrow::Datum> block;
-            for (size_t i = 0; i < Inputs_.size() - 1; i++) {
-                auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum();
-                ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[i], datum.descr());
-                if (datum.is_scalar()) {
-                    block.push_back(datum);
-                } else {
-                    MKQL_ENSURE(datum.is_array(), "Expecting array");
-                    block.push_back(Trimmers_[i]->Trim(datum.array()));
-                }
-            }
-            Data_.push_back(std::move(block));
-        }
+    void BuildIndex() {
+        Index_ = std::make_unique<TIndexMap>(CalculateRHHashTableCapacity(RowCount_));
+        for (size_t blockOffset = 0; blockOffset < Data_.size(); blockOffset++) {
+            const auto& block = GetBlock(blockOffset);
+            auto blockSize = block.Size;
+
+            std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> insertBatch;
+            std::array<TRowEntry, PrefetchBatchSize> insertBatchEntries;
+            std::array<std::vector<NYql::NUdf::TBlockItem>, PrefetchBatchSize> insertBatchKeys;
+            ui32 insertBatchLen = 0;
+
+            auto processInsertBatch = [&]() {
+                Index_->BatchInsert({insertBatch.data(), insertBatchLen}, [&](size_t i, TIndexMap::iterator iter, bool isNew) {
+                    auto value = static_cast<TIndexMapValue*>(Index_->GetMutablePayload(iter));
+                    if (isNew) {
+                        // Store single entry inplace
+                        *value = TIndexMapValue(insertBatchEntries[i]);
+                        Index_->CheckGrow();
+                    } else {
+                        if (Any_ && ContainsKey(value, insertBatchKeys[i])) {
+                            return;
+                        }
 
-        const auto& block = Data_.back();
-        auto blockOffset = Data_.size() - 1;
-        auto blockSize = GetBlockCount(Inputs_[Inputs_.size() - 1]);
-
-        std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> insertBatch;
-        std::array<TIndexEntry, PrefetchBatchSize> insertBatchEntries;
-        std::array<std::vector<NYql::NUdf::TBlockItem>, PrefetchBatchSize> insertBatchKeys;
-        ui32 insertBatchLen = 0;
-
-        auto processInsertBatch = [&]() {
-            Index_.BatchInsert({insertBatch.data(), insertBatchLen}, [&](size_t i, TIndexMap::iterator iter, bool isNew) {
-                auto value = static_cast<TIndexMapValue*>(Index_.GetMutablePayload(iter));
-                if (isNew) {
-                    // Store single entry inplace
-                    *value = TIndexMapValue(insertBatchEntries[i]);
-                    Index_.CheckGrow();
-                } else {
-                    if (Any_ && ContainsKey(value, insertBatchKeys[i])) {
-                        return;
-                    }
+                        // Store as list
+                        if (value->IsInplace()) {
+                            *value = TIndexMapValue(InsertIndexNode(value->GetEntry()));
+                        }
 
-                    // Store as list
-                    if (value->IsInplace()) {
-                        *value = TIndexMapValue(InsertIndexNode(value->GetEntry()));
+                        *value = TIndexMapValue(InsertIndexNode(insertBatchEntries[i], value->GetList()));
                     }
+                });
+            };
 
-                    *value = TIndexMapValue(InsertIndexNode(insertBatchEntries[i], value->GetList()));
+            Y_ENSURE(blockOffset <= std::numeric_limits<ui32>::max());
+            Y_ENSURE(blockSize <= std::numeric_limits<ui32>::max());
+            for (size_t itemOffset = 0; itemOffset < blockSize; itemOffset++) {
+                ui64 keyHash = GetKey(block, itemOffset, insertBatchKeys[insertBatchLen]);
+                if (!keyHash) {
+                    continue;
                 }
-            });
-        };
 
-        Y_ENSURE(blockOffset <= std::numeric_limits<ui32>::max());
-        Y_ENSURE(blockSize <= std::numeric_limits<ui32>::max());
-        for (size_t itemOffset = 0; itemOffset < blockSize; itemOffset++) {
-            ui64 keyHash = GetKey(block, itemOffset, insertBatchKeys[insertBatchLen]);
-            if (!keyHash) {
-                continue;
-            }
+                insertBatchEntries[insertBatchLen] = TRowEntry(blockOffset, itemOffset);
+                insertBatch[insertBatchLen].ConstructKey(keyHash);
+                insertBatchLen++;
 
-            insertBatchEntries[insertBatchLen] = TIndexEntry(blockOffset, itemOffset);
-            insertBatch[insertBatchLen].ConstructKey(keyHash);
-            insertBatchLen++;
+                if (insertBatchLen == PrefetchBatchSize) {
+                    processInsertBatch();
+                    insertBatchLen = 0;
+                }
+            }
 
-            if (insertBatchLen == PrefetchBatchSize) {
+            if (insertBatchLen > 0) {
                 processInsertBatch();
-                insertBatchLen = 0;
             }
         }
-
-        if (insertBatchLen > 0) {
-            processInsertBatch();
-        }
-
-        return NUdf::EFetchStatus::Ok;
     }
 
     template<typename TGetKey>
-    void BatchLookup(size_t batchSize, std::array<TBlockIndex::TIterator, PrefetchBatchSize>& iterators, TGetKey&& getKey) {
+    void BatchLookup(size_t batchSize, std::array<TIndexedBlockStorage::TIterator, PrefetchBatchSize>& iterators, TGetKey&& getKey) {
         Y_ENSURE(batchSize <= PrefetchBatchSize);
 
         std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> lookupBatch;
@@ -568,14 +662,14 @@ public:
             itemsBatch[i] = items;
         }
 
-        Index_.BatchLookup({lookupBatch.data(), batchSize}, [&](size_t i, TIndexMap::iterator iter) {
+        Index_->BatchLookup({lookupBatch.data(), batchSize}, [&](size_t i, TIndexMap::iterator iter) {
             if (!iter) {
                 // Empty iterator
                 iterators[i] = TIterator(this);
                 return;
             }
 
-            auto value = static_cast<const TIndexMapValue*>(Index_.GetPayload(iter));
+            auto value = static_cast<const TIndexMapValue*>(Index_->GetPayload(iter));
             if (value->IsInplace()) {
                 iterators[i] = TIterator(this, value->GetEntry(), std::move(itemsBatch[i]));
             } else {
@@ -584,20 +678,7 @@ public:
         });
     }
 
-    TBlockItem GetItem(TIndexEntry entry, ui32 columnIdx) const {
-        Y_ENSURE(entry.BlockOffset < Data_.size());
-        Y_ENSURE(columnIdx < Inputs_.size() - 1);
-        return GetItemFromBlock(Data_[entry.BlockOffset], columnIdx, entry.ItemOffset);
-    }
-
-    void GetRow(TIndexEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) const {
-        Y_ENSURE(row.size() == ioMap.size());
-        for (size_t i = 0; i < row.size(); i++) {
-            row[i] = GetItem(entry, ioMap[i]);
-        }
-    }
-
-    bool IsKeyEquals(TIndexEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
+    bool IsKeyEquals(TRowEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
         Y_ENSURE(keyItems.size() == KeyColumns_.size());
         for (size_t i = 0; i < KeyColumns_.size(); i++) {
             auto indexItem = GetItem(entry, KeyColumns_[i]);
@@ -610,7 +691,7 @@ public:
     }
 
 private:
-    ui64 GetKey(const std::vector<arrow::Datum>& block, size_t offset, std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
+    ui64 GetKey(const TBlock& block, size_t offset, std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
         ui64 keyHash = 0;
         keyItems.clear();
         for (ui32 keyColumn : KeyColumns_) {
@@ -627,17 +708,7 @@ private:
         return keyHash;
     }
 
-    TBlockItem GetItemFromBlock(const std::vector<arrow::Datum>& block, ui32 columnIdx, size_t offset) const {
-        const auto& datum = block[columnIdx];
-        if (datum.is_scalar()) {
-            return Readers_[columnIdx]->GetScalarItem(*datum.scalar());
-        } else {
-            MKQL_ENSURE(datum.is_array(), "Expecting array");
-            return Readers_[columnIdx]->GetItem(*datum.array(), offset);
-        }
-    }
-
-    TIndexNode* InsertIndexNode(TIndexEntry entry, TIndexNode* currentHead = nullptr) {
+    TIndexNode* InsertIndexNode(TRowEntry entry, TIndexNode* currentHead = nullptr) {
         return &IndexNodes_.emplace_back(entry, currentHead);
     }
 
@@ -658,22 +729,11 @@ private:
     }
 
 private:
-    const std::vector<arrow::ValueDescr> InputsDescr_;
     const TVector<ui32>& KeyColumns_;
 
-    TVector<std::unique_ptr<IBlockReader>> Readers_;
-    TVector<NUdf::IBlockItemHasher::TPtr> Hashers_;
-    TVector<NUdf::IBlockItemComparator::TPtr> Comparators_;
-    TVector<IBlockTrimmer::TPtr> Trimmers_;
-
-    std::vector<std::vector<arrow::Datum>> Data_;
-
-    TIndexMap Index_;
+    std::unique_ptr<TIndexMap> Index_;
     std::deque<TIndexNode> IndexNodes_;
 
-    NUdf::TUnboxedValue Stream_;
-    TUnboxedValueVector Inputs_;
-
     const bool Any_;
 };
 
@@ -682,7 +742,7 @@ class TBlockMapJoinCoreWraper : public TMutableComputationNode<TBlockMapJoinCore
 {
 using TBaseComputation = TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired, RightAny>>;
 using TJoinState = TBlockJoinState<RightRequired>;
-using TIndexState = TBlockIndex;
+using TIndexState = TIndexedBlockStorage;
 public:
     TBlockMapJoinCoreWraper(
         TComputationMutables& mutables,
@@ -728,10 +788,8 @@ public:
                                                       std::move(joinState),
                                                       std::move(indexState),
                                                       std::move(LeftStream_->GetValue(ctx)),
-                                                      LeftItemTypes_,
                                                       LeftKeyColumns_,
                                                       std::move(RightStream_->GetValue(ctx)),
-                                                      RightItemTypes_,
                                                       RightKeyColumns_,
                                                       RightIOMap_
         );
@@ -747,10 +805,8 @@ private:
             NUdf::TUnboxedValue&& joinState,
             NUdf::TUnboxedValue&& indexState,
             NUdf::TUnboxedValue&& leftStream,
-            const TVector<TType*>& leftTypes,
             const TVector<ui32>& leftKeyColumns,
             NUdf::TUnboxedValue&& rightStream,
-            const TVector<TType*>& rightTypes,
             const TVector<ui32>& rightKeyColumns,
             const TVector<ui32>& rightIOMap
         )
@@ -758,10 +814,8 @@ private:
             , JoinState_(joinState)
             , IndexState_(indexState)
             , LeftStream_(leftStream)
-            , LeftItemTypes_(leftTypes)
             , LeftKeyColumns_(leftKeyColumns)
             , RightStream_(rightStream)
-            , RightItemTypes_(rightTypes)
             , RightKeyColumns_(rightKeyColumns)
             , RightIOMap_(rightIOMap)
             , HolderFactory_(holderFactory)
@@ -781,6 +835,7 @@ private:
                     }
                 }
 
+                indexState.BuildIndex();
                 RightStreamConsumed_ = true;
             }
 
@@ -882,11 +937,9 @@ private:
         NUdf::TUnboxedValue IndexState_;
 
         NUdf::TUnboxedValue LeftStream_;
-        const TVector<TType*>& LeftItemTypes_;
         const TVector<ui32>& LeftKeyColumns_;
 
         NUdf::TUnboxedValue RightStream_;
-        const TVector<TType*>& RightItemTypes_;
         const TVector<ui32>& RightKeyColumns_;
         const TVector<ui32>& RightIOMap_;
         bool RightStreamConsumed_ = false;

+ 33 - 1
yql/essentials/minikql/mkql_alloc.h

@@ -164,7 +164,7 @@ struct TMkqlPAllocHeader {
     ui64 Self; // should be placed right before pointer to allocated area, see GetMemoryChunkContext
 };
 
-static_assert(sizeof(TMkqlPAllocHeader) == 
+static_assert(sizeof(TMkqlPAllocHeader) ==
     sizeof(size_t) +
     sizeof(TAllocState::TListEntry) +
     sizeof(void*), "Padding is not allowed");
@@ -497,6 +497,38 @@ struct TMKQLAllocator
 using TWithDefaultMiniKQLAlloc = TWithMiniKQLAlloc<EMemorySubPool::Default>;
 using TWithTemporaryMiniKQLAlloc = TWithMiniKQLAlloc<EMemorySubPool::Temporary>;
 
+template <typename Type>
+struct TMKQLHugeAllocator
+{
+    typedef Type value_type;
+    typedef Type* pointer;
+    typedef const Type* const_pointer;
+    typedef Type& reference;
+    typedef const Type& const_reference;
+    typedef size_t size_type;
+    typedef ptrdiff_t difference_type;
+
+    TMKQLHugeAllocator() noexcept = default;
+    ~TMKQLHugeAllocator() noexcept = default;
+
+    template<typename U> TMKQLHugeAllocator(const TMKQLHugeAllocator<U>&) noexcept {}
+    template<typename U> struct rebind { typedef TMKQLHugeAllocator<U> other; };
+    template<typename U> bool operator==(const TMKQLHugeAllocator<U>&) const { return true; }
+    template<typename U> bool operator!=(const TMKQLHugeAllocator<U>&) const { return false; }
+
+    static pointer allocate(size_type n, const void* = nullptr)
+    {
+        size_t size = Max(n * sizeof(value_type), TAllocState::POOL_PAGE_SIZE);
+        return static_cast<pointer>(TlsAllocState->GetBlock(size));
+    }
+
+    static void deallocate(const_pointer p, size_type n) noexcept
+    {
+        size_t size = Max(n * sizeof(value_type), TAllocState::POOL_PAGE_SIZE);
+        TlsAllocState->ReturnBlock(const_cast<pointer>(p), size);
+    }
+};
+
 template <typename T>
 class TPagedList
 {