mkql_grace_join.cpp 56 KB


  1. #include "mkql_counters.h"
  2. #include "mkql_grace_join.h"
  3. #include "mkql_grace_join_imp.h"
  4. #include <yql/essentials/public/udf/udf_data_type.h>
  5. #include <yql/essentials/public/udf/udf_value.h>
  6. #include <yql/essentials/public/decimal/yql_decimal_serialize.h>
  7. #include <yql/essentials/minikql/computation/mkql_custom_list.h>
  8. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  9. #include <yql/essentials/minikql/comp_nodes/mkql_factories.h>
  10. #include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
  11. #include <yql/essentials/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
  12. #include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
  13. #include <yql/essentials/minikql/computation/mkql_llvm_base.h> // Y_IGNORE
  14. #include <yql/essentials/minikql/mkql_node_cast.h>
  15. #include <yql/essentials/minikql/mkql_program_builder.h>
  16. #include <yql/essentials/minikql/mkql_string_util.h>
  17. #include <yql/essentials/utils/log/log.h>
  18. #include <yql/essentials/parser/pg_catalog/catalog.h>
  19. #include <chrono>
  20. #include <limits>
  21. namespace NKikimr {
  22. namespace NMiniKQL {
  23. namespace {
  24. const ui32 PartialJoinBatchSize = 100000; // Number of tuples for one join batch
  25. struct TColumnDataPackInfo {
  26. ui32 ColumnIdx = 0; // Column index in tuple
  27. ui32 Bytes = 0; // Size in bytes for fixed size values
  28. TType* MKQLType; // Data type of the column in term of compute nodes data flows
  29. NUdf::EDataSlot DataType = NUdf::EDataSlot::Uint32; // Data type of the column for standard types (TDataType)
  30. TString Name; // Name of the type column
  31. bool IsKeyColumn = false; // True if this columns is key for join
  32. bool IsString = false; // True if value is string
  33. bool IsPgType = false; // True if column is PG type
  34. bool IsPresortSupported = false; // True if pg type supports presort and can be interpreted as string value
  35. bool IsIType = false; // True if column need to be processed via IHash, IEquate interfaces
  36. ui32 Offset = 0; // Offset of column in packed data
  37. // TValuePacker Packer; // Packer for composite data types
  38. };
  39. struct TGraceJoinPacker {
  40. ui64 NullsBitmapSize = 0; // Number of ui64 values for nulls bitmap
  41. ui64 TuplesPacked = 0; // Total number of packed tuples
  42. ui64 TuplesBatchPacked = 0; // Number of tuples packed during current join batch
  43. ui64 TuplesUnpacked = 0; // Total number of unpacked tuples
  44. ui64 BatchSize = PartialJoinBatchSize; // Batch size for partial table packing and join
  45. std::chrono::time_point<std::chrono::system_clock> StartTime; // Start time of execution
  46. std::chrono::time_point<std::chrono::system_clock> EndTime; // End time of execution
  47. std::vector<ui64> TupleIntVals; // Packed value of all fixed length values of table tuple. Keys columns should be packed first.
  48. std::vector<ui32> TupleStrSizes; // Sizes of all packed strings
  49. std::vector<char*> TupleStrings; // All values of tuple strings
  50. std::vector<TType*> ColumnTypes; // Types of all columns
  51. std::vector<std::shared_ptr<TValuePacker>> Packers; // Packers for composite data types
  52. const THolderFactory& HolderFactory; // To use during unpacking
  53. std::vector<TColumnDataPackInfo> ColumnsPackInfo; // Information about columns packing
  54. std::unique_ptr<GraceJoin::TTable> TablePtr; // Table to pack data
  55. std::vector<NUdf::TUnboxedValue> TupleHolder; // Storage for tuple data
  56. std::vector<NUdf::TUnboxedValue*> TuplePtrs; // Storage for tuple data pointers to use in FetchValues
  57. std::vector<std::string> TupleStringHolder; // Storage for complex tuple data types serialized to strings
  58. std::vector<NUdf::TUnboxedValue> IColumnsHolder; // Storage for interface-based types (IHash, IEquate)
  59. GraceJoin::TupleData JoinTupleData; // TupleData to get join results
  60. ui64 TotalColumnsNum = 0; // Total number of columns to pack
  61. ui64 TotalIntColumnsNum = 0; // Total number of int columns
  62. ui64 TotalStrColumnsNum = 0; // Total number of string columns
  63. ui64 TotalIColumnsNum = 0; // Total number of interface-based columns
  64. ui64 KeyIntColumnsNum = 0; // Total number of key int columns in original table
  65. ui64 PackedKeyIntColumnsNum = 0; // Length of ui64 array containing data of all key int columns after packing
  66. ui64 KeyStrColumnsNum = 0; // Total number of key string columns
  67. ui64 KeyIColumnsNum = 0; // Total number of interface-based columns
  68. ui64 DataIntColumnsNum = TotalIntColumnsNum - KeyIntColumnsNum;
  69. ui64 PackedDataIntColumnsNum = 0; // Length of ui64 array containing data of all non-key int columns after packing
  70. ui64 DataStrColumnsNum = TotalStrColumnsNum - KeyStrColumnsNum;
  71. ui64 DataIColumnsNum = TotalIColumnsNum - KeyIColumnsNum;
  72. std::vector<GraceJoin::TColTypeInterface> ColumnInterfaces;
  73. bool IsAny; // Flag to support any join attribute
  74. inline void Pack() ; // Packs new tuple from TupleHolder and TuplePtrs to TupleIntVals, TupleStrSizes, TupleStrings
  75. inline void UnPack(); // Unpacks packed values from TupleIntVals, TupleStrSizes, TupleStrings into TupleHolder and TuplePtrs
  76. TGraceJoinPacker(const std::vector<TType*>& columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory, bool isAny);
  77. };
  78. TColumnDataPackInfo GetPackInfo(TType* type) {
  79. NUdf::TDataTypeId colTypeId;
  80. TColumnDataPackInfo res;
  81. res.MKQLType = type;
  82. TType* colType;
  83. if (type->IsOptional()) {
  84. colType = AS_TYPE(TOptionalType, type)->GetItemType();
  85. } else {
  86. colType = type;
  87. }
  88. if (type->GetKind() == TType::EKind::Pg ) {
  89. TPgType* pgType = AS_TYPE(TPgType, type);
  90. res.IsPgType = true;
  91. if (pgType->IsPresortSupported()) {
  92. res.IsPresortSupported = true;
  93. res.IsString = true;
  94. res.DataType = NUdf::EDataSlot::String;
  95. res.Name = pgType->GetName();
  96. } else {
  97. res.IsIType = true;
  98. }
  99. return res;
  100. }
  101. if (colType->GetKind() != TType::EKind::Data) {
  102. res.IsString = true;
  103. res.DataType = NUdf::EDataSlot::String;
  104. return res;
  105. }
  106. colTypeId = AS_TYPE(TDataType, colType)->GetSchemeType();
  107. NUdf::EDataSlot dataType = NUdf::GetDataSlot(colTypeId);
  108. res.DataType = dataType;
  109. const NYql::NUdf::TDataTypeInfo& ti = GetDataTypeInfo(dataType);
  110. res.Name = ti.Name;
  111. switch (dataType){
  112. case NUdf::EDataSlot::Bool:
  113. res.Bytes = sizeof(bool); break;
  114. case NUdf::EDataSlot::Int8:
  115. res.Bytes = sizeof(i8); break;
  116. case NUdf::EDataSlot::Uint8:
  117. res.Bytes = sizeof(ui8); break;
  118. case NUdf::EDataSlot::Int16:
  119. res.Bytes = sizeof(i16); break;
  120. case NUdf::EDataSlot::Uint16:
  121. res.Bytes = sizeof(ui16); break;
  122. case NUdf::EDataSlot::Int32:
  123. res.Bytes = sizeof(i32); break;
  124. case NUdf::EDataSlot::Uint32:
  125. res.Bytes = sizeof(ui32); break;
  126. case NUdf::EDataSlot::Int64:
  127. res.Bytes = sizeof(i64); break;
  128. case NUdf::EDataSlot::Uint64:
  129. res.Bytes = sizeof(ui64); break;
  130. case NUdf::EDataSlot::Float:
  131. res.Bytes = sizeof(float); break;
  132. case NUdf::EDataSlot::Double:
  133. res.Bytes = sizeof(double); break;
  134. case NUdf::EDataSlot::Date:
  135. res.Bytes = sizeof(ui16); break;
  136. case NUdf::EDataSlot::Datetime:
  137. res.Bytes = sizeof(ui32); break;
  138. case NUdf::EDataSlot::Timestamp:
  139. res.Bytes = sizeof(ui64); break;
  140. case NUdf::EDataSlot::Interval:
  141. res.Bytes = sizeof(i64); break;
  142. case NUdf::EDataSlot::TzDate:
  143. res.Bytes = 4; break;
  144. case NUdf::EDataSlot::TzDatetime:
  145. res.Bytes = 6; break;
  146. case NUdf::EDataSlot::TzTimestamp:
  147. res.Bytes = 10; break;
  148. case NUdf::EDataSlot::Decimal:
  149. res.Bytes = 16; break;
  150. case NUdf::EDataSlot::Date32:
  151. res.Bytes = 4; break;
  152. case NUdf::EDataSlot::Datetime64:
  153. res.Bytes = 8; break;
  154. case NUdf::EDataSlot::Timestamp64:
  155. res.Bytes = 8; break;
  156. case NUdf::EDataSlot::Interval64:
  157. res.Bytes = 8; break;
  158. case NUdf::EDataSlot::Uuid:
  159. case NUdf::EDataSlot::DyNumber:
  160. case NUdf::EDataSlot::JsonDocument:
  161. case NUdf::EDataSlot::String:
  162. case NUdf::EDataSlot::Utf8:
  163. case NUdf::EDataSlot::Yson:
  164. case NUdf::EDataSlot::Json:
  165. res.IsString = true; break;
  166. default:
  167. {
  168. MKQL_ENSURE(false, "Unknown data type.");
  169. res.IsString = true;
  170. }
  171. }
  172. return res;
  173. }
  174. void TGraceJoinPacker::Pack() {
  175. TuplesPacked++;
  176. std::fill(TupleIntVals.begin(), TupleIntVals.end(), 0);
  177. for (ui64 i = 0; i < ColumnsPackInfo.size(); i++) {
  178. const TColumnDataPackInfo &pi = ColumnsPackInfo[i];
  179. ui32 offset = pi.Offset;
  180. NYql::NUdf::TUnboxedValue value = *TuplePtrs[pi.ColumnIdx];
  181. if (!value) { // Null value
  182. ui64 currNullsIdx = (i + 1) / (sizeof(ui64) * 8);
  183. ui64 remShift = ( (i + 1) - currNullsIdx * (sizeof(ui64) * 8) );
  184. ui64 bitMask = ui64(0x1) << remShift;
  185. TupleIntVals[currNullsIdx] |= bitMask;
  186. if (pi.IsKeyColumn) {
  187. TupleIntVals[0] |= ui64(0x1);
  188. }
  189. continue;
  190. }
  191. TType* type = pi.MKQLType;
  192. TType* colType;
  193. if (type->IsOptional()) {
  194. colType = AS_TYPE(TOptionalType, type)->GetItemType();
  195. } else {
  196. colType = type;
  197. }
  198. if (colType->GetKind() != TType::EKind::Data) {
  199. if (pi.IsIType ) { // Interface-based type
  200. IColumnsHolder[offset] = value;
  201. } else {
  202. TStringBuf strBuf = Packers[pi.ColumnIdx]->Pack(value);
  203. TupleStringHolder[i] = strBuf;
  204. TupleStrings[offset] = TupleStringHolder[i].data();
  205. TupleStrSizes[offset] = TupleStringHolder[i].size();
  206. }
  207. continue;
  208. }
  209. char *buffPtr = reinterpret_cast<char *> (TupleIntVals.data()) + offset;
  210. switch (pi.DataType)
  211. {
  212. case NUdf::EDataSlot::Bool:
  213. WriteUnaligned<bool>(buffPtr, value.Get<bool>()); break;
  214. case NUdf::EDataSlot::Int8:
  215. WriteUnaligned<i8>(buffPtr, value.Get<i8>()); break;
  216. case NUdf::EDataSlot::Uint8:
  217. WriteUnaligned<ui8>(buffPtr, value.Get<ui8>()); break;
  218. case NUdf::EDataSlot::Int16:
  219. WriteUnaligned<i16>(buffPtr, value.Get<i16>()); break;
  220. case NUdf::EDataSlot::Uint16:
  221. WriteUnaligned<ui16>(buffPtr, value.Get<ui16>()); break;
  222. case NUdf::EDataSlot::Int32:
  223. WriteUnaligned<i32>(buffPtr, value.Get<i32>()); break;
  224. case NUdf::EDataSlot::Uint32:
  225. WriteUnaligned<ui32>(buffPtr, value.Get<ui32>()); break;
  226. case NUdf::EDataSlot::Int64:
  227. WriteUnaligned<i64>(buffPtr, value.Get<i64>()); break;
  228. case NUdf::EDataSlot::Uint64:
  229. WriteUnaligned<ui64>(buffPtr, value.Get<ui64>()); break;
  230. case NUdf::EDataSlot::Float:
  231. WriteUnaligned<float>(buffPtr, value.Get<float>()); break;
  232. case NUdf::EDataSlot::Double:
  233. WriteUnaligned<double>(buffPtr, value.Get<double>()); break;
  234. case NUdf::EDataSlot::Date:
  235. WriteUnaligned<ui16>(buffPtr, value.Get<ui16>()); break;
  236. case NUdf::EDataSlot::Datetime:
  237. WriteUnaligned<ui32>(buffPtr, value.Get<ui32>()); break;
  238. case NUdf::EDataSlot::Timestamp:
  239. WriteUnaligned<ui64>(buffPtr, value.Get<ui64>()); break;
  240. case NUdf::EDataSlot::Interval:
  241. WriteUnaligned<i64>(buffPtr, value.Get<i64>()); break;
  242. case NUdf::EDataSlot::Date32:
  243. WriteUnaligned<i32>(buffPtr, value.Get<i32>()); break;
  244. case NUdf::EDataSlot::Datetime64:
  245. WriteUnaligned<i64>(buffPtr, value.Get<i64>()); break;
  246. case NUdf::EDataSlot::Timestamp64:
  247. WriteUnaligned<i64>(buffPtr, value.Get<i64>()); break;
  248. case NUdf::EDataSlot::Interval64:
  249. WriteUnaligned<i64>(buffPtr, value.Get<i64>()); break;
  250. case NUdf::EDataSlot::TzDate:
  251. {
  252. WriteUnaligned<ui16>(buffPtr, value.Get<ui16>());
  253. WriteUnaligned<ui16>(buffPtr + sizeof(ui16), value.GetTimezoneId());
  254. break;
  255. }
  256. case NUdf::EDataSlot::TzDatetime:
  257. {
  258. WriteUnaligned<ui32>(buffPtr, value.Get<ui32>());
  259. WriteUnaligned<ui16>(buffPtr + sizeof(ui32), value.GetTimezoneId());
  260. break;
  261. }
  262. case NUdf::EDataSlot::TzTimestamp:
  263. {
  264. WriteUnaligned<ui64>(buffPtr, value.Get<ui64>());
  265. WriteUnaligned<ui16>(buffPtr + sizeof(ui64), value.GetTimezoneId());
  266. break;
  267. }
  268. case NUdf::EDataSlot::Decimal:
  269. {
  270. NYql::NDecimal::Serialize(value.GetInt128(), buffPtr);
  271. break;
  272. }
  273. default:
  274. {
  275. auto str = TuplePtrs[pi.ColumnIdx]->AsStringRef();
  276. TupleStrings[offset] = str.Data();
  277. TupleStrSizes[offset] = str.Size();
  278. }
  279. }
  280. }
  281. }
  282. void TGraceJoinPacker::UnPack() {
  283. TuplesUnpacked++;
  284. for (ui64 i = 0; i < ColumnsPackInfo.size(); i++) {
  285. const TColumnDataPackInfo &pi = ColumnsPackInfo[i];
  286. ui32 offset = pi.Offset;
  287. NYql::NUdf::TUnboxedValue & value = *TuplePtrs[pi.ColumnIdx];
  288. if (JoinTupleData.AllNulls) {
  289. value = NYql::NUdf::TUnboxedValue();
  290. continue;
  291. }
  292. ui64 currNullsIdx = (i + 1) / (sizeof(ui64) * 8);
  293. ui64 remShift = ( (i + 1) - currNullsIdx * (sizeof(ui64) * 8) );
  294. ui64 bitMask = ui64(0x1) << remShift;
  295. if ( TupleIntVals[currNullsIdx] & bitMask ) {
  296. value = NYql::NUdf::TUnboxedValue();
  297. continue;
  298. }
  299. TType * type = pi.MKQLType;
  300. TType * colType;
  301. if (type->IsOptional()) {
  302. colType = AS_TYPE(TOptionalType, type)->GetItemType();
  303. } else {
  304. colType = type;
  305. }
  306. if (colType->GetKind() != TType::EKind::Data) {
  307. if (colType->GetKind() == TType::EKind::Pg) {
  308. if ( pi.IsIType ) { // Interface-based type
  309. value = IColumnsHolder[offset];
  310. continue;
  311. }
  312. }
  313. value = Packers[pi.ColumnIdx]->Unpack(TStringBuf(TupleStrings[offset], TupleStrSizes[offset]), HolderFactory);
  314. continue;
  315. }
  316. char *buffPtr = reinterpret_cast<char *> (TupleIntVals.data()) + offset;
  317. switch (pi.DataType)
  318. {
  319. case NUdf::EDataSlot::Bool:
  320. value = NUdf::TUnboxedValuePod(ReadUnaligned<bool>(buffPtr)); break;
  321. case NUdf::EDataSlot::Int8:
  322. value = NUdf::TUnboxedValuePod(ReadUnaligned<i8>(buffPtr)); break;
  323. case NUdf::EDataSlot::Uint8:
  324. value = NUdf::TUnboxedValuePod(ReadUnaligned<ui8>(buffPtr)); break;
  325. case NUdf::EDataSlot::Int16:
  326. value = NUdf::TUnboxedValuePod(ReadUnaligned<i16>(buffPtr)); break;
  327. case NUdf::EDataSlot::Uint16:
  328. value = NUdf::TUnboxedValuePod(ReadUnaligned<ui16>(buffPtr)); break;
  329. case NUdf::EDataSlot::Int32:
  330. value = NUdf::TUnboxedValuePod(ReadUnaligned<i32>(buffPtr)); break;
  331. case NUdf::EDataSlot::Uint32:
  332. value = NUdf::TUnboxedValuePod(ReadUnaligned<ui32>(buffPtr)); break;
  333. case NUdf::EDataSlot::Int64:
  334. value = NUdf::TUnboxedValuePod(ReadUnaligned<i64>(buffPtr)); break;
  335. case NUdf::EDataSlot::Uint64:
  336. value = NUdf::TUnboxedValuePod(ReadUnaligned<ui64>(buffPtr)); break;
  337. case NUdf::EDataSlot::Float:
  338. value = NUdf::TUnboxedValuePod(ReadUnaligned<float>(buffPtr)); break;
  339. case NUdf::EDataSlot::Double:
  340. value = NUdf::TUnboxedValuePod(ReadUnaligned<double>(buffPtr)); break;
  341. case NUdf::EDataSlot::Date:
  342. value = NUdf::TUnboxedValuePod(ReadUnaligned<ui16>(buffPtr)); break;
  343. case NUdf::EDataSlot::Datetime:
  344. value = NUdf::TUnboxedValuePod(ReadUnaligned<ui32>(buffPtr)); break;
  345. case NUdf::EDataSlot::Timestamp:
  346. value = NUdf::TUnboxedValuePod(ReadUnaligned<ui64>(buffPtr)); break;
  347. case NUdf::EDataSlot::Interval:
  348. value = NUdf::TUnboxedValuePod(ReadUnaligned<i64>(buffPtr)); break;
  349. case NUdf::EDataSlot::Date32:
  350. value = NUdf::TUnboxedValuePod(ReadUnaligned<i32>(buffPtr)); break;
  351. case NUdf::EDataSlot::Datetime64:
  352. value = NUdf::TUnboxedValuePod(ReadUnaligned<i64>(buffPtr)); break;
  353. case NUdf::EDataSlot::Timestamp64:
  354. value = NUdf::TUnboxedValuePod(ReadUnaligned<i64>(buffPtr)); break;
  355. case NUdf::EDataSlot::Interval64:
  356. value = NUdf::TUnboxedValuePod(ReadUnaligned<i64>(buffPtr)); break;
  357. case NUdf::EDataSlot::TzDate:
  358. {
  359. value = NUdf::TUnboxedValuePod(ReadUnaligned<ui16>(buffPtr));
  360. value.SetTimezoneId(ReadUnaligned<ui16>(buffPtr + sizeof(ui16))) ;
  361. break;
  362. }
  363. case NUdf::EDataSlot::TzDatetime:
  364. {
  365. value = NUdf::TUnboxedValuePod(ReadUnaligned<ui32>(buffPtr));
  366. value.SetTimezoneId(ReadUnaligned<ui16>(buffPtr + sizeof(ui32)));
  367. break;
  368. }
  369. case NUdf::EDataSlot::TzTimestamp:
  370. {
  371. value = NUdf::TUnboxedValuePod(ReadUnaligned<ui64>(buffPtr));
  372. value.SetTimezoneId(ReadUnaligned<ui16>(buffPtr + sizeof(ui64))) ;
  373. break;
  374. }
  375. case NUdf::EDataSlot::Decimal:
  376. {
  377. const auto des = NYql::NDecimal::Deserialize(buffPtr, sizeof(NYql::NDecimal::TInt128));
  378. MKQL_ENSURE(!NYql::NDecimal::IsError(des.first), "Bad packed data: invalid decimal.");
  379. value = NUdf::TUnboxedValuePod(des.first);
  380. break;
  381. }
  382. default:
  383. {
  384. value = MakeString(NUdf::TStringRef(TupleStrings[offset], TupleStrSizes[offset]));
  385. }
  386. }
  387. }
  388. }
  389. TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory, bool isAny) :
  390. ColumnTypes(columnTypes)
  391. , HolderFactory(holderFactory)
  392. , IsAny(isAny) {
  393. ui64 nColumns = ColumnTypes.size();
  394. ui64 nKeyColumns = keyColumns.size();
  395. for (ui32 i = 0; i < keyColumns.size(); i++ ) {
  396. auto colType = columnTypes[keyColumns[i]];
  397. auto packInfo = GetPackInfo(colType);
  398. packInfo.ColumnIdx = keyColumns[i];
  399. packInfo.IsKeyColumn = true;
  400. ColumnsPackInfo.push_back(packInfo);
  401. }
  402. for ( ui32 i = 0; i < columnTypes.size(); i++ ) {
  403. auto colType = columnTypes[i];
  404. auto packInfo = GetPackInfo(colType);
  405. packInfo.ColumnIdx = i;
  406. ui32 keyColNums = std::count_if(keyColumns.begin(), keyColumns.end(), [&](ui32 k) {return k == i;});
  407. Packers.push_back(std::make_shared<TValuePacker>(true,colType));
  408. if (keyColNums == 0) {
  409. ColumnsPackInfo.push_back(packInfo);
  410. }
  411. }
  412. nColumns = ColumnsPackInfo.size();
  413. ui64 totalIntColumnsNum = std::count_if(ColumnsPackInfo.begin(), ColumnsPackInfo.end(), [](TColumnDataPackInfo a) { return !a.IsString && !a.IsPgType; });
  414. ui64 totalIColumnsNum = std::count_if(ColumnsPackInfo.begin(), ColumnsPackInfo.end(), [](TColumnDataPackInfo a) { return a.IsIType; });
  415. ui64 totalStrColumnsNum = nColumns - totalIntColumnsNum - totalIColumnsNum;
  416. ui64 keyIntColumnsNum = std::count_if(ColumnsPackInfo.begin(), ColumnsPackInfo.end(), [](TColumnDataPackInfo a) { return (a.IsKeyColumn && !a.IsString && !a.IsPgType);});
  417. ui64 keyIColumnsNum = std::count_if(ColumnsPackInfo.begin(), ColumnsPackInfo.end(), [](TColumnDataPackInfo a) { return (a.IsKeyColumn && a.IsIType);});
  418. ui64 keyStrColumnsNum = nKeyColumns - keyIntColumnsNum - keyIColumnsNum;
  419. TotalColumnsNum = nColumns;
  420. TotalIntColumnsNum = totalIntColumnsNum;
  421. TotalStrColumnsNum = totalStrColumnsNum;
  422. TotalIColumnsNum = totalIColumnsNum;
  423. KeyIntColumnsNum = keyIntColumnsNum;
  424. KeyStrColumnsNum = keyStrColumnsNum;
  425. KeyIColumnsNum = keyIColumnsNum;
  426. DataIntColumnsNum = TotalIntColumnsNum - KeyIntColumnsNum;
  427. DataStrColumnsNum = TotalStrColumnsNum - KeyStrColumnsNum;
  428. DataIColumnsNum = TotalIColumnsNum - KeyIColumnsNum;
  429. NullsBitmapSize = ( (nColumns + 1)/ (8 * sizeof(ui64)) + 1) ;
  430. TupleIntVals.resize(2 * totalIntColumnsNum + NullsBitmapSize);
  431. TupleStrings.resize(totalStrColumnsNum);
  432. TupleStrSizes.resize(totalStrColumnsNum);
  433. JoinTupleData.IntColumns = TupleIntVals.data();
  434. JoinTupleData.StrColumns = TupleStrings.data();
  435. JoinTupleData.StrSizes = TupleStrSizes.data();
  436. TupleHolder.resize(nColumns);
  437. TupleStringHolder.resize(nColumns);
  438. IColumnsHolder.resize(nColumns);
  439. JoinTupleData.IColumns = IColumnsHolder.data();
  440. std::transform(TupleHolder.begin(), TupleHolder.end(), std::back_inserter(TuplePtrs), [](NUdf::TUnboxedValue& v) { return std::addressof(v); });
  441. ui32 currIntOffset = NullsBitmapSize * sizeof(ui64) ;
  442. ui32 currStrOffset = 0;
  443. ui32 currIOffset = 0;
  444. std::vector<GraceJoin::TColTypeInterface> ctiv;
  445. bool prevKeyColumn = false;
  446. ui32 keyIntOffset = currIntOffset;
  447. for( auto & p: ColumnsPackInfo ) {
  448. if ( !p.IsString && !p.IsIType ) {
  449. if (prevKeyColumn && !p.IsKeyColumn) {
  450. currIntOffset = ( (currIntOffset + sizeof(ui64) - 1) / sizeof(ui64) ) * sizeof(ui64);
  451. }
  452. prevKeyColumn = p.IsKeyColumn;
  453. p.Offset = currIntOffset;
  454. currIntOffset += p.Bytes;
  455. if (p.IsKeyColumn) {
  456. keyIntOffset = currIntOffset;
  457. }
  458. } else if ( p.IsString ) {
  459. p.Offset = currStrOffset;
  460. currStrOffset++;
  461. } else if (p.IsIType) {
  462. p.Offset = currIOffset;
  463. currIOffset++;
  464. GraceJoin::TColTypeInterface cti{ MakeHashImpl(p.MKQLType), MakeEquateImpl(p.MKQLType), std::make_shared<TValuePacker>(true, p.MKQLType) , HolderFactory };
  465. ColumnInterfaces.push_back(cti);
  466. }
  467. }
  468. PackedKeyIntColumnsNum = (keyIntOffset + sizeof(ui64) - 1 ) / sizeof(ui64) - NullsBitmapSize;
  469. PackedDataIntColumnsNum = (currIntOffset + sizeof(ui64) - 1) / sizeof(ui64) - PackedKeyIntColumnsNum - NullsBitmapSize;
  470. GraceJoin::TColTypeInterface * cti_p = nullptr;
  471. if (TotalIColumnsNum > 0 ) {
  472. cti_p = ColumnInterfaces.data();
  473. }
  474. TablePtr = std::make_unique<GraceJoin::TTable>(
  475. PackedKeyIntColumnsNum, KeyStrColumnsNum, PackedDataIntColumnsNum,
  476. DataStrColumnsNum, KeyIColumnsNum, DataIColumnsNum, NullsBitmapSize, cti_p, IsAny );
  477. }
  478. class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpillingSupportState> {
  479. using TBase = TComputationValue<TGraceJoinSpillingSupportState>;
  480. enum class EOperatingMode {
  481. InMemory,
  482. Spilling,
  483. ProcessSpilled
  484. };
  485. public:
  486. TGraceJoinSpillingSupportState(TMemoryUsageInfo* memInfo,
  487. IComputationWideFlowNode* flowLeft, IComputationWideFlowNode* flowRight,
  488. EJoinKind joinKind, EAnyJoinSettings anyJoinSettings, const std::vector<ui32>& leftKeyColumns, const std::vector<ui32>& rightKeyColumns,
  489. const std::vector<ui32>& leftRenames, const std::vector<ui32>& rightRenames,
  490. const std::vector<TType*>& leftColumnsTypes, const std::vector<TType*>& rightColumnsTypes, TComputationContext& ctx,
  491. const bool isSelfJoin, bool isSpillingAllowed)
  492. : TBase(memInfo)
  493. , FlowLeft(flowLeft)
  494. , FlowRight(flowRight)
  495. , JoinKind(joinKind)
  496. , LeftKeyColumns(leftKeyColumns)
  497. , RightKeyColumns(rightKeyColumns)
  498. , LeftRenames(leftRenames)
  499. , RightRenames(rightRenames)
  500. , LeftPacker(std::make_unique<TGraceJoinPacker>(leftColumnsTypes, leftKeyColumns, ctx.HolderFactory, (anyJoinSettings == EAnyJoinSettings::Left || anyJoinSettings == EAnyJoinSettings::Both || joinKind == EJoinKind::RightSemi || joinKind == EJoinKind::RightOnly)))
  501. , RightPacker(std::make_unique<TGraceJoinPacker>(rightColumnsTypes, rightKeyColumns, ctx.HolderFactory, (anyJoinSettings == EAnyJoinSettings::Right || anyJoinSettings == EAnyJoinSettings::Both || joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly)))
  502. , JoinedTablePtr(std::make_unique<GraceJoin::TTable>())
  503. , JoinCompleted(std::make_unique<bool>(false))
  504. , PartialJoinCompleted(std::make_unique<bool>(false))
  505. , HaveMoreLeftRows(std::make_unique<bool>(true))
  506. , HaveMoreRightRows(std::make_unique<bool>(true))
  507. , IsSelfJoin_(isSelfJoin)
  508. , SelfJoinSameKeys_(isSelfJoin && (leftKeyColumns == rightKeyColumns))
  509. , IsSpillingAllowed(isSpillingAllowed)
  510. {
  511. YQL_LOG(GRACEJOIN_DEBUG) << (const void *)&*JoinedTablePtr << "# AnyJoinSettings=" << (int)anyJoinSettings << " JoinKind=" << (int)joinKind;
  512. if (IsSelfJoin_) {
  513. LeftPacker->BatchSize = std::numeric_limits<ui64>::max();
  514. RightPacker->BatchSize = std::numeric_limits<ui64>::max();
  515. }
  516. if (ctx.CountersProvider) {
  517. // id will be assigned externally in future versions
  518. TString id = TString(Operator_Join) + "0";
  519. CounterOutputRows_ = ctx.CountersProvider->GetCounter(id, Counter_OutputRows, false);
  520. }
  521. }
  522. EFetchResult FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) {
  523. while (true) {
  524. switch(GetMode()) {
  525. case EOperatingMode::InMemory: {
  526. auto r = DoCalculateInMemory(ctx, output);
  527. if (GetMode() == EOperatingMode::InMemory) {
  528. return r;
  529. }
  530. break;
  531. }
  532. case EOperatingMode::Spilling: {
  533. auto r = DoCalculateWithSpilling(ctx, output);
  534. if (r == EFetchResult::One)
  535. return r;
  536. if (GetMode() == EOperatingMode::Spilling) {
  537. return EFetchResult::Yield;
  538. }
  539. break;
  540. }
  541. case EOperatingMode::ProcessSpilled: {
  542. return ProcessSpilledData(ctx, output);
  543. }
  544. }
  545. }
  546. Y_UNREACHABLE();
  547. }
  548. private:
  549. EOperatingMode GetMode() const {
  550. return Mode;
  551. }
  552. bool HasMemoryForProcessing() const {
  553. return !TlsAllocState->IsMemoryYellowZoneEnabled();
  554. }
  555. bool IsSwitchToSpillingModeCondition() const {
  556. return !HasMemoryForProcessing();
  557. }
  558. void SwitchMode(EOperatingMode mode, TComputationContext& ctx) {
  559. LogMemoryUsage();
  560. switch(mode) {
  561. case EOperatingMode::InMemory: {
  562. YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to InMemory";
  563. MKQL_ENSURE(false, "Internal logic error");
  564. break;
  565. }
  566. case EOperatingMode::Spilling: {
  567. YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to Spilling";
  568. MKQL_ENSURE(EOperatingMode::InMemory == Mode, "Internal logic error");
  569. auto spiller = ctx.SpillerFactory->CreateSpiller();
  570. RightPacker->TablePtr->InitializeBucketSpillers(spiller);
  571. LeftPacker->TablePtr->InitializeBucketSpillers(spiller);
  572. break;
  573. }
  574. case EOperatingMode::ProcessSpilled: {
  575. YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to ProcessSpilled";
  576. SpilledBucketsJoinOrder.reserve(GraceJoin::NumberOfBuckets);
  577. for (ui32 i = 0; i < GraceJoin::NumberOfBuckets; ++i) SpilledBucketsJoinOrder.push_back(i);
  578. std::sort(SpilledBucketsJoinOrder.begin(), SpilledBucketsJoinOrder.end(), [&](ui32 lhs, ui32 rhs) {
  579. auto lhs_in_memory = LeftPacker->TablePtr->IsBucketInMemory(lhs) + RightPacker->TablePtr->IsBucketInMemory(lhs);
  580. auto rhs_in_memory = LeftPacker->TablePtr->IsBucketInMemory(rhs) + RightPacker->TablePtr->IsBucketInMemory(rhs);
  581. return lhs_in_memory > rhs_in_memory;
  582. });
  583. MKQL_ENSURE(EOperatingMode::Spilling == Mode, "Internal logic error");
  584. break;
  585. }
  586. }
  587. Mode = mode;
  588. }
  589. EFetchResult FetchAndPackData(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) {
  590. const NKikimr::NMiniKQL::EFetchResult resultLeft = FlowLeft->FetchValues(ctx, LeftPacker->TuplePtrs.data());
  591. NKikimr::NMiniKQL::EFetchResult resultRight;
  592. if (resultLeft == EFetchResult::One) {
  593. if (LeftPacker->TuplesPacked == 0) {
  594. LeftPacker->StartTime = std::chrono::system_clock::now();
  595. }
  596. LeftPacker->Pack();
  597. {
  598. auto added = LeftPacker->TablePtr->AddTuple(LeftPacker->TupleIntVals.data(), LeftPacker->TupleStrings.data(), LeftPacker->TupleStrSizes.data(), LeftPacker->IColumnsHolder.data(), *RightPacker->TablePtr);
  599. if (added == GraceJoin::TTable::EAddTupleResult::Added)
  600. ++LeftPacker->TuplesBatchPacked;
  601. else if (added == GraceJoin::TTable::EAddTupleResult::AnyMatch)
  602. ; // row dropped
  603. else if (JoinKind == EJoinKind::Inner || JoinKind == EJoinKind::Right || JoinKind == EJoinKind::RightSemi || JoinKind == EJoinKind::RightOnly || JoinKind == EJoinKind::LeftSemi)
  604. ; // row dropped
  605. else { // Left, LeftOnly, Full, Exclusion: output row
  606. for (size_t i = 0; i < LeftRenames.size() / 2; i++) {
  607. auto & valPtr = output[LeftRenames[2 * i + 1]];
  608. if ( valPtr ) {
  609. *valPtr = *LeftPacker->TuplePtrs[LeftRenames[2 * i]];
  610. }
  611. }
  612. for (size_t i = 0; i < RightRenames.size() / 2; i++) {
  613. auto & valPtr = output[RightRenames[2 * i + 1]];
  614. if ( valPtr ) {
  615. *valPtr = NYql::NUdf::TUnboxedValue();
  616. }
  617. }
  618. CounterOutputRows_.Inc();
  619. return EFetchResult::One;
  620. }
  621. }
  622. }
  623. if (IsSelfJoin_) {
  624. resultRight = resultLeft;
  625. if (!SelfJoinSameKeys_) {
  626. std::copy_n(LeftPacker->TupleHolder.begin(), LeftPacker->TotalColumnsNum, RightPacker->TupleHolder.begin());
  627. }
  628. } else {
  629. resultRight = FlowRight->FetchValues(ctx, RightPacker->TuplePtrs.data());
  630. }
  631. if (resultRight == EFetchResult::One) {
  632. if (RightPacker->TuplesPacked == 0) {
  633. RightPacker->StartTime = std::chrono::system_clock::now();
  634. }
  635. if ( !SelfJoinSameKeys_ ) {
  636. RightPacker->Pack();
  637. auto added = RightPacker->TablePtr->AddTuple(RightPacker->TupleIntVals.data(), RightPacker->TupleStrings.data(), RightPacker->TupleStrSizes.data(), RightPacker->IColumnsHolder.data(), *LeftPacker->TablePtr);
  638. if (added == GraceJoin::TTable::EAddTupleResult::Added)
  639. ++RightPacker->TuplesBatchPacked;
  640. else if (added == GraceJoin::TTable::EAddTupleResult::AnyMatch)
  641. ; // row dropped
  642. else if (JoinKind == EJoinKind::Inner || JoinKind == EJoinKind::Left || JoinKind == EJoinKind::LeftSemi || JoinKind == EJoinKind::LeftOnly || JoinKind == EJoinKind::RightSemi)
  643. ; // row dropped
  644. else { // Right, RightOnly, Full, Exclusion: output row
  645. for (size_t i = 0; i < LeftRenames.size() / 2; i++) {
  646. auto & valPtr = output[LeftRenames[2 * i + 1]];
  647. if ( valPtr ) {
  648. *valPtr = NYql::NUdf::TUnboxedValue();
  649. }
  650. }
  651. for (size_t i = 0; i < RightRenames.size() / 2; i++) {
  652. auto & valPtr = output[RightRenames[2 * i + 1]];
  653. if ( valPtr ) {
  654. *valPtr = *RightPacker->TuplePtrs[RightRenames[2 * i]];
  655. }
  656. }
  657. CounterOutputRows_.Inc();
  658. return EFetchResult::One;
  659. }
  660. }
  661. }
  662. if (resultLeft == EFetchResult::Yield || resultRight == EFetchResult::Yield) {
  663. return EFetchResult::Yield;
  664. }
  665. if (resultLeft == EFetchResult::Finish ) {
  666. *HaveMoreLeftRows = false;
  667. }
  668. if (resultRight == EFetchResult::Finish ) {
  669. *HaveMoreRightRows = false;
  670. }
  671. return EFetchResult::Finish;
  672. }
  673. void UnpackJoinedData(NUdf::TUnboxedValue*const* output) {
  674. LeftPacker->UnPack();
  675. RightPacker->UnPack();
  676. auto &valsLeft = LeftPacker->TupleHolder;
  677. auto &valsRight = RightPacker->TupleHolder;
  678. for (size_t i = 0; i < LeftRenames.size() / 2; i++) {
  679. auto & valPtr = output[LeftRenames[2 * i + 1]];
  680. if ( valPtr ) {
  681. *valPtr = valsLeft[LeftRenames[2 * i]];
  682. }
  683. }
  684. for (size_t i = 0; i < RightRenames.size() / 2; i++) {
  685. auto & valPtr = output[RightRenames[2 * i + 1]];
  686. if ( valPtr ) {
  687. *valPtr = valsRight[RightRenames[2 * i]];
  688. }
  689. }
  690. CounterOutputRows_.Inc();
  691. }
  692. void LogMemoryUsage() const {
  693. const auto used = TlsAllocState->GetUsed();
  694. const auto limit = TlsAllocState->GetLimit();
  695. TStringBuilder logmsg;
  696. logmsg << "Memory usage: ";
  697. if (limit) {
  698. logmsg << (used*100/limit) << "%=";
  699. }
  700. logmsg << (used/1_MB) << "MB/" << (limit/1_MB) << "MB";
  701. YQL_LOG(INFO) << logmsg;
  702. }
  703. EFetchResult DoCalculateInMemory(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) {
  704. // Collecting data for join and perform join (batch or full)
  705. while (!*JoinCompleted ) {
  706. if ( *PartialJoinCompleted) {
  707. // Returns join results (batch or full)
  708. while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData)) {
  709. UnpackJoinedData(output);
  710. return EFetchResult::One;
  711. }
  712. // Resets batch state for batch join
  713. if (!*HaveMoreRightRows) {
  714. *PartialJoinCompleted = false;
  715. LeftPacker->TuplesBatchPacked = 0;
  716. LeftPacker->TablePtr->Clear(); // Clear table content, ready to collect data for next batch
  717. JoinedTablePtr->Clear();
  718. JoinedTablePtr->ResetIterator();
  719. }
  720. if (!*HaveMoreLeftRows ) {
  721. *PartialJoinCompleted = false;
  722. RightPacker->TuplesBatchPacked = 0;
  723. RightPacker->TablePtr->Clear(); // Clear table content, ready to collect data for next batch
  724. JoinedTablePtr->Clear();
  725. JoinedTablePtr->ResetIterator();
  726. }
  727. }
  728. if (!*HaveMoreRightRows && !*HaveMoreLeftRows) {
  729. *JoinCompleted = true;
  730. break;
  731. }
  732. auto isYield = FetchAndPackData(ctx, output);
  733. if (isYield == EFetchResult::One)
  734. return isYield;
  735. if (IsSpillingAllowed && ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) {
  736. SwitchMode(EOperatingMode::Spilling, ctx);
  737. return EFetchResult::Yield;
  738. }
  739. if (isYield != EFetchResult::Finish) return isYield;
  740. if (!*PartialJoinCompleted && (
  741. (!*HaveMoreRightRows && (!*HaveMoreLeftRows || LeftPacker->TuplesBatchPacked >= LeftPacker->BatchSize )) ||
  742. (!*HaveMoreLeftRows && RightPacker->TuplesBatchPacked >= RightPacker->BatchSize))) {
  743. YQL_LOG(GRACEJOIN_TRACE)
  744. << (const void *)&*JoinedTablePtr << '#'
  745. << " HaveLeft " << *HaveMoreLeftRows << " LeftPacked " << LeftPacker->TuplesBatchPacked << " LeftBatch " << LeftPacker->BatchSize
  746. << " HaveRight " << *HaveMoreRightRows << " RightPacked " << RightPacker->TuplesBatchPacked << " RightBatch " << RightPacker->BatchSize
  747. ;
  748. auto& leftTable = *LeftPacker->TablePtr;
  749. auto& rightTable = SelfJoinSameKeys_ ? *LeftPacker->TablePtr : *RightPacker->TablePtr;
  750. if (IsSpillingAllowed && ctx.SpillerFactory && !JoinedTablePtr->TryToPreallocateMemoryForJoin(leftTable, rightTable, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows)) {
  751. SwitchMode(EOperatingMode::Spilling, ctx);
  752. return EFetchResult::Yield;
  753. }
  754. *PartialJoinCompleted = true;
  755. LeftPacker->StartTime = std::chrono::system_clock::now();
  756. RightPacker->StartTime = std::chrono::system_clock::now();
  757. JoinedTablePtr->Join(leftTable, rightTable, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows);
  758. JoinedTablePtr->ResetIterator();
  759. LeftPacker->EndTime = std::chrono::system_clock::now();
  760. RightPacker->EndTime = std::chrono::system_clock::now();
  761. }
  762. }
  763. return EFetchResult::Finish;
  764. }
  765. bool TryToReduceMemoryAndWait() {
  766. if (!IsSpillingFinished()) return true;
  767. i32 largestBucketsPairIndex = 0;
  768. ui64 largestBucketsPairSize = 0;
  769. for (ui32 bucket = 0; bucket < GraceJoin::NumberOfBuckets; ++bucket) {
  770. ui64 leftBucketSize = LeftPacker->TablePtr->GetSizeOfBucket(bucket);
  771. ui64 rightBucketSize = RightPacker->TablePtr->GetSizeOfBucket(bucket);
  772. ui64 totalSize = leftBucketSize + rightBucketSize;
  773. if (totalSize > largestBucketsPairSize) {
  774. largestBucketsPairSize = totalSize;
  775. largestBucketsPairIndex = bucket;
  776. }
  777. }
  778. bool isWaitingLeftForReduce = LeftPacker->TablePtr->TryToReduceMemoryAndWait(largestBucketsPairIndex);
  779. bool isWaitingRightForReduce = RightPacker->TablePtr->TryToReduceMemoryAndWait(largestBucketsPairIndex);
  780. return isWaitingLeftForReduce || isWaitingRightForReduce;
  781. }
  782. void UpdateSpilling() {
  783. LeftPacker->TablePtr->UpdateSpilling();
  784. RightPacker->TablePtr->UpdateSpilling();
  785. }
  786. bool IsSpillingFinished() const {
  787. return LeftPacker->TablePtr->IsSpillingFinished() && RightPacker->TablePtr->IsSpillingFinished();
  788. }
  789. bool IsReadyForSpilledDataProcessing() const {
  790. return LeftPacker->TablePtr->IsSpillingAcceptingDataRequests() && RightPacker->TablePtr->IsSpillingAcceptingDataRequests();
  791. }
  792. bool IsRestoringSpilledBuckets() const {
  793. return LeftPacker->TablePtr->IsRestoringSpilledBuckets() || RightPacker->TablePtr->IsRestoringSpilledBuckets();
  794. }
  795. EFetchResult DoCalculateWithSpilling(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) {
  796. UpdateSpilling();
  797. ui32 cnt = 0;
  798. while (*HaveMoreLeftRows || *HaveMoreRightRows) {
  799. if ((cnt++ % GraceJoin::SpillingRowLimit) == 0) {
  800. if (!HasMemoryForProcessing() && !IsSpillingFinalized) {
  801. bool isWaitingForReduce = TryToReduceMemoryAndWait();
  802. if (isWaitingForReduce) return EFetchResult::Yield;
  803. }
  804. }
  805. auto isYield = FetchAndPackData(ctx, output);
  806. if (isYield != EFetchResult::Finish) return isYield;
  807. }
  808. if (!*HaveMoreLeftRows && !*HaveMoreRightRows) {
  809. if (!IsSpillingFinished()) return EFetchResult::Yield;
  810. if (!IsSpillingFinalized) {
  811. LeftPacker->TablePtr->FinalizeSpilling();
  812. RightPacker->TablePtr->FinalizeSpilling();
  813. IsSpillingFinalized = true;
  814. UpdateSpilling();
  815. }
  816. if (!IsReadyForSpilledDataProcessing()) return EFetchResult::Yield;
  817. SwitchMode(EOperatingMode::ProcessSpilled, ctx);
  818. return EFetchResult::Finish;
  819. }
  820. return EFetchResult::Yield;
  821. }
  822. EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const* output) {
  823. while (SpilledBucketsJoinOrderCurrentIndex != GraceJoin::NumberOfBuckets) {
  824. UpdateSpilling();
  825. if (IsRestoringSpilledBuckets()) return EFetchResult::Yield;
  826. ui32 nextBucketToJoin = SpilledBucketsJoinOrder[SpilledBucketsJoinOrderCurrentIndex];
  827. if (LeftPacker->TablePtr->IsSpilledBucketWaitingForExtraction(nextBucketToJoin)) {
  828. LeftPacker->TablePtr->PrepareBucket(nextBucketToJoin);
  829. }
  830. if (RightPacker->TablePtr->IsSpilledBucketWaitingForExtraction(nextBucketToJoin)) {
  831. RightPacker->TablePtr->PrepareBucket(nextBucketToJoin);
  832. }
  833. if (!LeftPacker->TablePtr->IsBucketInMemory(nextBucketToJoin)) {
  834. LeftPacker->TablePtr->StartLoadingBucket(nextBucketToJoin);
  835. }
  836. if (!RightPacker->TablePtr->IsBucketInMemory(nextBucketToJoin)) {
  837. RightPacker->TablePtr->StartLoadingBucket(nextBucketToJoin);
  838. }
  839. if (LeftPacker->TablePtr->IsBucketInMemory(nextBucketToJoin) && RightPacker->TablePtr->IsBucketInMemory(nextBucketToJoin)) {
  840. if (*PartialJoinCompleted) {
  841. while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData, nextBucketToJoin + 1)) {
  842. UnpackJoinedData(output);
  843. return EFetchResult::One;
  844. }
  845. LeftPacker->TuplesBatchPacked = 0;
  846. LeftPacker->TablePtr->ClearBucket(nextBucketToJoin); // Clear content of returned bucket
  847. LeftPacker->TablePtr->ShrinkBucket(nextBucketToJoin);
  848. RightPacker->TuplesBatchPacked = 0;
  849. RightPacker->TablePtr->ClearBucket(nextBucketToJoin); // Clear content of returned bucket
  850. RightPacker->TablePtr->ShrinkBucket(nextBucketToJoin);
  851. JoinedTablePtr->Clear();
  852. JoinedTablePtr->ResetIterator();
  853. *PartialJoinCompleted = false;
  854. SpilledBucketsJoinOrderCurrentIndex++;
  855. } else {
  856. *PartialJoinCompleted = true;
  857. LeftPacker->StartTime = std::chrono::system_clock::now();
  858. RightPacker->StartTime = std::chrono::system_clock::now();
  859. if ( SelfJoinSameKeys_ ) {
  860. JoinedTablePtr->Join(*LeftPacker->TablePtr, *LeftPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows, nextBucketToJoin, nextBucketToJoin+1);
  861. } else {
  862. JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows, nextBucketToJoin, nextBucketToJoin+1);
  863. }
  864. JoinedTablePtr->ResetIterator();
  865. LeftPacker->EndTime = std::chrono::system_clock::now();
  866. RightPacker->EndTime = std::chrono::system_clock::now();
  867. }
  868. }
  869. }
  870. return EFetchResult::Finish;
  871. }
  872. private:
  873. EOperatingMode Mode = EOperatingMode::InMemory;
  874. IComputationWideFlowNode* const FlowLeft;
  875. IComputationWideFlowNode* const FlowRight;
  876. const EJoinKind JoinKind;
  877. const std::vector<ui32> LeftKeyColumns;
  878. const std::vector<ui32> RightKeyColumns;
  879. const std::vector<ui32> LeftRenames;
  880. const std::vector<ui32> RightRenames;
  881. const std::vector<TType *> LeftColumnsTypes;
  882. const std::vector<TType *> RightColumnsTypes;
  883. const std::unique_ptr<TGraceJoinPacker> LeftPacker;
  884. const std::unique_ptr<TGraceJoinPacker> RightPacker;
  885. const std::unique_ptr<GraceJoin::TTable> JoinedTablePtr;
  886. const std::unique_ptr<bool> JoinCompleted;
  887. const std::unique_ptr<bool> PartialJoinCompleted;
  888. const std::unique_ptr<bool> HaveMoreLeftRows;
  889. const std::unique_ptr<bool> HaveMoreRightRows;
  890. const bool IsSelfJoin_;
  891. const bool SelfJoinSameKeys_;
  892. const bool IsSpillingAllowed;
  893. bool IsSpillingFinalized = false;
  894. NYql::NUdf::TCounter CounterOutputRows_;
  895. ui32 SpilledBucketsJoinOrderCurrentIndex = 0;
  896. std::vector<ui32> SpilledBucketsJoinOrder;
  897. };
  898. class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWrapper> {
  899. using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TGraceJoinWrapper>;
  900. public:
  901. TGraceJoinWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flowLeft, IComputationWideFlowNode* flowRight,
  902. EJoinKind joinKind, EAnyJoinSettings anyJoinSettings, std::vector<ui32>&& leftKeyColumns, std::vector<ui32>&& rightKeyColumns,
  903. std::vector<ui32>&& leftRenames, std::vector<ui32>&& rightRenames,
  904. std::vector<TType*>&& leftColumnsTypes, std::vector<TType*>&& rightColumnsTypes,
  905. std::vector<EValueRepresentation>&& outputRepresentations, bool isSelfJoin, bool isSpillingAllowed)
  906. : TBaseComputation(mutables, nullptr, EValueRepresentation::Boxed)
  907. , FlowLeft(flowLeft)
  908. , FlowRight(flowRight)
  909. , JoinKind(joinKind)
  910. , AnyJoinSettings_(anyJoinSettings)
  911. , LeftKeyColumns(std::move(leftKeyColumns))
  912. , RightKeyColumns(std::move(rightKeyColumns))
  913. , LeftRenames(std::move(leftRenames))
  914. , RightRenames(std::move(rightRenames))
  915. , LeftColumnsTypes(std::move(leftColumnsTypes))
  916. , RightColumnsTypes(std::move(rightColumnsTypes))
  917. , OutputRepresentations(std::move(outputRepresentations))
  918. , IsSelfJoin_(isSelfJoin)
  919. , IsSpillingAllowed(isSpillingAllowed)
  920. {}
  921. EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
  922. if (state.IsInvalid()) {
  923. MakeSpillingSupportState(ctx, state);
  924. }
  925. return static_cast<TGraceJoinSpillingSupportState*>(state.AsBoxed().Get())->FetchValues(ctx, output);
  926. }
  927. #ifndef MKQL_DISABLE_CODEGEN
  928. ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
  929. auto& context = ctx.Codegen.GetContext();
  930. const auto valueType = Type::getInt128Ty(context);
  931. const auto indexType = Type::getInt32Ty(context);
  932. const auto arrayType = ArrayType::get(valueType, OutputRepresentations.size());
  933. const auto fieldsType = ArrayType::get(PointerType::getUnqual(valueType), OutputRepresentations.size());
  934. const auto atTop = &ctx.Func->getEntryBlock().back();
  935. const auto values = new AllocaInst(arrayType, 0U, "values", atTop);
  936. const auto fields = new AllocaInst(fieldsType, 0U, "fields", atTop);
  937. ICodegeneratorInlineWideNode::TGettersList getters(OutputRepresentations.size());
  938. Value* initV = UndefValue::get(arrayType);
  939. Value* initF = UndefValue::get(fieldsType);
  940. std::vector<Value*> pointers;
  941. pointers.reserve(getters.size());
  942. for (auto i = 0U; i < getters.size(); ++i) {
  943. pointers.emplace_back(GetElementPtrInst::CreateInBounds(arrayType, values, {ConstantInt::get(indexType, 0), ConstantInt::get(indexType, i)}, (TString("ptr_") += ToString(i)).c_str(), atTop));
  944. initV = InsertValueInst::Create(initV, ConstantInt::get(valueType, 0), {i}, (TString("zero_") += ToString(i)).c_str(), atTop);
  945. initF = InsertValueInst::Create(initF, pointers.back(), {i}, (TString("insert_") += ToString(i)).c_str(), atTop);
  946. getters[i] = [i, values, indexType, arrayType, valueType](const TCodegenContext& ctx, BasicBlock*& block) {
  947. Y_UNUSED(ctx);
  948. const auto pointer = GetElementPtrInst::CreateInBounds(arrayType, values, {ConstantInt::get(indexType, 0), ConstantInt::get(indexType, i)}, (TString("ptr_") += ToString(i)).c_str(), block);
  949. return new LoadInst(valueType, pointer, (TString("load_") += ToString(i)).c_str(), block);
  950. };
  951. }
  952. new StoreInst(initV, values, atTop);
  953. new StoreInst(initF, fields, atTop);
  954. TLLVMFieldsStructure<TComputationValue<TNull>> fieldsStruct(context);
  955. const auto stateType = StructType::get(context, fieldsStruct.GetFieldsArray());
  956. const auto statePtrType = PointerType::getUnqual(stateType);
  957. const auto make = BasicBlock::Create(context, "make", ctx.Func);
  958. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  959. BranchInst::Create(make, main, IsInvalid(statePtr, block, context), block);
  960. block = make;
  961. const auto ptrType = PointerType::getUnqual(StructType::get(context));
  962. const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
  963. const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TGraceJoinWrapper::MakeSpillingSupportState));
  964. const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false);
  965. const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block);
  966. CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block);
  967. BranchInst::Create(main, block);
  968. block = main;
  969. for (ui32 i = 0U; i < OutputRepresentations.size(); ++i) {
  970. ValueCleanup(OutputRepresentations[i], pointers[i], ctx, block);
  971. }
  972. new StoreInst(initV, values, block);
  973. const auto state = new LoadInst(valueType, statePtr, "state", block);
  974. const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
  975. const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block);
  976. const auto func = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TGraceJoinSpillingSupportState::FetchValues));
  977. const auto funcType = FunctionType::get(Type::getInt32Ty(context), { statePtrType, ctx.Ctx->getType(), fields->getType() }, false);
  978. const auto funcPtr = CastInst::Create(Instruction::IntToPtr, func, PointerType::getUnqual(funcType), "fetch_func", block);
  979. const auto result = CallInst::Create(funcType, funcPtr, { stateArg, ctx.Ctx, fields }, "fetch", block);
  980. for (ui32 i = 0U; i < OutputRepresentations.size(); ++i) {
  981. ValueRelease(OutputRepresentations[i], pointers[i], ctx, block);
  982. }
  983. return {result, std::move(getters)};
  984. }
  985. #endif
  986. private:
  987. void RegisterDependencies() const final {
  988. FlowDependsOnBoth(FlowLeft, FlowRight);
  989. }
  990. void MakeSpillingSupportState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
  991. state = ctx.HolderFactory.Create<TGraceJoinSpillingSupportState>(
  992. FlowLeft, FlowRight, JoinKind, AnyJoinSettings_, LeftKeyColumns, RightKeyColumns,
  993. LeftRenames, RightRenames, LeftColumnsTypes, RightColumnsTypes,
  994. ctx, IsSelfJoin_, IsSpillingAllowed);
  995. }
  996. IComputationWideFlowNode *const FlowLeft;
  997. IComputationWideFlowNode *const FlowRight;
  998. const EJoinKind JoinKind;
  999. const EAnyJoinSettings AnyJoinSettings_;
  1000. const std::vector<ui32> LeftKeyColumns;
  1001. const std::vector<ui32> RightKeyColumns;
  1002. const std::vector<ui32> LeftRenames;
  1003. const std::vector<ui32> RightRenames;
  1004. const std::vector<TType *> LeftColumnsTypes;
  1005. const std::vector<TType *> RightColumnsTypes;
  1006. const std::vector<EValueRepresentation> OutputRepresentations;
  1007. const bool IsSelfJoin_;
  1008. const bool IsSpillingAllowed;
  1009. };
  1010. }
  1011. IComputationNode* WrapGraceJoinCommon(TCallable& callable, const TComputationNodeFactoryContext& ctx, bool isSelfJoin, bool isSpillingAllowed) {
  1012. const auto leftFlowNodeIndex = 0;
  1013. const auto rightFlowNodeIndex = 1;
  1014. const auto joinKindNodeIndex = isSelfJoin ? 1 : 2;
  1015. const auto leftKeyColumnsNodeIndex = joinKindNodeIndex + 1;
  1016. const auto rightKeyColumnsNodeIndex = leftKeyColumnsNodeIndex + 1;
  1017. const auto leftRenamesNodeIndex = rightKeyColumnsNodeIndex + 1;
  1018. const auto rightRenamesNodeIndex = leftRenamesNodeIndex + 1;
  1019. const auto anyJoinSettingsIndex = rightRenamesNodeIndex + 1;
  1020. const auto leftFlowNode = callable.GetInput(leftFlowNodeIndex);
  1021. const auto joinKindNode = callable.GetInput(joinKindNodeIndex);
  1022. const auto leftKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(leftKeyColumnsNodeIndex));
  1023. const auto rightKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(rightKeyColumnsNodeIndex));
  1024. const auto leftRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(leftRenamesNodeIndex));
  1025. const auto rightRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(rightRenamesNodeIndex));
  1026. const EAnyJoinSettings anyJoinSettings = GetAnyJoinSettings(AS_VALUE(TDataLiteral, callable.GetInput(anyJoinSettingsIndex))->AsValue().Get<ui32>());
  1027. const auto leftFlowComponents = GetWideComponents(AS_TYPE(TFlowType, leftFlowNode));
  1028. const ui32 rawJoinKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>();
  1029. const auto flowLeft = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 0));
  1030. IComputationWideFlowNode* flowRight = nullptr;
  1031. if (!isSelfJoin) {
  1032. flowRight = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 1));
  1033. }
  1034. const auto outputFlowComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetType()->GetReturnType()));
  1035. std::vector<EValueRepresentation> outputRepresentations;
  1036. outputRepresentations.reserve(outputFlowComponents.size());
  1037. for (ui32 i = 0U; i < outputFlowComponents.size(); ++i) {
  1038. outputRepresentations.emplace_back(GetValueRepresentation(outputFlowComponents[i]));
  1039. }
  1040. std::vector<ui32> leftKeyColumns, leftRenames, rightKeyColumns, rightRenames;
  1041. std::vector<TType*> leftColumnsTypes(leftFlowComponents.begin(), leftFlowComponents.end());
  1042. std::vector<TType*> rightColumnsTypes;
  1043. if (isSelfJoin) {
  1044. rightColumnsTypes = {leftColumnsTypes};
  1045. } else {
  1046. const auto rightFlowNode = callable.GetInput(rightFlowNodeIndex);
  1047. const auto rightFlowComponents = GetWideComponents(AS_TYPE(TFlowType, rightFlowNode));
  1048. rightColumnsTypes = {rightFlowComponents.begin(), rightFlowComponents.end()};
  1049. }
  1050. leftKeyColumns.reserve(leftKeyColumnsNode->GetValuesCount());
  1051. for (ui32 i = 0; i < leftKeyColumnsNode->GetValuesCount(); ++i) {
  1052. leftKeyColumns.emplace_back(AS_VALUE(TDataLiteral, leftKeyColumnsNode->GetValue(i))->AsValue().Get<ui32>());
  1053. }
  1054. leftRenames.reserve(leftRenamesNode->GetValuesCount());
  1055. for (ui32 i = 0; i < leftRenamesNode->GetValuesCount(); ++i) {
  1056. leftRenames.emplace_back(AS_VALUE(TDataLiteral, leftRenamesNode->GetValue(i))->AsValue().Get<ui32>());
  1057. }
  1058. rightKeyColumns.reserve(rightKeyColumnsNode->GetValuesCount());
  1059. for (ui32 i = 0; i < rightKeyColumnsNode->GetValuesCount(); ++i) {
  1060. rightKeyColumns.emplace_back(AS_VALUE(TDataLiteral, rightKeyColumnsNode->GetValue(i))->AsValue().Get<ui32>());
  1061. }
  1062. if (isSelfJoin) {
  1063. MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Number of key columns for self join should be equal");
  1064. }
  1065. rightRenames.reserve(rightRenamesNode->GetValuesCount());
  1066. for (ui32 i = 0; i < rightRenamesNode->GetValuesCount(); ++i) {
  1067. rightRenames.emplace_back(AS_VALUE(TDataLiteral, rightRenamesNode->GetValue(i))->AsValue().Get<ui32>());
  1068. }
  1069. return new TGraceJoinWrapper(
  1070. ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind), anyJoinSettings,
  1071. std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames),
  1072. std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), isSelfJoin, isSpillingAllowed);
  1073. }
  1074. IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  1075. MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args");
  1076. return WrapGraceJoinCommon(callable, ctx, false, HasSpillingFlag(callable));
  1077. }
  1078. IComputationNode* WrapGraceSelfJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  1079. MKQL_ENSURE(callable.GetInputsCount() == 7, "Expected 7 args");
  1080. return WrapGraceJoinCommon(callable, ctx, true, HasSpillingFlag(callable));
  1081. }
  1082. }
  1083. }