mkql_grace_join_imp.h 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. #pragma once
  2. #include <yql/essentials/minikql/computation/mkql_vector_spiller_adapter.h>
  3. #include <yql/essentials/public/udf/udf_data_type.h>
  4. #include <yql/essentials/minikql/mkql_program_builder.h>
  5. #include <yql/essentials/public/udf/udf_type_builder.h>
  6. #include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
  7. namespace NKikimr {
  8. namespace NMiniKQL {
  9. namespace GraceJoin {
  10. class TTableBucketSpiller;
  11. #define GRACEJOIN_DEBUG DEBUG
  12. #define GRACEJOIN_TRACE TRACE
  13. const ui64 BitsForNumberOfBuckets = 6; // 2^6 = 64
  14. const ui64 BucketsMask = (0x00000001 << BitsForNumberOfBuckets) - 1;
  15. const ui64 NumberOfBuckets = (0x00000001 << BitsForNumberOfBuckets); // Number of hashed keys buckets to distribute incoming tables tuples
  16. const ui64 DefaultTuplesNum = 101; // Default initial number of tuples in one bucket to allocate memory
  17. const ui64 DefaultTupleBytes = 64; // Default size of all columns in table row for estimations
  18. const ui64 HashSize = 1; // Using ui64 hash size
  19. const ui64 SpillingSizeLimit = 1_MB; // Don't try to spill if net effect is lower than this size
  20. const ui32 SpillingRowLimit = 1024; // Don't try to invoke spilling more often than 1 in this number of rows
  21. constexpr ui64 CachelineBits = 9;
  22. constexpr ui64 CachelineSize = ui64(1)<<CachelineBits;
  23. template <typename Alloc>
  24. class TBloomfilter {
  25. std::vector<ui64, Alloc> Storage_;
  26. ui64 *Ptr_;
  27. ui64 Bits_;
  28. bool Finalized_ = false;
  29. public:
  30. static constexpr ui64 BlockSize = CachelineSize;
  31. static constexpr ui64 BlockBits = CachelineBits;
  32. TBloomfilter() {}
  33. TBloomfilter(ui64 size) {
  34. Resize(size);
  35. }
  36. void Reserve(ui64 size) {
  37. size = std::max(size, CachelineSize);
  38. Bits_ = 6;
  39. for (; (ui64(1) << Bits_) < size; ++Bits_)
  40. ;
  41. Bits_ += 3; // -> multiply by 8
  42. Storage_.reserve(ComputeStorageSize());
  43. }
  44. void Resize(ui64 size) {
  45. Storage_.clear();
  46. Reserve(size);
  47. Storage_.resize(ComputeStorageSize());
  48. // align Ptr_ up to BlockSize
  49. Ptr_ = (ui64 *)((uintptr_t(Storage_.data()) + BlockSize - 1) & ~(BlockSize - 1));
  50. Finalized_ = false;
  51. }
  52. void Add(ui64 hash) {
  53. Y_DEBUG_ABORT_UNLESS(!Finalized_);
  54. auto bit = (hash >> (64 - Bits_));
  55. Ptr_[bit/64] |= (ui64(1)<<(bit % 64));
  56. // replace low BlockBits with next part of hash
  57. auto low = hash >> (64 - Bits_ - BlockBits);
  58. bit &= ~(BlockSize - 1);
  59. bit ^= low & (BlockSize - 1);
  60. Ptr_[bit/64] |= (ui64(1) << (bit % 64));
  61. }
  62. bool IsMissing(ui64 hash) const {
  63. Y_DEBUG_ABORT_UNLESS(Finalized_);
  64. auto bit = (hash >> (64 - Bits_));
  65. if (!(Ptr_[bit/64] & (ui64(1)<<(bit % 64))))
  66. return true;
  67. // replace low BlockBits with next part of hash
  68. auto low = hash >> (64 - Bits_ - BlockBits);
  69. bit &= ~(BlockSize - 1);
  70. bit ^= low & (BlockSize - 1);
  71. if (!(Ptr_[bit/64] & (ui64(1)<<(bit % 64))))
  72. return true;
  73. return false;
  74. }
  75. constexpr bool IsFinalized() const {
  76. return Finalized_;
  77. }
  78. void Finalize() {
  79. Finalized_ = true;
  80. }
  81. void Shrink() {
  82. Finalized_ = false;
  83. Bits_ = 1;
  84. Storage_.clear();
  85. Storage_.resize(1, ~ui64(0));
  86. Storage_.shrink_to_fit();
  87. Ptr_ = Storage_.data();
  88. }
  89. private:
  90. ui64 ComputeStorageSize() const {
  91. MKQL_ENSURE(Bits_ >= 6, "Internal logic error");
  92. return (1u << (Bits_ - 6)) + CachelineSize / sizeof(ui64) - 1;
  93. }
  94. };
  95. /*
  96. Table data stored in buckets. Table columns are interpreted either as integers, strings or some interface-based type,
  97. providing IHash, IEquate, IPack and IUnpack functions.
  98. External clients should transform (pack) data into appropriate presentation.
  99. Key columns always first, following int columns, string columns and interface-based columns.
  100. Optimum presentation of table data is chosen based on locality to perform most
  101. processing of related data in processor caches.
  102. Structure to represent offsets in header of key records could look like the following:
  103. struct TKeysHeader {
  104. ui64 hash_offset = 0; // value of hash for keys data
  105. ui64 nulls_offset = sizeof(ui64); // Nulls are represented in the form of bitmap array. It goes immediately after hash
  106. ui64 int_vals_offset; // Integer values go after nulls bitmap. it starts at nulls_offset + sizeof(nulls bitmap array)
  107. };
  108. */
  109. struct JoinTuplesIds {
  110. ui32 id1 = 0; // Identifier of first table tuple as index in bucket
  111. ui32 id2 = 0; // Identifier of second table tuple as index in bucket
  112. };
  113. // To store keys values when making join only for unique keys (any join attribute)
  114. struct KeysHashTable {
  115. ui64 SlotSize = 0; // Slot size in hash table
  116. ui64 NSlots = 0; // Total number of slots in table
  117. ui64 FillCount = 0; // Number of ui64 slots which are filled
  118. std::vector<ui64, TMKQLAllocator<ui64>> Table; // Table to store keys data in particular slots
  119. std::vector<ui64, TMKQLAllocator<ui64>> SpillData; // Vector to store long data which cannot be fit in single hash table slot.
  120. };
  121. struct TTableBucket {
  122. std::vector<ui64, TMKQLAllocator<ui64>> KeyIntVals; // Vector to store table key values
  123. std::vector<ui64, TMKQLAllocator<ui64>> DataIntVals; // Vector to store data values in bucket
  124. std::vector<char, TMKQLAllocator<char>> StringsValues; // Vector to store data strings values
  125. std::vector<ui32, TMKQLAllocator<ui32>> StringsOffsets; // Vector to store strings values sizes (offsets in StringsValues are calculated) for particular tuple.
  126. std::vector<char, TMKQLAllocator<char>> InterfaceValues; // Vector to store types to work through external-provided IHash, IEquate interfaces
  127. std::vector<ui32, TMKQLAllocator<ui32>> InterfaceOffsets; // Vector to store sizes of columns to work through IHash, IEquate interfaces
  128. std::vector<JoinTuplesIds, TMKQLAllocator<JoinTuplesIds>> JoinIds; // Results of join operations stored as index of tuples in buckets
  129. // of two tables with the same number
  130. std::vector<ui32, TMKQLAllocator<ui32>> LeftIds; // Left-side ids missing in other table
  131. std::vector<ui64, TMKQLAllocator<ui64>> JoinSlots; // Hashtable
  132. ui64 NSlots = 0; // Hashtable
  133. };
  134. struct TTableBucketStats {
  135. TBloomfilter<TMKQLAllocator<ui64>> BloomFilter;
  136. KeysHashTable AnyHashTable; // Hash table to process join only for unique keys (any join attribute)
  137. ui64 TuplesNum = 0; // Total number of tuples in bucket
  138. ui64 StringValuesTotalSize = 0; // Total size of StringsValues. Used to correctly calculate StringsOffsets.
  139. ui64 KeyIntValsTotalSize = 0; // Total size of KeyIntVals. Used to correctly calculate StringsOffsets.
  140. ui32 SlotSize = 0;
  141. bool HashtableMatches = false;
  142. };
  143. struct TupleData {
  144. ui64 * IntColumns = nullptr; // Array of packed int data of the table. Caller should allocate array of NumberOfIntColumns size
  145. char ** StrColumns = nullptr; // Pointers to values of strings for table. Strings are not null-terminated
  146. ui32 * StrSizes = nullptr; // Sizes of strings for table.
  147. NYql::NUdf::TUnboxedValue * IColumns = nullptr; // Array of TUboxedValues for interface-based columns. Caller should allocate array of required size.
  148. bool AllNulls = false; // If tuple data contains all nulls (it is required for corresponding join types)
  149. };
  150. // Interface to work with complex column types without "simple" byte-serialized representation (which can be used for keys comparison)
  151. struct TColTypeInterface {
  152. NYql::NUdf::IHash::TPtr HashI = nullptr; // Interface to calculate hash of column value
  153. NYql::NUdf::IEquate::TPtr EquateI = nullptr; // Interface to compare two column values
  154. std::shared_ptr<TValuePacker> Packer; // Class to pack and unpack column values
  155. const THolderFactory& HolderFactory; // To use during unpacking
  156. };
  157. // Class that spills bucket data.
  158. // If, after saving, data has accumulated in the bucket again, you can spill it again.
  159. // After restoring the entire bucket, it will contain all the data saved over different iterations.
  160. class TTableBucketSpiller {
  161. public:
  162. TTableBucketSpiller(ISpiller::TPtr spiller, size_t sizeLimit);
  163. // Takes the bucket and immediately starts spilling. Spilling continues until an async operation occurs.
  164. void SpillBucket(TTableBucket&& bucket);
  165. // Starts bucket restoration after spilling. Restores and unites all the buckets from different iterations. Will pause in case of async operation.
  166. void StartBucketRestoration();
  167. // Extracts bucket restored from spilling. This bucket will contain all the data from different iterations of spilling.
  168. TTableBucket&& ExtractBucket();
  169. // Updates the states of spillers. This update should be called after async operation completion to resume spilling/resoration.
  170. void Update();
  171. // Flushes all the data from inner spillers. Should be called when no more data is expected for spilling.
  172. void Finalize();
  173. // Is bucket in memory. False if spilled.
  174. bool IsInMemory() const;
  175. // Is bucket loaded to memory but still owned by spilled.
  176. // ExtractBucket must be called if true.
  177. bool IsExtractionRequired() const;
  178. // Is there any bucket that is being spilled right now.
  179. bool IsProcessingSpilling() const;
  180. // Is spiller ready to start loading new bucket.
  181. bool IsAcceptingDataRequests() const;
  182. // Is there any bucket that is being restored right now.
  183. bool IsRestoring() const;
  184. private:
  185. void ProcessBucketSpilling();
  186. template <class T>
  187. void AppendVector(std::vector<T, TMKQLAllocator<T>>& first, std::vector<T, TMKQLAllocator<T>>&& second) const;
  188. void ProcessBucketRestoration();
  189. void ProcessFinalizing();
  190. private:
  191. enum class EState {
  192. InMemory,
  193. Spilling,
  194. AcceptingData,
  195. Finalizing,
  196. AcceptingDataRequests,
  197. Restoring,
  198. WaitingForExtraction
  199. };
  200. enum class ENextVectorToProcess {
  201. KeyAndVals,
  202. DataIntVals,
  203. StringsValues,
  204. StringsOffsets,
  205. InterfaceValues,
  206. InterfaceOffsets,
  207. None
  208. };
  209. TVectorSpillerAdapter<ui64, TMKQLAllocator<ui64>> StateUi64Adapter;
  210. TVectorSpillerAdapter<ui32, TMKQLAllocator<ui32>> StateUi32Adapter;
  211. TVectorSpillerAdapter<char, TMKQLAllocator<char>> StateCharAdapter;
  212. EState State = EState::InMemory;
  213. ENextVectorToProcess NextVectorToProcess = ENextVectorToProcess::None;
  214. ui64 SpilledBucketsCount = 0;
  215. bool IsFinalizingRequested = false;
  216. TTableBucket CurrentBucket;
  217. };
  218. // Class which represents single table data stored in buckets
  219. class TTable {
  220. ui64 NumberOfKeyIntColumns = 0; // Key int columns always first and padded to sizeof(ui64).
  221. ui64 NumberOfKeyStringColumns = 0; // String key columns go after key int columns
  222. ui64 NumberOfKeyIColumns = 0; // Number of interface - provided key columns
  223. ui64 NumberOfDataIntColumns = 0; //Number of integer data columns in the Table
  224. ui64 NumberOfDataStringColumns = 0; // Number of strings data columns in the Table
  225. ui64 NumberOfDataIColumns = 0; // Number of interface - provided data columns
  226. TColTypeInterface * ColInterfaces = nullptr; // Array of interfaces to work with corresponding columns data
  227. ui64 NumberOfColumns = 0; // Number of columns in the Table
  228. ui64 NumberOfKeyColumns = 0; // Number of key columns in the Table
  229. ui64 NumberOfDataColumns = 0; // Number of data columns in the Table
  230. ui64 NumberOfStringColumns = 0; // Total number of String Columns
  231. ui64 NumberOfIColumns = 0; // Total number of interface-based columns
  232. ui64 NullsBitmapSize_ = 1; // Default size of ui64 values used for null columns bitmap.
  233. // Every bit set means null value. Order of columns is equal to order in AddTuple call.
  234. // First key int column is bit 1 in bit mask, second - bit 2, etc. Bit 0 is least significant in bitmask and tells if key columns contain nulls.
  235. ui64 TotalStringsSize = 0; // Bytes in tuple header reserved to store total strings size key tuple columns
  236. ui64 HeaderSize = HashSize + NullsBitmapSize_ + NumberOfKeyIntColumns + NumberOfKeyIColumns + TotalStringsSize; // Header of all tuples size
  237. ui64 BytesInKeyIntColumns = sizeof(ui64) * NumberOfKeyIntColumns;
  238. // Table data is partitioned in buckets based on key value
  239. std::vector<TTableBucket> TableBuckets;
  240. // Statistics for buckets. Total number of tuples inside a single bucket and offsets.
  241. std::vector<TTableBucketStats> TableBucketsStats;
  242. std::vector<TTableBucketSpiller> TableBucketsSpillers;
  243. // Temporary vector for tuples manipulation;
  244. std::vector<ui64> TempTuple;
  245. // Hashes for interface - based columns values
  246. std::vector<ui64> IColumnsHashes;
  247. // Serialized values for interface-based columns
  248. std::vector<std::vector<char>> IColumnsVals;
  249. // Current iterator index for NextJoinedData iterator
  250. ui64 CurrIterIndex = 0;
  251. // Current bucket for iterators
  252. ui64 CurrIterBucket = 0;
  253. // True if table joined from two other tables
  254. bool IsTableJoined = false;
  255. // Type of the join
  256. EJoinKind JoinKind = EJoinKind::Inner;
  257. // Pointers to the joined tables. Lifetime of source tables to join should be greater than joined table
  258. TTable * JoinTable1 = nullptr;
  259. TTable * JoinTable2 = nullptr;
  260. // Returns tuple data in td from bucket with id bucketNum. Tuple id inside bucket is tupleId.
  261. inline void GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData& td);
  262. // Adds keys to KeysHashTable, return true if added, false if equal key already added
  263. inline bool AddKeysToHashTable(KeysHashTable& t, ui64* keys, NYql::NUdf::TUnboxedValue * iColumns);
  264. ui64 TotalPacked = 0; // Total number of packed tuples
  265. ui64 TotalUnpacked = 0; // Total number of unpacked tuples
  266. bool LeftTableBatch_ = false; // True if left table is processed in batch mode
  267. bool RightTableBatch_ = false; // True if right table is procesed in batch mode
  268. bool HasMoreLeftTuples_ = false; // True if join is not completed, rows from left table are coming
  269. bool HasMoreRightTuples_ = false; // True if join is not completed, rows from right table are coming
  270. bool IsAny_ = false; // True if key duplicates need to be removed from table (any join)
  271. ui64 TuplesFound_ = 0; // Total number of matching keys found during join
  272. public:
  273. // Resets iterators. In case of join results table it also resets iterators for joined tables
  274. void ResetIterator();
  275. // Returns value of next tuple. Returs true if there are more tuples
  276. bool NextTuple(TupleData& td);
  277. bool TryToPreallocateMemoryForJoin(TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLeftTuples, bool hasMoreRightTuples);
  278. // Joins two tables and stores join result in table data. Tuples of joined table could be received by
  279. // joined table iterator. Life time of t1, t2 should be greater than lifetime of joined table
  280. // hasMoreLeftTuples, hasMoreRightTuples is true if join is partial and more rows are coming. For final batch hasMoreLeftTuples = false, hasMoreRightTuples = false
  281. void Join(TTable& t1, TTable& t2, EJoinKind joinKind = EJoinKind::Inner, bool hasMoreLeftTuples = false, bool hasMoreRightTuples = false, ui32 fromBucket = 0, ui32 toBucket = NumberOfBuckets);
  282. // Returns next jointed tuple data. Returs true if there are more tuples
  283. bool NextJoinedData(TupleData& td1, TupleData& td2, ui64 bucketLimit);
  284. bool NextJoinedData(TupleData& td1, TupleData& td2) {
  285. return NextJoinedData(td1, td2, JoinTable1->TableBucketsStats.size());
  286. }
  287. // Creates buckets that support spilling.
  288. void InitializeBucketSpillers(ISpiller::TPtr spiller);
  289. // Calculates approximate size of a bucket. Used for spilling to determine the largest bucket.
  290. ui64 GetSizeOfBucket(ui64 bucket) const;
  291. // This functions wind the largest bucket and spills it to the disk.
  292. bool TryToReduceMemoryAndWait(ui64 bucket);
  293. // Update state of spilling. Must be called during each DoCalculate.
  294. void UpdateSpilling();
  295. // Flushes all the spillers.
  296. void FinalizeSpilling();
  297. // Checks if spilling has any running save operation
  298. bool IsSpillingFinished() const;
  299. // Checks if spilling ready for requesting buckets for restoration.
  300. bool IsSpillingAcceptingDataRequests() const;
  301. // Checks is spilling has any running load operation
  302. bool IsRestoringSpilledBuckets() const;
  303. // Checks if bucket fully loaded to memory and may be joined.
  304. bool IsBucketInMemory(ui32 bucket) const;
  305. // Checks if extraction of bucket is required
  306. bool IsSpilledBucketWaitingForExtraction(ui32 bucket) const;
  307. // Starts loading spilled bucket to memory.
  308. void StartLoadingBucket(ui32 bucket);
  309. // Prepares bucket for joining after spilling and restoring back.
  310. void PrepareBucket(ui64 bucket);
  311. // Clears all the data related to a single bucket
  312. void ClearBucket(ui64 bucket);
  313. // Forces bucket to release the space used for underlying containers.
  314. void ShrinkBucket(ui64 bucket);
  315. // Clears table content
  316. void Clear();
  317. // Creates new table with key columns and data columns
  318. TTable(ui64 numberOfKeyIntColumns = 0, ui64 numberOfKeyStringColumns = 0,
  319. ui64 numberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0,
  320. ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0,
  321. ui64 nullsBitmapSize = 1, TColTypeInterface * colInterfaces = nullptr, bool isAny = false);
  322. enum class EAddTupleResult { Added, Unmatched, AnyMatch };
  323. // Adds new tuple to the table. intColumns, stringColumns - data of columns,
  324. // stringsSizes - sizes of strings columns. Indexes of null-value columns
  325. // in the form of bit array should be first values of intColumns.
  326. EAddTupleResult AddTuple(ui64* intColumns, char** stringColumns, ui32* stringsSizes, NYql::NUdf::TUnboxedValue * iColumns = nullptr, const TTable &other = {});
  327. ~TTable();
  328. ui64 InitHashTableCount_ = 0;
  329. ui64 HashLookups_ = 0; // hash lookups
  330. ui64 HashO1Iterations_ = 0; // hash chain
  331. ui64 HashSlotIterations_ = 0; // O(SlotSize) operations
  332. ui64 JoinTable1Total_ = 0;
  333. ui64 JoinTable2Total_ = 0;
  334. ui64 AnyFiltered_ = 0;
  335. ui64 BloomLookups_ = 0;
  336. ui64 BloomHits_ = 0;
  337. ui64 BloomFalsePositives_ = 0;
  338. };
  339. }
  340. }
  341. }