Browse Source

BlockMapJoinCore: don't keep rows with the same keys in the block index in RightAny mode
commit_hash:da99f555af301f58cac1645447e931a57d28d738

ziganshinmr 2 months ago
parent
commit
a472cb8a8f
1 changed files with 64 additions and 33 deletions
  1. 64 33
      yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp

+ 64 - 33
yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp

@@ -394,11 +394,11 @@ public:
                 }
 
                 EntryConsumed_ = true;
-                return CheckEntry(Entry_) ? TMaybe<TIndexEntry>(Entry_) : Nothing();
+                return BlockIndex_->IsKeyEquals(Entry_, ItemsToLookup_) ? TMaybe<TIndexEntry>(Entry_) : Nothing();
 
             case EIteratorType::LIST:
                 for (; Node_ != nullptr; Node_ = Node_->Next) {
-                    if (CheckEntry(Node_->Entry)) {
+                    if (BlockIndex_->IsKeyEquals(Node_->Entry, ItemsToLookup_)) {
                         auto entry = Node_->Entry;
                         Node_ = Node_->Next;
                         return entry;
@@ -430,18 +430,6 @@ public:
             *this = TIterator();
         }
 
-    private:
-        bool CheckEntry(const TIndexEntry& entry) {
-            for (size_t i = 0; i < BlockIndex_->KeyColumns_.size(); i++) {
-                auto indexItem = BlockIndex_->GetItem(entry, BlockIndex_->KeyColumns_[i]);
-                if (BlockIndex_->Comparators_[BlockIndex_->KeyColumns_[i]]->Equals(indexItem, ItemsToLookup_[i])) {
-                    return true;
-                }
-            }
-
-            return false;
-        }
-
     private:
         EIteratorType Type_;
         TBlockIndex* BlockIndex_ = nullptr;
@@ -462,13 +450,15 @@ public:
         TMemoryUsageInfo* memInfo,
         const TVector<TType*>& itemTypes,
         const TVector<ui32>& keyColumns,
-        NUdf::TUnboxedValue stream
+        NUdf::TUnboxedValue stream,
+        bool any
     )
         : TBase(memInfo)
         , InputsDescr_(ToValueDescr(itemTypes))
         , KeyColumns_(keyColumns)
         , Stream_(stream)
         , Inputs_(itemTypes.size())
+        , Any_(any)
     {
         TBlockTypeHelper helper;
         for (size_t i = 0; i < itemTypes.size(); i++) {
@@ -489,17 +479,23 @@ public:
             break;
         }
 
-        std::vector<arrow::Datum> block;
-        for (size_t i = 0; i < Inputs_.size() - 1; i++) {
-            auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum();
-            ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[i], datum.descr());
-            block.push_back(std::move(datum));
+        {
+            std::vector<arrow::Datum> block;
+            for (size_t i = 0; i < Inputs_.size() - 1; i++) {
+                auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum();
+                ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[i], datum.descr());
+                block.push_back(std::move(datum));
+            }
+            Data_.push_back(std::move(block));
         }
 
+        const auto& block = Data_.back();
+        auto blockOffset = Data_.size() - 1;
         auto blockSize = GetBlockCount(Inputs_[Inputs_.size() - 1]);
 
         std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> insertBatch;
         std::array<TIndexEntry, PrefetchBatchSize> insertBatchEntries;
+        std::array<std::vector<NYql::NUdf::TBlockItem>, PrefetchBatchSize> insertBatchKeys;
         ui32 insertBatchLen = 0;
 
         auto processInsertBatch = [&]() {
@@ -510,6 +506,10 @@ public:
                     *value = TIndexMapValue(insertBatchEntries[i]);
                     Index_.CheckGrow();
                 } else {
+                    if (Any_ && ContainsKey(value, insertBatchKeys[i])) {
+                        return;
+                    }
+
                     // Store as list
                     if (value->IsInplace()) {
                         *value = TIndexMapValue(InsertIndexNode(value->GetEntry()));
@@ -520,15 +520,15 @@ public:
             });
         };
 
-        Y_ENSURE(Data_.size() <= std::numeric_limits<ui32>::max());
+        Y_ENSURE(blockOffset <= std::numeric_limits<ui32>::max());
         Y_ENSURE(blockSize <= std::numeric_limits<ui32>::max());
-        for (size_t i = 0; i < blockSize; i++) {
-            ui64 keyHash = CalculateKeyHash(block, i);
+        for (size_t itemOffset = 0; itemOffset < blockSize; itemOffset++) {
+            ui64 keyHash = GetKey(block, itemOffset, insertBatchKeys[insertBatchLen]);
             if (!keyHash) {
                 continue;
             }
 
-            insertBatchEntries[insertBatchLen] = TIndexEntry(Data_.size(), i);
+            insertBatchEntries[insertBatchLen] = TIndexEntry(blockOffset, itemOffset);
             insertBatch[insertBatchLen].ConstructKey(keyHash);
             insertBatchLen++;
 
@@ -542,7 +542,6 @@ public:
             processInsertBatch();
         }
 
-        Data_.push_back(std::move(block));
         return NUdf::EFetchStatus::Ok;
     }
 
@@ -576,6 +575,7 @@ public:
     }
 
     TBlockItem GetItem(TIndexEntry entry, ui32 columnIdx) {
+        Y_ENSURE(entry.BlockOffset < Data_.size());
         Y_ENSURE(columnIdx < Inputs_.size() - 1);
 
         auto& datum = Data_[entry.BlockOffset][columnIdx];
@@ -590,20 +590,36 @@ public:
         }
     }
 
+    bool IsKeyEquals(TIndexEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) {
+        Y_ENSURE(keyItems.size() == KeyColumns_.size());
+        for (size_t i = 0; i < KeyColumns_.size(); i++) {
+            auto indexItem = GetItem(entry, KeyColumns_[i]);
+            if (Comparators_[KeyColumns_[i]]->Equals(indexItem, keyItems[i])) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
 private:
-    ui64 CalculateKeyHash(const std::vector<arrow::Datum>& block, size_t offset) const {
+    ui64 GetKey(const std::vector<arrow::Datum>& block, size_t offset, std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
         ui64 keyHash = 0;
+        keyItems.clear();
         for (ui32 keyColumn : KeyColumns_) {
             auto& datum = block[keyColumn];
             MKQL_ENSURE(datum.is_array(), "Expecting array");
 
             auto item = Readers_[keyColumn]->GetItem(*datum.array(), offset);
             if (!item) {
+                keyItems.clear();
                 return 0;
             }
 
             keyHash = CombineHashes(keyHash, Hashers_[keyColumn]->Hash(item));
+            keyItems.push_back(std::move(item));
         }
+
         return keyHash;
     }
 
@@ -611,6 +627,22 @@ private:
         return &IndexNodes_.emplace_back(entry, currentHead);
     }
 
+    bool ContainsKey(const TIndexMapValue* chain, const std::vector<NYql::NUdf::TBlockItem>& keyItems) {
+        if (chain->IsInplace()) {
+            return IsKeyEquals(chain->GetEntry(), keyItems);
+        } else {
+            for (TIndexNode* node = chain->GetList(); node != nullptr; node = node->Next) {
+                if (IsKeyEquals(node->Entry, keyItems)) {
+                    return true;
+                }
+
+                node = node->Next;
+            }
+
+            return false;
+        }
+    }
+
 private:
     const std::vector<arrow::ValueDescr> InputsDescr_;
     const TVector<ui32>& KeyColumns_;
@@ -626,6 +658,8 @@ private:
 
     NUdf::TUnboxedValue Stream_;
     TUnboxedValueVector Inputs_;
+
+    const bool Any_;
 };
 
 template <bool WithoutRight, bool RightRequired, bool RightAny>
@@ -670,7 +704,8 @@ public:
         const auto indexState = ctx.HolderFactory.Create<TIndexState>(
             RightItemTypes_,
             RightKeyColumns_,
-            std::move(RightStream_->GetValue(ctx))
+            std::move(RightStream_->GetValue(ctx)),
+            RightAny
         );
 
         return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory,
@@ -768,13 +803,9 @@ private:
                         auto key = iter.Next();
                         indexState.GetRow(*key, RightIOMap_, rightRow);
                         joinState.MakeRow(rightRow);
-
-                        if constexpr (RightAny) {
-                            break;
-                        }
                     }
 
-                    if (RightAny || iter.IsEmpty()) {
+                    if (iter.IsEmpty()) {
                         joinState.NextRow();
                         LookupBatchCurrent_++;
                     }
@@ -844,7 +875,7 @@ private:
         const TVector<ui32>& RightIOMap_;
         bool RightStreamConsumed_ = false;
 
-        std::array<TBlockIndex::TIterator, PrefetchBatchSize> LookupBatchIterators_;
+        std::array<typename TIndexState::TIterator, PrefetchBatchSize> LookupBatchIterators_;
         ui32 LookupBatchCurrent_ = 0;
         ui32 LookupBatchSize_ = 0;