mkql_grace_join_imp.h 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443
  1. #pragma once
  2. #include <yql/essentials/minikql/computation/mkql_vector_spiller_adapter.h>
  3. #include <yql/essentials/public/udf/udf_data_type.h>
  4. #include <yql/essentials/minikql/mkql_program_builder.h>
  5. #include <yql/essentials/public/udf/udf_type_builder.h>
  6. #include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
  7. namespace NKikimr {
  8. namespace NMiniKQL {
  9. namespace GraceJoin {
  10. class TTableBucketSpiller;
  11. #define GRACEJOIN_DEBUG DEBUG
  12. #define GRACEJOIN_TRACE TRACE
  13. const ui64 BitsForNumberOfBuckets = 5; // 2^5 = 32
  14. const ui64 BucketsMask = (0x00000001 << BitsForNumberOfBuckets) - 1;
  15. const ui64 NumberOfBuckets = (0x00000001 << BitsForNumberOfBuckets); // Number of hashed keys buckets to distribute incoming tables tuples
  16. const ui64 DefaultTuplesNum = 101; // Default initial number of tuples in one bucket to allocate memory
  17. const ui64 DefaultTupleBytes = 64; // Default size of all columns in table row for estimations
  18. const ui64 HashSize = 1; // Using ui64 hash size
  19. const ui64 SpillingSizeLimit = 1_MB; // Don't try to spill if net effect is lower than this size
  20. const ui32 SpillingRowLimit = 1024; // Don't try to invoke spilling more often than 1 in this number of rows
  21. constexpr ui64 CachelineBits = 9;
  22. constexpr ui64 CachelineSize = ui64(1)<<CachelineBits;
  23. template <typename Alloc>
  24. class TBloomfilter {
  25. std::vector<ui64, Alloc> Storage_;
  26. ui64 *Ptr_;
  27. ui64 Bits_;
  28. bool Finalized_ = false;
  29. public:
  30. static constexpr ui64 BlockSize = CachelineSize;
  31. static constexpr ui64 BlockBits = CachelineBits;
  32. TBloomfilter() {}
  33. TBloomfilter(ui64 size) {
  34. Resize(size);
  35. }
  36. void Resize(ui64 size) {
  37. size = std::max(size, CachelineSize);
  38. Bits_ = 6;
  39. for (; (ui64(1)<<Bits_) < size; ++Bits_)
  40. ;
  41. Bits_ += 3; // -> multiply by 8
  42. size = 1u<<(Bits_ - 6);
  43. Storage_.clear();
  44. Storage_.resize(size + CachelineSize/sizeof(ui64) - 1);
  45. // align Ptr_ up to BlockSize
  46. Ptr_ = (ui64 *)((uintptr_t(Storage_.data()) + BlockSize - 1) & ~(BlockSize - 1));
  47. Finalized_ = false;
  48. }
  49. void Add(ui64 hash) {
  50. Y_DEBUG_ABORT_UNLESS(!Finalized_);
  51. auto bit = (hash >> (64 - Bits_));
  52. Ptr_[bit/64] |= (ui64(1)<<(bit % 64));
  53. // replace low BlockBits with next part of hash
  54. auto low = hash >> (64 - Bits_ - BlockBits);
  55. bit &= ~(BlockSize - 1);
  56. bit ^= low & (BlockSize - 1);
  57. Ptr_[bit/64] |= (ui64(1) << (bit % 64));
  58. }
  59. bool IsMissing(ui64 hash) const {
  60. Y_DEBUG_ABORT_UNLESS(Finalized_);
  61. auto bit = (hash >> (64 - Bits_));
  62. if (!(Ptr_[bit/64] & (ui64(1)<<(bit % 64))))
  63. return true;
  64. // replace low BlockBits with next part of hash
  65. auto low = hash >> (64 - Bits_ - BlockBits);
  66. bit &= ~(BlockSize - 1);
  67. bit ^= low & (BlockSize - 1);
  68. if (!(Ptr_[bit/64] & (ui64(1)<<(bit % 64))))
  69. return true;
  70. return false;
  71. }
  72. constexpr bool IsFinalized() const {
  73. return Finalized_;
  74. }
  75. void Finalize() {
  76. Finalized_ = true;
  77. }
  78. void Shrink() {
  79. Finalized_ = false;
  80. Bits_ = 1;
  81. Storage_.clear();
  82. Storage_.resize(1, ~ui64(0));
  83. Storage_.shrink_to_fit();
  84. Ptr_ = Storage_.data();
  85. }
  86. };
  87. /*
  88. Table data stored in buckets. Table columns are interpreted either as integers, strings or some interface-based type,
  89. providing IHash, IEquate, IPack and IUnpack functions.
  90. External clients should transform (pack) data into appropriate presentation.
  91. Key columns always first, following int columns, string columns and interface-based columns.
  92. Optimum presentation of table data is chosen based on locality to perform most
  93. processing of related data in processor caches.
  94. Structure to represent offsets in header of key records could look like the following:
  95. struct TKeysHeader {
  96. ui64 hash_offset = 0; // value of hash for keys data
  97. ui64 nulls_offset = sizeof(ui64); // Nulls are represented in the form of bitmap array. It goes immediately after hash
  98. ui64 int_vals_offset; // Integer values go after nulls bitmap. it starts at nulls_offset + sizeof(nulls bitmap array)
  99. };
  100. */
  101. struct JoinTuplesIds {
  102. ui32 id1 = 0; // Identifier of first table tuple as index in bucket
  103. ui32 id2 = 0; // Identifier of second table tuple as index in bucket
  104. };
  105. // To store keys values when making join only for unique keys (any join attribute)
  106. struct KeysHashTable {
  107. ui64 SlotSize = 0; // Slot size in hash table
  108. ui64 NSlots = 0; // Total number of slots in table
  109. ui64 FillCount = 0; // Number of ui64 slots which are filled
  110. std::vector<ui64, TMKQLAllocator<ui64>> Table; // Table to store keys data in particular slots
  111. std::vector<ui64, TMKQLAllocator<ui64>> SpillData; // Vector to store long data which cannot be fit in single hash table slot.
  112. };
  113. struct TTableBucket {
  114. std::vector<ui64, TMKQLAllocator<ui64>> KeyIntVals; // Vector to store table key values
  115. std::vector<ui64, TMKQLAllocator<ui64>> DataIntVals; // Vector to store data values in bucket
  116. std::vector<char, TMKQLAllocator<char>> StringsValues; // Vector to store data strings values
  117. std::vector<ui32, TMKQLAllocator<ui32>> StringsOffsets; // Vector to store strings values sizes (offsets in StringsValues are calculated) for particular tuple.
  118. std::vector<char, TMKQLAllocator<char>> InterfaceValues; // Vector to store types to work through external-provided IHash, IEquate interfaces
  119. std::vector<ui32, TMKQLAllocator<ui32>> InterfaceOffsets; // Vector to store sizes of columns to work through IHash, IEquate interfaces
  120. std::vector<JoinTuplesIds, TMKQLAllocator<JoinTuplesIds>> JoinIds; // Results of join operations stored as index of tuples in buckets
  121. // of two tables with the same number
  122. std::vector<ui32, TMKQLAllocator<ui32>> LeftIds; // Left-side ids missing in other table
  123. std::vector<ui64, TMKQLAllocator<ui64>> JoinSlots; // Hashtable
  124. ui64 NSlots = 0; // Hashtable
  125. };
  126. struct TTableBucketStats {
  127. TBloomfilter<TMKQLAllocator<ui64>> BloomFilter;
  128. KeysHashTable AnyHashTable; // Hash table to process join only for unique keys (any join attribute)
  129. ui64 TuplesNum = 0; // Total number of tuples in bucket
  130. ui64 StringValuesTotalSize = 0; // Total size of StringsValues. Used to correctly calculate StringsOffsets.
  131. ui64 KeyIntValsTotalSize = 0; // Total size of KeyIntVals. Used to correctly calculate StringsOffsets.
  132. ui32 SlotSize = 0;
  133. bool HashtableMatches = false;
  134. };
  135. struct TupleData {
  136. ui64 * IntColumns = nullptr; // Array of packed int data of the table. Caller should allocate array of NumberOfIntColumns size
  137. char ** StrColumns = nullptr; // Pointers to values of strings for table. Strings are not null-terminated
  138. ui32 * StrSizes = nullptr; // Sizes of strings for table.
  139. NYql::NUdf::TUnboxedValue * IColumns = nullptr; // Array of TUboxedValues for interface-based columns. Caller should allocate array of required size.
  140. bool AllNulls = false; // If tuple data contains all nulls (it is required for corresponding join types)
  141. };
  142. // Interface to work with complex column types without "simple" byte-serialized representation (which can be used for keys comparison)
  143. struct TColTypeInterface {
  144. NYql::NUdf::IHash::TPtr HashI = nullptr; // Interface to calculate hash of column value
  145. NYql::NUdf::IEquate::TPtr EquateI = nullptr; // Interface to compare two column values
  146. std::shared_ptr<TValuePacker> Packer; // Class to pack and unpack column values
  147. const THolderFactory& HolderFactory; // To use during unpacking
  148. };
  149. // Class that spills bucket data.
  150. // If, after saving, data has accumulated in the bucket again, you can spill it again.
  151. // After restoring the entire bucket, it will contain all the data saved over different iterations.
  152. class TTableBucketSpiller {
  153. public:
  154. TTableBucketSpiller(ISpiller::TPtr spiller, size_t sizeLimit);
  155. // Takes the bucket and immediately starts spilling. Spilling continues until an async operation occurs.
  156. void SpillBucket(TTableBucket&& bucket);
  157. // Starts bucket restoration after spilling. Restores and unites all the buckets from different iterations. Will pause in case of async operation.
  158. void StartBucketRestoration();
  159. // Extracts bucket restored from spilling. This bucket will contain all the data from different iterations of spilling.
  160. TTableBucket&& ExtractBucket();
  161. // Updates the states of spillers. This update should be called after async operation completion to resume spilling/resoration.
  162. void Update();
  163. // Flushes all the data from inner spillers. Should be called when no more data is expected for spilling.
  164. void Finalize();
  165. // Is bucket in memory. False if spilled.
  166. bool IsInMemory() const;
  167. // Is bucket loaded to memory but still owned by spilled.
  168. // ExtractBucket must be called if true.
  169. bool IsExtractionRequired() const;
  170. // Is there any bucket that is being spilled right now.
  171. bool IsProcessingSpilling() const;
  172. // Is spiller ready to start loading new bucket.
  173. bool IsAcceptingDataRequests() const;
  174. // Is there any bucket that is being restored right now.
  175. bool IsRestoring() const;
  176. private:
  177. void ProcessBucketSpilling();
  178. template <class T>
  179. void AppendVector(std::vector<T, TMKQLAllocator<T>>& first, std::vector<T, TMKQLAllocator<T>>&& second) const;
  180. void ProcessBucketRestoration();
  181. void ProcessFinalizing();
  182. private:
  183. enum class EState {
  184. InMemory,
  185. Spilling,
  186. AcceptingData,
  187. Finalizing,
  188. AcceptingDataRequests,
  189. Restoring,
  190. WaitingForExtraction
  191. };
  192. enum class ENextVectorToProcess {
  193. KeyAndVals,
  194. DataIntVals,
  195. StringsValues,
  196. StringsOffsets,
  197. InterfaceValues,
  198. InterfaceOffsets,
  199. None
  200. };
  201. TVectorSpillerAdapter<ui64, TMKQLAllocator<ui64>> StateUi64Adapter;
  202. TVectorSpillerAdapter<ui32, TMKQLAllocator<ui32>> StateUi32Adapter;
  203. TVectorSpillerAdapter<char, TMKQLAllocator<char>> StateCharAdapter;
  204. EState State = EState::InMemory;
  205. ENextVectorToProcess NextVectorToProcess = ENextVectorToProcess::None;
  206. ui64 SpilledBucketsCount = 0;
  207. bool IsFinalizingRequested = false;
  208. TTableBucket CurrentBucket;
  209. };
  210. // Class which represents single table data stored in buckets
  211. class TTable {
  212. ui64 NumberOfKeyIntColumns = 0; // Key int columns always first and padded to sizeof(ui64).
  213. ui64 NumberOfKeyStringColumns = 0; // String key columns go after key int columns
  214. ui64 NumberOfKeyIColumns = 0; // Number of interface - provided key columns
  215. ui64 NumberOfDataIntColumns = 0; //Number of integer data columns in the Table
  216. ui64 NumberOfDataStringColumns = 0; // Number of strings data columns in the Table
  217. ui64 NumberOfDataIColumns = 0; // Number of interface - provided data columns
  218. TColTypeInterface * ColInterfaces = nullptr; // Array of interfaces to work with corresponding columns data
  219. ui64 NumberOfColumns = 0; // Number of columns in the Table
  220. ui64 NumberOfKeyColumns = 0; // Number of key columns in the Table
  221. ui64 NumberOfDataColumns = 0; // Number of data columns in the Table
  222. ui64 NumberOfStringColumns = 0; // Total number of String Columns
  223. ui64 NumberOfIColumns = 0; // Total number of interface-based columns
  224. ui64 NullsBitmapSize_ = 1; // Default size of ui64 values used for null columns bitmap.
  225. // Every bit set means null value. Order of columns is equal to order in AddTuple call.
  226. // First key int column is bit 1 in bit mask, second - bit 2, etc. Bit 0 is least significant in bitmask and tells if key columns contain nulls.
  227. ui64 TotalStringsSize = 0; // Bytes in tuple header reserved to store total strings size key tuple columns
  228. ui64 HeaderSize = HashSize + NullsBitmapSize_ + NumberOfKeyIntColumns + NumberOfKeyIColumns + TotalStringsSize; // Header of all tuples size
  229. ui64 BytesInKeyIntColumns = sizeof(ui64) * NumberOfKeyIntColumns;
  230. // Table data is partitioned in buckets based on key value
  231. std::vector<TTableBucket> TableBuckets;
  232. // Statistics for buckets. Total number of tuples inside a single bucket and offsets.
  233. std::vector<TTableBucketStats> TableBucketsStats;
  234. std::vector<TTableBucketSpiller> TableBucketsSpillers;
  235. // Temporary vector for tuples manipulation;
  236. std::vector<ui64> TempTuple;
  237. // Hashes for interface - based columns values
  238. std::vector<ui64> IColumnsHashes;
  239. // Serialized values for interface-based columns
  240. std::vector<std::vector<char>> IColumnsVals;
  241. // Current iterator index for NextJoinedData iterator
  242. ui64 CurrIterIndex = 0;
  243. // Current bucket for iterators
  244. ui64 CurrIterBucket = 0;
  245. // True if table joined from two other tables
  246. bool IsTableJoined = false;
  247. // Type of the join
  248. EJoinKind JoinKind = EJoinKind::Inner;
  249. // Pointers to the joined tables. Lifetime of source tables to join should be greater than joined table
  250. TTable * JoinTable1 = nullptr;
  251. TTable * JoinTable2 = nullptr;
  252. // Returns tuple data in td from bucket with id bucketNum. Tuple id inside bucket is tupleId.
  253. inline void GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData& td);
  254. // Adds keys to KeysHashTable, return true if added, false if equal key already added
  255. inline bool AddKeysToHashTable(KeysHashTable& t, ui64* keys, NYql::NUdf::TUnboxedValue * iColumns);
  256. ui64 TotalPacked = 0; // Total number of packed tuples
  257. ui64 TotalUnpacked = 0; // Total number of unpacked tuples
  258. bool LeftTableBatch_ = false; // True if left table is processed in batch mode
  259. bool RightTableBatch_ = false; // True if right table is procesed in batch mode
  260. bool HasMoreLeftTuples_ = false; // True if join is not completed, rows from left table are coming
  261. bool HasMoreRightTuples_ = false; // True if join is not completed, rows from right table are coming
  262. bool IsAny_ = false; // True if key duplicates need to be removed from table (any join)
  263. ui64 TuplesFound_ = 0; // Total number of matching keys found during join
  264. public:
  265. // Resets iterators. In case of join results table it also resets iterators for joined tables
  266. void ResetIterator();
  267. // Returns value of next tuple. Returs true if there are more tuples
  268. bool NextTuple(TupleData& td);
  269. bool TryToPreallocateMemoryForJoin(TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLeftTuples, bool hasMoreRightTuples);
  270. // Joins two tables and stores join result in table data. Tuples of joined table could be received by
  271. // joined table iterator. Life time of t1, t2 should be greater than lifetime of joined table
  272. // hasMoreLeftTuples, hasMoreRightTuples is true if join is partial and more rows are coming. For final batch hasMoreLeftTuples = false, hasMoreRightTuples = false
  273. void Join(TTable& t1, TTable& t2, EJoinKind joinKind = EJoinKind::Inner, bool hasMoreLeftTuples = false, bool hasMoreRightTuples = false, ui32 fromBucket = 0, ui32 toBucket = NumberOfBuckets);
  274. // Returns next jointed tuple data. Returs true if there are more tuples
  275. bool NextJoinedData(TupleData& td1, TupleData& td2, ui64 bucketLimit);
  276. bool NextJoinedData(TupleData& td1, TupleData& td2) {
  277. return NextJoinedData(td1, td2, JoinTable1->TableBucketsStats.size());
  278. }
  279. // Creates buckets that support spilling.
  280. void InitializeBucketSpillers(ISpiller::TPtr spiller);
  281. // Calculates approximate size of a bucket. Used for spilling to determine the largest bucket.
  282. ui64 GetSizeOfBucket(ui64 bucket) const;
  283. // This functions wind the largest bucket and spills it to the disk.
  284. bool TryToReduceMemoryAndWait(ui64 bucket);
  285. // Update state of spilling. Must be called during each DoCalculate.
  286. void UpdateSpilling();
  287. // Flushes all the spillers.
  288. void FinalizeSpilling();
  289. // Checks if spilling has any running save operation
  290. bool IsSpillingFinished() const;
  291. // Checks if spilling ready for requesting buckets for restoration.
  292. bool IsSpillingAcceptingDataRequests() const;
  293. // Checks is spilling has any running load operation
  294. bool IsRestoringSpilledBuckets() const;
  295. // Checks if bucket fully loaded to memory and may be joined.
  296. bool IsBucketInMemory(ui32 bucket) const;
  297. // Checks if extraction of bucket is required
  298. bool IsSpilledBucketWaitingForExtraction(ui32 bucket) const;
  299. // Starts loading spilled bucket to memory.
  300. void StartLoadingBucket(ui32 bucket);
  301. // Prepares bucket for joining after spilling and restoring back.
  302. void PrepareBucket(ui64 bucket);
  303. // Clears all the data related to a single bucket
  304. void ClearBucket(ui64 bucket);
  305. // Forces bucket to release the space used for underlying containers.
  306. void ShrinkBucket(ui64 bucket);
  307. // Clears table content
  308. void Clear();
  309. // Creates new table with key columns and data columns
  310. TTable(ui64 numberOfKeyIntColumns = 0, ui64 numberOfKeyStringColumns = 0,
  311. ui64 numberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0,
  312. ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0,
  313. ui64 nullsBitmapSize = 1, TColTypeInterface * colInterfaces = nullptr, bool isAny = false);
  314. enum class EAddTupleResult { Added, Unmatched, AnyMatch };
  315. // Adds new tuple to the table. intColumns, stringColumns - data of columns,
  316. // stringsSizes - sizes of strings columns. Indexes of null-value columns
  317. // in the form of bit array should be first values of intColumns.
  318. EAddTupleResult AddTuple(ui64* intColumns, char** stringColumns, ui32* stringsSizes, NYql::NUdf::TUnboxedValue * iColumns = nullptr, const TTable &other = {});
  319. ~TTable();
  320. ui64 InitHashTableCount_ = 0;
  321. ui64 HashLookups_ = 0; // hash lookups
  322. ui64 HashO1Iterations_ = 0; // hash chain
  323. ui64 HashSlotIterations_ = 0; // O(SlotSize) operations
  324. ui64 JoinTable1Total_ = 0;
  325. ui64 JoinTable2Total_ = 0;
  326. ui64 AnyFiltered_ = 0;
  327. ui64 BloomLookups_ = 0;
  328. ui64 BloomHits_ = 0;
  329. ui64 BloomFalsePositives_ = 0;
  330. };
  331. }
  332. }
  333. }