mkql_grace_join_imp.cpp 52 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365
  1. #include "mkql_grace_join_imp.h"
  2. #include <yql/essentials/public/udf/udf_data_type.h>
  3. #include <yql/essentials/utils/log/log.h>
  4. #include <contrib/libs/xxhash/xxhash.h>
  5. #include <string_view>
  6. namespace NKikimr {
  7. namespace NMiniKQL {
  8. namespace GraceJoin {
  9. TTable::EAddTupleResult TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * stringsSizes, NYql::NUdf::TUnboxedValue * iColumns, const TTable &other) {
  10. if ((intColumns[0] & 1))
  11. return EAddTupleResult::Unmatched;
  12. TotalPacked++;
  13. TempTuple.clear();
  14. TempTuple.insert(TempTuple.end(), intColumns, intColumns + NullsBitmapSize_ + NumberOfKeyIntColumns);
  15. if ( NumberOfKeyIColumns > 0 ) {
  16. for (ui32 i = 0; i < NumberOfKeyIColumns; i++) {
  17. TempTuple.push_back((ColInterfaces + i)->HashI->Hash(*(iColumns+i)));
  18. }
  19. }
  20. ui64 totalBytesForStrings = 0;
  21. ui64 totalIntsForStrings = 0;
  22. // Processing variable length string columns
  23. if ( NumberOfKeyStringColumns != 0 || NumberOfKeyIColumns != 0) {
  24. totalBytesForStrings += sizeof(ui32)*NumberOfKeyStringColumns;
  25. totalBytesForStrings += sizeof(ui32)*NumberOfKeyIColumns;
  26. for( ui64 i = 0; i < NumberOfKeyStringColumns; i++ ) {
  27. totalBytesForStrings += stringsSizes[i];
  28. }
  29. for ( ui64 i = 0; i < NumberOfKeyIColumns; i++) {
  30. TStringBuf val = (ColInterfaces + i)->Packer->Pack(*(iColumns+i));
  31. IColumnsVals[i].clear();
  32. IColumnsVals[i].insert(IColumnsVals[i].begin(), val.cbegin(), val.end());
  33. totalBytesForStrings += val.size();
  34. }
  35. totalIntsForStrings = (totalBytesForStrings + sizeof(ui64) - 1) / sizeof(ui64);
  36. TempTuple.push_back(totalIntsForStrings);
  37. TempTuple.resize(TempTuple.size() + totalIntsForStrings);
  38. TempTuple.back() = 0;
  39. ui64 * startPtr = (TempTuple.data() + TempTuple.size() - totalIntsForStrings );
  40. char * currStrPtr = reinterpret_cast< char* > (startPtr);
  41. for( ui64 i = 0; i < NumberOfKeyStringColumns; i++) {
  42. WriteUnaligned<ui32>(currStrPtr, stringsSizes[i] );
  43. currStrPtr+=sizeof(ui32);
  44. std::memcpy(currStrPtr, stringColumns[i], stringsSizes[i] );
  45. currStrPtr+=stringsSizes[i];
  46. }
  47. for( ui64 i = 0; i < NumberOfKeyIColumns; i++) {
  48. WriteUnaligned<ui32>(currStrPtr, IColumnsVals[i].size() );
  49. currStrPtr+=sizeof(ui32);
  50. std::memcpy(currStrPtr, IColumnsVals[i].data(), IColumnsVals[i].size() );
  51. currStrPtr+=IColumnsVals[i].size();
  52. }
  53. }
  54. XXH64_hash_t hash = XXH64(TempTuple.data() + NullsBitmapSize_, (TempTuple.size() - NullsBitmapSize_) * sizeof(ui64), 0);
  55. if (!hash) hash = 1;
  56. ui64 bucket = hash & BucketsMask;
  57. if (!IsAny_ && other.TableBucketsStats[bucket].BloomFilter.IsFinalized()) {
  58. auto bucket2 = &other.TableBucketsStats[bucket];
  59. auto &bloomFilter = bucket2->BloomFilter;
  60. ++BloomLookups_;
  61. if (bloomFilter.IsMissing(hash)) {
  62. ++BloomHits_;
  63. return EAddTupleResult::Unmatched;
  64. }
  65. }
  66. std::vector<ui64, TMKQLAllocator<ui64>> & keyIntVals = TableBuckets[bucket].KeyIntVals;
  67. std::vector<ui32, TMKQLAllocator<ui32>> & stringsOffsets = TableBuckets[bucket].StringsOffsets;
  68. std::vector<ui64, TMKQLAllocator<ui64>> & dataIntVals = TableBuckets[bucket].DataIntVals;
  69. std::vector<char, TMKQLAllocator<char>> & stringVals = TableBuckets[bucket].StringsValues;
  70. KeysHashTable & kh = TableBucketsStats[bucket].AnyHashTable;
  71. ui32 offset = keyIntVals.size(); // Offset of tuple inside the keyIntVals vector
  72. keyIntVals.push_back(hash);
  73. keyIntVals.insert(keyIntVals.end(), TempTuple.begin(), TempTuple.end());
  74. if (IsAny_) {
  75. if ( !AddKeysToHashTable(kh, keyIntVals.begin() + offset, iColumns) ) {
  76. keyIntVals.resize(offset);
  77. ++AnyFiltered_;
  78. return EAddTupleResult::AnyMatch;
  79. }
  80. if (other.TableBucketsStats[bucket].BloomFilter.IsFinalized()) {
  81. auto bucket2 = &other.TableBucketsStats[bucket];
  82. auto &bloomFilter = bucket2->BloomFilter;
  83. ++BloomLookups_;
  84. if (bloomFilter.IsMissing(hash)) {
  85. keyIntVals.resize(offset);
  86. ++BloomHits_;
  87. return EAddTupleResult::Unmatched;
  88. }
  89. }
  90. }
  91. TableBucketsStats[bucket].TuplesNum++;
  92. if (NumberOfStringColumns || NumberOfIColumns ) {
  93. stringsOffsets.push_back(TableBucketsStats[bucket].KeyIntValsTotalSize); // Adding offset to tuple in keyIntVals vector
  94. stringsOffsets.push_back(TableBucketsStats[bucket].StringValuesTotalSize); // Adding offset to string values
  95. // Adding strings sizes for keys and data
  96. if ( NumberOfStringColumns ) {
  97. stringsOffsets.insert( stringsOffsets.end(), stringsSizes, stringsSizes+NumberOfStringColumns );
  98. }
  99. if ( NumberOfIColumns ) {
  100. for ( ui64 i = NumberOfKeyIColumns; i < NumberOfIColumns; i++) {
  101. TStringBuf val = (ColInterfaces + i)->Packer->Pack(*(iColumns+i));
  102. IColumnsVals[i].clear();
  103. IColumnsVals[i].insert(IColumnsVals[i].begin(), val.cbegin(), val.end());
  104. }
  105. for (ui64 i = 0; i < NumberOfIColumns; i++ ) {
  106. stringsOffsets.push_back(IColumnsVals[i].size());
  107. }
  108. }
  109. }
  110. // Adding data values
  111. ui64 * dataColumns = intColumns + NullsBitmapSize_ + NumberOfKeyIntColumns;
  112. dataIntVals.insert(dataIntVals.end(), dataColumns, dataColumns + NumberOfDataIntColumns);
  113. // Adding strings values for data columns
  114. char ** dataStringsColumns = stringColumns + NumberOfKeyStringColumns;
  115. ui32 * dataStringsSizes = stringsSizes + NumberOfKeyStringColumns;
  116. ui64 initialStringsSize = stringVals.size();
  117. for( ui64 i = 0; i < NumberOfDataStringColumns; i++) {
  118. ui32 currStringSize = *(dataStringsSizes + i);
  119. stringVals.insert(stringVals.end(), *(dataStringsColumns + i), *(dataStringsColumns + i) + currStringSize);
  120. }
  121. for ( ui64 i = 0; i < NumberOfDataIColumns; i++) {
  122. stringVals.insert( stringVals.end(), IColumnsVals[NumberOfKeyIColumns + i].begin(), IColumnsVals[NumberOfKeyIColumns + i].end());
  123. }
  124. TableBucketsStats[bucket].KeyIntValsTotalSize += keyIntVals.size() - offset;
  125. TableBucketsStats[bucket].StringValuesTotalSize += stringVals.size() - initialStringsSize;
  126. return EAddTupleResult::Added;
  127. }
  128. void TTable::ResetIterator() {
  129. CurrIterIndex = 0;
  130. CurrIterBucket = 0;
  131. if (IsTableJoined) {
  132. JoinTable1->ResetIterator();
  133. JoinTable2->ResetIterator();
  134. }
  135. TotalUnpacked = 0;
  136. }
  137. // Checks if there are more tuples and sets bucketId and tupleId to next valid.
  138. inline bool HasMoreTuples(std::vector<TTableBucketStats> & tableBucketsStats, ui64 & bucketId, ui64 & tupleId, ui64 bucketLimit ) {
  139. if (bucketId >= bucketLimit) return false;
  140. if ( tupleId >= tableBucketsStats[bucketId].TuplesNum ) {
  141. tupleId = 0;
  142. bucketId ++;
  143. if (bucketId == bucketLimit) {
  144. return false;
  145. }
  146. while( tableBucketsStats[bucketId].TuplesNum == 0 ) {
  147. bucketId ++;
  148. if (bucketId == bucketLimit) {
  149. return false;
  150. }
  151. }
  152. }
  153. return true;
  154. }
  155. // Returns value of next tuple. Returs true if there are more tuples
  156. bool TTable::NextTuple(TupleData & td){
  157. if (HasMoreTuples(TableBucketsStats, CurrIterBucket, CurrIterIndex, TableBucketsStats.size())) {
  158. GetTupleData(CurrIterBucket, CurrIterIndex, td);
  159. CurrIterIndex++;
  160. return true;
  161. } else {
  162. td.AllNulls = true;
  163. return false;
  164. }
  165. }
  166. inline bool CompareIColumns( const ui32* stringSizes1, const char * vals1,
  167. const ui32* stringSizes2, const char * vals2,
  168. TColTypeInterface * colInterfaces, ui64 nStringColumns, ui64 nIColumns) {
  169. ui32 currOffset1 = 0;
  170. ui32 currOffset2 = 0;
  171. ui32 currSize1 = 0;
  172. ui32 currSize2 = 0;
  173. NYql::NUdf::TUnboxedValue val1, val2;
  174. TStringBuf str1, str2;
  175. for (ui32 i = 0; i < nStringColumns; i ++) {
  176. currSize1 = *(stringSizes1 + i);
  177. currSize2 = *(stringSizes2 + i);
  178. if (currSize1 != currSize2)
  179. return false;
  180. currOffset1 += currSize1 + sizeof(ui32);
  181. currOffset2 += currSize2 + sizeof(ui32);
  182. }
  183. if (0 != std::memcmp(vals1, vals2, currOffset1))
  184. return false;
  185. for (ui32 i = 0; i < nIColumns; i ++) {
  186. currSize1 = *(stringSizes1 + nStringColumns + i );
  187. currSize2 = *(stringSizes2 + nStringColumns + i );
  188. currOffset1 += sizeof(ui32);
  189. currOffset2 += sizeof(ui32);
  190. str1 = TStringBuf(vals1 + currOffset1, currSize1);
  191. val1 = (colInterfaces + i)->Packer->Unpack(str1, colInterfaces->HolderFactory);
  192. str2 = TStringBuf(vals2 + currOffset2, currSize2 );
  193. val2 = (colInterfaces + i)->Packer->Unpack(str2, colInterfaces->HolderFactory);
  194. if ( ! ((colInterfaces + i)->EquateI->Equals(val1,val2)) ) {
  195. return false;
  196. }
  197. currOffset1 += currSize1;
  198. currOffset2 += currSize2;
  199. }
  200. return true;
  201. }
  202. inline bool CompareIColumns( const char * vals1,
  203. const char * vals2,
  204. NYql::NUdf::TUnboxedValue * iColumns,
  205. TColTypeInterface * colInterfaces,
  206. ui64 nStringColumns, ui64 nIColumns) {
  207. ui32 currOffset1 = 0;
  208. NYql::NUdf::TUnboxedValue val1;
  209. TStringBuf str1;
  210. for (ui32 i = 0; i < nStringColumns; i ++) {
  211. auto currSize1 = ReadUnaligned<ui32>(vals1 + currOffset1);
  212. auto currSize2 = ReadUnaligned<ui32>(vals2 + currOffset1);
  213. if (currSize1 != currSize2)
  214. return false;
  215. currOffset1 += currSize1 + sizeof(ui32);
  216. }
  217. if (0 != std::memcmp(vals1, vals2, currOffset1))
  218. return false;
  219. for (ui32 i = 0; i < nIColumns; i ++) {
  220. auto currSize1 = ReadUnaligned<ui32>(vals1 + currOffset1);
  221. currOffset1 += sizeof(ui32);
  222. str1 = TStringBuf(vals1 + currOffset1, currSize1);
  223. val1 = (colInterfaces + i)->Packer->Unpack(str1, colInterfaces->HolderFactory);
  224. auto &val2 = iColumns[i];
  225. if ( ! ((colInterfaces + i)->EquateI->Equals(val1,val2)) ) {
  226. return false;
  227. }
  228. currOffset1 += currSize1;
  229. }
  230. return true;
  231. }
  232. // Resizes KeysHashTable to new slots, keeps old content.
  233. void ResizeHashTable(KeysHashTable &t, ui64 newSlots){
  234. std::vector<ui64, TMKQLAllocator<ui64>> newTable(newSlots * t.SlotSize , 0);
  235. for ( auto it = t.Table.begin(); it != t.Table.end(); it += t.SlotSize ) {
  236. if ( *it == 0)
  237. continue;
  238. ui64 hash = *it;
  239. ui64 newSlotNum = hash % (newSlots);
  240. auto newIt = newTable.begin() + t.SlotSize * newSlotNum;
  241. while (*newIt != 0) {
  242. newIt += t.SlotSize;
  243. if (newIt == newTable.end()) {
  244. newIt = newTable.begin();
  245. }
  246. }
  247. std::copy_n(it, t.SlotSize, newIt);
  248. }
  249. t.NSlots = newSlots;
  250. t.Table = std::move(newTable);
  251. }
  252. bool IsTablesSwapRequired(ui64 tuplesNum1, ui64 tuplesNum2, bool table1Batch, bool table2Batch) {
  253. return tuplesNum2 > tuplesNum1 && !table1Batch || table2Batch;
  254. }
  255. ui64 ComputeJoinSlotsSizeForBucket(const TTableBucket& bucket, const TTableBucketStats& bucketStats, ui64 headerSize, bool tableHasKeyStringColumns, bool tableHasKeyIColumns) {
  256. ui64 tuplesNum = bucketStats.TuplesNum;
  257. ui64 avgStringsSize = (3 * (bucket.KeyIntVals.size() - tuplesNum * headerSize) ) / ( 2 * tuplesNum + 1) + 1;
  258. ui64 slotSize = headerSize + 1; // Header [Short Strings] SlotIdx
  259. if (tableHasKeyStringColumns || tableHasKeyIColumns) {
  260. slotSize = slotSize + avgStringsSize;
  261. }
  262. return slotSize;
  263. }
  264. ui64 ComputeNumberOfSlots(ui64 tuplesNum) {
  265. return (3 * tuplesNum + 1) | 1;
  266. }
  267. bool TTable::TryToPreallocateMemoryForJoin(TTable & t1, TTable & t2, EJoinKind /* joinKind */, bool hasMoreLeftTuples, bool hasMoreRightTuples) {
  268. // If the batch is final or the only one, then the buckets are processed sequentially, the memory for the hash tables is freed immediately after processing.
  269. // So, no preallocation is required.
  270. if (!hasMoreLeftTuples && !hasMoreRightTuples) return true;
  271. for (ui64 bucket = 0; bucket < GraceJoin::NumberOfBuckets; bucket++) {
  272. ui64 tuplesNum1 = t1.TableBucketsStats[bucket].TuplesNum;
  273. ui64 tuplesNum2 = t2.TableBucketsStats[bucket].TuplesNum;
  274. TTable& tableForPreallocation = IsTablesSwapRequired(tuplesNum1, tuplesNum2, hasMoreLeftTuples || LeftTableBatch_, hasMoreRightTuples || RightTableBatch_) ? t1 : t2;
  275. if (!tableForPreallocation.TableBucketsStats[bucket].TuplesNum || tableForPreallocation.TableBuckets[bucket].NSlots) continue;
  276. TTableBucket& bucketForPreallocation = tableForPreallocation.TableBuckets[bucket];
  277. TTableBucketStats& bucketForPreallocationStats = tableForPreallocation.TableBucketsStats[bucket];
  278. const auto nSlots = ComputeJoinSlotsSizeForBucket(bucketForPreallocation, bucketForPreallocationStats, tableForPreallocation.HeaderSize,
  279. tableForPreallocation.NumberOfKeyStringColumns != 0, tableForPreallocation.NumberOfKeyIColumns != 0);
  280. const auto slotSize = ComputeNumberOfSlots(tableForPreallocation.TableBucketsStats[bucket].TuplesNum);
  281. try {
  282. bucketForPreallocation.JoinSlots.reserve(nSlots*slotSize);
  283. bucketForPreallocationStats.BloomFilter.Reserve(bucketForPreallocationStats.TuplesNum);
  284. } catch (TMemoryLimitExceededException) {
  285. for (ui64 i = 0; i < bucket; ++i) {
  286. auto& b1 = t1.TableBuckets[i];
  287. b1.JoinSlots.resize(0);
  288. b1.JoinSlots.shrink_to_fit();
  289. auto& s1 = t1.TableBucketsStats[i];
  290. s1.BloomFilter.Shrink();
  291. auto& b2 = t2.TableBuckets[i];
  292. b2.JoinSlots.resize(0);
  293. b2.JoinSlots.shrink_to_fit();
  294. auto& s2 = t2.TableBucketsStats[i];
  295. s2.BloomFilter.Shrink();
  296. }
  297. return false;
  298. }
  299. }
  300. return true;
  301. }
  302. // Joins two tables and returns join result in joined table. Tuples of joined table could be received by
  303. // joined table iterator
  304. void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLeftTuples, bool hasMoreRightTuples, ui32 fromBucket, ui32 toBucket ) {
  305. if ( hasMoreLeftTuples )
  306. LeftTableBatch_ = true;
  307. if( hasMoreRightTuples )
  308. RightTableBatch_ = true;
  309. auto table1Batch = LeftTableBatch_;
  310. auto table2Batch = RightTableBatch_;
  311. JoinTable1 = &t1;
  312. JoinTable2 = &t2;
  313. JoinKind = joinKind;
  314. IsTableJoined = true;
  315. MKQL_ENSURE(joinKind != EJoinKind::Cross, "Cross Join is not allowed in Grace Join");
  316. const bool needCrossIds = JoinKind == EJoinKind::Inner || JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Left || JoinKind == EJoinKind::Right;
  317. ui64 tuplesFound = 0;
  318. for (ui64 bucket = fromBucket; bucket < toBucket; bucket++) {
  319. auto &joinResults = TableBuckets[bucket].JoinIds;
  320. joinResults.clear();
  321. TTableBucket * bucket1 = &JoinTable1->TableBuckets[bucket];
  322. TTableBucket * bucket2 = &JoinTable2->TableBuckets[bucket];
  323. TTableBucketStats * bucketStats1 = &JoinTable1->TableBucketsStats[bucket];
  324. TTableBucketStats * bucketStats2 = &JoinTable2->TableBucketsStats[bucket];
  325. ui64 tuplesNum1 = JoinTable1->TableBucketsStats[bucket].TuplesNum;
  326. ui64 tuplesNum2 = JoinTable2->TableBucketsStats[bucket].TuplesNum;
  327. ui64 headerSize1 = JoinTable1->HeaderSize;
  328. ui64 headerSize2 = JoinTable2->HeaderSize;
  329. ui64 nullsSize1 = JoinTable1->NullsBitmapSize_;
  330. ui64 nullsSize2 = JoinTable2->NullsBitmapSize_;
  331. ui64 keyIntOffset1 = HashSize + nullsSize1;
  332. ui64 keyIntOffset2 = HashSize + nullsSize2;
  333. bool table1HasKeyStringColumns = (JoinTable1->NumberOfKeyStringColumns != 0);
  334. bool table2HasKeyStringColumns = (JoinTable2->NumberOfKeyStringColumns != 0);
  335. bool table1HasKeyIColumns = (JoinTable1->NumberOfKeyIColumns != 0);
  336. bool table2HasKeyIColumns = (JoinTable2->NumberOfKeyIColumns != 0);
  337. bool swapTables = IsTablesSwapRequired(tuplesNum1, tuplesNum2, table1Batch, table2Batch);
  338. if (swapTables) {
  339. std::swap(bucket1, bucket2);
  340. std::swap(bucketStats1, bucketStats2);
  341. std::swap(headerSize1, headerSize2);
  342. std::swap(nullsSize1, nullsSize2);
  343. std::swap(keyIntOffset1, keyIntOffset2);
  344. std::swap(table1HasKeyStringColumns, table2HasKeyStringColumns);
  345. std::swap(table1HasKeyIColumns, table2HasKeyIColumns);
  346. std::swap(tuplesNum1, tuplesNum2);
  347. }
  348. auto &leftIds = bucket1->LeftIds;
  349. leftIds.clear();
  350. const bool selfJoinSameKeys = (JoinTable1 == JoinTable2);
  351. const bool needLeftIds = ((swapTables ? (JoinKind == EJoinKind::Right || JoinKind == EJoinKind::RightOnly) : (JoinKind == EJoinKind::Left || JoinKind == EJoinKind::LeftOnly)) || JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Exclusion) && !selfJoinSameKeys;
  352. const bool isLeftSemi = swapTables ? JoinKind == EJoinKind::RightSemi : JoinKind == EJoinKind::LeftSemi;
  353. //const bool isRightSemi = swapTables ? JoinKind == EJoinKind::LeftSemi : JoinKind == EJoinKind::RightSemi;
  354. bucketStats2->HashtableMatches = ((swapTables ? (JoinKind == EJoinKind::Left || JoinKind == EJoinKind::LeftOnly || JoinKind == EJoinKind::LeftSemi) : (JoinKind == EJoinKind::Right || JoinKind == EJoinKind::RightOnly || JoinKind == EJoinKind::RightSemi)) || JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Exclusion) && !selfJoinSameKeys;
  355. // In this case, all keys except for NULLs have matched key on other side, and NULLs are handled by AddTuple
  356. if (tuplesNum2 == 0) {
  357. if (needLeftIds) {
  358. for (ui32 leftId = 0; leftId != tuplesNum1; ++leftId)
  359. leftIds.push_back(leftId);
  360. }
  361. continue;
  362. }
  363. if (tuplesNum1 == 0 && (hasMoreRightTuples || hasMoreLeftTuples || !bucketStats2->HashtableMatches))
  364. continue;
  365. ui64 slotSize = ComputeJoinSlotsSizeForBucket(*bucket2, *bucketStats2, headerSize2, table2HasKeyStringColumns, table2HasKeyIColumns);
  366. ui64 &nSlots = bucket2->NSlots;
  367. auto &joinSlots = bucket2->JoinSlots;
  368. auto &bloomFilter = bucketStats2->BloomFilter;
  369. bool initHashTable = false;
  370. Y_DEBUG_ABORT_UNLESS(bucketStats2->SlotSize == 0 || bucketStats2->SlotSize == slotSize);
  371. if (!nSlots) {
  372. nSlots = ComputeNumberOfSlots(tuplesNum2);
  373. joinSlots.resize(nSlots*slotSize, 0);
  374. bloomFilter.Resize(tuplesNum2);
  375. initHashTable = true;
  376. bucketStats2->SlotSize = slotSize;
  377. ++InitHashTableCount_;
  378. }
  379. auto firstSlot = [begin = joinSlots.begin(), slotSize, nSlots](auto hash) {
  380. ui64 slotNum = hash % nSlots;
  381. return begin + slotNum * slotSize;
  382. };
  383. auto nextSlot = [begin = joinSlots.begin(), end = joinSlots.end(), slotSize](auto it) {
  384. it += slotSize;
  385. if (it == end)
  386. it = begin;
  387. return it;
  388. };
  389. if (initHashTable) {
  390. ui32 tuple2Idx = 0;
  391. auto it2 = bucket2->KeyIntVals.begin();
  392. for (ui64 keysValSize = headerSize2; it2 != bucket2->KeyIntVals.end(); it2 += keysValSize, ++tuple2Idx) {
  393. if ( table2HasKeyStringColumns || table2HasKeyIColumns) {
  394. keysValSize = headerSize2 + *(it2 + headerSize2 - 1) ;
  395. }
  396. ui64 hash = *it2;
  397. // Note: if hashtable is re-created after being spilled
  398. // (*(it2 + HashSize) & 1) may be true (even though key does NOT contain NULL)
  399. bloomFilter.Add(hash);
  400. auto slotIt = firstSlot(hash);
  401. ++HashLookups_;
  402. for (; *slotIt != 0; slotIt = nextSlot(slotIt))
  403. {
  404. ++HashO1Iterations_;
  405. }
  406. ++HashSlotIterations_;
  407. if (keysValSize <= slotSize - 1)
  408. {
  409. std::copy_n(it2, keysValSize, slotIt);
  410. }
  411. else
  412. {
  413. std::copy_n(it2, headerSize2, slotIt);
  414. *(slotIt + headerSize2) = it2 + headerSize2 - bucket2->KeyIntVals.begin();
  415. }
  416. slotIt[slotSize - 1] = tuple2Idx;
  417. }
  418. bloomFilter.Finalize();
  419. if (swapTables) JoinTable1Total_ += tuplesNum2; else JoinTable2Total_ += tuplesNum2;
  420. }
  421. if (swapTables) JoinTable2Total_ += tuplesNum1; else JoinTable1Total_ += tuplesNum1;
  422. ui32 tuple1Idx = 0;
  423. auto it1 = bucket1->KeyIntVals.begin();
  424. // /-------headerSize---------------------------\
  425. // hash nulls-bitmap keyInt[] KeyIHash[] strSize| [strPos | strs] slotIdx
  426. // \---------------------------------------slotSize ---------------------/
  427. // bit0 of nulls bitmap denotes key-with-nulls
  428. // strSize only present if HasKeyStrCol || HasKeyICol
  429. // strPos is only present if (HasKeyStrCol || HasKeyICol) && strSize + headerSize >= slotSize
  430. // slotSize, slotIdx and strPos is only for hashtable (table2)
  431. ui64 bloomHits = 0;
  432. ui64 bloomLookups = 0;
  433. for (ui64 keysValSize = headerSize1; it1 != bucket1->KeyIntVals.end(); it1 += keysValSize, ++tuple1Idx ) {
  434. if ( table1HasKeyStringColumns || table1HasKeyIColumns ) {
  435. keysValSize = headerSize1 + *(it1 + headerSize1 - 1) ;
  436. }
  437. ui64 hash = *it1;
  438. Y_DEBUG_ABORT_UNLESS((*(it1 + HashSize) & 1) == 0); // Keys with NULL never reaches Join
  439. if (initHashTable) {
  440. bloomLookups++;
  441. if (bloomFilter.IsMissing(hash)) {
  442. if (needLeftIds)
  443. leftIds.push_back(tuple1Idx);
  444. bloomHits++;
  445. continue;
  446. }
  447. }
  448. ++HashLookups_;
  449. auto saveTuplesFound = tuplesFound;
  450. auto slotIt = firstSlot(hash);
  451. for (; *slotIt != 0; slotIt = nextSlot(slotIt) )
  452. {
  453. ++HashO1Iterations_;
  454. if (*slotIt != hash)
  455. continue;
  456. auto tuple2Idx = slotIt[slotSize - 1];
  457. ++HashSlotIterations_;
  458. if (table1HasKeyIColumns || !(keysValSize - nullsSize1 <= slotSize - 1 - nullsSize2)) {
  459. // 2nd condition cannot be true unless HasKeyStringColumns or HasKeyIColumns, hence size at the end of header is present
  460. if (!std::equal(it1 + keyIntOffset1, it1 + headerSize1 - 1, slotIt + keyIntOffset2))
  461. continue;
  462. auto slotStringsStart = slotIt + headerSize2;
  463. ui64 slotStringsSize = *(slotIt + headerSize2 - 1);
  464. if (headerSize2 + slotStringsSize + 1 > slotSize)
  465. {
  466. ui64 stringsPos = *(slotIt + headerSize2);
  467. slotStringsStart = bucket2->KeyIntVals.begin() + stringsPos;
  468. }
  469. if (table1HasKeyIColumns)
  470. {
  471. ui64 stringsOffsetsIdx1 = tuple1Idx * (JoinTable1->NumberOfStringColumns + JoinTable1->NumberOfIColumns + 2);
  472. ui64 stringsOffsetsIdx2 = tuple2Idx * (JoinTable2->NumberOfStringColumns + JoinTable2->NumberOfIColumns + 2);
  473. ui32 * stringsSizesPtr1 = bucket1->StringsOffsets.data() + stringsOffsetsIdx1 + 2;
  474. ui32 * stringsSizesPtr2 = bucket2->StringsOffsets.data() + stringsOffsetsIdx2 + 2;
  475. if (!CompareIColumns( stringsSizesPtr1 ,
  476. (char *) (it1 + headerSize1 ),
  477. stringsSizesPtr2,
  478. (char *) (slotStringsStart),
  479. JoinTable1 -> ColInterfaces, JoinTable1->NumberOfStringColumns, JoinTable1 -> NumberOfKeyIColumns ))
  480. continue;
  481. } else {
  482. ui64 stringsSize = *(it1 + headerSize1 - 1);
  483. if (stringsSize != slotStringsSize || !std::equal(it1 + headerSize1, it1 + headerSize1 + stringsSize, slotStringsStart))
  484. continue;
  485. }
  486. } else {
  487. if (!std::equal(it1 + keyIntOffset1, it1 + keysValSize, slotIt + keyIntOffset2))
  488. continue;
  489. }
  490. *(slotIt + HashSize) |= 1; // mark right slot as matched
  491. tuplesFound++;
  492. if (needCrossIds) {
  493. JoinTuplesIds joinIds;
  494. joinIds.id1 = swapTables ? tuple2Idx : tuple1Idx;
  495. joinIds.id2 = swapTables ? tuple1Idx : tuple2Idx;
  496. joinResults.emplace_back(joinIds);
  497. }
  498. }
  499. if (saveTuplesFound == tuplesFound) {
  500. ++BloomFalsePositives_;
  501. if (needLeftIds)
  502. leftIds.push_back(tuple1Idx);
  503. } else if (isLeftSemi) {
  504. leftIds.push_back(tuple1Idx);
  505. }
  506. }
  507. if (!hasMoreLeftTuples && !hasMoreRightTuples) {
  508. bloomFilter.Shrink();
  509. if (bucketStats2->HashtableMatches) {
  510. auto slotIt = joinSlots.cbegin();
  511. auto end = joinSlots.cend();
  512. auto isSemi = JoinKind == EJoinKind::LeftSemi || JoinKind == EJoinKind::RightSemi;
  513. auto &leftIds2 = bucket2->LeftIds;
  514. for (; slotIt != end; slotIt += slotSize) {
  515. if ((*(slotIt + HashSize) & 1) == isSemi && *slotIt != 0) {
  516. auto id2 = *(slotIt + slotSize - 1);
  517. Y_DEBUG_ABORT_UNLESS(id2 < bucketStats2->TuplesNum);
  518. leftIds2.push_back(id2);
  519. }
  520. }
  521. std::sort(leftIds2.begin(), leftIds2.end());
  522. }
  523. joinSlots.clear();
  524. joinSlots.shrink_to_fit();
  525. nSlots = 0;
  526. }
  527. if (bloomHits < bloomLookups/8) {
  528. // Bloomfilter was inefficient, drop it
  529. bloomFilter.Shrink();
  530. }
  531. BloomHits_ += bloomHits;
  532. BloomLookups_ += bloomLookups;
  533. YQL_LOG(GRACEJOIN_TRACE)
  534. << (const void *)this << '#'
  535. << bucket
  536. << " Table1 " << JoinTable1->TableBucketsStats[bucket].TuplesNum
  537. << " Table2 " << JoinTable2->TableBucketsStats[bucket].TuplesNum
  538. << " LeftTableBatch " << LeftTableBatch_
  539. << " RightTableBatch " << RightTableBatch_
  540. << " leftIds " << leftIds.size()
  541. << " joinIds " << joinResults.size()
  542. << " joinKind " << (int)JoinKind
  543. << " swapTables " << swapTables
  544. << " initHashTable " << initHashTable
  545. ;
  546. }
  547. HasMoreLeftTuples_ = hasMoreLeftTuples;
  548. HasMoreRightTuples_ = hasMoreRightTuples;
  549. TuplesFound_ += tuplesFound;
  550. }
  551. inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) {
  552. ui64 keyIntsOffset = 0;
  553. ui64 dataIntsOffset = 0;
  554. ui64 keyStringsOffset = 0;
  555. ui64 dataStringsOffset = 0;
  556. td.AllNulls = false;
  557. TotalUnpacked++;
  558. TTableBucket & tb = TableBuckets[bucketNum];
  559. ui64 stringsOffsetsIdx = tupleId * (NumberOfStringColumns + NumberOfIColumns + 2);
  560. if(NumberOfKeyStringColumns != 0 || NumberOfKeyIColumns !=0 ) {
  561. keyIntsOffset = tb.StringsOffsets[stringsOffsetsIdx];
  562. } else {
  563. keyIntsOffset = HeaderSize * tupleId;
  564. }
  565. for ( ui64 i = 0; i < NumberOfKeyIntColumns + NullsBitmapSize_; ++i) {
  566. td.IntColumns[i] = tb.KeyIntVals[keyIntsOffset + HashSize + i];
  567. }
  568. dataIntsOffset = NumberOfDataIntColumns * tupleId;
  569. for ( ui64 i = 0; i < NumberOfDataIntColumns; ++i) {
  570. td.IntColumns[NumberOfKeyIntColumns + NullsBitmapSize_ + i] = tb.DataIntVals[dataIntsOffset + i];
  571. }
  572. char *strPtr = nullptr;
  573. if(NumberOfKeyStringColumns != 0 || NumberOfKeyIColumns != 0) {
  574. keyStringsOffset = tb.StringsOffsets[stringsOffsetsIdx] + HeaderSize;
  575. strPtr = reinterpret_cast<char *>(tb.KeyIntVals.data() + keyStringsOffset);
  576. for (ui64 i = 0; i < NumberOfKeyStringColumns; ++i)
  577. {
  578. td.StrSizes[i] = tb.StringsOffsets[stringsOffsetsIdx + 2 + i];
  579. Y_DEBUG_ABORT_UNLESS(ReadUnaligned<ui32>(strPtr) == td.StrSizes[i]);
  580. strPtr += sizeof(ui32);
  581. td.StrColumns[i] = strPtr;
  582. strPtr += td.StrSizes[i];
  583. }
  584. for ( ui64 i = 0; i < NumberOfKeyIColumns; i++) {
  585. ui32 currSize = tb.StringsOffsets[stringsOffsetsIdx + 2 + NumberOfKeyStringColumns + i];
  586. Y_DEBUG_ABORT_UNLESS(ReadUnaligned<ui32>(strPtr) == currSize);
  587. strPtr += sizeof(ui32);
  588. *(td.IColumns + i) = (ColInterfaces + i)->Packer->Unpack(TStringBuf(strPtr, currSize), ColInterfaces->HolderFactory);
  589. strPtr += currSize;
  590. }
  591. }
  592. if(NumberOfDataStringColumns || NumberOfDataIColumns != 0) {
  593. dataStringsOffset = tb.StringsOffsets[stringsOffsetsIdx + 1];
  594. }
  595. strPtr = (tb.StringsValues.data() + dataStringsOffset);
  596. for ( ui64 i = 0; i < NumberOfDataStringColumns; ++i ) {
  597. ui32 currIdx = NumberOfKeyStringColumns + i;
  598. td.StrColumns[currIdx] = strPtr;
  599. td.StrSizes[currIdx] = tb.StringsOffsets[stringsOffsetsIdx + 2 + currIdx];
  600. strPtr += td.StrSizes[currIdx];
  601. }
  602. for (ui64 i = 0; i < NumberOfDataIColumns; i++ ) {
  603. ui32 currIdx = NumberOfStringColumns + NumberOfKeyIColumns + i;
  604. ui32 currSize = tb.StringsOffsets[stringsOffsetsIdx + 2 + currIdx];
  605. *(td.IColumns + NumberOfKeyIColumns + i) = (ColInterfaces + NumberOfKeyIColumns + i)->Packer->Unpack(TStringBuf(strPtr, currSize), ColInterfaces->HolderFactory);
  606. strPtr += currSize;
  607. }
  608. }
  609. inline bool TTable::AddKeysToHashTable(KeysHashTable& t, ui64* keys, NYql::NUdf::TUnboxedValue * iColumns) {
  610. if (t.NSlots == 0) {
  611. t.SlotSize = HeaderSize + NumberOfKeyStringColumns * 2;
  612. t.Table.resize(DefaultTuplesNum * t.SlotSize, 0);
  613. t.NSlots = DefaultTuplesNum;
  614. }
  615. if ( t.FillCount > t.NSlots/2 ) {
  616. ResizeHashTable(t, 2 * t.NSlots + 1);
  617. }
  618. if ( (*(keys + HashSize) & 1) ) // Keys with null value
  619. return true;
  620. ui64 hash = *keys;
  621. ui64 slot = hash % t.NSlots;
  622. auto it = t.Table.begin() + slot * t.SlotSize;
  623. ui64 keyIntOffset = HashSize + NullsBitmapSize_;
  624. ui64 keysSize = HeaderSize;
  625. ui64 keyStringsSize = 0;
  626. if ( NumberOfKeyStringColumns > 0 || NumberOfKeyIColumns > 0) {
  627. keyStringsSize = *(keys + HeaderSize - 1);
  628. keysSize = HeaderSize + keyStringsSize;
  629. }
  630. auto nextSlot = [begin = t.Table.begin(), end = t.Table.end(), slotSize = t.SlotSize](auto it) {
  631. it += slotSize;
  632. if (it == end)
  633. it = begin;
  634. return it;
  635. };
  636. for (auto itValSize = HeaderSize; *it != 0; it = nextSlot(it)) {
  637. if (*it != hash)
  638. continue;
  639. if ( NumberOfKeyIColumns == 0 && (itValSize <= t.SlotSize)) {
  640. if (!std::equal(it + keyIntOffset, it + itValSize, keys + keyIntOffset))
  641. continue;
  642. return false;
  643. }
  644. Y_DEBUG_ABORT_UNLESS( NumberOfKeyStringColumns > 0 || NumberOfKeyIColumns > 0);
  645. itValSize = HeaderSize + *(it + HeaderSize - 1);
  646. auto slotStringsStart = it + HeaderSize;
  647. if (!std::equal(it + keyIntOffset, it + HeaderSize - 1, keys + keyIntOffset))
  648. continue;
  649. if (NumberOfKeyIColumns > 0) {
  650. if (!CompareIColumns(
  651. (char *) (slotStringsStart),
  652. (char *) (keys + HeaderSize ),
  653. iColumns,
  654. JoinTable1 -> ColInterfaces, JoinTable1->NumberOfStringColumns, JoinTable1 -> NumberOfKeyIColumns ))
  655. continue;
  656. return false;
  657. }
  658. Y_DEBUG_ABORT_UNLESS(!(itValSize <= t.SlotSize));
  659. ui64 stringsPos = *(it + HeaderSize);
  660. slotStringsStart = t.SpillData.begin() + stringsPos;
  661. if (keysSize != itValSize || !std::equal(slotStringsStart, slotStringsStart + itValSize, keys + HeaderSize))
  662. continue;
  663. return false;
  664. }
  665. if (keysSize > t.SlotSize) {
  666. ui64 spillDataOffset = t.SpillData.size();
  667. t.SpillData.insert(t.SpillData.end(), keys + HeaderSize, keys + keysSize);
  668. std::copy_n(keys, HeaderSize, it);
  669. *(it + HeaderSize) = spillDataOffset;
  670. } else {
  671. std::copy_n(keys, keysSize, it);
  672. }
  673. t.FillCount++;
  674. return true;
  675. }
  676. bool TTable::NextJoinedData( TupleData & td1, TupleData & td2, ui64 bucketLimit) {
  677. while (CurrIterBucket < bucketLimit) {
  678. if (auto &joinIds = TableBuckets[CurrIterBucket].JoinIds; CurrIterIndex != joinIds.size()) {
  679. Y_DEBUG_ABORT_UNLESS(JoinKind == EJoinKind::Inner || JoinKind == EJoinKind::Left || JoinKind == EJoinKind::Right || JoinKind == EJoinKind::Full);
  680. auto ids = joinIds[CurrIterIndex++];
  681. JoinTable1->GetTupleData(CurrIterBucket, ids.id1, td1);
  682. JoinTable2->GetTupleData(CurrIterBucket, ids.id2, td2);
  683. return true;
  684. }
  685. auto leftSide = [this](auto sideTable, auto &tdL, auto &tdR) {
  686. const auto &bucket = sideTable->TableBuckets[CurrIterBucket];
  687. auto &currIterIndex = sideTable->CurrIterIndex;
  688. const auto &leftIds = bucket.LeftIds;
  689. if (currIterIndex != leftIds.size()) {
  690. auto id = leftIds[currIterIndex++];
  691. sideTable->GetTupleData(CurrIterBucket, id, tdL);
  692. tdR.AllNulls = true;
  693. return true;
  694. }
  695. return false;
  696. };
  697. if (leftSide(JoinTable1, td1, td2))
  698. return true;
  699. if (leftSide(JoinTable2, td2, td1))
  700. return true;
  701. ++CurrIterBucket;
  702. CurrIterIndex = 0;
  703. JoinTable1->CurrIterIndex = 0;
  704. JoinTable2->CurrIterIndex = 0;
  705. }
  706. return false;
  707. }
  708. void TTable::Clear() {
  709. for (ui64 bucket = 0; bucket < NumberOfBuckets; bucket++) {
  710. ClearBucket(bucket);
  711. }
  712. }
  713. void TTable::ClearBucket(ui64 bucket) {
  714. TTableBucket & tb = TableBuckets[bucket];
  715. tb.KeyIntVals.clear();
  716. tb.DataIntVals.clear();
  717. tb.StringsOffsets.clear();
  718. tb.StringsValues.clear();
  719. tb.InterfaceValues.clear();
  720. tb.InterfaceOffsets.clear();
  721. tb.JoinIds.clear();
  722. tb.LeftIds.clear();
  723. tb.JoinSlots.clear();
  724. tb.NSlots = 0;
  725. TTableBucketStats & tbs = TableBucketsStats[bucket];
  726. tbs.TuplesNum = 0;
  727. tbs.KeyIntValsTotalSize = 0;
  728. tbs.StringValuesTotalSize = 0;
  729. }
  730. void TTable::ShrinkBucket(ui64 bucket) {
  731. TTableBucket & tb = TableBuckets[bucket];
  732. tb.KeyIntVals.shrink_to_fit();
  733. tb.DataIntVals.shrink_to_fit();
  734. tb.StringsOffsets.shrink_to_fit();
  735. tb.StringsValues.shrink_to_fit();
  736. tb.InterfaceValues.shrink_to_fit();
  737. tb.InterfaceOffsets.shrink_to_fit();
  738. tb.JoinIds.shrink_to_fit();
  739. tb.LeftIds.shrink_to_fit();
  740. tb.JoinSlots.shrink_to_fit();
  741. }
  742. void TTable::InitializeBucketSpillers(ISpiller::TPtr spiller) {
  743. for (size_t i = 0; i < NumberOfBuckets; ++i) {
  744. TableBucketsSpillers.emplace_back(spiller, 5_MB);
  745. }
  746. }
  747. ui64 TTable::GetSizeOfBucket(ui64 bucket) const {
  748. return TableBuckets[bucket].KeyIntVals.size() * sizeof(ui64)
  749. + TableBuckets[bucket].JoinSlots.size() * sizeof(ui64)
  750. + TableBuckets[bucket].DataIntVals.size() * sizeof(ui64)
  751. + TableBuckets[bucket].StringsValues.size()
  752. + TableBuckets[bucket].StringsOffsets.size() * sizeof(ui32)
  753. + TableBuckets[bucket].InterfaceValues.size()
  754. + TableBuckets[bucket].InterfaceOffsets.size() * sizeof(ui32);
  755. }
  756. bool TTable::TryToReduceMemoryAndWait(ui64 bucket) {
  757. if (GetSizeOfBucket(bucket) < SpillingSizeLimit/NumberOfBuckets) return false;
  758. if (const auto &tbs = TableBucketsStats[bucket]; tbs.HashtableMatches) {
  759. auto &tb = TableBuckets[bucket];
  760. if (tb.JoinSlots.size()) {
  761. const auto slotSize = tbs.SlotSize;
  762. Y_DEBUG_ABORT_UNLESS(slotSize);
  763. auto it = tb.JoinSlots.cbegin();
  764. const auto end = tb.JoinSlots.cend();
  765. for (; it != end; it += slotSize) {
  766. // Note: we need not check if *it is 0
  767. if ((*(it + HashSize) & 1)) {
  768. ui64 keyIntsOffset;
  769. auto tupleId = *(it + slotSize - 1);
  770. Y_DEBUG_ABORT_UNLESS(tupleId < tbs.TuplesNum);
  771. if (NumberOfKeyStringColumns != 0 || NumberOfKeyIColumns != 0) {
  772. ui64 stringsOffsetsIdx = tupleId * (NumberOfStringColumns + NumberOfIColumns + 2);
  773. keyIntsOffset = tb.StringsOffsets[stringsOffsetsIdx];
  774. } else {
  775. keyIntsOffset = HeaderSize * tupleId;
  776. }
  777. tb.KeyIntVals[keyIntsOffset + HashSize] |= 1;
  778. }
  779. }
  780. tb.JoinSlots.clear();
  781. tb.JoinSlots.shrink_to_fit();
  782. }
  783. }
  784. TableBucketsSpillers[bucket].SpillBucket(std::move(TableBuckets[bucket]));
  785. TableBuckets[bucket] = TTableBucket{};
  786. return TableBucketsSpillers[bucket].IsProcessingSpilling();
  787. }
  788. void TTable::UpdateSpilling() {
  789. for (ui64 i = 0; i < NumberOfBuckets; ++i) {
  790. TableBucketsSpillers[i].Update();
  791. }
  792. }
  793. bool TTable::IsSpillingFinished() const {
  794. for (ui64 i = 0; i < NumberOfBuckets; ++i) {
  795. if (TableBucketsSpillers[i].IsProcessingSpilling()) return false;
  796. }
  797. return true;
  798. }
  799. bool TTable::IsSpillingAcceptingDataRequests() const {
  800. for (ui64 i = 0; i < NumberOfBuckets; ++i) {
  801. if (TableBucketsSpillers[i].IsInMemory()) continue;
  802. if (!TableBucketsSpillers[i].IsAcceptingDataRequests()) return false;
  803. }
  804. return true;
  805. }
  806. bool TTable::IsRestoringSpilledBuckets() const {
  807. for (ui64 i = 0; i < NumberOfBuckets; ++i) {
  808. if (TableBucketsSpillers[i].IsRestoring()) return true;
  809. }
  810. return false;
  811. }
  812. void TTable::FinalizeSpilling() {
  813. for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) {
  814. if (!TableBucketsSpillers[bucket].IsInMemory()) {
  815. TableBucketsSpillers[bucket].Finalize();
  816. TableBucketsSpillers[bucket].SpillBucket(std::move(TableBuckets[bucket]));
  817. TableBuckets[bucket] = TTableBucket{};
  818. }
  819. }
  820. }
  821. bool TTable::IsBucketInMemory(ui32 bucket) const {
  822. return TableBucketsSpillers[bucket].IsInMemory();
  823. }
  824. bool TTable::IsSpilledBucketWaitingForExtraction(ui32 bucket) const {
  825. return TableBucketsSpillers[bucket].IsExtractionRequired();
  826. }
  827. void TTable::StartLoadingBucket(ui32 bucket) {
  828. MKQL_ENSURE(!TableBucketsSpillers[bucket].IsInMemory(), "Internal logic error");
  829. TableBucketsSpillers[bucket].StartBucketRestoration();
  830. }
  831. void TTable::PrepareBucket(ui64 bucket) {
  832. if (!TableBucketsSpillers[bucket].IsExtractionRequired()) return;
  833. TableBuckets[bucket] = std::move(TableBucketsSpillers[bucket].ExtractBucket());
  834. }
  835. // Creates new table with key columns and data columns
  836. TTable::TTable( ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns,
  837. ui64 numberOfDataIntColumns, ui64 numberOfDataStringColumns,
  838. ui64 numberOfKeyIColumns, ui64 numberOfDataIColumns,
  839. ui64 nullsBitmapSize, TColTypeInterface * colInterfaces, bool isAny ) :
  840. NumberOfKeyIntColumns(numberOfKeyIntColumns),
  841. NumberOfKeyStringColumns(numberOfKeyStringColumns),
  842. NumberOfKeyIColumns(numberOfKeyIColumns),
  843. NumberOfDataIntColumns(numberOfDataIntColumns),
  844. NumberOfDataStringColumns(numberOfDataStringColumns),
  845. NumberOfDataIColumns(numberOfDataIColumns),
  846. ColInterfaces(colInterfaces),
  847. NullsBitmapSize_(nullsBitmapSize),
  848. IsAny_(isAny) {
  849. NumberOfKeyColumns = NumberOfKeyIntColumns + NumberOfKeyStringColumns + NumberOfKeyIColumns;
  850. NumberOfDataColumns = NumberOfDataIntColumns + NumberOfDataStringColumns + NumberOfDataIColumns;
  851. NumberOfColumns = NumberOfKeyColumns + NumberOfDataColumns;
  852. NumberOfStringColumns = NumberOfKeyStringColumns + NumberOfDataStringColumns;
  853. NumberOfIColumns = NumberOfKeyIColumns + NumberOfDataIColumns;
  854. BytesInKeyIntColumns = NumberOfKeyIntColumns * sizeof(ui64);
  855. TotalStringsSize = (numberOfKeyStringColumns > 0 || NumberOfKeyIColumns > 0 ) ? 1 : 0;
  856. HeaderSize = HashSize + NullsBitmapSize_ + NumberOfKeyIntColumns + NumberOfKeyIColumns + TotalStringsSize;
  857. TableBuckets.resize(NumberOfBuckets);
  858. TableBucketsStats.resize(NumberOfBuckets);
  859. const ui64 reservedSizePerTuple = (2 * DefaultTupleBytes) / sizeof(ui64);
  860. TempTuple.reserve( reservedSizePerTuple );
  861. IColumnsHashes.resize(NumberOfKeyIColumns);
  862. IColumnsVals.resize(NumberOfIColumns);
  863. const ui64 totalForTuples = DefaultTuplesNum * reservedSizePerTuple;
  864. for ( auto & b: TableBuckets ) {
  865. b.KeyIntVals.reserve( (totalForTuples * NumberOfKeyColumns) / (NumberOfColumns + 1) );
  866. b.StringsOffsets.reserve((totalForTuples * NumberOfStringColumns) / (NumberOfColumns + 1));
  867. b.DataIntVals.reserve( (totalForTuples * NumberOfDataIntColumns) / (NumberOfColumns + 1));
  868. b.StringsValues.reserve( (totalForTuples * NumberOfStringColumns) / (NumberOfColumns + 1) );
  869. b.InterfaceOffsets.reserve( (totalForTuples * NumberOfIColumns) / (NumberOfColumns + 1) );
  870. b.InterfaceValues.reserve( (totalForTuples * NumberOfIColumns) / (NumberOfColumns + 1));
  871. }
  872. }
  873. TTable::~TTable() {
  874. YQL_LOG_IF(GRACEJOIN_DEBUG, InitHashTableCount_)
  875. << (const void *)this << '#' << "InitHashTableCount " << InitHashTableCount_
  876. << " BloomLookups " << BloomLookups_ << " BloomHits " << BloomHits_ << " BloomFalsePositives " << BloomFalsePositives_
  877. << " HashLookups " << HashLookups_ << " HashChainTraversal " << HashO1Iterations_/(double)HashLookups_ << " HashSlotOperations " << HashSlotIterations_/(double)HashLookups_
  878. << " Table1 " << JoinTable1Total_ << " Table2 " << JoinTable2Total_ << " TuplesFound " << TuplesFound_
  879. ;
  880. YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable1 && JoinTable1->AnyFiltered_) << (const void *)this << '#' << "L AnyFiltered " << JoinTable1->AnyFiltered_;
  881. YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable1 && JoinTable1->BloomLookups_) << (const void *)this << '#' << "L BloomLookups " << JoinTable1->BloomLookups_ << " BloomHits " << JoinTable1->BloomHits_;
  882. YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable2 && JoinTable2->AnyFiltered_) << (const void *)this << '#' << "R AnyFiltered " << JoinTable2->AnyFiltered_;
  883. YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable2 && JoinTable2->BloomLookups_) << (const void *)this << '#' << "R BloomLookups " << JoinTable2->BloomLookups_ << " BloomHits " << JoinTable2->BloomHits_;
  884. };
  885. TTableBucketSpiller::TTableBucketSpiller(ISpiller::TPtr spiller, size_t sizeLimit)
  886. : StateUi64Adapter(spiller, sizeLimit)
  887. , StateUi32Adapter(spiller, sizeLimit)
  888. , StateCharAdapter(spiller, sizeLimit)
  889. {
  890. }
  891. void TTableBucketSpiller::Update() {
  892. StateUi64Adapter.Update();
  893. StateUi32Adapter.Update();
  894. StateCharAdapter.Update();
  895. if (State == EState::Spilling) {
  896. ProcessBucketSpilling();
  897. } else if (State == EState::Finalizing) {
  898. ProcessFinalizing();
  899. } else if (State == EState::Restoring) {
  900. ProcessBucketRestoration();
  901. }
  902. }
  903. void TTableBucketSpiller::Finalize() {
  904. IsFinalizingRequested = true;
  905. }
  906. void TTableBucketSpiller::SpillBucket(TTableBucket&& bucket) {
  907. MKQL_ENSURE(NextVectorToProcess == ENextVectorToProcess::None, "Internal logic error");
  908. State = EState::Spilling;
  909. CurrentBucket = std::move(bucket);
  910. NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
  911. ProcessBucketSpilling();
  912. }
  913. TTableBucket&& TTableBucketSpiller::ExtractBucket() {
  914. MKQL_ENSURE(State == EState::WaitingForExtraction, "Internal logic error");
  915. MKQL_ENSURE(SpilledBucketsCount == 0, "Internal logic error");
  916. State = EState::InMemory;
  917. return std::move(CurrentBucket);
  918. }
  919. bool TTableBucketSpiller::IsInMemory() const {
  920. return State == EState::InMemory;
  921. }
  922. bool TTableBucketSpiller::IsExtractionRequired() const {
  923. return State == EState::WaitingForExtraction;
  924. }
  925. bool TTableBucketSpiller::IsProcessingSpilling() const {
  926. return State == EState::Spilling;
  927. }
  928. bool TTableBucketSpiller::IsAcceptingDataRequests() const {
  929. return State == EState::AcceptingDataRequests;
  930. }
  931. bool TTableBucketSpiller::IsRestoring() const {
  932. return State == EState::Restoring;
  933. }
  934. void TTableBucketSpiller::StartBucketRestoration() {
  935. MKQL_ENSURE(State == EState::AcceptingDataRequests, "Internal logic error");
  936. MKQL_ENSURE(NextVectorToProcess == ENextVectorToProcess::None, "Internal logic error");
  937. NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
  938. State = EState::Restoring;
  939. ProcessBucketRestoration();
  940. }
  941. void TTableBucketSpiller::ProcessBucketSpilling() {
  942. while (NextVectorToProcess != ENextVectorToProcess::None) {
  943. switch (NextVectorToProcess) {
  944. case ENextVectorToProcess::KeyAndVals:
  945. if (!StateUi64Adapter.IsAcceptingData()) return;
  946. StateUi64Adapter.AddData(std::move(CurrentBucket.KeyIntVals));
  947. NextVectorToProcess = ENextVectorToProcess::DataIntVals;
  948. break;
  949. case ENextVectorToProcess::DataIntVals:
  950. if (!StateUi64Adapter.IsAcceptingData()) return;
  951. StateUi64Adapter.AddData(std::move(CurrentBucket.DataIntVals));
  952. NextVectorToProcess = ENextVectorToProcess::StringsValues;
  953. break;
  954. case ENextVectorToProcess::StringsValues:
  955. if (!StateCharAdapter.IsAcceptingData()) return;
  956. StateCharAdapter.AddData(std::move(CurrentBucket.StringsValues));
  957. NextVectorToProcess = ENextVectorToProcess::StringsOffsets;
  958. break;
  959. case ENextVectorToProcess::StringsOffsets:
  960. if (!StateUi32Adapter.IsAcceptingData()) return;
  961. StateUi32Adapter.AddData(std::move(CurrentBucket.StringsOffsets));
  962. NextVectorToProcess = ENextVectorToProcess::InterfaceValues;
  963. break;
  964. case ENextVectorToProcess::InterfaceValues:
  965. if (!StateCharAdapter.IsAcceptingData()) return;
  966. StateCharAdapter.AddData(std::move(CurrentBucket.InterfaceValues));
  967. NextVectorToProcess = ENextVectorToProcess::InterfaceOffsets;
  968. break;
  969. case ENextVectorToProcess::InterfaceOffsets:
  970. if (!StateUi32Adapter.IsAcceptingData()) return;
  971. StateUi32Adapter.AddData(std::move(CurrentBucket.InterfaceOffsets));
  972. NextVectorToProcess = ENextVectorToProcess::None;
  973. SpilledBucketsCount++;
  974. break;
  975. default:
  976. return;
  977. }
  978. }
  979. if (IsFinalizingRequested) {
  980. if (!StateCharAdapter.IsAcceptingData() || !StateUi32Adapter.IsAcceptingData() || !StateUi64Adapter.IsAcceptingData()) return;
  981. State = EState::Finalizing;
  982. StateUi64Adapter.Finalize();
  983. StateUi32Adapter.Finalize();
  984. StateCharAdapter.Finalize();
  985. ProcessFinalizing();
  986. return;
  987. }
  988. State = EState::AcceptingData;
  989. }
  990. void TTableBucketSpiller::ProcessFinalizing() {
  991. if (StateCharAdapter.IsAcceptingDataRequests() && StateUi32Adapter.IsAcceptingDataRequests() && StateUi64Adapter.IsAcceptingDataRequests()) {
  992. State = EState::AcceptingDataRequests;
  993. }
  994. }
  995. template <class T>
  996. void TTableBucketSpiller::AppendVector(std::vector<T, TMKQLAllocator<T>>& first, std::vector<T, TMKQLAllocator<T>>&& second) const {
  997. if (first.empty()) {
  998. first = std::move(second);
  999. return;
  1000. }
  1001. first.insert(first.end(), second.begin(), second.end());
  1002. second.clear();
  1003. }
  1004. void TTableBucketSpiller::ProcessBucketRestoration() {
  1005. while (NextVectorToProcess != ENextVectorToProcess::None) {
  1006. switch (NextVectorToProcess) {
  1007. case ENextVectorToProcess::KeyAndVals:
  1008. if (StateUi64Adapter.IsDataReady()) {
  1009. AppendVector(CurrentBucket.KeyIntVals, StateUi64Adapter.ExtractVector());
  1010. NextVectorToProcess = ENextVectorToProcess::DataIntVals;
  1011. break;
  1012. }
  1013. if (StateUi64Adapter.IsAcceptingDataRequests()) {
  1014. StateUi64Adapter.RequestNextVector();
  1015. break;
  1016. }
  1017. return;
  1018. case ENextVectorToProcess::DataIntVals:
  1019. if (StateUi64Adapter.IsDataReady()) {
  1020. AppendVector(CurrentBucket.DataIntVals, StateUi64Adapter.ExtractVector());
  1021. NextVectorToProcess = ENextVectorToProcess::StringsValues;
  1022. break;
  1023. }
  1024. if (StateUi64Adapter.IsAcceptingDataRequests()) {
  1025. StateUi64Adapter.RequestNextVector();
  1026. break;
  1027. }
  1028. return;
  1029. case ENextVectorToProcess::StringsValues:
  1030. if (StateCharAdapter.IsDataReady()) {
  1031. AppendVector(CurrentBucket.StringsValues, StateCharAdapter.ExtractVector());
  1032. NextVectorToProcess = ENextVectorToProcess::StringsOffsets;
  1033. break;
  1034. }
  1035. if (StateCharAdapter.IsAcceptingDataRequests()) {
  1036. StateCharAdapter.RequestNextVector();
  1037. break;
  1038. }
  1039. return;
  1040. case ENextVectorToProcess::StringsOffsets:
  1041. if (StateUi32Adapter.IsDataReady()) {
  1042. AppendVector(CurrentBucket.StringsOffsets, StateUi32Adapter.ExtractVector());
  1043. NextVectorToProcess = ENextVectorToProcess::InterfaceValues;
  1044. break;
  1045. }
  1046. if (StateUi32Adapter.IsAcceptingDataRequests()) {
  1047. StateUi32Adapter.RequestNextVector();
  1048. break;
  1049. }
  1050. return;
  1051. case ENextVectorToProcess::InterfaceValues:
  1052. if (StateCharAdapter.IsDataReady()) {
  1053. AppendVector(CurrentBucket.InterfaceValues, StateCharAdapter.ExtractVector());
  1054. NextVectorToProcess = ENextVectorToProcess::InterfaceOffsets;
  1055. break;
  1056. }
  1057. if (StateCharAdapter.IsAcceptingDataRequests()) {
  1058. StateCharAdapter.RequestNextVector();
  1059. break;
  1060. }
  1061. return;
  1062. case ENextVectorToProcess::InterfaceOffsets:
  1063. if (StateUi32Adapter.IsDataReady()) {
  1064. AppendVector(CurrentBucket.InterfaceOffsets, StateUi32Adapter.ExtractVector());
  1065. SpilledBucketsCount--;
  1066. if (SpilledBucketsCount == 0) {
  1067. NextVectorToProcess = ENextVectorToProcess::None;
  1068. State = EState::WaitingForExtraction;
  1069. } else {
  1070. NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
  1071. }
  1072. break;
  1073. }
  1074. if (StateUi32Adapter.IsAcceptingDataRequests()) {
  1075. StateUi32Adapter.RequestNextVector();
  1076. break;
  1077. }
  1078. return;
  1079. default:
  1080. return;
  1081. }
  1082. }
  1083. }
  1084. }
  1085. }
  1086. }