mkql_grace_join_imp.cpp 51 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359
  1. #include "mkql_grace_join_imp.h"
  2. #include <yql/essentials/public/udf/udf_data_type.h>
  3. #include <yql/essentials/utils/log/log.h>
  4. #include <contrib/libs/xxhash/xxhash.h>
  5. #include <string_view>
  6. namespace NKikimr {
  7. namespace NMiniKQL {
  8. namespace GraceJoin {
  9. TTable::EAddTupleResult TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * stringsSizes, NYql::NUdf::TUnboxedValue * iColumns, const TTable &other) {
  10. if ((intColumns[0] & 1))
  11. return EAddTupleResult::Unmatched;
  12. TotalPacked++;
  13. TempTuple.clear();
  14. TempTuple.insert(TempTuple.end(), intColumns, intColumns + NullsBitmapSize_ + NumberOfKeyIntColumns);
  15. if ( NumberOfKeyIColumns > 0 ) {
  16. for (ui32 i = 0; i < NumberOfKeyIColumns; i++) {
  17. TempTuple.push_back((ColInterfaces + i)->HashI->Hash(*(iColumns+i)));
  18. }
  19. }
  20. ui64 totalBytesForStrings = 0;
  21. ui64 totalIntsForStrings = 0;
  22. // Processing variable length string columns
  23. if ( NumberOfKeyStringColumns != 0 || NumberOfKeyIColumns != 0) {
  24. totalBytesForStrings += sizeof(ui32)*NumberOfKeyStringColumns;
  25. totalBytesForStrings += sizeof(ui32)*NumberOfKeyIColumns;
  26. for( ui64 i = 0; i < NumberOfKeyStringColumns; i++ ) {
  27. totalBytesForStrings += stringsSizes[i];
  28. }
  29. for ( ui64 i = 0; i < NumberOfKeyIColumns; i++) {
  30. TStringBuf val = (ColInterfaces + i)->Packer->Pack(*(iColumns+i));
  31. IColumnsVals[i].clear();
  32. IColumnsVals[i].insert(IColumnsVals[i].begin(), val.cbegin(), val.end());
  33. totalBytesForStrings += val.size();
  34. }
  35. totalIntsForStrings = (totalBytesForStrings + sizeof(ui64) - 1) / sizeof(ui64);
  36. TempTuple.push_back(totalIntsForStrings);
  37. TempTuple.resize(TempTuple.size() + totalIntsForStrings);
  38. TempTuple.back() = 0;
  39. ui64 * startPtr = (TempTuple.data() + TempTuple.size() - totalIntsForStrings );
  40. char * currStrPtr = reinterpret_cast< char* > (startPtr);
  41. for( ui64 i = 0; i < NumberOfKeyStringColumns; i++) {
  42. WriteUnaligned<ui32>(currStrPtr, stringsSizes[i] );
  43. currStrPtr+=sizeof(ui32);
  44. std::memcpy(currStrPtr, stringColumns[i], stringsSizes[i] );
  45. currStrPtr+=stringsSizes[i];
  46. }
  47. for( ui64 i = 0; i < NumberOfKeyIColumns; i++) {
  48. WriteUnaligned<ui32>(currStrPtr, IColumnsVals[i].size() );
  49. currStrPtr+=sizeof(ui32);
  50. std::memcpy(currStrPtr, IColumnsVals[i].data(), IColumnsVals[i].size() );
  51. currStrPtr+=IColumnsVals[i].size();
  52. }
  53. }
  54. XXH64_hash_t hash = XXH64(TempTuple.data() + NullsBitmapSize_, (TempTuple.size() - NullsBitmapSize_) * sizeof(ui64), 0);
  55. if (!hash) hash = 1;
  56. ui64 bucket = hash & BucketsMask;
  57. if (!IsAny_ && other.TableBucketsStats[bucket].BloomFilter.IsFinalized()) {
  58. auto bucket2 = &other.TableBucketsStats[bucket];
  59. auto &bloomFilter = bucket2->BloomFilter;
  60. ++BloomLookups_;
  61. if (bloomFilter.IsMissing(hash)) {
  62. ++BloomHits_;
  63. return EAddTupleResult::Unmatched;
  64. }
  65. }
  66. std::vector<ui64, TMKQLAllocator<ui64>> & keyIntVals = TableBuckets[bucket].KeyIntVals;
  67. std::vector<ui32, TMKQLAllocator<ui32>> & stringsOffsets = TableBuckets[bucket].StringsOffsets;
  68. std::vector<ui64, TMKQLAllocator<ui64>> & dataIntVals = TableBuckets[bucket].DataIntVals;
  69. std::vector<char, TMKQLAllocator<char>> & stringVals = TableBuckets[bucket].StringsValues;
  70. KeysHashTable & kh = TableBucketsStats[bucket].AnyHashTable;
  71. ui32 offset = keyIntVals.size(); // Offset of tuple inside the keyIntVals vector
  72. keyIntVals.push_back(hash);
  73. keyIntVals.insert(keyIntVals.end(), TempTuple.begin(), TempTuple.end());
  74. if (IsAny_) {
  75. if ( !AddKeysToHashTable(kh, keyIntVals.begin() + offset, iColumns) ) {
  76. keyIntVals.resize(offset);
  77. ++AnyFiltered_;
  78. return EAddTupleResult::AnyMatch;
  79. }
  80. if (other.TableBucketsStats[bucket].BloomFilter.IsFinalized()) {
  81. auto bucket2 = &other.TableBucketsStats[bucket];
  82. auto &bloomFilter = bucket2->BloomFilter;
  83. ++BloomLookups_;
  84. if (bloomFilter.IsMissing(hash)) {
  85. keyIntVals.resize(offset);
  86. ++BloomHits_;
  87. return EAddTupleResult::Unmatched;
  88. }
  89. }
  90. }
  91. TableBucketsStats[bucket].TuplesNum++;
  92. if (NumberOfStringColumns || NumberOfIColumns ) {
  93. stringsOffsets.push_back(TableBucketsStats[bucket].KeyIntValsTotalSize); // Adding offset to tuple in keyIntVals vector
  94. stringsOffsets.push_back(TableBucketsStats[bucket].StringValuesTotalSize); // Adding offset to string values
  95. // Adding strings sizes for keys and data
  96. if ( NumberOfStringColumns ) {
  97. stringsOffsets.insert( stringsOffsets.end(), stringsSizes, stringsSizes+NumberOfStringColumns );
  98. }
  99. if ( NumberOfIColumns ) {
  100. for ( ui64 i = NumberOfKeyIColumns; i < NumberOfIColumns; i++) {
  101. TStringBuf val = (ColInterfaces + i)->Packer->Pack(*(iColumns+i));
  102. IColumnsVals[i].clear();
  103. IColumnsVals[i].insert(IColumnsVals[i].begin(), val.cbegin(), val.end());
  104. }
  105. for (ui64 i = 0; i < NumberOfIColumns; i++ ) {
  106. stringsOffsets.push_back(IColumnsVals[i].size());
  107. }
  108. }
  109. }
  110. // Adding data values
  111. ui64 * dataColumns = intColumns + NullsBitmapSize_ + NumberOfKeyIntColumns;
  112. dataIntVals.insert(dataIntVals.end(), dataColumns, dataColumns + NumberOfDataIntColumns);
  113. // Adding strings values for data columns
  114. char ** dataStringsColumns = stringColumns + NumberOfKeyStringColumns;
  115. ui32 * dataStringsSizes = stringsSizes + NumberOfKeyStringColumns;
  116. ui64 initialStringsSize = stringVals.size();
  117. for( ui64 i = 0; i < NumberOfDataStringColumns; i++) {
  118. ui32 currStringSize = *(dataStringsSizes + i);
  119. stringVals.insert(stringVals.end(), *(dataStringsColumns + i), *(dataStringsColumns + i) + currStringSize);
  120. }
  121. for ( ui64 i = 0; i < NumberOfDataIColumns; i++) {
  122. stringVals.insert( stringVals.end(), IColumnsVals[NumberOfKeyIColumns + i].begin(), IColumnsVals[NumberOfKeyIColumns + i].end());
  123. }
  124. TableBucketsStats[bucket].KeyIntValsTotalSize += keyIntVals.size() - offset;
  125. TableBucketsStats[bucket].StringValuesTotalSize += stringVals.size() - initialStringsSize;
  126. return EAddTupleResult::Added;
  127. }
  128. void TTable::ResetIterator() {
  129. CurrIterIndex = 0;
  130. CurrIterBucket = 0;
  131. if (IsTableJoined) {
  132. JoinTable1->ResetIterator();
  133. JoinTable2->ResetIterator();
  134. }
  135. TotalUnpacked = 0;
  136. }
  137. // Checks if there are more tuples and sets bucketId and tupleId to next valid.
  138. inline bool HasMoreTuples(std::vector<TTableBucketStats> & tableBucketsStats, ui64 & bucketId, ui64 & tupleId, ui64 bucketLimit ) {
  139. if (bucketId >= bucketLimit) return false;
  140. if ( tupleId >= tableBucketsStats[bucketId].TuplesNum ) {
  141. tupleId = 0;
  142. bucketId ++;
  143. if (bucketId == bucketLimit) {
  144. return false;
  145. }
  146. while( tableBucketsStats[bucketId].TuplesNum == 0 ) {
  147. bucketId ++;
  148. if (bucketId == bucketLimit) {
  149. return false;
  150. }
  151. }
  152. }
  153. return true;
  154. }
  155. // Returns value of next tuple. Returs true if there are more tuples
  156. bool TTable::NextTuple(TupleData & td){
  157. if (HasMoreTuples(TableBucketsStats, CurrIterBucket, CurrIterIndex, TableBucketsStats.size())) {
  158. GetTupleData(CurrIterBucket, CurrIterIndex, td);
  159. CurrIterIndex++;
  160. return true;
  161. } else {
  162. td.AllNulls = true;
  163. return false;
  164. }
  165. }
  166. inline bool CompareIColumns( const ui32* stringSizes1, const char * vals1,
  167. const ui32* stringSizes2, const char * vals2,
  168. TColTypeInterface * colInterfaces, ui64 nStringColumns, ui64 nIColumns) {
  169. ui32 currOffset1 = 0;
  170. ui32 currOffset2 = 0;
  171. ui32 currSize1 = 0;
  172. ui32 currSize2 = 0;
  173. NYql::NUdf::TUnboxedValue val1, val2;
  174. TStringBuf str1, str2;
  175. for (ui32 i = 0; i < nStringColumns; i ++) {
  176. currSize1 = *(stringSizes1 + i);
  177. currSize2 = *(stringSizes2 + i);
  178. if (currSize1 != currSize2)
  179. return false;
  180. currOffset1 += currSize1 + sizeof(ui32);
  181. currOffset2 += currSize2 + sizeof(ui32);
  182. }
  183. if (0 != std::memcmp(vals1, vals2, currOffset1))
  184. return false;
  185. for (ui32 i = 0; i < nIColumns; i ++) {
  186. currSize1 = *(stringSizes1 + nStringColumns + i );
  187. currSize2 = *(stringSizes2 + nStringColumns + i );
  188. currOffset1 += sizeof(ui32);
  189. currOffset2 += sizeof(ui32);
  190. str1 = TStringBuf(vals1 + currOffset1, currSize1);
  191. val1 = (colInterfaces + i)->Packer->Unpack(str1, colInterfaces->HolderFactory);
  192. str2 = TStringBuf(vals2 + currOffset2, currSize2 );
  193. val2 = (colInterfaces + i)->Packer->Unpack(str2, colInterfaces->HolderFactory);
  194. if ( ! ((colInterfaces + i)->EquateI->Equals(val1,val2)) ) {
  195. return false;
  196. }
  197. currOffset1 += currSize1;
  198. currOffset2 += currSize2;
  199. }
  200. return true;
  201. }
  202. inline bool CompareIColumns( const char * vals1,
  203. const char * vals2,
  204. NYql::NUdf::TUnboxedValue * iColumns,
  205. TColTypeInterface * colInterfaces,
  206. ui64 nStringColumns, ui64 nIColumns) {
  207. ui32 currOffset1 = 0;
  208. NYql::NUdf::TUnboxedValue val1;
  209. TStringBuf str1;
  210. for (ui32 i = 0; i < nStringColumns; i ++) {
  211. auto currSize1 = ReadUnaligned<ui32>(vals1 + currOffset1);
  212. auto currSize2 = ReadUnaligned<ui32>(vals2 + currOffset1);
  213. if (currSize1 != currSize2)
  214. return false;
  215. currOffset1 += currSize1 + sizeof(ui32);
  216. }
  217. if (0 != std::memcmp(vals1, vals2, currOffset1))
  218. return false;
  219. for (ui32 i = 0; i < nIColumns; i ++) {
  220. auto currSize1 = ReadUnaligned<ui32>(vals1 + currOffset1);
  221. currOffset1 += sizeof(ui32);
  222. str1 = TStringBuf(vals1 + currOffset1, currSize1);
  223. val1 = (colInterfaces + i)->Packer->Unpack(str1, colInterfaces->HolderFactory);
  224. auto &val2 = iColumns[i];
  225. if ( ! ((colInterfaces + i)->EquateI->Equals(val1,val2)) ) {
  226. return false;
  227. }
  228. currOffset1 += currSize1;
  229. }
  230. return true;
  231. }
  232. // Resizes KeysHashTable to new slots, keeps old content.
  233. void ResizeHashTable(KeysHashTable &t, ui64 newSlots){
  234. std::vector<ui64, TMKQLAllocator<ui64>> newTable(newSlots * t.SlotSize , 0);
  235. for ( auto it = t.Table.begin(); it != t.Table.end(); it += t.SlotSize ) {
  236. if ( *it == 0)
  237. continue;
  238. ui64 hash = *it;
  239. ui64 newSlotNum = hash % (newSlots);
  240. auto newIt = newTable.begin() + t.SlotSize * newSlotNum;
  241. while (*newIt != 0) {
  242. newIt += t.SlotSize;
  243. if (newIt == newTable.end()) {
  244. newIt = newTable.begin();
  245. }
  246. }
  247. std::copy_n(it, t.SlotSize, newIt);
  248. }
  249. t.NSlots = newSlots;
  250. t.Table = std::move(newTable);
  251. }
  252. bool IsTablesSwapRequired(ui64 tuplesNum1, ui64 tuplesNum2, bool table1Batch, bool table2Batch) {
  253. return tuplesNum2 > tuplesNum1 && !table1Batch || table2Batch;
  254. }
  255. ui64 ComputeJoinSlotsSizeForBucket(const TTableBucket& bucket, const TTableBucketStats& bucketStats, ui64 headerSize, bool tableHasKeyStringColumns, bool tableHasKeyIColumns) {
  256. ui64 tuplesNum = bucketStats.TuplesNum;
  257. ui64 avgStringsSize = (3 * (bucket.KeyIntVals.size() - tuplesNum * headerSize) ) / ( 2 * tuplesNum + 1) + 1;
  258. ui64 slotSize = headerSize + 1; // Header [Short Strings] SlotIdx
  259. if (tableHasKeyStringColumns || tableHasKeyIColumns) {
  260. slotSize = slotSize + avgStringsSize;
  261. }
  262. return slotSize;
  263. }
  264. ui64 ComputeNumberOfSlots(ui64 tuplesNum) {
  265. return (3 * tuplesNum + 1) | 1;
  266. }
  267. bool TTable::TryToPreallocateMemoryForJoin(TTable & t1, TTable & t2, EJoinKind /* joinKind */, bool hasMoreLeftTuples, bool hasMoreRightTuples) {
  268. // If the batch is final or the only one, then the buckets are processed sequentially, the memory for the hash tables is freed immediately after processing.
  269. // So, no preallocation is required.
  270. if (!hasMoreLeftTuples && !hasMoreRightTuples) return true;
  271. for (ui64 bucket = 0; bucket < GraceJoin::NumberOfBuckets; bucket++) {
  272. ui64 tuplesNum1 = t1.TableBucketsStats[bucket].TuplesNum;
  273. ui64 tuplesNum2 = t2.TableBucketsStats[bucket].TuplesNum;
  274. TTable& tableForPreallocation = IsTablesSwapRequired(tuplesNum1, tuplesNum2, hasMoreLeftTuples || LeftTableBatch_, hasMoreRightTuples || RightTableBatch_) ? t1 : t2;
  275. if (!tableForPreallocation.TableBucketsStats[bucket].TuplesNum || tableForPreallocation.TableBuckets[bucket].NSlots) continue;
  276. TTableBucket& bucketForPreallocation = tableForPreallocation.TableBuckets[bucket];
  277. const TTableBucketStats& bucketForPreallocationStats = tableForPreallocation.TableBucketsStats[bucket];
  278. const auto nSlots = ComputeJoinSlotsSizeForBucket(bucketForPreallocation, bucketForPreallocationStats, tableForPreallocation.HeaderSize,
  279. tableForPreallocation.NumberOfKeyStringColumns != 0, tableForPreallocation.NumberOfKeyIColumns != 0);
  280. const auto slotSize = ComputeNumberOfSlots(tableForPreallocation.TableBucketsStats[bucket].TuplesNum);
  281. try {
  282. bucketForPreallocation.JoinSlots.reserve(nSlots*slotSize);
  283. } catch (TMemoryLimitExceededException) {
  284. for (ui64 i = 0; i < bucket; ++i) {
  285. GraceJoin::TTableBucket * b1 = &JoinTable1->TableBuckets[i];
  286. b1->JoinSlots.resize(0);
  287. b1->JoinSlots.shrink_to_fit();
  288. GraceJoin::TTableBucket * b2 = &JoinTable2->TableBuckets[i];
  289. b2->JoinSlots.resize(0);
  290. b2->JoinSlots.shrink_to_fit();
  291. }
  292. return false;
  293. }
  294. }
  295. return true;
  296. }
  297. // Joins two tables and returns join result in joined table. Tuples of joined table could be received by
  298. // joined table iterator
  299. void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLeftTuples, bool hasMoreRightTuples, ui32 fromBucket, ui32 toBucket ) {
  300. if ( hasMoreLeftTuples )
  301. LeftTableBatch_ = true;
  302. if( hasMoreRightTuples )
  303. RightTableBatch_ = true;
  304. auto table1Batch = LeftTableBatch_;
  305. auto table2Batch = RightTableBatch_;
  306. JoinTable1 = &t1;
  307. JoinTable2 = &t2;
  308. JoinKind = joinKind;
  309. IsTableJoined = true;
  310. MKQL_ENSURE(joinKind != EJoinKind::Cross, "Cross Join is not allowed in Grace Join");
  311. const bool needCrossIds = JoinKind == EJoinKind::Inner || JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Left || JoinKind == EJoinKind::Right;
  312. ui64 tuplesFound = 0;
  313. for (ui64 bucket = fromBucket; bucket < toBucket; bucket++) {
  314. auto &joinResults = TableBuckets[bucket].JoinIds;
  315. joinResults.clear();
  316. TTableBucket * bucket1 = &JoinTable1->TableBuckets[bucket];
  317. TTableBucket * bucket2 = &JoinTable2->TableBuckets[bucket];
  318. TTableBucketStats * bucketStats1 = &JoinTable1->TableBucketsStats[bucket];
  319. TTableBucketStats * bucketStats2 = &JoinTable2->TableBucketsStats[bucket];
  320. ui64 tuplesNum1 = JoinTable1->TableBucketsStats[bucket].TuplesNum;
  321. ui64 tuplesNum2 = JoinTable2->TableBucketsStats[bucket].TuplesNum;
  322. ui64 headerSize1 = JoinTable1->HeaderSize;
  323. ui64 headerSize2 = JoinTable2->HeaderSize;
  324. ui64 nullsSize1 = JoinTable1->NullsBitmapSize_;
  325. ui64 nullsSize2 = JoinTable2->NullsBitmapSize_;
  326. ui64 keyIntOffset1 = HashSize + nullsSize1;
  327. ui64 keyIntOffset2 = HashSize + nullsSize2;
  328. bool table1HasKeyStringColumns = (JoinTable1->NumberOfKeyStringColumns != 0);
  329. bool table2HasKeyStringColumns = (JoinTable2->NumberOfKeyStringColumns != 0);
  330. bool table1HasKeyIColumns = (JoinTable1->NumberOfKeyIColumns != 0);
  331. bool table2HasKeyIColumns = (JoinTable2->NumberOfKeyIColumns != 0);
  332. bool swapTables = IsTablesSwapRequired(tuplesNum1, tuplesNum2, table1Batch, table2Batch);
  333. if (swapTables) {
  334. std::swap(bucket1, bucket2);
  335. std::swap(bucketStats1, bucketStats2);
  336. std::swap(headerSize1, headerSize2);
  337. std::swap(nullsSize1, nullsSize2);
  338. std::swap(keyIntOffset1, keyIntOffset2);
  339. std::swap(table1HasKeyStringColumns, table2HasKeyStringColumns);
  340. std::swap(table1HasKeyIColumns, table2HasKeyIColumns);
  341. std::swap(tuplesNum1, tuplesNum2);
  342. }
  343. auto &leftIds = bucket1->LeftIds;
  344. leftIds.clear();
  345. const bool selfJoinSameKeys = (JoinTable1 == JoinTable2);
  346. const bool needLeftIds = ((swapTables ? (JoinKind == EJoinKind::Right || JoinKind == EJoinKind::RightOnly) : (JoinKind == EJoinKind::Left || JoinKind == EJoinKind::LeftOnly)) || JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Exclusion) && !selfJoinSameKeys;
  347. const bool isLeftSemi = swapTables ? JoinKind == EJoinKind::RightSemi : JoinKind == EJoinKind::LeftSemi;
  348. //const bool isRightSemi = swapTables ? JoinKind == EJoinKind::LeftSemi : JoinKind == EJoinKind::RightSemi;
  349. bucketStats2->HashtableMatches = ((swapTables ? (JoinKind == EJoinKind::Left || JoinKind == EJoinKind::LeftOnly || JoinKind == EJoinKind::LeftSemi) : (JoinKind == EJoinKind::Right || JoinKind == EJoinKind::RightOnly || JoinKind == EJoinKind::RightSemi)) || JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Exclusion) && !selfJoinSameKeys;
  350. // In this case, all keys except for NULLs have matched key on other side, and NULLs are handled by AddTuple
  351. if (tuplesNum2 == 0) {
  352. if (needLeftIds) {
  353. for (ui32 leftId = 0; leftId != tuplesNum1; ++leftId)
  354. leftIds.push_back(leftId);
  355. }
  356. continue;
  357. }
  358. if (tuplesNum1 == 0 && (hasMoreRightTuples || hasMoreLeftTuples || !bucketStats2->HashtableMatches))
  359. continue;
  360. ui64 slotSize = ComputeJoinSlotsSizeForBucket(*bucket2, *bucketStats2, headerSize2, table2HasKeyStringColumns, table2HasKeyIColumns);
  361. ui64 &nSlots = bucket2->NSlots;
  362. auto &joinSlots = bucket2->JoinSlots;
  363. auto &bloomFilter = bucketStats2->BloomFilter;
  364. bool initHashTable = false;
  365. Y_DEBUG_ABORT_UNLESS(bucketStats2->SlotSize == 0 || bucketStats2->SlotSize == slotSize);
  366. if (!nSlots) {
  367. nSlots = ComputeNumberOfSlots(tuplesNum2);
  368. joinSlots.resize(nSlots*slotSize, 0);
  369. bloomFilter.Resize(tuplesNum2);
  370. initHashTable = true;
  371. bucketStats2->SlotSize = slotSize;
  372. ++InitHashTableCount_;
  373. }
  374. auto firstSlot = [begin = joinSlots.begin(), slotSize, nSlots](auto hash) {
  375. ui64 slotNum = hash % nSlots;
  376. return begin + slotNum * slotSize;
  377. };
  378. auto nextSlot = [begin = joinSlots.begin(), end = joinSlots.end(), slotSize](auto it) {
  379. it += slotSize;
  380. if (it == end)
  381. it = begin;
  382. return it;
  383. };
  384. if (initHashTable) {
  385. ui32 tuple2Idx = 0;
  386. auto it2 = bucket2->KeyIntVals.begin();
  387. for (ui64 keysValSize = headerSize2; it2 != bucket2->KeyIntVals.end(); it2 += keysValSize, ++tuple2Idx) {
  388. if ( table2HasKeyStringColumns || table2HasKeyIColumns) {
  389. keysValSize = headerSize2 + *(it2 + headerSize2 - 1) ;
  390. }
  391. ui64 hash = *it2;
  392. // Note: if hashtable is re-created after being spilled
  393. // (*(it2 + HashSize) & 1) may be true (even though key does NOT contain NULL)
  394. bloomFilter.Add(hash);
  395. auto slotIt = firstSlot(hash);
  396. ++HashLookups_;
  397. for (; *slotIt != 0; slotIt = nextSlot(slotIt))
  398. {
  399. ++HashO1Iterations_;
  400. }
  401. ++HashSlotIterations_;
  402. if (keysValSize <= slotSize - 1)
  403. {
  404. std::copy_n(it2, keysValSize, slotIt);
  405. }
  406. else
  407. {
  408. std::copy_n(it2, headerSize2, slotIt);
  409. *(slotIt + headerSize2) = it2 + headerSize2 - bucket2->KeyIntVals.begin();
  410. }
  411. slotIt[slotSize - 1] = tuple2Idx;
  412. }
  413. bloomFilter.Finalize();
  414. if (swapTables) JoinTable1Total_ += tuplesNum2; else JoinTable2Total_ += tuplesNum2;
  415. }
  416. if (swapTables) JoinTable2Total_ += tuplesNum1; else JoinTable1Total_ += tuplesNum1;
  417. ui32 tuple1Idx = 0;
  418. auto it1 = bucket1->KeyIntVals.begin();
  419. // /-------headerSize---------------------------\
  420. // hash nulls-bitmap keyInt[] KeyIHash[] strSize| [strPos | strs] slotIdx
  421. // \---------------------------------------slotSize ---------------------/
  422. // bit0 of nulls bitmap denotes key-with-nulls
  423. // strSize only present if HasKeyStrCol || HasKeyICol
  424. // strPos is only present if (HasKeyStrCol || HasKeyICol) && strSize + headerSize >= slotSize
  425. // slotSize, slotIdx and strPos is only for hashtable (table2)
  426. ui64 bloomHits = 0;
  427. ui64 bloomLookups = 0;
  428. for (ui64 keysValSize = headerSize1; it1 != bucket1->KeyIntVals.end(); it1 += keysValSize, ++tuple1Idx ) {
  429. if ( table1HasKeyStringColumns || table1HasKeyIColumns ) {
  430. keysValSize = headerSize1 + *(it1 + headerSize1 - 1) ;
  431. }
  432. ui64 hash = *it1;
  433. Y_DEBUG_ABORT_UNLESS((*(it1 + HashSize) & 1) == 0); // Keys with NULL never reaches Join
  434. if (initHashTable) {
  435. bloomLookups++;
  436. if (bloomFilter.IsMissing(hash)) {
  437. if (needLeftIds)
  438. leftIds.push_back(tuple1Idx);
  439. bloomHits++;
  440. continue;
  441. }
  442. }
  443. ++HashLookups_;
  444. auto saveTuplesFound = tuplesFound;
  445. auto slotIt = firstSlot(hash);
  446. for (; *slotIt != 0; slotIt = nextSlot(slotIt) )
  447. {
  448. ++HashO1Iterations_;
  449. if (*slotIt != hash)
  450. continue;
  451. auto tuple2Idx = slotIt[slotSize - 1];
  452. ++HashSlotIterations_;
  453. if (table1HasKeyIColumns || !(keysValSize - nullsSize1 <= slotSize - 1 - nullsSize2)) {
  454. // 2nd condition cannot be true unless HasKeyStringColumns or HasKeyIColumns, hence size at the end of header is present
  455. if (!std::equal(it1 + keyIntOffset1, it1 + headerSize1 - 1, slotIt + keyIntOffset2))
  456. continue;
  457. auto slotStringsStart = slotIt + headerSize2;
  458. ui64 slotStringsSize = *(slotIt + headerSize2 - 1);
  459. if (headerSize2 + slotStringsSize + 1 > slotSize)
  460. {
  461. ui64 stringsPos = *(slotIt + headerSize2);
  462. slotStringsStart = bucket2->KeyIntVals.begin() + stringsPos;
  463. }
  464. if (table1HasKeyIColumns)
  465. {
  466. ui64 stringsOffsetsIdx1 = tuple1Idx * (JoinTable1->NumberOfStringColumns + JoinTable1->NumberOfIColumns + 2);
  467. ui64 stringsOffsetsIdx2 = tuple2Idx * (JoinTable2->NumberOfStringColumns + JoinTable2->NumberOfIColumns + 2);
  468. ui32 * stringsSizesPtr1 = bucket1->StringsOffsets.data() + stringsOffsetsIdx1 + 2;
  469. ui32 * stringsSizesPtr2 = bucket2->StringsOffsets.data() + stringsOffsetsIdx2 + 2;
  470. if (!CompareIColumns( stringsSizesPtr1 ,
  471. (char *) (it1 + headerSize1 ),
  472. stringsSizesPtr2,
  473. (char *) (slotStringsStart),
  474. JoinTable1 -> ColInterfaces, JoinTable1->NumberOfStringColumns, JoinTable1 -> NumberOfKeyIColumns ))
  475. continue;
  476. } else {
  477. ui64 stringsSize = *(it1 + headerSize1 - 1);
  478. if (stringsSize != slotStringsSize || !std::equal(it1 + headerSize1, it1 + headerSize1 + stringsSize, slotStringsStart))
  479. continue;
  480. }
  481. } else {
  482. if (!std::equal(it1 + keyIntOffset1, it1 + keysValSize, slotIt + keyIntOffset2))
  483. continue;
  484. }
  485. *(slotIt + HashSize) |= 1; // mark right slot as matched
  486. tuplesFound++;
  487. if (needCrossIds) {
  488. JoinTuplesIds joinIds;
  489. joinIds.id1 = swapTables ? tuple2Idx : tuple1Idx;
  490. joinIds.id2 = swapTables ? tuple1Idx : tuple2Idx;
  491. joinResults.emplace_back(joinIds);
  492. }
  493. }
  494. if (saveTuplesFound == tuplesFound) {
  495. ++BloomFalsePositives_;
  496. if (needLeftIds)
  497. leftIds.push_back(tuple1Idx);
  498. } else if (isLeftSemi) {
  499. leftIds.push_back(tuple1Idx);
  500. }
  501. }
  502. if (!hasMoreLeftTuples && !hasMoreRightTuples) {
  503. bloomFilter.Shrink();
  504. if (bucketStats2->HashtableMatches) {
  505. auto slotIt = joinSlots.cbegin();
  506. auto end = joinSlots.cend();
  507. auto isSemi = JoinKind == EJoinKind::LeftSemi || JoinKind == EJoinKind::RightSemi;
  508. auto &leftIds2 = bucket2->LeftIds;
  509. for (; slotIt != end; slotIt += slotSize) {
  510. if ((*(slotIt + HashSize) & 1) == isSemi && *slotIt != 0) {
  511. auto id2 = *(slotIt + slotSize - 1);
  512. Y_DEBUG_ABORT_UNLESS(id2 < bucketStats2->TuplesNum);
  513. leftIds2.push_back(id2);
  514. }
  515. }
  516. std::sort(leftIds2.begin(), leftIds2.end());
  517. }
  518. joinSlots.clear();
  519. joinSlots.shrink_to_fit();
  520. nSlots = 0;
  521. }
  522. if (bloomHits < bloomLookups/8) {
  523. // Bloomfilter was inefficient, drop it
  524. bloomFilter.Shrink();
  525. }
  526. BloomHits_ += bloomHits;
  527. BloomLookups_ += bloomLookups;
  528. YQL_LOG(GRACEJOIN_TRACE)
  529. << (const void *)this << '#'
  530. << bucket
  531. << " Table1 " << JoinTable1->TableBucketsStats[bucket].TuplesNum
  532. << " Table2 " << JoinTable2->TableBucketsStats[bucket].TuplesNum
  533. << " LeftTableBatch " << LeftTableBatch_
  534. << " RightTableBatch " << RightTableBatch_
  535. << " leftIds " << leftIds.size()
  536. << " joinIds " << joinResults.size()
  537. << " joinKind " << (int)JoinKind
  538. << " swapTables " << swapTables
  539. << " initHashTable " << initHashTable
  540. ;
  541. }
  542. HasMoreLeftTuples_ = hasMoreLeftTuples;
  543. HasMoreRightTuples_ = hasMoreRightTuples;
  544. TuplesFound_ += tuplesFound;
  545. }
  546. inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) {
  547. ui64 keyIntsOffset = 0;
  548. ui64 dataIntsOffset = 0;
  549. ui64 keyStringsOffset = 0;
  550. ui64 dataStringsOffset = 0;
  551. td.AllNulls = false;
  552. TotalUnpacked++;
  553. TTableBucket & tb = TableBuckets[bucketNum];
  554. ui64 stringsOffsetsIdx = tupleId * (NumberOfStringColumns + NumberOfIColumns + 2);
  555. if(NumberOfKeyStringColumns != 0 || NumberOfKeyIColumns !=0 ) {
  556. keyIntsOffset = tb.StringsOffsets[stringsOffsetsIdx];
  557. } else {
  558. keyIntsOffset = HeaderSize * tupleId;
  559. }
  560. for ( ui64 i = 0; i < NumberOfKeyIntColumns + NullsBitmapSize_; ++i) {
  561. td.IntColumns[i] = tb.KeyIntVals[keyIntsOffset + HashSize + i];
  562. }
  563. dataIntsOffset = NumberOfDataIntColumns * tupleId;
  564. for ( ui64 i = 0; i < NumberOfDataIntColumns; ++i) {
  565. td.IntColumns[NumberOfKeyIntColumns + NullsBitmapSize_ + i] = tb.DataIntVals[dataIntsOffset + i];
  566. }
  567. char *strPtr = nullptr;
  568. if(NumberOfKeyStringColumns != 0 || NumberOfKeyIColumns != 0) {
  569. keyStringsOffset = tb.StringsOffsets[stringsOffsetsIdx] + HeaderSize;
  570. strPtr = reinterpret_cast<char *>(tb.KeyIntVals.data() + keyStringsOffset);
  571. for (ui64 i = 0; i < NumberOfKeyStringColumns; ++i)
  572. {
  573. td.StrSizes[i] = tb.StringsOffsets[stringsOffsetsIdx + 2 + i];
  574. Y_DEBUG_ABORT_UNLESS(ReadUnaligned<ui32>(strPtr) == td.StrSizes[i]);
  575. strPtr += sizeof(ui32);
  576. td.StrColumns[i] = strPtr;
  577. strPtr += td.StrSizes[i];
  578. }
  579. for ( ui64 i = 0; i < NumberOfKeyIColumns; i++) {
  580. ui32 currSize = tb.StringsOffsets[stringsOffsetsIdx + 2 + NumberOfKeyStringColumns + i];
  581. Y_DEBUG_ABORT_UNLESS(ReadUnaligned<ui32>(strPtr) == currSize);
  582. strPtr += sizeof(ui32);
  583. *(td.IColumns + i) = (ColInterfaces + i)->Packer->Unpack(TStringBuf(strPtr, currSize), ColInterfaces->HolderFactory);
  584. strPtr += currSize;
  585. }
  586. }
  587. if(NumberOfDataStringColumns || NumberOfDataIColumns != 0) {
  588. dataStringsOffset = tb.StringsOffsets[stringsOffsetsIdx + 1];
  589. }
  590. strPtr = (tb.StringsValues.data() + dataStringsOffset);
  591. for ( ui64 i = 0; i < NumberOfDataStringColumns; ++i ) {
  592. ui32 currIdx = NumberOfKeyStringColumns + i;
  593. td.StrColumns[currIdx] = strPtr;
  594. td.StrSizes[currIdx] = tb.StringsOffsets[stringsOffsetsIdx + 2 + currIdx];
  595. strPtr += td.StrSizes[currIdx];
  596. }
  597. for (ui64 i = 0; i < NumberOfDataIColumns; i++ ) {
  598. ui32 currIdx = NumberOfStringColumns + NumberOfKeyIColumns + i;
  599. ui32 currSize = tb.StringsOffsets[stringsOffsetsIdx + 2 + currIdx];
  600. *(td.IColumns + NumberOfKeyIColumns + i) = (ColInterfaces + NumberOfKeyIColumns + i)->Packer->Unpack(TStringBuf(strPtr, currSize), ColInterfaces->HolderFactory);
  601. strPtr += currSize;
  602. }
  603. }
  604. inline bool TTable::AddKeysToHashTable(KeysHashTable& t, ui64* keys, NYql::NUdf::TUnboxedValue * iColumns) {
  605. if (t.NSlots == 0) {
  606. t.SlotSize = HeaderSize + NumberOfKeyStringColumns * 2;
  607. t.Table.resize(DefaultTuplesNum * t.SlotSize, 0);
  608. t.NSlots = DefaultTuplesNum;
  609. }
  610. if ( t.FillCount > t.NSlots/2 ) {
  611. ResizeHashTable(t, 2 * t.NSlots + 1);
  612. }
  613. if ( (*(keys + HashSize) & 1) ) // Keys with null value
  614. return true;
  615. ui64 hash = *keys;
  616. ui64 slot = hash % t.NSlots;
  617. auto it = t.Table.begin() + slot * t.SlotSize;
  618. ui64 keyIntOffset = HashSize + NullsBitmapSize_;
  619. ui64 keysSize = HeaderSize;
  620. ui64 keyStringsSize = 0;
  621. if ( NumberOfKeyStringColumns > 0 || NumberOfKeyIColumns > 0) {
  622. keyStringsSize = *(keys + HeaderSize - 1);
  623. keysSize = HeaderSize + keyStringsSize;
  624. }
  625. auto nextSlot = [begin = t.Table.begin(), end = t.Table.end(), slotSize = t.SlotSize](auto it) {
  626. it += slotSize;
  627. if (it == end)
  628. it = begin;
  629. return it;
  630. };
  631. for (auto itValSize = HeaderSize; *it != 0; it = nextSlot(it)) {
  632. if (*it != hash)
  633. continue;
  634. if ( NumberOfKeyIColumns == 0 && (itValSize <= t.SlotSize)) {
  635. if (!std::equal(it + keyIntOffset, it + itValSize, keys + keyIntOffset))
  636. continue;
  637. return false;
  638. }
  639. Y_DEBUG_ABORT_UNLESS( NumberOfKeyStringColumns > 0 || NumberOfKeyIColumns > 0);
  640. itValSize = HeaderSize + *(it + HeaderSize - 1);
  641. auto slotStringsStart = it + HeaderSize;
  642. if (!std::equal(it + keyIntOffset, it + HeaderSize - 1, keys + keyIntOffset))
  643. continue;
  644. if (NumberOfKeyIColumns > 0) {
  645. if (!CompareIColumns(
  646. (char *) (slotStringsStart),
  647. (char *) (keys + HeaderSize ),
  648. iColumns,
  649. JoinTable1 -> ColInterfaces, JoinTable1->NumberOfStringColumns, JoinTable1 -> NumberOfKeyIColumns ))
  650. continue;
  651. return false;
  652. }
  653. Y_DEBUG_ABORT_UNLESS(!(itValSize <= t.SlotSize));
  654. ui64 stringsPos = *(it + HeaderSize);
  655. slotStringsStart = t.SpillData.begin() + stringsPos;
  656. if (keysSize != itValSize || !std::equal(slotStringsStart, slotStringsStart + itValSize, keys + HeaderSize))
  657. continue;
  658. return false;
  659. }
  660. if (keysSize > t.SlotSize) {
  661. ui64 spillDataOffset = t.SpillData.size();
  662. t.SpillData.insert(t.SpillData.end(), keys + HeaderSize, keys + keysSize);
  663. std::copy_n(keys, HeaderSize, it);
  664. *(it + HeaderSize) = spillDataOffset;
  665. } else {
  666. std::copy_n(keys, keysSize, it);
  667. }
  668. t.FillCount++;
  669. return true;
  670. }
  671. bool TTable::NextJoinedData( TupleData & td1, TupleData & td2, ui64 bucketLimit) {
  672. while (CurrIterBucket < bucketLimit) {
  673. if (auto &joinIds = TableBuckets[CurrIterBucket].JoinIds; CurrIterIndex != joinIds.size()) {
  674. Y_DEBUG_ABORT_UNLESS(JoinKind == EJoinKind::Inner || JoinKind == EJoinKind::Left || JoinKind == EJoinKind::Right || JoinKind == EJoinKind::Full);
  675. auto ids = joinIds[CurrIterIndex++];
  676. JoinTable1->GetTupleData(CurrIterBucket, ids.id1, td1);
  677. JoinTable2->GetTupleData(CurrIterBucket, ids.id2, td2);
  678. return true;
  679. }
  680. auto leftSide = [this](auto sideTable, auto &tdL, auto &tdR) {
  681. const auto &bucket = sideTable->TableBuckets[CurrIterBucket];
  682. auto &currIterIndex = sideTable->CurrIterIndex;
  683. const auto &leftIds = bucket.LeftIds;
  684. if (currIterIndex != leftIds.size()) {
  685. auto id = leftIds[currIterIndex++];
  686. sideTable->GetTupleData(CurrIterBucket, id, tdL);
  687. tdR.AllNulls = true;
  688. return true;
  689. }
  690. return false;
  691. };
  692. if (leftSide(JoinTable1, td1, td2))
  693. return true;
  694. if (leftSide(JoinTable2, td2, td1))
  695. return true;
  696. ++CurrIterBucket;
  697. CurrIterIndex = 0;
  698. JoinTable1->CurrIterIndex = 0;
  699. JoinTable2->CurrIterIndex = 0;
  700. }
  701. return false;
  702. }
  703. void TTable::Clear() {
  704. for (ui64 bucket = 0; bucket < NumberOfBuckets; bucket++) {
  705. ClearBucket(bucket);
  706. }
  707. }
  708. void TTable::ClearBucket(ui64 bucket) {
  709. TTableBucket & tb = TableBuckets[bucket];
  710. tb.KeyIntVals.clear();
  711. tb.DataIntVals.clear();
  712. tb.StringsOffsets.clear();
  713. tb.StringsValues.clear();
  714. tb.InterfaceValues.clear();
  715. tb.InterfaceOffsets.clear();
  716. tb.JoinIds.clear();
  717. tb.LeftIds.clear();
  718. tb.JoinSlots.clear();
  719. tb.NSlots = 0;
  720. TTableBucketStats & tbs = TableBucketsStats[bucket];
  721. tbs.TuplesNum = 0;
  722. tbs.KeyIntValsTotalSize = 0;
  723. tbs.StringValuesTotalSize = 0;
  724. }
  725. void TTable::ShrinkBucket(ui64 bucket) {
  726. TTableBucket & tb = TableBuckets[bucket];
  727. tb.KeyIntVals.shrink_to_fit();
  728. tb.DataIntVals.shrink_to_fit();
  729. tb.StringsOffsets.shrink_to_fit();
  730. tb.StringsValues.shrink_to_fit();
  731. tb.InterfaceValues.shrink_to_fit();
  732. tb.InterfaceOffsets.shrink_to_fit();
  733. tb.JoinIds.shrink_to_fit();
  734. tb.LeftIds.shrink_to_fit();
  735. tb.JoinSlots.shrink_to_fit();
  736. }
  737. void TTable::InitializeBucketSpillers(ISpiller::TPtr spiller) {
  738. for (size_t i = 0; i < NumberOfBuckets; ++i) {
  739. TableBucketsSpillers.emplace_back(spiller, 5_MB);
  740. }
  741. }
  742. ui64 TTable::GetSizeOfBucket(ui64 bucket) const {
  743. return TableBuckets[bucket].KeyIntVals.size() * sizeof(ui64)
  744. + TableBuckets[bucket].JoinSlots.size() * sizeof(ui64)
  745. + TableBuckets[bucket].DataIntVals.size() * sizeof(ui64)
  746. + TableBuckets[bucket].StringsValues.size()
  747. + TableBuckets[bucket].StringsOffsets.size() * sizeof(ui32)
  748. + TableBuckets[bucket].InterfaceValues.size()
  749. + TableBuckets[bucket].InterfaceOffsets.size() * sizeof(ui32);
  750. }
  751. bool TTable::TryToReduceMemoryAndWait(ui64 bucket) {
  752. if (GetSizeOfBucket(bucket) < SpillingSizeLimit/NumberOfBuckets) return false;
  753. if (const auto &tbs = TableBucketsStats[bucket]; tbs.HashtableMatches) {
  754. auto &tb = TableBuckets[bucket];
  755. if (tb.JoinSlots.size()) {
  756. const auto slotSize = tbs.SlotSize;
  757. Y_DEBUG_ABORT_UNLESS(slotSize);
  758. auto it = tb.JoinSlots.cbegin();
  759. const auto end = tb.JoinSlots.cend();
  760. for (; it != end; it += slotSize) {
  761. // Note: we need not check if *it is 0
  762. if ((*(it + HashSize) & 1)) {
  763. ui64 keyIntsOffset;
  764. auto tupleId = *(it + slotSize - 1);
  765. Y_DEBUG_ABORT_UNLESS(tupleId < tbs.TuplesNum);
  766. if (NumberOfKeyStringColumns != 0 || NumberOfKeyIColumns != 0) {
  767. ui64 stringsOffsetsIdx = tupleId * (NumberOfStringColumns + NumberOfIColumns + 2);
  768. keyIntsOffset = tb.StringsOffsets[stringsOffsetsIdx];
  769. } else {
  770. keyIntsOffset = HeaderSize * tupleId;
  771. }
  772. tb.KeyIntVals[keyIntsOffset + HashSize] |= 1;
  773. }
  774. }
  775. tb.JoinSlots.clear();
  776. tb.JoinSlots.shrink_to_fit();
  777. }
  778. }
  779. TableBucketsSpillers[bucket].SpillBucket(std::move(TableBuckets[bucket]));
  780. TableBuckets[bucket] = TTableBucket{};
  781. return TableBucketsSpillers[bucket].IsProcessingSpilling();
  782. }
  783. void TTable::UpdateSpilling() {
  784. for (ui64 i = 0; i < NumberOfBuckets; ++i) {
  785. TableBucketsSpillers[i].Update();
  786. }
  787. }
  788. bool TTable::IsSpillingFinished() const {
  789. for (ui64 i = 0; i < NumberOfBuckets; ++i) {
  790. if (TableBucketsSpillers[i].IsProcessingSpilling()) return false;
  791. }
  792. return true;
  793. }
  794. bool TTable::IsSpillingAcceptingDataRequests() const {
  795. for (ui64 i = 0; i < NumberOfBuckets; ++i) {
  796. if (TableBucketsSpillers[i].IsInMemory()) continue;
  797. if (!TableBucketsSpillers[i].IsAcceptingDataRequests()) return false;
  798. }
  799. return true;
  800. }
  801. bool TTable::IsRestoringSpilledBuckets() const {
  802. for (ui64 i = 0; i < NumberOfBuckets; ++i) {
  803. if (TableBucketsSpillers[i].IsRestoring()) return true;
  804. }
  805. return false;
  806. }
  807. void TTable::FinalizeSpilling() {
  808. for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) {
  809. if (!TableBucketsSpillers[bucket].IsInMemory()) {
  810. TableBucketsSpillers[bucket].Finalize();
  811. TableBucketsSpillers[bucket].SpillBucket(std::move(TableBuckets[bucket]));
  812. TableBuckets[bucket] = TTableBucket{};
  813. }
  814. }
  815. }
  816. bool TTable::IsBucketInMemory(ui32 bucket) const {
  817. return TableBucketsSpillers[bucket].IsInMemory();
  818. }
  819. bool TTable::IsSpilledBucketWaitingForExtraction(ui32 bucket) const {
  820. return TableBucketsSpillers[bucket].IsExtractionRequired();
  821. }
  822. void TTable::StartLoadingBucket(ui32 bucket) {
  823. MKQL_ENSURE(!TableBucketsSpillers[bucket].IsInMemory(), "Internal logic error");
  824. TableBucketsSpillers[bucket].StartBucketRestoration();
  825. }
  826. void TTable::PrepareBucket(ui64 bucket) {
  827. if (!TableBucketsSpillers[bucket].IsExtractionRequired()) return;
  828. TableBuckets[bucket] = std::move(TableBucketsSpillers[bucket].ExtractBucket());
  829. }
  830. // Creates new table with key columns and data columns
  831. TTable::TTable( ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns,
  832. ui64 numberOfDataIntColumns, ui64 numberOfDataStringColumns,
  833. ui64 numberOfKeyIColumns, ui64 numberOfDataIColumns,
  834. ui64 nullsBitmapSize, TColTypeInterface * colInterfaces, bool isAny ) :
  835. NumberOfKeyIntColumns(numberOfKeyIntColumns),
  836. NumberOfKeyStringColumns(numberOfKeyStringColumns),
  837. NumberOfKeyIColumns(numberOfKeyIColumns),
  838. NumberOfDataIntColumns(numberOfDataIntColumns),
  839. NumberOfDataStringColumns(numberOfDataStringColumns),
  840. NumberOfDataIColumns(numberOfDataIColumns),
  841. ColInterfaces(colInterfaces),
  842. NullsBitmapSize_(nullsBitmapSize),
  843. IsAny_(isAny) {
  844. NumberOfKeyColumns = NumberOfKeyIntColumns + NumberOfKeyStringColumns + NumberOfKeyIColumns;
  845. NumberOfDataColumns = NumberOfDataIntColumns + NumberOfDataStringColumns + NumberOfDataIColumns;
  846. NumberOfColumns = NumberOfKeyColumns + NumberOfDataColumns;
  847. NumberOfStringColumns = NumberOfKeyStringColumns + NumberOfDataStringColumns;
  848. NumberOfIColumns = NumberOfKeyIColumns + NumberOfDataIColumns;
  849. BytesInKeyIntColumns = NumberOfKeyIntColumns * sizeof(ui64);
  850. TotalStringsSize = (numberOfKeyStringColumns > 0 || NumberOfKeyIColumns > 0 ) ? 1 : 0;
  851. HeaderSize = HashSize + NullsBitmapSize_ + NumberOfKeyIntColumns + NumberOfKeyIColumns + TotalStringsSize;
  852. TableBuckets.resize(NumberOfBuckets);
  853. TableBucketsStats.resize(NumberOfBuckets);
  854. const ui64 reservedSizePerTuple = (2 * DefaultTupleBytes) / sizeof(ui64);
  855. TempTuple.reserve( reservedSizePerTuple );
  856. IColumnsHashes.resize(NumberOfKeyIColumns);
  857. IColumnsVals.resize(NumberOfIColumns);
  858. const ui64 totalForTuples = DefaultTuplesNum * reservedSizePerTuple;
  859. for ( auto & b: TableBuckets ) {
  860. b.KeyIntVals.reserve( (totalForTuples * NumberOfKeyColumns) / (NumberOfColumns + 1) );
  861. b.StringsOffsets.reserve((totalForTuples * NumberOfStringColumns) / (NumberOfColumns + 1));
  862. b.DataIntVals.reserve( (totalForTuples * NumberOfDataIntColumns) / (NumberOfColumns + 1));
  863. b.StringsValues.reserve( (totalForTuples * NumberOfStringColumns) / (NumberOfColumns + 1) );
  864. b.InterfaceOffsets.reserve( (totalForTuples * NumberOfIColumns) / (NumberOfColumns + 1) );
  865. b.InterfaceValues.reserve( (totalForTuples * NumberOfIColumns) / (NumberOfColumns + 1));
  866. }
  867. }
  868. TTable::~TTable() {
  869. YQL_LOG_IF(GRACEJOIN_DEBUG, InitHashTableCount_)
  870. << (const void *)this << '#' << "InitHashTableCount " << InitHashTableCount_
  871. << " BloomLookups " << BloomLookups_ << " BloomHits " << BloomHits_ << " BloomFalsePositives " << BloomFalsePositives_
  872. << " HashLookups " << HashLookups_ << " HashChainTraversal " << HashO1Iterations_/(double)HashLookups_ << " HashSlotOperations " << HashSlotIterations_/(double)HashLookups_
  873. << " Table1 " << JoinTable1Total_ << " Table2 " << JoinTable2Total_ << " TuplesFound " << TuplesFound_
  874. ;
  875. YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable1 && JoinTable1->AnyFiltered_) << (const void *)this << '#' << "L AnyFiltered " << JoinTable1->AnyFiltered_;
  876. YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable1 && JoinTable1->BloomLookups_) << (const void *)this << '#' << "L BloomLookups " << JoinTable1->BloomLookups_ << " BloomHits " << JoinTable1->BloomHits_;
  877. YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable2 && JoinTable2->AnyFiltered_) << (const void *)this << '#' << "R AnyFiltered " << JoinTable2->AnyFiltered_;
  878. YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable2 && JoinTable2->BloomLookups_) << (const void *)this << '#' << "R BloomLookups " << JoinTable2->BloomLookups_ << " BloomHits " << JoinTable2->BloomHits_;
  879. };
  880. TTableBucketSpiller::TTableBucketSpiller(ISpiller::TPtr spiller, size_t sizeLimit)
  881. : StateUi64Adapter(spiller, sizeLimit)
  882. , StateUi32Adapter(spiller, sizeLimit)
  883. , StateCharAdapter(spiller, sizeLimit)
  884. {
  885. }
  886. void TTableBucketSpiller::Update() {
  887. StateUi64Adapter.Update();
  888. StateUi32Adapter.Update();
  889. StateCharAdapter.Update();
  890. if (State == EState::Spilling) {
  891. ProcessBucketSpilling();
  892. } else if (State == EState::Finalizing) {
  893. ProcessFinalizing();
  894. } else if (State == EState::Restoring) {
  895. ProcessBucketRestoration();
  896. }
  897. }
  898. void TTableBucketSpiller::Finalize() {
  899. IsFinalizingRequested = true;
  900. }
  901. void TTableBucketSpiller::SpillBucket(TTableBucket&& bucket) {
  902. MKQL_ENSURE(NextVectorToProcess == ENextVectorToProcess::None, "Internal logic error");
  903. State = EState::Spilling;
  904. CurrentBucket = std::move(bucket);
  905. NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
  906. ProcessBucketSpilling();
  907. }
  908. TTableBucket&& TTableBucketSpiller::ExtractBucket() {
  909. MKQL_ENSURE(State == EState::WaitingForExtraction, "Internal logic error");
  910. MKQL_ENSURE(SpilledBucketsCount == 0, "Internal logic error");
  911. State = EState::InMemory;
  912. return std::move(CurrentBucket);
  913. }
  914. bool TTableBucketSpiller::IsInMemory() const {
  915. return State == EState::InMemory;
  916. }
  917. bool TTableBucketSpiller::IsExtractionRequired() const {
  918. return State == EState::WaitingForExtraction;
  919. }
  920. bool TTableBucketSpiller::IsProcessingSpilling() const {
  921. return State == EState::Spilling;
  922. }
  923. bool TTableBucketSpiller::IsAcceptingDataRequests() const {
  924. return State == EState::AcceptingDataRequests;
  925. }
  926. bool TTableBucketSpiller::IsRestoring() const {
  927. return State == EState::Restoring;
  928. }
  929. void TTableBucketSpiller::StartBucketRestoration() {
  930. MKQL_ENSURE(State == EState::AcceptingDataRequests, "Internal logic error");
  931. MKQL_ENSURE(NextVectorToProcess == ENextVectorToProcess::None, "Internal logic error");
  932. NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
  933. State = EState::Restoring;
  934. ProcessBucketRestoration();
  935. }
  936. void TTableBucketSpiller::ProcessBucketSpilling() {
  937. while (NextVectorToProcess != ENextVectorToProcess::None) {
  938. switch (NextVectorToProcess) {
  939. case ENextVectorToProcess::KeyAndVals:
  940. if (!StateUi64Adapter.IsAcceptingData()) return;
  941. StateUi64Adapter.AddData(std::move(CurrentBucket.KeyIntVals));
  942. NextVectorToProcess = ENextVectorToProcess::DataIntVals;
  943. break;
  944. case ENextVectorToProcess::DataIntVals:
  945. if (!StateUi64Adapter.IsAcceptingData()) return;
  946. StateUi64Adapter.AddData(std::move(CurrentBucket.DataIntVals));
  947. NextVectorToProcess = ENextVectorToProcess::StringsValues;
  948. break;
  949. case ENextVectorToProcess::StringsValues:
  950. if (!StateCharAdapter.IsAcceptingData()) return;
  951. StateCharAdapter.AddData(std::move(CurrentBucket.StringsValues));
  952. NextVectorToProcess = ENextVectorToProcess::StringsOffsets;
  953. break;
  954. case ENextVectorToProcess::StringsOffsets:
  955. if (!StateUi32Adapter.IsAcceptingData()) return;
  956. StateUi32Adapter.AddData(std::move(CurrentBucket.StringsOffsets));
  957. NextVectorToProcess = ENextVectorToProcess::InterfaceValues;
  958. break;
  959. case ENextVectorToProcess::InterfaceValues:
  960. if (!StateCharAdapter.IsAcceptingData()) return;
  961. StateCharAdapter.AddData(std::move(CurrentBucket.InterfaceValues));
  962. NextVectorToProcess = ENextVectorToProcess::InterfaceOffsets;
  963. break;
  964. case ENextVectorToProcess::InterfaceOffsets:
  965. if (!StateUi32Adapter.IsAcceptingData()) return;
  966. StateUi32Adapter.AddData(std::move(CurrentBucket.InterfaceOffsets));
  967. NextVectorToProcess = ENextVectorToProcess::None;
  968. SpilledBucketsCount++;
  969. break;
  970. default:
  971. return;
  972. }
  973. }
  974. if (IsFinalizingRequested) {
  975. if (!StateCharAdapter.IsAcceptingData() || !StateUi32Adapter.IsAcceptingData() || !StateUi64Adapter.IsAcceptingData()) return;
  976. State = EState::Finalizing;
  977. StateUi64Adapter.Finalize();
  978. StateUi32Adapter.Finalize();
  979. StateCharAdapter.Finalize();
  980. ProcessFinalizing();
  981. return;
  982. }
  983. State = EState::AcceptingData;
  984. }
  985. void TTableBucketSpiller::ProcessFinalizing() {
  986. if (StateCharAdapter.IsAcceptingDataRequests() && StateUi32Adapter.IsAcceptingDataRequests() && StateUi64Adapter.IsAcceptingDataRequests()) {
  987. State = EState::AcceptingDataRequests;
  988. }
  989. }
  990. template <class T>
  991. void TTableBucketSpiller::AppendVector(std::vector<T, TMKQLAllocator<T>>& first, std::vector<T, TMKQLAllocator<T>>&& second) const {
  992. if (first.empty()) {
  993. first = std::move(second);
  994. return;
  995. }
  996. first.insert(first.end(), second.begin(), second.end());
  997. second.clear();
  998. }
  999. void TTableBucketSpiller::ProcessBucketRestoration() {
  1000. while (NextVectorToProcess != ENextVectorToProcess::None) {
  1001. switch (NextVectorToProcess) {
  1002. case ENextVectorToProcess::KeyAndVals:
  1003. if (StateUi64Adapter.IsDataReady()) {
  1004. AppendVector(CurrentBucket.KeyIntVals, StateUi64Adapter.ExtractVector());
  1005. NextVectorToProcess = ENextVectorToProcess::DataIntVals;
  1006. break;
  1007. }
  1008. if (StateUi64Adapter.IsAcceptingDataRequests()) {
  1009. StateUi64Adapter.RequestNextVector();
  1010. break;
  1011. }
  1012. return;
  1013. case ENextVectorToProcess::DataIntVals:
  1014. if (StateUi64Adapter.IsDataReady()) {
  1015. AppendVector(CurrentBucket.DataIntVals, StateUi64Adapter.ExtractVector());
  1016. NextVectorToProcess = ENextVectorToProcess::StringsValues;
  1017. break;
  1018. }
  1019. if (StateUi64Adapter.IsAcceptingDataRequests()) {
  1020. StateUi64Adapter.RequestNextVector();
  1021. break;
  1022. }
  1023. return;
  1024. case ENextVectorToProcess::StringsValues:
  1025. if (StateCharAdapter.IsDataReady()) {
  1026. AppendVector(CurrentBucket.StringsValues, StateCharAdapter.ExtractVector());
  1027. NextVectorToProcess = ENextVectorToProcess::StringsOffsets;
  1028. break;
  1029. }
  1030. if (StateCharAdapter.IsAcceptingDataRequests()) {
  1031. StateCharAdapter.RequestNextVector();
  1032. break;
  1033. }
  1034. return;
  1035. case ENextVectorToProcess::StringsOffsets:
  1036. if (StateUi32Adapter.IsDataReady()) {
  1037. AppendVector(CurrentBucket.StringsOffsets, StateUi32Adapter.ExtractVector());
  1038. NextVectorToProcess = ENextVectorToProcess::InterfaceValues;
  1039. break;
  1040. }
  1041. if (StateUi32Adapter.IsAcceptingDataRequests()) {
  1042. StateUi32Adapter.RequestNextVector();
  1043. break;
  1044. }
  1045. return;
  1046. case ENextVectorToProcess::InterfaceValues:
  1047. if (StateCharAdapter.IsDataReady()) {
  1048. AppendVector(CurrentBucket.InterfaceValues, StateCharAdapter.ExtractVector());
  1049. NextVectorToProcess = ENextVectorToProcess::InterfaceOffsets;
  1050. break;
  1051. }
  1052. if (StateCharAdapter.IsAcceptingDataRequests()) {
  1053. StateCharAdapter.RequestNextVector();
  1054. break;
  1055. }
  1056. return;
  1057. case ENextVectorToProcess::InterfaceOffsets:
  1058. if (StateUi32Adapter.IsDataReady()) {
  1059. AppendVector(CurrentBucket.InterfaceOffsets, StateUi32Adapter.ExtractVector());
  1060. SpilledBucketsCount--;
  1061. if (SpilledBucketsCount == 0) {
  1062. NextVectorToProcess = ENextVectorToProcess::None;
  1063. State = EState::WaitingForExtraction;
  1064. } else {
  1065. NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
  1066. }
  1067. break;
  1068. }
  1069. if (StateUi32Adapter.IsAcceptingDataRequests()) {
  1070. StateUi32Adapter.RequestNextVector();
  1071. break;
  1072. }
  1073. return;
  1074. default:
  1075. return;
  1076. }
  1077. }
  1078. }
  1079. }
  1080. }
  1081. }