#include "mkql_grace_join_imp.h" #include #include #include #include namespace NKikimr { namespace NMiniKQL { namespace GraceJoin { TTable::EAddTupleResult TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * stringsSizes, NYql::NUdf::TUnboxedValue * iColumns, const TTable &other) { if ((intColumns[0] & 1)) return EAddTupleResult::Unmatched; TotalPacked++; TempTuple.clear(); TempTuple.insert(TempTuple.end(), intColumns, intColumns + NullsBitmapSize_ + NumberOfKeyIntColumns); if ( NumberOfKeyIColumns > 0 ) { for (ui32 i = 0; i < NumberOfKeyIColumns; i++) { TempTuple.push_back((ColInterfaces + i)->HashI->Hash(*(iColumns+i))); } } ui64 totalBytesForStrings = 0; ui64 totalIntsForStrings = 0; // Processing variable length string columns if ( NumberOfKeyStringColumns != 0 || NumberOfKeyIColumns != 0) { totalBytesForStrings += sizeof(ui32)*NumberOfKeyStringColumns; totalBytesForStrings += sizeof(ui32)*NumberOfKeyIColumns; for( ui64 i = 0; i < NumberOfKeyStringColumns; i++ ) { totalBytesForStrings += stringsSizes[i]; } for ( ui64 i = 0; i < NumberOfKeyIColumns; i++) { TStringBuf val = (ColInterfaces + i)->Packer->Pack(*(iColumns+i)); IColumnsVals[i].clear(); IColumnsVals[i].insert(IColumnsVals[i].begin(), val.cbegin(), val.end()); totalBytesForStrings += val.size(); } totalIntsForStrings = (totalBytesForStrings + sizeof(ui64) - 1) / sizeof(ui64); TempTuple.push_back(totalIntsForStrings); TempTuple.resize(TempTuple.size() + totalIntsForStrings); TempTuple.back() = 0; ui64 * startPtr = (TempTuple.data() + TempTuple.size() - totalIntsForStrings ); char * currStrPtr = reinterpret_cast< char* > (startPtr); for( ui64 i = 0; i < NumberOfKeyStringColumns; i++) { WriteUnaligned(currStrPtr, stringsSizes[i] ); currStrPtr+=sizeof(ui32); std::memcpy(currStrPtr, stringColumns[i], stringsSizes[i] ); currStrPtr+=stringsSizes[i]; } for( ui64 i = 0; i < NumberOfKeyIColumns; i++) { WriteUnaligned(currStrPtr, IColumnsVals[i].size() ); currStrPtr+=sizeof(ui32); std::memcpy(currStrPtr, IColumnsVals[i].data(), IColumnsVals[i].size() ); currStrPtr+=IColumnsVals[i].size(); } } XXH64_hash_t hash = XXH64(TempTuple.data() + NullsBitmapSize_, (TempTuple.size() - NullsBitmapSize_) * sizeof(ui64), 0); if (!hash) hash = 1; ui64 bucket = hash & BucketsMask; if (!IsAny_ && other.TableBucketsStats[bucket].BloomFilter.IsFinalized()) { auto bucket2 = &other.TableBucketsStats[bucket]; auto &bloomFilter = bucket2->BloomFilter; ++BloomLookups_; if (bloomFilter.IsMissing(hash)) { ++BloomHits_; return EAddTupleResult::Unmatched; } } std::vector> & keyIntVals = TableBuckets[bucket].KeyIntVals; std::vector> & stringsOffsets = TableBuckets[bucket].StringsOffsets; std::vector> & dataIntVals = TableBuckets[bucket].DataIntVals; std::vector> & stringVals = TableBuckets[bucket].StringsValues; KeysHashTable & kh = TableBucketsStats[bucket].AnyHashTable; ui32 offset = keyIntVals.size(); // Offset of tuple inside the keyIntVals vector keyIntVals.push_back(hash); keyIntVals.insert(keyIntVals.end(), TempTuple.begin(), TempTuple.end()); if (IsAny_) { if ( !AddKeysToHashTable(kh, keyIntVals.begin() + offset, iColumns) ) { keyIntVals.resize(offset); ++AnyFiltered_; return EAddTupleResult::AnyMatch; } if (other.TableBucketsStats[bucket].BloomFilter.IsFinalized()) { auto bucket2 = &other.TableBucketsStats[bucket]; auto &bloomFilter = bucket2->BloomFilter; ++BloomLookups_; if (bloomFilter.IsMissing(hash)) { keyIntVals.resize(offset); ++BloomHits_; return EAddTupleResult::Unmatched; } } } TableBucketsStats[bucket].TuplesNum++; if (NumberOfStringColumns || NumberOfIColumns ) { stringsOffsets.push_back(TableBucketsStats[bucket].KeyIntValsTotalSize); // Adding offset to tuple in keyIntVals vector stringsOffsets.push_back(TableBucketsStats[bucket].StringValuesTotalSize); // Adding offset to string values // Adding strings sizes for keys and data if ( NumberOfStringColumns ) { stringsOffsets.insert( stringsOffsets.end(), stringsSizes, stringsSizes+NumberOfStringColumns ); } if ( NumberOfIColumns ) { for ( ui64 i = NumberOfKeyIColumns; i < NumberOfIColumns; i++) { TStringBuf val = (ColInterfaces + i)->Packer->Pack(*(iColumns+i)); IColumnsVals[i].clear(); IColumnsVals[i].insert(IColumnsVals[i].begin(), val.cbegin(), val.end()); } for (ui64 i = 0; i < NumberOfIColumns; i++ ) { stringsOffsets.push_back(IColumnsVals[i].size()); } } } // Adding data values ui64 * dataColumns = intColumns + NullsBitmapSize_ + NumberOfKeyIntColumns; dataIntVals.insert(dataIntVals.end(), dataColumns, dataColumns + NumberOfDataIntColumns); // Adding strings values for data columns char ** dataStringsColumns = stringColumns + NumberOfKeyStringColumns; ui32 * dataStringsSizes = stringsSizes + NumberOfKeyStringColumns; ui64 initialStringsSize = stringVals.size(); for( ui64 i = 0; i < NumberOfDataStringColumns; i++) { ui32 currStringSize = *(dataStringsSizes + i); stringVals.insert(stringVals.end(), *(dataStringsColumns + i), *(dataStringsColumns + i) + currStringSize); } for ( ui64 i = 0; i < NumberOfDataIColumns; i++) { stringVals.insert( stringVals.end(), IColumnsVals[NumberOfKeyIColumns + i].begin(), IColumnsVals[NumberOfKeyIColumns + i].end()); } TableBucketsStats[bucket].KeyIntValsTotalSize += keyIntVals.size() - offset; TableBucketsStats[bucket].StringValuesTotalSize += stringVals.size() - initialStringsSize; return EAddTupleResult::Added; } void TTable::ResetIterator() { CurrIterIndex = 0; CurrIterBucket = 0; if (IsTableJoined) { JoinTable1->ResetIterator(); JoinTable2->ResetIterator(); } TotalUnpacked = 0; } // Checks if there are more tuples and sets bucketId and tupleId to next valid. inline bool HasMoreTuples(std::vector & tableBucketsStats, ui64 & bucketId, ui64 & tupleId, ui64 bucketLimit ) { if (bucketId >= bucketLimit) return false; if ( tupleId >= tableBucketsStats[bucketId].TuplesNum ) { tupleId = 0; bucketId ++; if (bucketId == bucketLimit) { return false; } while( tableBucketsStats[bucketId].TuplesNum == 0 ) { bucketId ++; if (bucketId == bucketLimit) { return false; } } } return true; } // Returns value of next tuple. Returs true if there are more tuples bool TTable::NextTuple(TupleData & td){ if (HasMoreTuples(TableBucketsStats, CurrIterBucket, CurrIterIndex, TableBucketsStats.size())) { GetTupleData(CurrIterBucket, CurrIterIndex, td); CurrIterIndex++; return true; } else { td.AllNulls = true; return false; } } inline bool CompareIColumns( const ui32* stringSizes1, const char * vals1, const ui32* stringSizes2, const char * vals2, TColTypeInterface * colInterfaces, ui64 nStringColumns, ui64 nIColumns) { ui32 currOffset1 = 0; ui32 currOffset2 = 0; ui32 currSize1 = 0; ui32 currSize2 = 0; NYql::NUdf::TUnboxedValue val1, val2; TStringBuf str1, str2; for (ui32 i = 0; i < nStringColumns; i ++) { currSize1 = *(stringSizes1 + i); currSize2 = *(stringSizes2 + i); if (currSize1 != currSize2) return false; currOffset1 += currSize1 + sizeof(ui32); currOffset2 += currSize2 + sizeof(ui32); } if (0 != std::memcmp(vals1, vals2, currOffset1)) return false; for (ui32 i = 0; i < nIColumns; i ++) { currSize1 = *(stringSizes1 + nStringColumns + i ); currSize2 = *(stringSizes2 + nStringColumns + i ); currOffset1 += sizeof(ui32); currOffset2 += sizeof(ui32); str1 = TStringBuf(vals1 + currOffset1, currSize1); val1 = (colInterfaces + i)->Packer->Unpack(str1, colInterfaces->HolderFactory); str2 = TStringBuf(vals2 + currOffset2, currSize2 ); val2 = (colInterfaces + i)->Packer->Unpack(str2, colInterfaces->HolderFactory); if ( ! ((colInterfaces + i)->EquateI->Equals(val1,val2)) ) { return false; } currOffset1 += currSize1; currOffset2 += currSize2; } return true; } inline bool CompareIColumns( const char * vals1, const char * vals2, NYql::NUdf::TUnboxedValue * iColumns, TColTypeInterface * colInterfaces, ui64 nStringColumns, ui64 nIColumns) { ui32 currOffset1 = 0; NYql::NUdf::TUnboxedValue val1; TStringBuf str1; for (ui32 i = 0; i < nStringColumns; i ++) { auto currSize1 = ReadUnaligned(vals1 + currOffset1); auto currSize2 = ReadUnaligned(vals2 + currOffset1); if (currSize1 != currSize2) return false; currOffset1 += currSize1 + sizeof(ui32); } if (0 != std::memcmp(vals1, vals2, currOffset1)) return false; for (ui32 i = 0; i < nIColumns; i ++) { auto currSize1 = ReadUnaligned(vals1 + currOffset1); currOffset1 += sizeof(ui32); str1 = TStringBuf(vals1 + currOffset1, currSize1); val1 = (colInterfaces + i)->Packer->Unpack(str1, colInterfaces->HolderFactory); auto &val2 = iColumns[i]; if ( ! ((colInterfaces + i)->EquateI->Equals(val1,val2)) ) { return false; } currOffset1 += currSize1; } return true; } // Resizes KeysHashTable to new slots, keeps old content. void ResizeHashTable(KeysHashTable &t, ui64 newSlots){ std::vector> newTable(newSlots * t.SlotSize , 0); for ( auto it = t.Table.begin(); it != t.Table.end(); it += t.SlotSize ) { if ( *it == 0) continue; ui64 hash = *it; ui64 newSlotNum = hash % (newSlots); auto newIt = newTable.begin() + t.SlotSize * newSlotNum; while (*newIt != 0) { newIt += t.SlotSize; if (newIt == newTable.end()) { newIt = newTable.begin(); } } std::copy_n(it, t.SlotSize, newIt); } t.NSlots = newSlots; t.Table = std::move(newTable); } bool IsTablesSwapRequired(ui64 tuplesNum1, ui64 tuplesNum2, bool table1Batch, bool table2Batch) { return tuplesNum2 > tuplesNum1 && !table1Batch || table2Batch; } ui64 ComputeJoinSlotsSizeForBucket(const TTableBucket& bucket, const TTableBucketStats& bucketStats, ui64 headerSize, bool tableHasKeyStringColumns, bool tableHasKeyIColumns) { ui64 tuplesNum = bucketStats.TuplesNum; ui64 avgStringsSize = (3 * (bucket.KeyIntVals.size() - tuplesNum * headerSize) ) / ( 2 * tuplesNum + 1) + 1; ui64 slotSize = headerSize + 1; // Header [Short Strings] SlotIdx if (tableHasKeyStringColumns || tableHasKeyIColumns) { slotSize = slotSize + avgStringsSize; } return slotSize; } ui64 ComputeNumberOfSlots(ui64 tuplesNum) { return (3 * tuplesNum + 1) | 1; } bool TTable::TryToPreallocateMemoryForJoin(TTable & t1, TTable & t2, EJoinKind /* joinKind */, bool hasMoreLeftTuples, bool hasMoreRightTuples) { // If the batch is final or the only one, then the buckets are processed sequentially, the memory for the hash tables is freed immediately after processing. // So, no preallocation is required. if (!hasMoreLeftTuples && !hasMoreRightTuples) return true; for (ui64 bucket = 0; bucket < GraceJoin::NumberOfBuckets; bucket++) { ui64 tuplesNum1 = t1.TableBucketsStats[bucket].TuplesNum; ui64 tuplesNum2 = t2.TableBucketsStats[bucket].TuplesNum; TTable& tableForPreallocation = IsTablesSwapRequired(tuplesNum1, tuplesNum2, hasMoreLeftTuples || LeftTableBatch_, hasMoreRightTuples || RightTableBatch_) ? t1 : t2; if (!tableForPreallocation.TableBucketsStats[bucket].TuplesNum || tableForPreallocation.TableBuckets[bucket].NSlots) continue; TTableBucket& bucketForPreallocation = tableForPreallocation.TableBuckets[bucket]; const TTableBucketStats& bucketForPreallocationStats = tableForPreallocation.TableBucketsStats[bucket]; const auto nSlots = ComputeJoinSlotsSizeForBucket(bucketForPreallocation, bucketForPreallocationStats, tableForPreallocation.HeaderSize, tableForPreallocation.NumberOfKeyStringColumns != 0, tableForPreallocation.NumberOfKeyIColumns != 0); const auto slotSize = ComputeNumberOfSlots(tableForPreallocation.TableBucketsStats[bucket].TuplesNum); try { bucketForPreallocation.JoinSlots.reserve(nSlots*slotSize); } catch (TMemoryLimitExceededException) { for (ui64 i = 0; i < bucket; ++i) { GraceJoin::TTableBucket * b1 = &JoinTable1->TableBuckets[i]; b1->JoinSlots.resize(0); b1->JoinSlots.shrink_to_fit(); GraceJoin::TTableBucket * b2 = &JoinTable2->TableBuckets[i]; b2->JoinSlots.resize(0); b2->JoinSlots.shrink_to_fit(); } return false; } } return true; } // Joins two tables and returns join result in joined table. Tuples of joined table could be received by // joined table iterator void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLeftTuples, bool hasMoreRightTuples, ui32 fromBucket, ui32 toBucket ) { if ( hasMoreLeftTuples ) LeftTableBatch_ = true; if( hasMoreRightTuples ) RightTableBatch_ = true; auto table1Batch = LeftTableBatch_; auto table2Batch = RightTableBatch_; JoinTable1 = &t1; JoinTable2 = &t2; JoinKind = joinKind; IsTableJoined = true; MKQL_ENSURE(joinKind != EJoinKind::Cross, "Cross Join is not allowed in Grace Join"); const bool needCrossIds = JoinKind == EJoinKind::Inner || JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Left || JoinKind == EJoinKind::Right; ui64 tuplesFound = 0; for (ui64 bucket = fromBucket; bucket < toBucket; bucket++) { auto &joinResults = TableBuckets[bucket].JoinIds; joinResults.clear(); TTableBucket * bucket1 = &JoinTable1->TableBuckets[bucket]; TTableBucket * bucket2 = &JoinTable2->TableBuckets[bucket]; TTableBucketStats * bucketStats1 = &JoinTable1->TableBucketsStats[bucket]; TTableBucketStats * bucketStats2 = &JoinTable2->TableBucketsStats[bucket]; ui64 tuplesNum1 = JoinTable1->TableBucketsStats[bucket].TuplesNum; ui64 tuplesNum2 = JoinTable2->TableBucketsStats[bucket].TuplesNum; ui64 headerSize1 = JoinTable1->HeaderSize; ui64 headerSize2 = JoinTable2->HeaderSize; ui64 nullsSize1 = JoinTable1->NullsBitmapSize_; ui64 nullsSize2 = JoinTable2->NullsBitmapSize_; ui64 keyIntOffset1 = HashSize + nullsSize1; ui64 keyIntOffset2 = HashSize + nullsSize2; bool table1HasKeyStringColumns = (JoinTable1->NumberOfKeyStringColumns != 0); bool table2HasKeyStringColumns = (JoinTable2->NumberOfKeyStringColumns != 0); bool table1HasKeyIColumns = (JoinTable1->NumberOfKeyIColumns != 0); bool table2HasKeyIColumns = (JoinTable2->NumberOfKeyIColumns != 0); bool swapTables = IsTablesSwapRequired(tuplesNum1, tuplesNum2, table1Batch, table2Batch); if (swapTables) { std::swap(bucket1, bucket2); std::swap(bucketStats1, bucketStats2); std::swap(headerSize1, headerSize2); std::swap(nullsSize1, nullsSize2); std::swap(keyIntOffset1, keyIntOffset2); std::swap(table1HasKeyStringColumns, table2HasKeyStringColumns); std::swap(table1HasKeyIColumns, table2HasKeyIColumns); std::swap(tuplesNum1, tuplesNum2); } auto &leftIds = bucket1->LeftIds; leftIds.clear(); const bool selfJoinSameKeys = (JoinTable1 == JoinTable2); const bool needLeftIds = ((swapTables ? (JoinKind == EJoinKind::Right || JoinKind == EJoinKind::RightOnly) : (JoinKind == EJoinKind::Left || JoinKind == EJoinKind::LeftOnly)) || JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Exclusion) && !selfJoinSameKeys; const bool isLeftSemi = swapTables ? JoinKind == EJoinKind::RightSemi : JoinKind == EJoinKind::LeftSemi; //const bool isRightSemi = swapTables ? JoinKind == EJoinKind::LeftSemi : JoinKind == EJoinKind::RightSemi; bucketStats2->HashtableMatches = ((swapTables ? (JoinKind == EJoinKind::Left || JoinKind == EJoinKind::LeftOnly || JoinKind == EJoinKind::LeftSemi) : (JoinKind == EJoinKind::Right || JoinKind == EJoinKind::RightOnly || JoinKind == EJoinKind::RightSemi)) || JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Exclusion) && !selfJoinSameKeys; // In this case, all keys except for NULLs have matched key on other side, and NULLs are handled by AddTuple if (tuplesNum2 == 0) { if (needLeftIds) { for (ui32 leftId = 0; leftId != tuplesNum1; ++leftId) leftIds.push_back(leftId); } continue; } if (tuplesNum1 == 0 && (hasMoreRightTuples || hasMoreLeftTuples || !bucketStats2->HashtableMatches)) continue; ui64 slotSize = ComputeJoinSlotsSizeForBucket(*bucket2, *bucketStats2, headerSize2, table2HasKeyStringColumns, table2HasKeyIColumns); ui64 &nSlots = bucket2->NSlots; auto &joinSlots = bucket2->JoinSlots; auto &bloomFilter = bucketStats2->BloomFilter; bool initHashTable = false; Y_DEBUG_ABORT_UNLESS(bucketStats2->SlotSize == 0 || bucketStats2->SlotSize == slotSize); if (!nSlots) { nSlots = ComputeNumberOfSlots(tuplesNum2); joinSlots.resize(nSlots*slotSize, 0); bloomFilter.Resize(tuplesNum2); initHashTable = true; bucketStats2->SlotSize = slotSize; ++InitHashTableCount_; } auto firstSlot = [begin = joinSlots.begin(), slotSize, nSlots](auto hash) { ui64 slotNum = hash % nSlots; return begin + slotNum * slotSize; }; auto nextSlot = [begin = joinSlots.begin(), end = joinSlots.end(), slotSize](auto it) { it += slotSize; if (it == end) it = begin; return it; }; if (initHashTable) { ui32 tuple2Idx = 0; auto it2 = bucket2->KeyIntVals.begin(); for (ui64 keysValSize = headerSize2; it2 != bucket2->KeyIntVals.end(); it2 += keysValSize, ++tuple2Idx) { if ( table2HasKeyStringColumns || table2HasKeyIColumns) { keysValSize = headerSize2 + *(it2 + headerSize2 - 1) ; } ui64 hash = *it2; // Note: if hashtable is re-created after being spilled // (*(it2 + HashSize) & 1) may be true (even though key does NOT contain NULL) bloomFilter.Add(hash); auto slotIt = firstSlot(hash); ++HashLookups_; for (; *slotIt != 0; slotIt = nextSlot(slotIt)) { ++HashO1Iterations_; } ++HashSlotIterations_; if (keysValSize <= slotSize - 1) { std::copy_n(it2, keysValSize, slotIt); } else { std::copy_n(it2, headerSize2, slotIt); *(slotIt + headerSize2) = it2 + headerSize2 - bucket2->KeyIntVals.begin(); } slotIt[slotSize - 1] = tuple2Idx; } bloomFilter.Finalize(); if (swapTables) JoinTable1Total_ += tuplesNum2; else JoinTable2Total_ += tuplesNum2; } if (swapTables) JoinTable2Total_ += tuplesNum1; else JoinTable1Total_ += tuplesNum1; ui32 tuple1Idx = 0; auto it1 = bucket1->KeyIntVals.begin(); // /-------headerSize---------------------------\ // hash nulls-bitmap keyInt[] KeyIHash[] strSize| [strPos | strs] slotIdx // \---------------------------------------slotSize ---------------------/ // bit0 of nulls bitmap denotes key-with-nulls // strSize only present if HasKeyStrCol || HasKeyICol // strPos is only present if (HasKeyStrCol || HasKeyICol) && strSize + headerSize >= slotSize // slotSize, slotIdx and strPos is only for hashtable (table2) ui64 bloomHits = 0; ui64 bloomLookups = 0; for (ui64 keysValSize = headerSize1; it1 != bucket1->KeyIntVals.end(); it1 += keysValSize, ++tuple1Idx ) { if ( table1HasKeyStringColumns || table1HasKeyIColumns ) { keysValSize = headerSize1 + *(it1 + headerSize1 - 1) ; } ui64 hash = *it1; Y_DEBUG_ABORT_UNLESS((*(it1 + HashSize) & 1) == 0); // Keys with NULL never reaches Join if (initHashTable) { bloomLookups++; if (bloomFilter.IsMissing(hash)) { if (needLeftIds) leftIds.push_back(tuple1Idx); bloomHits++; continue; } } ++HashLookups_; auto saveTuplesFound = tuplesFound; auto slotIt = firstSlot(hash); for (; *slotIt != 0; slotIt = nextSlot(slotIt) ) { ++HashO1Iterations_; if (*slotIt != hash) continue; auto tuple2Idx = slotIt[slotSize - 1]; ++HashSlotIterations_; if (table1HasKeyIColumns || !(keysValSize - nullsSize1 <= slotSize - 1 - nullsSize2)) { // 2nd condition cannot be true unless HasKeyStringColumns or HasKeyIColumns, hence size at the end of header is present if (!std::equal(it1 + keyIntOffset1, it1 + headerSize1 - 1, slotIt + keyIntOffset2)) continue; auto slotStringsStart = slotIt + headerSize2; ui64 slotStringsSize = *(slotIt + headerSize2 - 1); if (headerSize2 + slotStringsSize + 1 > slotSize) { ui64 stringsPos = *(slotIt + headerSize2); slotStringsStart = bucket2->KeyIntVals.begin() + stringsPos; } if (table1HasKeyIColumns) { ui64 stringsOffsetsIdx1 = tuple1Idx * (JoinTable1->NumberOfStringColumns + JoinTable1->NumberOfIColumns + 2); ui64 stringsOffsetsIdx2 = tuple2Idx * (JoinTable2->NumberOfStringColumns + JoinTable2->NumberOfIColumns + 2); ui32 * stringsSizesPtr1 = bucket1->StringsOffsets.data() + stringsOffsetsIdx1 + 2; ui32 * stringsSizesPtr2 = bucket2->StringsOffsets.data() + stringsOffsetsIdx2 + 2; if (!CompareIColumns( stringsSizesPtr1 , (char *) (it1 + headerSize1 ), stringsSizesPtr2, (char *) (slotStringsStart), JoinTable1 -> ColInterfaces, JoinTable1->NumberOfStringColumns, JoinTable1 -> NumberOfKeyIColumns )) continue; } else { ui64 stringsSize = *(it1 + headerSize1 - 1); if (stringsSize != slotStringsSize || !std::equal(it1 + headerSize1, it1 + headerSize1 + stringsSize, slotStringsStart)) continue; } } else { if (!std::equal(it1 + keyIntOffset1, it1 + keysValSize, slotIt + keyIntOffset2)) continue; } *(slotIt + HashSize) |= 1; // mark right slot as matched tuplesFound++; if (needCrossIds) { JoinTuplesIds joinIds; joinIds.id1 = swapTables ? tuple2Idx : tuple1Idx; joinIds.id2 = swapTables ? tuple1Idx : tuple2Idx; joinResults.emplace_back(joinIds); } } if (saveTuplesFound == tuplesFound) { ++BloomFalsePositives_; if (needLeftIds) leftIds.push_back(tuple1Idx); } else if (isLeftSemi) { leftIds.push_back(tuple1Idx); } } if (!hasMoreLeftTuples && !hasMoreRightTuples) { bloomFilter.Shrink(); if (bucketStats2->HashtableMatches) { auto slotIt = joinSlots.cbegin(); auto end = joinSlots.cend(); auto isSemi = JoinKind == EJoinKind::LeftSemi || JoinKind == EJoinKind::RightSemi; auto &leftIds2 = bucket2->LeftIds; for (; slotIt != end; slotIt += slotSize) { if ((*(slotIt + HashSize) & 1) == isSemi && *slotIt != 0) { auto id2 = *(slotIt + slotSize - 1); Y_DEBUG_ABORT_UNLESS(id2 < bucketStats2->TuplesNum); leftIds2.push_back(id2); } } std::sort(leftIds2.begin(), leftIds2.end()); } joinSlots.clear(); joinSlots.shrink_to_fit(); nSlots = 0; } if (bloomHits < bloomLookups/8) { // Bloomfilter was inefficient, drop it bloomFilter.Shrink(); } BloomHits_ += bloomHits; BloomLookups_ += bloomLookups; YQL_LOG(GRACEJOIN_TRACE) << (const void *)this << '#' << bucket << " Table1 " << JoinTable1->TableBucketsStats[bucket].TuplesNum << " Table2 " << JoinTable2->TableBucketsStats[bucket].TuplesNum << " LeftTableBatch " << LeftTableBatch_ << " RightTableBatch " << RightTableBatch_ << " leftIds " << leftIds.size() << " joinIds " << joinResults.size() << " joinKind " << (int)JoinKind << " swapTables " << swapTables << " initHashTable " << initHashTable ; } HasMoreLeftTuples_ = hasMoreLeftTuples; HasMoreRightTuples_ = hasMoreRightTuples; TuplesFound_ += tuplesFound; } inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) { ui64 keyIntsOffset = 0; ui64 dataIntsOffset = 0; ui64 keyStringsOffset = 0; ui64 dataStringsOffset = 0; td.AllNulls = false; TotalUnpacked++; TTableBucket & tb = TableBuckets[bucketNum]; ui64 stringsOffsetsIdx = tupleId * (NumberOfStringColumns + NumberOfIColumns + 2); if(NumberOfKeyStringColumns != 0 || NumberOfKeyIColumns !=0 ) { keyIntsOffset = tb.StringsOffsets[stringsOffsetsIdx]; } else { keyIntsOffset = HeaderSize * tupleId; } for ( ui64 i = 0; i < NumberOfKeyIntColumns + NullsBitmapSize_; ++i) { td.IntColumns[i] = tb.KeyIntVals[keyIntsOffset + HashSize + i]; } dataIntsOffset = NumberOfDataIntColumns * tupleId; for ( ui64 i = 0; i < NumberOfDataIntColumns; ++i) { td.IntColumns[NumberOfKeyIntColumns + NullsBitmapSize_ + i] = tb.DataIntVals[dataIntsOffset + i]; } char *strPtr = nullptr; if(NumberOfKeyStringColumns != 0 || NumberOfKeyIColumns != 0) { keyStringsOffset = tb.StringsOffsets[stringsOffsetsIdx] + HeaderSize; strPtr = reinterpret_cast(tb.KeyIntVals.data() + keyStringsOffset); for (ui64 i = 0; i < NumberOfKeyStringColumns; ++i) { td.StrSizes[i] = tb.StringsOffsets[stringsOffsetsIdx + 2 + i]; Y_DEBUG_ABORT_UNLESS(ReadUnaligned(strPtr) == td.StrSizes[i]); strPtr += sizeof(ui32); td.StrColumns[i] = strPtr; strPtr += td.StrSizes[i]; } for ( ui64 i = 0; i < NumberOfKeyIColumns; i++) { ui32 currSize = tb.StringsOffsets[stringsOffsetsIdx + 2 + NumberOfKeyStringColumns + i]; Y_DEBUG_ABORT_UNLESS(ReadUnaligned(strPtr) == currSize); strPtr += sizeof(ui32); *(td.IColumns + i) = (ColInterfaces + i)->Packer->Unpack(TStringBuf(strPtr, currSize), ColInterfaces->HolderFactory); strPtr += currSize; } } if(NumberOfDataStringColumns || NumberOfDataIColumns != 0) { dataStringsOffset = tb.StringsOffsets[stringsOffsetsIdx + 1]; } strPtr = (tb.StringsValues.data() + dataStringsOffset); for ( ui64 i = 0; i < NumberOfDataStringColumns; ++i ) { ui32 currIdx = NumberOfKeyStringColumns + i; td.StrColumns[currIdx] = strPtr; td.StrSizes[currIdx] = tb.StringsOffsets[stringsOffsetsIdx + 2 + currIdx]; strPtr += td.StrSizes[currIdx]; } for (ui64 i = 0; i < NumberOfDataIColumns; i++ ) { ui32 currIdx = NumberOfStringColumns + NumberOfKeyIColumns + i; ui32 currSize = tb.StringsOffsets[stringsOffsetsIdx + 2 + currIdx]; *(td.IColumns + NumberOfKeyIColumns + i) = (ColInterfaces + NumberOfKeyIColumns + i)->Packer->Unpack(TStringBuf(strPtr, currSize), ColInterfaces->HolderFactory); strPtr += currSize; } } inline bool TTable::AddKeysToHashTable(KeysHashTable& t, ui64* keys, NYql::NUdf::TUnboxedValue * iColumns) { if (t.NSlots == 0) { t.SlotSize = HeaderSize + NumberOfKeyStringColumns * 2; t.Table.resize(DefaultTuplesNum * t.SlotSize, 0); t.NSlots = DefaultTuplesNum; } if ( t.FillCount > t.NSlots/2 ) { ResizeHashTable(t, 2 * t.NSlots + 1); } if ( (*(keys + HashSize) & 1) ) // Keys with null value return true; ui64 hash = *keys; ui64 slot = hash % t.NSlots; auto it = t.Table.begin() + slot * t.SlotSize; ui64 keyIntOffset = HashSize + NullsBitmapSize_; ui64 keysSize = HeaderSize; ui64 keyStringsSize = 0; if ( NumberOfKeyStringColumns > 0 || NumberOfKeyIColumns > 0) { keyStringsSize = *(keys + HeaderSize - 1); keysSize = HeaderSize + keyStringsSize; } auto nextSlot = [begin = t.Table.begin(), end = t.Table.end(), slotSize = t.SlotSize](auto it) { it += slotSize; if (it == end) it = begin; return it; }; for (auto itValSize = HeaderSize; *it != 0; it = nextSlot(it)) { if (*it != hash) continue; if ( NumberOfKeyIColumns == 0 && (itValSize <= t.SlotSize)) { if (!std::equal(it + keyIntOffset, it + itValSize, keys + keyIntOffset)) continue; return false; } Y_DEBUG_ABORT_UNLESS( NumberOfKeyStringColumns > 0 || NumberOfKeyIColumns > 0); itValSize = HeaderSize + *(it + HeaderSize - 1); auto slotStringsStart = it + HeaderSize; if (!std::equal(it + keyIntOffset, it + HeaderSize - 1, keys + keyIntOffset)) continue; if (NumberOfKeyIColumns > 0) { if (!CompareIColumns( (char *) (slotStringsStart), (char *) (keys + HeaderSize ), iColumns, JoinTable1 -> ColInterfaces, JoinTable1->NumberOfStringColumns, JoinTable1 -> NumberOfKeyIColumns )) continue; return false; } Y_DEBUG_ABORT_UNLESS(!(itValSize <= t.SlotSize)); ui64 stringsPos = *(it + HeaderSize); slotStringsStart = t.SpillData.begin() + stringsPos; if (keysSize != itValSize || !std::equal(slotStringsStart, slotStringsStart + itValSize, keys + HeaderSize)) continue; return false; } if (keysSize > t.SlotSize) { ui64 spillDataOffset = t.SpillData.size(); t.SpillData.insert(t.SpillData.end(), keys + HeaderSize, keys + keysSize); std::copy_n(keys, HeaderSize, it); *(it + HeaderSize) = spillDataOffset; } else { std::copy_n(keys, keysSize, it); } t.FillCount++; return true; } bool TTable::NextJoinedData( TupleData & td1, TupleData & td2, ui64 bucketLimit) { while (CurrIterBucket < bucketLimit) { if (auto &joinIds = TableBuckets[CurrIterBucket].JoinIds; CurrIterIndex != joinIds.size()) { Y_DEBUG_ABORT_UNLESS(JoinKind == EJoinKind::Inner || JoinKind == EJoinKind::Left || JoinKind == EJoinKind::Right || JoinKind == EJoinKind::Full); auto ids = joinIds[CurrIterIndex++]; JoinTable1->GetTupleData(CurrIterBucket, ids.id1, td1); JoinTable2->GetTupleData(CurrIterBucket, ids.id2, td2); return true; } auto leftSide = [this](auto sideTable, auto &tdL, auto &tdR) { const auto &bucket = sideTable->TableBuckets[CurrIterBucket]; auto &currIterIndex = sideTable->CurrIterIndex; const auto &leftIds = bucket.LeftIds; if (currIterIndex != leftIds.size()) { auto id = leftIds[currIterIndex++]; sideTable->GetTupleData(CurrIterBucket, id, tdL); tdR.AllNulls = true; return true; } return false; }; if (leftSide(JoinTable1, td1, td2)) return true; if (leftSide(JoinTable2, td2, td1)) return true; ++CurrIterBucket; CurrIterIndex = 0; JoinTable1->CurrIterIndex = 0; JoinTable2->CurrIterIndex = 0; } return false; } void TTable::Clear() { for (ui64 bucket = 0; bucket < NumberOfBuckets; bucket++) { ClearBucket(bucket); } } void TTable::ClearBucket(ui64 bucket) { TTableBucket & tb = TableBuckets[bucket]; tb.KeyIntVals.clear(); tb.DataIntVals.clear(); tb.StringsOffsets.clear(); tb.StringsValues.clear(); tb.InterfaceValues.clear(); tb.InterfaceOffsets.clear(); tb.JoinIds.clear(); tb.LeftIds.clear(); tb.JoinSlots.clear(); tb.NSlots = 0; TTableBucketStats & tbs = TableBucketsStats[bucket]; tbs.TuplesNum = 0; tbs.KeyIntValsTotalSize = 0; tbs.StringValuesTotalSize = 0; } void TTable::ShrinkBucket(ui64 bucket) { TTableBucket & tb = TableBuckets[bucket]; tb.KeyIntVals.shrink_to_fit(); tb.DataIntVals.shrink_to_fit(); tb.StringsOffsets.shrink_to_fit(); tb.StringsValues.shrink_to_fit(); tb.InterfaceValues.shrink_to_fit(); tb.InterfaceOffsets.shrink_to_fit(); tb.JoinIds.shrink_to_fit(); tb.LeftIds.shrink_to_fit(); tb.JoinSlots.shrink_to_fit(); } void TTable::InitializeBucketSpillers(ISpiller::TPtr spiller) { for (size_t i = 0; i < NumberOfBuckets; ++i) { TableBucketsSpillers.emplace_back(spiller, 5_MB); } } ui64 TTable::GetSizeOfBucket(ui64 bucket) const { return TableBuckets[bucket].KeyIntVals.size() * sizeof(ui64) + TableBuckets[bucket].JoinSlots.size() * sizeof(ui64) + TableBuckets[bucket].DataIntVals.size() * sizeof(ui64) + TableBuckets[bucket].StringsValues.size() + TableBuckets[bucket].StringsOffsets.size() * sizeof(ui32) + TableBuckets[bucket].InterfaceValues.size() + TableBuckets[bucket].InterfaceOffsets.size() * sizeof(ui32); } bool TTable::TryToReduceMemoryAndWait(ui64 bucket) { if (GetSizeOfBucket(bucket) < SpillingSizeLimit/NumberOfBuckets) return false; if (const auto &tbs = TableBucketsStats[bucket]; tbs.HashtableMatches) { auto &tb = TableBuckets[bucket]; if (tb.JoinSlots.size()) { const auto slotSize = tbs.SlotSize; Y_DEBUG_ABORT_UNLESS(slotSize); auto it = tb.JoinSlots.cbegin(); const auto end = tb.JoinSlots.cend(); for (; it != end; it += slotSize) { // Note: we need not check if *it is 0 if ((*(it + HashSize) & 1)) { ui64 keyIntsOffset; auto tupleId = *(it + slotSize - 1); Y_DEBUG_ABORT_UNLESS(tupleId < tbs.TuplesNum); if (NumberOfKeyStringColumns != 0 || NumberOfKeyIColumns != 0) { ui64 stringsOffsetsIdx = tupleId * (NumberOfStringColumns + NumberOfIColumns + 2); keyIntsOffset = tb.StringsOffsets[stringsOffsetsIdx]; } else { keyIntsOffset = HeaderSize * tupleId; } tb.KeyIntVals[keyIntsOffset + HashSize] |= 1; } } tb.JoinSlots.clear(); tb.JoinSlots.shrink_to_fit(); } } TableBucketsSpillers[bucket].SpillBucket(std::move(TableBuckets[bucket])); TableBuckets[bucket] = TTableBucket{}; return TableBucketsSpillers[bucket].IsProcessingSpilling(); } void TTable::UpdateSpilling() { for (ui64 i = 0; i < NumberOfBuckets; ++i) { TableBucketsSpillers[i].Update(); } } bool TTable::IsSpillingFinished() const { for (ui64 i = 0; i < NumberOfBuckets; ++i) { if (TableBucketsSpillers[i].IsProcessingSpilling()) return false; } return true; } bool TTable::IsSpillingAcceptingDataRequests() const { for (ui64 i = 0; i < NumberOfBuckets; ++i) { if (TableBucketsSpillers[i].IsInMemory()) continue; if (!TableBucketsSpillers[i].IsAcceptingDataRequests()) return false; } return true; } bool TTable::IsRestoringSpilledBuckets() const { for (ui64 i = 0; i < NumberOfBuckets; ++i) { if (TableBucketsSpillers[i].IsRestoring()) return true; } return false; } void TTable::FinalizeSpilling() { for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) { if (!TableBucketsSpillers[bucket].IsInMemory()) { TableBucketsSpillers[bucket].Finalize(); TableBucketsSpillers[bucket].SpillBucket(std::move(TableBuckets[bucket])); TableBuckets[bucket] = TTableBucket{}; } } } bool TTable::IsBucketInMemory(ui32 bucket) const { return TableBucketsSpillers[bucket].IsInMemory(); } bool TTable::IsSpilledBucketWaitingForExtraction(ui32 bucket) const { return TableBucketsSpillers[bucket].IsExtractionRequired(); } void TTable::StartLoadingBucket(ui32 bucket) { MKQL_ENSURE(!TableBucketsSpillers[bucket].IsInMemory(), "Internal logic error"); TableBucketsSpillers[bucket].StartBucketRestoration(); } void TTable::PrepareBucket(ui64 bucket) { if (!TableBucketsSpillers[bucket].IsExtractionRequired()) return; TableBuckets[bucket] = std::move(TableBucketsSpillers[bucket].ExtractBucket()); } // Creates new table with key columns and data columns TTable::TTable( ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns, ui64 numberOfDataIntColumns, ui64 numberOfDataStringColumns, ui64 numberOfKeyIColumns, ui64 numberOfDataIColumns, ui64 nullsBitmapSize, TColTypeInterface * colInterfaces, bool isAny ) : NumberOfKeyIntColumns(numberOfKeyIntColumns), NumberOfKeyStringColumns(numberOfKeyStringColumns), NumberOfKeyIColumns(numberOfKeyIColumns), NumberOfDataIntColumns(numberOfDataIntColumns), NumberOfDataStringColumns(numberOfDataStringColumns), NumberOfDataIColumns(numberOfDataIColumns), ColInterfaces(colInterfaces), NullsBitmapSize_(nullsBitmapSize), IsAny_(isAny) { NumberOfKeyColumns = NumberOfKeyIntColumns + NumberOfKeyStringColumns + NumberOfKeyIColumns; NumberOfDataColumns = NumberOfDataIntColumns + NumberOfDataStringColumns + NumberOfDataIColumns; NumberOfColumns = NumberOfKeyColumns + NumberOfDataColumns; NumberOfStringColumns = NumberOfKeyStringColumns + NumberOfDataStringColumns; NumberOfIColumns = NumberOfKeyIColumns + NumberOfDataIColumns; BytesInKeyIntColumns = NumberOfKeyIntColumns * sizeof(ui64); TotalStringsSize = (numberOfKeyStringColumns > 0 || NumberOfKeyIColumns > 0 ) ? 1 : 0; HeaderSize = HashSize + NullsBitmapSize_ + NumberOfKeyIntColumns + NumberOfKeyIColumns + TotalStringsSize; TableBuckets.resize(NumberOfBuckets); TableBucketsStats.resize(NumberOfBuckets); const ui64 reservedSizePerTuple = (2 * DefaultTupleBytes) / sizeof(ui64); TempTuple.reserve( reservedSizePerTuple ); IColumnsHashes.resize(NumberOfKeyIColumns); IColumnsVals.resize(NumberOfIColumns); const ui64 totalForTuples = DefaultTuplesNum * reservedSizePerTuple; for ( auto & b: TableBuckets ) { b.KeyIntVals.reserve( (totalForTuples * NumberOfKeyColumns) / (NumberOfColumns + 1) ); b.StringsOffsets.reserve((totalForTuples * NumberOfStringColumns) / (NumberOfColumns + 1)); b.DataIntVals.reserve( (totalForTuples * NumberOfDataIntColumns) / (NumberOfColumns + 1)); b.StringsValues.reserve( (totalForTuples * NumberOfStringColumns) / (NumberOfColumns + 1) ); b.InterfaceOffsets.reserve( (totalForTuples * NumberOfIColumns) / (NumberOfColumns + 1) ); b.InterfaceValues.reserve( (totalForTuples * NumberOfIColumns) / (NumberOfColumns + 1)); } } TTable::~TTable() { YQL_LOG_IF(GRACEJOIN_DEBUG, InitHashTableCount_) << (const void *)this << '#' << "InitHashTableCount " << InitHashTableCount_ << " BloomLookups " << BloomLookups_ << " BloomHits " << BloomHits_ << " BloomFalsePositives " << BloomFalsePositives_ << " HashLookups " << HashLookups_ << " HashChainTraversal " << HashO1Iterations_/(double)HashLookups_ << " HashSlotOperations " << HashSlotIterations_/(double)HashLookups_ << " Table1 " << JoinTable1Total_ << " Table2 " << JoinTable2Total_ << " TuplesFound " << TuplesFound_ ; YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable1 && JoinTable1->AnyFiltered_) << (const void *)this << '#' << "L AnyFiltered " << JoinTable1->AnyFiltered_; YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable1 && JoinTable1->BloomLookups_) << (const void *)this << '#' << "L BloomLookups " << JoinTable1->BloomLookups_ << " BloomHits " << JoinTable1->BloomHits_; YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable2 && JoinTable2->AnyFiltered_) << (const void *)this << '#' << "R AnyFiltered " << JoinTable2->AnyFiltered_; YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable2 && JoinTable2->BloomLookups_) << (const void *)this << '#' << "R BloomLookups " << JoinTable2->BloomLookups_ << " BloomHits " << JoinTable2->BloomHits_; }; TTableBucketSpiller::TTableBucketSpiller(ISpiller::TPtr spiller, size_t sizeLimit) : StateUi64Adapter(spiller, sizeLimit) , StateUi32Adapter(spiller, sizeLimit) , StateCharAdapter(spiller, sizeLimit) { } void TTableBucketSpiller::Update() { StateUi64Adapter.Update(); StateUi32Adapter.Update(); StateCharAdapter.Update(); if (State == EState::Spilling) { ProcessBucketSpilling(); } else if (State == EState::Finalizing) { ProcessFinalizing(); } else if (State == EState::Restoring) { ProcessBucketRestoration(); } } void TTableBucketSpiller::Finalize() { IsFinalizingRequested = true; } void TTableBucketSpiller::SpillBucket(TTableBucket&& bucket) { MKQL_ENSURE(NextVectorToProcess == ENextVectorToProcess::None, "Internal logic error"); State = EState::Spilling; CurrentBucket = std::move(bucket); NextVectorToProcess = ENextVectorToProcess::KeyAndVals; ProcessBucketSpilling(); } TTableBucket&& TTableBucketSpiller::ExtractBucket() { MKQL_ENSURE(State == EState::WaitingForExtraction, "Internal logic error"); MKQL_ENSURE(SpilledBucketsCount == 0, "Internal logic error"); State = EState::InMemory; return std::move(CurrentBucket); } bool TTableBucketSpiller::IsInMemory() const { return State == EState::InMemory; } bool TTableBucketSpiller::IsExtractionRequired() const { return State == EState::WaitingForExtraction; } bool TTableBucketSpiller::IsProcessingSpilling() const { return State == EState::Spilling; } bool TTableBucketSpiller::IsAcceptingDataRequests() const { return State == EState::AcceptingDataRequests; } bool TTableBucketSpiller::IsRestoring() const { return State == EState::Restoring; } void TTableBucketSpiller::StartBucketRestoration() { MKQL_ENSURE(State == EState::AcceptingDataRequests, "Internal logic error"); MKQL_ENSURE(NextVectorToProcess == ENextVectorToProcess::None, "Internal logic error"); NextVectorToProcess = ENextVectorToProcess::KeyAndVals; State = EState::Restoring; ProcessBucketRestoration(); } void TTableBucketSpiller::ProcessBucketSpilling() { while (NextVectorToProcess != ENextVectorToProcess::None) { switch (NextVectorToProcess) { case ENextVectorToProcess::KeyAndVals: if (!StateUi64Adapter.IsAcceptingData()) return; StateUi64Adapter.AddData(std::move(CurrentBucket.KeyIntVals)); NextVectorToProcess = ENextVectorToProcess::DataIntVals; break; case ENextVectorToProcess::DataIntVals: if (!StateUi64Adapter.IsAcceptingData()) return; StateUi64Adapter.AddData(std::move(CurrentBucket.DataIntVals)); NextVectorToProcess = ENextVectorToProcess::StringsValues; break; case ENextVectorToProcess::StringsValues: if (!StateCharAdapter.IsAcceptingData()) return; StateCharAdapter.AddData(std::move(CurrentBucket.StringsValues)); NextVectorToProcess = ENextVectorToProcess::StringsOffsets; break; case ENextVectorToProcess::StringsOffsets: if (!StateUi32Adapter.IsAcceptingData()) return; StateUi32Adapter.AddData(std::move(CurrentBucket.StringsOffsets)); NextVectorToProcess = ENextVectorToProcess::InterfaceValues; break; case ENextVectorToProcess::InterfaceValues: if (!StateCharAdapter.IsAcceptingData()) return; StateCharAdapter.AddData(std::move(CurrentBucket.InterfaceValues)); NextVectorToProcess = ENextVectorToProcess::InterfaceOffsets; break; case ENextVectorToProcess::InterfaceOffsets: if (!StateUi32Adapter.IsAcceptingData()) return; StateUi32Adapter.AddData(std::move(CurrentBucket.InterfaceOffsets)); NextVectorToProcess = ENextVectorToProcess::None; SpilledBucketsCount++; break; default: return; } } if (IsFinalizingRequested) { if (!StateCharAdapter.IsAcceptingData() || !StateUi32Adapter.IsAcceptingData() || !StateUi64Adapter.IsAcceptingData()) return; State = EState::Finalizing; StateUi64Adapter.Finalize(); StateUi32Adapter.Finalize(); StateCharAdapter.Finalize(); ProcessFinalizing(); return; } State = EState::AcceptingData; } void TTableBucketSpiller::ProcessFinalizing() { if (StateCharAdapter.IsAcceptingDataRequests() && StateUi32Adapter.IsAcceptingDataRequests() && StateUi64Adapter.IsAcceptingDataRequests()) { State = EState::AcceptingDataRequests; } } template void TTableBucketSpiller::AppendVector(std::vector>& first, std::vector>&& second) const { if (first.empty()) { first = std::move(second); return; } first.insert(first.end(), second.begin(), second.end()); second.clear(); } void TTableBucketSpiller::ProcessBucketRestoration() { while (NextVectorToProcess != ENextVectorToProcess::None) { switch (NextVectorToProcess) { case ENextVectorToProcess::KeyAndVals: if (StateUi64Adapter.IsDataReady()) { AppendVector(CurrentBucket.KeyIntVals, StateUi64Adapter.ExtractVector()); NextVectorToProcess = ENextVectorToProcess::DataIntVals; break; } if (StateUi64Adapter.IsAcceptingDataRequests()) { StateUi64Adapter.RequestNextVector(); break; } return; case ENextVectorToProcess::DataIntVals: if (StateUi64Adapter.IsDataReady()) { AppendVector(CurrentBucket.DataIntVals, StateUi64Adapter.ExtractVector()); NextVectorToProcess = ENextVectorToProcess::StringsValues; break; } if (StateUi64Adapter.IsAcceptingDataRequests()) { StateUi64Adapter.RequestNextVector(); break; } return; case ENextVectorToProcess::StringsValues: if (StateCharAdapter.IsDataReady()) { AppendVector(CurrentBucket.StringsValues, StateCharAdapter.ExtractVector()); NextVectorToProcess = ENextVectorToProcess::StringsOffsets; break; } if (StateCharAdapter.IsAcceptingDataRequests()) { StateCharAdapter.RequestNextVector(); break; } return; case ENextVectorToProcess::StringsOffsets: if (StateUi32Adapter.IsDataReady()) { AppendVector(CurrentBucket.StringsOffsets, StateUi32Adapter.ExtractVector()); NextVectorToProcess = ENextVectorToProcess::InterfaceValues; break; } if (StateUi32Adapter.IsAcceptingDataRequests()) { StateUi32Adapter.RequestNextVector(); break; } return; case ENextVectorToProcess::InterfaceValues: if (StateCharAdapter.IsDataReady()) { AppendVector(CurrentBucket.InterfaceValues, StateCharAdapter.ExtractVector()); NextVectorToProcess = ENextVectorToProcess::InterfaceOffsets; break; } if (StateCharAdapter.IsAcceptingDataRequests()) { StateCharAdapter.RequestNextVector(); break; } return; case ENextVectorToProcess::InterfaceOffsets: if (StateUi32Adapter.IsDataReady()) { AppendVector(CurrentBucket.InterfaceOffsets, StateUi32Adapter.ExtractVector()); SpilledBucketsCount--; if (SpilledBucketsCount == 0) { NextVectorToProcess = ENextVectorToProcess::None; State = EState::WaitingForExtraction; } else { NextVectorToProcess = ENextVectorToProcess::KeyAndVals; } break; } if (StateUi32Adapter.IsAcceptingDataRequests()) { StateUi32Adapter.RequestNextVector(); break; } return; default: return; } } } } } }