mkql_todict.cpp 83 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083
  1. #include "mkql_todict.h"
  2. #include <yql/essentials/minikql/computation/mkql_computation_list_adapter.h>
  3. #include <yql/essentials/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
  4. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  5. #include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
  6. #include <yql/essentials/minikql/computation/mkql_llvm_base.h> // Y_IGNORE
  7. #include <yql/essentials/minikql/computation/presort.h>
  8. #include <yql/essentials/minikql/mkql_node_cast.h>
  9. #include <yql/essentials/minikql/mkql_string_util.h>
  10. #include <yql/essentials/public/udf/udf_types.h>
  11. #include <yql/essentials/utils/cast.h>
  12. #include <yql/essentials/utils/hash.h>
  13. #include <algorithm>
  14. #include <unordered_map>
  15. #include <optional>
  16. #include <vector>
  17. namespace NKikimr {
  18. namespace NMiniKQL {
  19. using NYql::EnsureDynamicCast;
  20. namespace {
  21. class THashedMultiMapAccumulator {
  22. using TMapType = TValuesDictHashMap;
  23. TComputationContext& Ctx;
  24. TType* KeyType;
  25. const TKeyTypes& KeyTypes;
  26. bool IsTuple;
  27. std::shared_ptr<TValuePacker> Packer;
  28. const NUdf::IHash* Hash;
  29. const NUdf::IEquate* Equate;
  30. TMapType Map;
  31. public:
  32. static constexpr bool IsSorted = false;
  33. THashedMultiMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  34. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  35. : Ctx(ctx), KeyType(keyType), KeyTypes(keyTypes), IsTuple(isTuple), Hash(hash), Equate(equate)
  36. , Map(0, TValueHasher(KeyTypes, isTuple, hash), TValueEqual(KeyTypes, isTuple, equate))
  37. {
  38. Y_UNUSED(compare);
  39. if (encoded) {
  40. Packer = std::make_shared<TValuePacker>(true, keyType);
  41. }
  42. Y_UNUSED(payloadType);
  43. Map.reserve(itemsCountHint);
  44. }
  45. void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload)
  46. {
  47. if (Packer) {
  48. key = MakeString(Packer->Pack(key));
  49. }
  50. auto it = Map.find(key);
  51. if (it == Map.end()) {
  52. it = Map.emplace(std::move(key), Ctx.HolderFactory.NewVectorHolder()).first;
  53. }
  54. it->second.Push(std::move(payload));
  55. }
  56. NUdf::TUnboxedValue Build()
  57. {
  58. const auto filler = [this](TValuesDictHashMap& targetMap) {
  59. targetMap = std::move(Map);
  60. };
  61. return Ctx.HolderFactory.CreateDirectHashedDictHolder(filler, KeyTypes, IsTuple, true, Packer ? KeyType : nullptr, Hash, Equate);
  62. }
  63. };
  64. class THashedMapAccumulator {
  65. using TMapType = TValuesDictHashMap;
  66. TComputationContext& Ctx;
  67. TType* KeyType;
  68. const TKeyTypes& KeyTypes;
  69. const bool IsTuple;
  70. std::shared_ptr<TValuePacker> Packer;
  71. const NUdf::IHash* Hash;
  72. const NUdf::IEquate* Equate;
  73. TMapType Map;
  74. public:
  75. static constexpr bool IsSorted = false;
  76. THashedMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  77. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  78. : Ctx(ctx), KeyType(keyType), KeyTypes(keyTypes), IsTuple(isTuple), Hash(hash), Equate(equate)
  79. , Map(0, TValueHasher(KeyTypes, isTuple, hash), TValueEqual(KeyTypes, isTuple, equate))
  80. {
  81. Y_UNUSED(compare);
  82. if (encoded) {
  83. Packer = std::make_shared<TValuePacker>(true, keyType);
  84. }
  85. Y_UNUSED(payloadType);
  86. Map.reserve(itemsCountHint);
  87. }
  88. void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload)
  89. {
  90. if (Packer) {
  91. key = MakeString(Packer->Pack(key));
  92. }
  93. Map.emplace(std::move(key), std::move(payload));
  94. }
  95. NUdf::TUnboxedValue Build()
  96. {
  97. const auto filler = [this](TMapType& targetMap) {
  98. targetMap = std::move(Map);
  99. };
  100. return Ctx.HolderFactory.CreateDirectHashedDictHolder(filler, KeyTypes, IsTuple, true, Packer ? KeyType : nullptr, Hash, Equate);
  101. }
  102. };
  103. template<typename T, bool OptionalKey>
  104. class THashedSingleFixedMultiMapAccumulator {
  105. using TMapType = TValuesDictHashSingleFixedMap<T>;
  106. TComputationContext& Ctx;
  107. const TKeyTypes& KeyTypes;
  108. TMapType Map;
  109. TUnboxedValueVector NullPayloads;
  110. NUdf::TUnboxedValue CurrentEmptyVectorForInsert;
  111. public:
  112. static constexpr bool IsSorted = false;
  113. THashedSingleFixedMultiMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  114. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  115. : Ctx(ctx), KeyTypes(keyTypes), Map(0, TMyHash<T>(), TMyEquals<T>()) {
  116. Y_UNUSED(keyType);
  117. Y_UNUSED(payloadType);
  118. Y_UNUSED(isTuple);
  119. Y_UNUSED(encoded);
  120. Y_UNUSED(compare);
  121. Y_UNUSED(equate);
  122. Y_UNUSED(hash);
  123. Map.reserve(itemsCountHint);
  124. CurrentEmptyVectorForInsert = Ctx.HolderFactory.NewVectorHolder();
  125. }
  126. void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload) {
  127. if constexpr (OptionalKey) {
  128. if (!key) {
  129. NullPayloads.emplace_back(std::move(payload));
  130. return;
  131. }
  132. }
  133. auto insertInfo = Map.emplace(key.Get<T>(), CurrentEmptyVectorForInsert);
  134. if (insertInfo.second) {
  135. CurrentEmptyVectorForInsert = Ctx.HolderFactory.NewVectorHolder();
  136. }
  137. insertInfo.first->second.Push(payload.Release());
  138. }
  139. NUdf::TUnboxedValue Build() {
  140. std::optional<NUdf::TUnboxedValue> nullPayload;
  141. if (NullPayloads.size()) {
  142. nullPayload = Ctx.HolderFactory.VectorAsVectorHolder(std::move(NullPayloads));
  143. }
  144. return Ctx.HolderFactory.CreateDirectHashedSingleFixedMapHolder<T, OptionalKey>(std::move(Map), std::move(nullPayload));
  145. }
  146. };
  147. template<typename T, bool OptionalKey>
  148. class THashedSingleFixedMapAccumulator {
  149. using TMapType = TValuesDictHashSingleFixedMap<T>;
  150. TComputationContext& Ctx;
  151. TMapType Map;
  152. std::optional<NUdf::TUnboxedValue> NullPayload;
  153. public:
  154. static constexpr bool IsSorted = false;
  155. THashedSingleFixedMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  156. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  157. : Ctx(ctx), Map(0, TMyHash<T>(), TMyEquals<T>())
  158. {
  159. Y_UNUSED(keyType);
  160. Y_UNUSED(payloadType);
  161. Y_UNUSED(keyTypes);
  162. Y_UNUSED(isTuple);
  163. Y_UNUSED(encoded);
  164. Y_UNUSED(compare);
  165. Y_UNUSED(equate);
  166. Y_UNUSED(hash);
  167. Map.reserve(itemsCountHint);
  168. }
  169. void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload)
  170. {
  171. if constexpr (OptionalKey) {
  172. if (!key) {
  173. NullPayload.emplace(std::move(payload));
  174. return;
  175. }
  176. }
  177. Map.emplace(key.Get<T>(), std::move(payload));
  178. }
  179. NUdf::TUnboxedValue Build()
  180. {
  181. return Ctx.HolderFactory.CreateDirectHashedSingleFixedMapHolder<T, OptionalKey>(std::move(Map), std::move(NullPayload));
  182. }
  183. };
  184. class THashedSetAccumulator {
  185. using TSetType = TValuesDictHashSet;
  186. TComputationContext& Ctx;
  187. TType* KeyType;
  188. const TKeyTypes& KeyTypes;
  189. bool IsTuple;
  190. std::shared_ptr<TValuePacker> Packer;
  191. TSetType Set;
  192. const NUdf::IHash* Hash;
  193. const NUdf::IEquate* Equate;
  194. public:
  195. static constexpr bool IsSorted = false;
  196. THashedSetAccumulator(TType* keyType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  197. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  198. : Ctx(ctx), KeyType(keyType), KeyTypes(keyTypes), IsTuple(isTuple), Set(0, TValueHasher(KeyTypes, isTuple, hash),
  199. TValueEqual(KeyTypes, isTuple, equate)), Hash(hash), Equate(equate)
  200. {
  201. Y_UNUSED(compare);
  202. if (encoded) {
  203. Packer = std::make_shared<TValuePacker>(true, keyType);
  204. }
  205. Set.reserve(itemsCountHint);
  206. }
  207. void Add(NUdf::TUnboxedValue&& key)
  208. {
  209. if (Packer) {
  210. key = MakeString(Packer->Pack(key));
  211. }
  212. Set.emplace(std::move(key));
  213. }
  214. NUdf::TUnboxedValue Build()
  215. {
  216. const auto filler = [this](TSetType& targetSet) {
  217. targetSet = std::move(Set);
  218. };
  219. return Ctx.HolderFactory.CreateDirectHashedSetHolder(filler, KeyTypes, IsTuple, true, Packer ? KeyType : nullptr, Hash, Equate);
  220. }
  221. };
  222. template <typename T, bool OptionalKey>
  223. class THashedSingleFixedSetAccumulator {
  224. using TSetType = TValuesDictHashSingleFixedSet<T>;
  225. TComputationContext& Ctx;
  226. TSetType Set;
  227. bool HasNull = false;
  228. public:
  229. static constexpr bool IsSorted = false;
  230. THashedSingleFixedSetAccumulator(TType* keyType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  231. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  232. : Ctx(ctx), Set(0, TMyHash<T>(), TMyEquals<T>())
  233. {
  234. Y_UNUSED(keyType);
  235. Y_UNUSED(keyTypes);
  236. Y_UNUSED(isTuple);
  237. Y_UNUSED(encoded);
  238. Y_UNUSED(compare);
  239. Y_UNUSED(equate);
  240. Y_UNUSED(hash);
  241. Set.reserve(itemsCountHint);
  242. }
  243. void Add(NUdf::TUnboxedValue&& key)
  244. {
  245. if constexpr (OptionalKey) {
  246. if (!key) {
  247. HasNull = true;
  248. return;
  249. }
  250. }
  251. Set.emplace(key.Get<T>());
  252. }
  253. NUdf::TUnboxedValue Build()
  254. {
  255. return Ctx.HolderFactory.CreateDirectHashedSingleFixedSetHolder<T, OptionalKey>(std::move(Set), HasNull);
  256. }
  257. };
  258. template <typename T, bool OptionalKey>
  259. class THashedSingleFixedCompactSetAccumulator {
  260. using TSetType = TValuesDictHashSingleFixedCompactSet<T>;
  261. TComputationContext& Ctx;
  262. TPagedArena Pool;
  263. TSetType Set;
  264. bool HasNull = false;
  265. public:
  266. static constexpr bool IsSorted = false;
  267. THashedSingleFixedCompactSetAccumulator(TType* keyType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  268. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  269. : Ctx(ctx), Pool(&Ctx.HolderFactory.GetPagePool()), Set(Ctx.HolderFactory.GetPagePool(), itemsCountHint / COMPACT_HASH_MAX_LOAD_FACTOR)
  270. {
  271. Y_UNUSED(keyType);
  272. Y_UNUSED(keyTypes);
  273. Y_UNUSED(isTuple);
  274. Y_UNUSED(encoded);
  275. Y_UNUSED(compare);
  276. Y_UNUSED(equate);
  277. Y_UNUSED(hash);
  278. Set.SetMaxLoadFactor(COMPACT_HASH_MAX_LOAD_FACTOR);
  279. }
  280. void Add(NUdf::TUnboxedValue&& key)
  281. {
  282. if constexpr (OptionalKey) {
  283. if (!key) {
  284. HasNull = true;
  285. return;
  286. }
  287. }
  288. Set.Insert(key.Get<T>());
  289. }
  290. NUdf::TUnboxedValue Build()
  291. {
  292. return Ctx.HolderFactory.CreateDirectHashedSingleFixedCompactSetHolder<T, OptionalKey>(std::move(Set), HasNull);
  293. }
  294. };
  295. class THashedCompactSetAccumulator {
  296. using TSetType = TValuesDictHashCompactSet;
  297. TComputationContext& Ctx;
  298. TPagedArena Pool;
  299. TSetType Set;
  300. TType *KeyType;
  301. std::shared_ptr<TValuePacker> KeyPacker;
  302. public:
  303. static constexpr bool IsSorted = false;
  304. THashedCompactSetAccumulator(TType* keyType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  305. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  306. : Ctx(ctx), Pool(&Ctx.HolderFactory.GetPagePool()), Set(Ctx.HolderFactory.GetPagePool(), itemsCountHint / COMPACT_HASH_MAX_LOAD_FACTOR, TSmallValueHash(), TSmallValueEqual())
  307. , KeyType(keyType), KeyPacker(std::make_shared<TValuePacker>(true, keyType))
  308. {
  309. Y_UNUSED(keyTypes);
  310. Y_UNUSED(isTuple);
  311. Y_UNUSED(encoded);
  312. Y_UNUSED(compare);
  313. Y_UNUSED(equate);
  314. Y_UNUSED(hash);
  315. Set.SetMaxLoadFactor(COMPACT_HASH_MAX_LOAD_FACTOR);
  316. }
  317. void Add(NUdf::TUnboxedValue&& key)
  318. {
  319. Set.Insert(AddSmallValue(Pool, KeyPacker->Pack(key)));
  320. }
  321. NUdf::TUnboxedValue Build()
  322. {
  323. return Ctx.HolderFactory.CreateDirectHashedCompactSetHolder(std::move(Set), std::move(Pool), KeyType, &Ctx);
  324. }
  325. };
  326. template <bool Multi>
  327. class THashedCompactMapAccumulator;
  328. template <>
  329. class THashedCompactMapAccumulator<false> {
  330. using TMapType = TValuesDictHashCompactMap;
  331. TComputationContext& Ctx;
  332. TPagedArena Pool;
  333. TMapType Map;
  334. TType *KeyType, *PayloadType;
  335. std::shared_ptr<TValuePacker> KeyPacker, PayloadPacker;
  336. public:
  337. static constexpr bool IsSorted = false;
  338. THashedCompactMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  339. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  340. : Ctx(ctx), Pool(&Ctx.HolderFactory.GetPagePool()), Map(Ctx.HolderFactory.GetPagePool(), itemsCountHint / COMPACT_HASH_MAX_LOAD_FACTOR)
  341. , KeyType(keyType), PayloadType(payloadType)
  342. , KeyPacker(std::make_shared<TValuePacker>(true, keyType))
  343. , PayloadPacker(std::make_shared<TValuePacker>(false, payloadType))
  344. {
  345. Y_UNUSED(keyTypes);
  346. Y_UNUSED(isTuple);
  347. Y_UNUSED(encoded);
  348. Y_UNUSED(compare);
  349. Y_UNUSED(equate);
  350. Y_UNUSED(hash);
  351. Map.SetMaxLoadFactor(COMPACT_HASH_MAX_LOAD_FACTOR);
  352. }
  353. void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload)
  354. {
  355. Map.InsertNew(AddSmallValue(Pool, KeyPacker->Pack(key)), AddSmallValue(Pool, PayloadPacker->Pack(payload)));
  356. }
  357. NUdf::TUnboxedValue Build()
  358. {
  359. return Ctx.HolderFactory.CreateDirectHashedCompactMapHolder(std::move(Map), std::move(Pool), KeyType, PayloadType, &Ctx);
  360. }
  361. };
  362. template <>
  363. class THashedCompactMapAccumulator<true> {
  364. using TMapType = TValuesDictHashCompactMultiMap;
  365. TComputationContext& Ctx;
  366. TPagedArena Pool;
  367. TMapType Map;
  368. TType *KeyType, *PayloadType;
  369. std::shared_ptr<TValuePacker> KeyPacker, PayloadPacker;
  370. public:
  371. static constexpr bool IsSorted = false;
  372. THashedCompactMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  373. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  374. : Ctx(ctx), Pool(&Ctx.HolderFactory.GetPagePool()), Map(Ctx.HolderFactory.GetPagePool(), itemsCountHint / COMPACT_HASH_MAX_LOAD_FACTOR)
  375. , KeyType(keyType), PayloadType(payloadType)
  376. , KeyPacker(std::make_shared<TValuePacker>(true, keyType))
  377. , PayloadPacker(std::make_shared<TValuePacker>(false, payloadType))
  378. {
  379. Y_UNUSED(keyTypes);
  380. Y_UNUSED(isTuple);
  381. Y_UNUSED(encoded);
  382. Y_UNUSED(compare);
  383. Y_UNUSED(equate);
  384. Y_UNUSED(hash);
  385. Map.SetMaxLoadFactor(COMPACT_HASH_MAX_LOAD_FACTOR);
  386. }
  387. void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload)
  388. {
  389. Map.Insert(AddSmallValue(Pool, KeyPacker->Pack(key)), AddSmallValue(Pool, PayloadPacker->Pack(payload)));
  390. }
  391. NUdf::TUnboxedValue Build()
  392. {
  393. return Ctx.HolderFactory.CreateDirectHashedCompactMultiMapHolder(std::move(Map), std::move(Pool), KeyType, PayloadType, &Ctx);
  394. }
  395. };
  396. template <typename T, bool OptionalKey, bool Multi>
  397. class THashedSingleFixedCompactMapAccumulator;
  398. template <typename T, bool OptionalKey>
  399. class THashedSingleFixedCompactMapAccumulator<T, OptionalKey, false> {
  400. using TMapType = TValuesDictHashSingleFixedCompactMap<T>;
  401. TComputationContext& Ctx;
  402. TPagedArena Pool;
  403. TMapType Map;
  404. std::optional<ui64> NullPayload;
  405. TType* PayloadType;
  406. std::shared_ptr<TValuePacker> PayloadPacker;
  407. public:
  408. static constexpr bool IsSorted = false;
  409. THashedSingleFixedCompactMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  410. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  411. : Ctx(ctx), Pool(&Ctx.HolderFactory.GetPagePool()), Map(Ctx.HolderFactory.GetPagePool(), itemsCountHint / COMPACT_HASH_MAX_LOAD_FACTOR)
  412. , PayloadType(payloadType), PayloadPacker(std::make_shared<TValuePacker>(false, payloadType))
  413. {
  414. Y_UNUSED(keyType);
  415. Y_UNUSED(keyTypes);
  416. Y_UNUSED(isTuple);
  417. Y_UNUSED(encoded);
  418. Y_UNUSED(compare);
  419. Y_UNUSED(equate);
  420. Y_UNUSED(hash);
  421. Map.SetMaxLoadFactor(COMPACT_HASH_MAX_LOAD_FACTOR);
  422. }
  423. void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload)
  424. {
  425. if constexpr (OptionalKey) {
  426. if (!key) {
  427. NullPayload = AddSmallValue(Pool, PayloadPacker->Pack(payload));
  428. return;
  429. }
  430. }
  431. Map.InsertNew(key.Get<T>(), AddSmallValue(Pool, PayloadPacker->Pack(payload)));
  432. }
  433. NUdf::TUnboxedValue Build()
  434. {
  435. return Ctx.HolderFactory.CreateDirectHashedSingleFixedCompactMapHolder<T, OptionalKey>(std::move(Map), std::move(NullPayload), std::move(Pool), PayloadType, &Ctx);
  436. }
  437. };
  438. template <typename T, bool OptionalKey>
  439. class THashedSingleFixedCompactMapAccumulator<T, OptionalKey, true> {
  440. using TMapType = TValuesDictHashSingleFixedCompactMultiMap<T>;
  441. TComputationContext& Ctx;
  442. TPagedArena Pool;
  443. TMapType Map;
  444. std::vector<ui64> NullPayloads;
  445. TType* PayloadType;
  446. std::shared_ptr<TValuePacker> PayloadPacker;
  447. public:
  448. static constexpr bool IsSorted = false;
  449. THashedSingleFixedCompactMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  450. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  451. : Ctx(ctx), Pool(&Ctx.HolderFactory.GetPagePool()), Map(Ctx.HolderFactory.GetPagePool(), itemsCountHint / COMPACT_HASH_MAX_LOAD_FACTOR)
  452. , PayloadType(payloadType), PayloadPacker(std::make_shared<TValuePacker>(false, payloadType))
  453. {
  454. Y_UNUSED(keyTypes);
  455. Y_UNUSED(keyType);
  456. Y_UNUSED(isTuple);
  457. Y_UNUSED(encoded);
  458. Y_UNUSED(compare);
  459. Y_UNUSED(equate);
  460. Y_UNUSED(hash);
  461. Map.SetMaxLoadFactor(COMPACT_HASH_MAX_LOAD_FACTOR);
  462. }
  463. void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload)
  464. {
  465. if constexpr (OptionalKey) {
  466. if (!key) {
  467. NullPayloads.push_back(AddSmallValue(Pool, PayloadPacker->Pack(payload)));
  468. return;
  469. }
  470. }
  471. Map.Insert(key.Get<T>(), AddSmallValue(Pool, PayloadPacker->Pack(payload)));
  472. }
  473. NUdf::TUnboxedValue Build()
  474. {
  475. return Ctx.HolderFactory.CreateDirectHashedSingleFixedCompactMultiMapHolder<T, OptionalKey>(std::move(Map), std::move(NullPayloads), std::move(Pool), PayloadType, &Ctx);
  476. }
  477. };
  478. class TSortedSetAccumulator {
  479. TComputationContext& Ctx;
  480. TType* KeyType;
  481. const TKeyTypes& KeyTypes;
  482. bool IsTuple;
  483. const NUdf::ICompare* Compare;
  484. const NUdf::IEquate* Equate;
  485. std::optional<TGenericPresortEncoder> Packer;
  486. TUnboxedValueVector Items;
  487. public:
  488. static constexpr bool IsSorted = true;
  489. TSortedSetAccumulator(TType* keyType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  490. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  491. : Ctx(ctx), KeyType(keyType), KeyTypes(keyTypes), IsTuple(isTuple), Compare(compare), Equate(equate)
  492. {
  493. Y_UNUSED(hash);
  494. if (encoded) {
  495. Packer.emplace(KeyType);
  496. }
  497. Items.reserve(itemsCountHint);
  498. }
  499. void Add(NUdf::TUnboxedValue&& key)
  500. {
  501. if (Packer) {
  502. key = MakeString(Packer->Encode(key, false));
  503. }
  504. Items.emplace_back(std::move(key));
  505. }
  506. NUdf::TUnboxedValue Build()
  507. {
  508. const TSortedSetFiller filler = [this](TUnboxedValueVector& values) {
  509. std::stable_sort(Items.begin(), Items.end(), TValueLess(KeyTypes, IsTuple, Compare));
  510. Items.erase(std::unique(Items.begin(), Items.end(), TValueEqual(KeyTypes, IsTuple, Equate)), Items.end());
  511. values = std::move(Items);
  512. };
  513. return Ctx.HolderFactory.CreateDirectSortedSetHolder(filler, KeyTypes, IsTuple,
  514. EDictSortMode::SortedUniqueAscending, true, Packer ? KeyType : nullptr, Compare, Equate);
  515. }
  516. };
  517. template<bool IsMulti>
  518. class TSortedMapAccumulator;
  519. template<>
  520. class TSortedMapAccumulator<false> {
  521. TComputationContext& Ctx;
  522. TType* KeyType;
  523. const TKeyTypes& KeyTypes;
  524. bool IsTuple;
  525. const NUdf::ICompare* Compare;
  526. const NUdf::IEquate* Equate;
  527. std::optional<TGenericPresortEncoder> Packer;
  528. TKeyPayloadPairVector Items;
  529. public:
  530. static constexpr bool IsSorted = true;
  531. TSortedMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  532. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  533. : Ctx(ctx)
  534. , KeyType(keyType)
  535. , KeyTypes(keyTypes)
  536. , IsTuple(isTuple)
  537. , Compare(compare)
  538. , Equate(equate)
  539. {
  540. Y_UNUSED(hash);
  541. if (encoded) {
  542. Packer.emplace(KeyType);
  543. }
  544. Y_UNUSED(payloadType);
  545. Items.reserve(itemsCountHint);
  546. }
  547. void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload)
  548. {
  549. if (Packer) {
  550. key = MakeString(Packer->Encode(key, false));
  551. }
  552. Items.emplace_back(std::move(key), std::move(payload));
  553. }
  554. NUdf::TUnboxedValue Build()
  555. {
  556. const TSortedDictFiller filler = [this](TKeyPayloadPairVector& values) {
  557. values = std::move(Items);
  558. };
  559. return Ctx.HolderFactory.CreateDirectSortedDictHolder(filler, KeyTypes, IsTuple, EDictSortMode::RequiresSorting,
  560. true, Packer ? KeyType : nullptr, Compare, Equate);
  561. }
  562. };
  563. template<>
  564. class TSortedMapAccumulator<true> {
  565. TComputationContext& Ctx;
  566. TType* KeyType;
  567. const TKeyTypes& KeyTypes;
  568. bool IsTuple;
  569. const NUdf::ICompare* Compare;
  570. const NUdf::IEquate* Equate;
  571. std::optional<TGenericPresortEncoder> Packer;
  572. TKeyPayloadPairVector Items;
  573. public:
  574. static constexpr bool IsSorted = true;
  575. TSortedMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  576. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  577. : Ctx(ctx), KeyType(keyType), KeyTypes(keyTypes), IsTuple(isTuple), Compare(compare), Equate(equate)
  578. {
  579. Y_UNUSED(hash);
  580. if (encoded) {
  581. Packer.emplace(KeyType);
  582. }
  583. Y_UNUSED(payloadType);
  584. Items.reserve(itemsCountHint);
  585. }
  586. void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload)
  587. {
  588. if (Packer) {
  589. key = MakeString(Packer->Encode(key, false));
  590. }
  591. Items.emplace_back(std::move(key), std::move(payload));
  592. }
  593. NUdf::TUnboxedValue Build()
  594. {
  595. const TSortedDictFiller filler = [this](TKeyPayloadPairVector& values) {
  596. std::stable_sort(Items.begin(), Items.end(), TKeyPayloadPairLess(KeyTypes, IsTuple, Compare));
  597. TKeyPayloadPairVector groups;
  598. groups.reserve(Items.size());
  599. if (!Items.empty()) {
  600. TDefaultListRepresentation currentList(std::move(Items.begin()->second));
  601. auto lastKey = std::move(Items.begin()->first);
  602. TValueEqual eqPredicate(KeyTypes, IsTuple, Equate);
  603. for (auto it = Items.begin() + 1; it != Items.end(); ++it) {
  604. if (eqPredicate(lastKey, it->first)) {
  605. currentList = currentList.Append(std::move(it->second));
  606. }
  607. else {
  608. auto payload = Ctx.HolderFactory.CreateDirectListHolder(std::move(currentList));
  609. groups.emplace_back(std::move(lastKey), std::move(payload));
  610. currentList = TDefaultListRepresentation(std::move(it->second));
  611. lastKey = std::move(it->first);
  612. }
  613. }
  614. auto payload = Ctx.HolderFactory.CreateDirectListHolder(std::move(currentList));
  615. groups.emplace_back(std::move(lastKey), std::move(payload));
  616. }
  617. values = std::move(groups);
  618. };
  619. return Ctx.HolderFactory.CreateDirectSortedDictHolder(filler, KeyTypes, IsTuple,
  620. EDictSortMode::SortedUniqueAscending, true, Packer ? KeyType : nullptr, Compare, Equate);
  621. }
  622. };
  623. template <typename TSetAccumulator, bool IsStream>
  624. class TSetWrapper : public TMutableComputationNode<TSetWrapper<TSetAccumulator, IsStream>> {
  625. typedef TMutableComputationNode<TSetWrapper<TSetAccumulator, IsStream>> TBaseComputation;
  626. public:
  627. class TStreamValue : public TComputationValue<TStreamValue> {
  628. public:
  629. TStreamValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& input, IComputationExternalNode* const item,
  630. IComputationNode* const key, TSetAccumulator&& setAccum, TComputationContext& ctx)
  631. : TComputationValue<TStreamValue>(memInfo)
  632. , Input(std::move(input))
  633. , Item(item)
  634. , Key(key)
  635. , SetAccum(std::move(setAccum))
  636. , Ctx(ctx) {}
  637. private:
  638. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
  639. if (Finished) {
  640. return NUdf::EFetchStatus::Finish;
  641. }
  642. for (;;) {
  643. NUdf::TUnboxedValue item;
  644. switch (auto status = Input.Fetch(item)) {
  645. case NUdf::EFetchStatus::Ok: {
  646. Item->SetValue(Ctx, std::move(item));
  647. SetAccum.Add(Key->GetValue(Ctx));
  648. break; // and continue
  649. }
  650. case NUdf::EFetchStatus::Finish: {
  651. result = SetAccum.Build();
  652. Finished = true;
  653. return NUdf::EFetchStatus::Ok;
  654. }
  655. case NUdf::EFetchStatus::Yield: {
  656. return NUdf::EFetchStatus::Yield;
  657. }
  658. }
  659. }
  660. }
  661. NUdf::TUnboxedValue Input;
  662. IComputationExternalNode* const Item;
  663. IComputationNode* const Key;
  664. TSetAccumulator SetAccum;
  665. TComputationContext& Ctx;
  666. bool Finished = false;
  667. };
  668. TSetWrapper(TComputationMutables& mutables, TType* keyType, IComputationNode* list, IComputationExternalNode* item,
  669. IComputationNode* key, ui64 itemsCountHint)
  670. : TBaseComputation(mutables, EValueRepresentation::Boxed)
  671. , KeyType(keyType)
  672. , List(list)
  673. , Item(item)
  674. , Key(key)
  675. , ItemsCountHint(itemsCountHint)
  676. {
  677. GetDictionaryKeyTypes(KeyType, KeyTypes, IsTuple, Encoded, UseIHash);
  678. Compare = UseIHash && TSetAccumulator::IsSorted ? MakeCompareImpl(KeyType) : nullptr;
  679. Equate = UseIHash ? MakeEquateImpl(KeyType) : nullptr;
  680. Hash = UseIHash && !TSetAccumulator::IsSorted ? MakeHashImpl(KeyType) : nullptr;
  681. }
  682. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  683. if constexpr (IsStream) {
  684. return ctx.HolderFactory.Create<TStreamValue>(List->GetValue(ctx), Item, Key,
  685. TSetAccumulator(KeyType, KeyTypes, IsTuple, Encoded, Compare.Get(), Equate.Get(), Hash.Get(),
  686. ctx, ItemsCountHint), ctx);
  687. }
  688. const auto& list = List->GetValue(ctx);
  689. auto itemsCountHint = ItemsCountHint;
  690. if (list.HasFastListLength()) {
  691. if (const auto size = list.GetListLength())
  692. itemsCountHint = size;
  693. else
  694. return ctx.HolderFactory.GetEmptyContainerLazy();
  695. }
  696. TSetAccumulator accumulator(KeyType, KeyTypes, IsTuple, Encoded, Compare.Get(), Equate.Get(), Hash.Get(),
  697. ctx, itemsCountHint);
  698. TThresher<false>::DoForEachItem(list,
  699. [this, &accumulator, &ctx] (NUdf::TUnboxedValue&& item) {
  700. Item->SetValue(ctx, std::move(item));
  701. accumulator.Add(Key->GetValue(ctx));
  702. }
  703. );
  704. return accumulator.Build().Release();
  705. }
  706. private:
  707. void RegisterDependencies() const final {
  708. this->DependsOn(List);
  709. this->Own(Item);
  710. this->DependsOn(Key);
  711. }
  712. TType* const KeyType;
  713. IComputationNode* const List;
  714. IComputationExternalNode* const Item;
  715. IComputationNode* const Key;
  716. const ui64 ItemsCountHint;
  717. TKeyTypes KeyTypes;
  718. bool IsTuple;
  719. bool Encoded;
  720. bool UseIHash;
  721. NUdf::ICompare::TPtr Compare;
  722. NUdf::IEquate::TPtr Equate;
  723. NUdf::IHash::TPtr Hash;
  724. };
  725. #ifndef MKQL_DISABLE_CODEGEN
  726. template <class TLLVMBase>
  727. class TLLVMFieldsStructureStateWithAccum: public TLLVMBase {
  728. private:
  729. using TBase = TLLVMBase;
  730. llvm::PointerType* StructPtrType;
  731. protected:
  732. using TBase::Context;
  733. public:
  734. std::vector<llvm::Type*> GetFieldsArray() {
  735. std::vector<llvm::Type*> result = TBase::GetFields();
  736. result.emplace_back(StructPtrType); //accumulator
  737. return result;
  738. }
  739. llvm::Constant* GetAccumulator() {
  740. return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 0);
  741. }
  742. TLLVMFieldsStructureStateWithAccum(llvm::LLVMContext& context)
  743. : TBase(context)
  744. , StructPtrType(PointerType::getUnqual(StructType::get(context))) {
  745. }
  746. };
  747. #endif
  748. template <typename TSetAccumulator>
  749. class TSqueezeSetFlowWrapper : public TStatefulFlowCodegeneratorNode<TSqueezeSetFlowWrapper<TSetAccumulator>> {
  750. using TBase = TStatefulFlowCodegeneratorNode<TSqueezeSetFlowWrapper<TSetAccumulator>>;
  751. public:
  752. class TState : public TComputationValue<TState> {
  753. using TBase = TComputationValue<TState>;
  754. public:
  755. TState(TMemoryUsageInfo* memInfo, TSetAccumulator&& setAccum)
  756. : TBase(memInfo), SetAccum(std::move(setAccum)) {}
  757. NUdf::TUnboxedValuePod Build() {
  758. return SetAccum.Build().Release();
  759. }
  760. void Insert(NUdf::TUnboxedValuePod value) {
  761. SetAccum.Add(value);
  762. }
  763. private:
  764. TSetAccumulator SetAccum;
  765. };
  766. TSqueezeSetFlowWrapper(TComputationMutables& mutables, TType* keyType,
  767. IComputationNode* flow, IComputationExternalNode* item, IComputationNode* key, ui64 itemsCountHint)
  768. : TBase(mutables, flow, EValueRepresentation::Boxed, EValueRepresentation::Any)
  769. , KeyType(keyType)
  770. , Flow(flow)
  771. , Item(item)
  772. , Key(key)
  773. , ItemsCountHint(itemsCountHint)
  774. {
  775. GetDictionaryKeyTypes(KeyType, KeyTypes, IsTuple, Encoded, UseIHash);
  776. Compare = UseIHash && TSetAccumulator::IsSorted ? MakeCompareImpl(KeyType) : nullptr;
  777. Equate = UseIHash ? MakeEquateImpl(KeyType) : nullptr;
  778. Hash = UseIHash && !TSetAccumulator::IsSorted ? MakeHashImpl(KeyType) : nullptr;
  779. }
  780. NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
  781. if (state.IsFinish()) {
  782. return state.Release();
  783. } else if (state.IsInvalid()) {
  784. MakeState(ctx, state);
  785. }
  786. while (const auto statePtr = static_cast<TState*>(state.AsBoxed().Get())) {
  787. if (auto item = Flow->GetValue(ctx); item.IsYield()) {
  788. return item.Release();
  789. } else if (item.IsFinish()) {
  790. const auto dict = statePtr->Build();
  791. state = std::move(item);
  792. return dict;
  793. } else {
  794. Item->SetValue(ctx, std::move(item));
  795. statePtr->Insert(Key->GetValue(ctx).Release());
  796. }
  797. }
  798. Y_UNREACHABLE();
  799. }
  800. #ifndef MKQL_DISABLE_CODEGEN
  801. Value* DoGenerateGetValue(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
  802. auto& context = ctx.Codegen.GetContext();
  803. const auto codegenItemArg = dynamic_cast<ICodegeneratorExternalNode*>(Item);
  804. MKQL_ENSURE(codegenItemArg, "Item must be codegenerator node.");
  805. const auto valueType = Type::getInt128Ty(context);
  806. TLLVMFieldsStructureStateWithAccum<TLLVMFieldsStructure<TComputationValue<TState>>> fieldsStruct(context);
  807. const auto stateType = StructType::get(context, fieldsStruct.GetFieldsArray());
  808. const auto statePtrType = PointerType::getUnqual(stateType);
  809. const auto make = BasicBlock::Create(context, "make", ctx.Func);
  810. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  811. BranchInst::Create(make, main, IsInvalid(statePtr, block), block);
  812. block = make;
  813. const auto ptrType = PointerType::getUnqual(StructType::get(context));
  814. const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
  815. const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TSqueezeSetFlowWrapper<TSetAccumulator>::MakeState));
  816. const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false);
  817. const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block);
  818. CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block);
  819. BranchInst::Create(main, block);
  820. block = main;
  821. const auto more = BasicBlock::Create(context, "more", ctx.Func);
  822. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  823. const auto plus = BasicBlock::Create(context, "plus", ctx.Func);
  824. const auto over = BasicBlock::Create(context, "over", ctx.Func);
  825. const auto result = PHINode::Create(valueType, 3U, "result", over);
  826. const auto state = new LoadInst(valueType, statePtr, "state", block);
  827. const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
  828. const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block);
  829. result->addIncoming(GetFinish(context), block);
  830. BranchInst::Create(over, more, IsFinish(state, block), block);
  831. block = more;
  832. const auto item = GetNodeValue(Flow, ctx, block);
  833. result->addIncoming(GetYield(context), block);
  834. const auto choise = SwitchInst::Create(item, plus, 2U, block);
  835. choise->addCase(GetFinish(context), done);
  836. choise->addCase(GetYield(context), over);
  837. block = plus;
  838. codegenItemArg->CreateSetValue(ctx, block, item);
  839. const auto key = GetNodeValue(Key, ctx, block);
  840. const auto insert = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Insert));
  841. const auto keyArg = WrapArgumentForWindows(key, ctx, block);
  842. const auto insType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), keyArg->getType()}, false);
  843. const auto insPtr = CastInst::Create(Instruction::IntToPtr, insert, PointerType::getUnqual(insType), "insert", block);
  844. CallInst::Create(insType, insPtr, {stateArg, keyArg}, "", block);
  845. BranchInst::Create(more, block);
  846. block = done;
  847. const auto build = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Build));
  848. if (NYql::NCodegen::ETarget::Windows != ctx.Codegen.GetEffectiveTarget()) {
  849. const auto funType = FunctionType::get(valueType, {stateArg->getType()}, false);
  850. const auto funcPtr = CastInst::Create(Instruction::IntToPtr, build, PointerType::getUnqual(funType), "build", block);
  851. const auto dict = CallInst::Create(funType, funcPtr, {stateArg}, "dict", block);
  852. UnRefBoxed(state, ctx, block);
  853. result->addIncoming(dict, block);
  854. } else {
  855. const auto ptr = new AllocaInst(valueType, 0U, "ptr", block);
  856. const auto funType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), ptr->getType()}, false);
  857. const auto funcPtr = CastInst::Create(Instruction::IntToPtr, build, PointerType::getUnqual(funType), "build", block);
  858. CallInst::Create(funType, funcPtr, {stateArg, ptr}, "", block);
  859. const auto dict = new LoadInst(valueType, ptr, "dict", block);
  860. UnRefBoxed(state, ctx, block);
  861. result->addIncoming(dict, block);
  862. }
  863. new StoreInst(item, statePtr, block);
  864. BranchInst::Create(over, block);
  865. block = over;
  866. return result;
  867. }
  868. #endif
  869. private:
  870. void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
  871. state = ctx.HolderFactory.Create<TState>(TSetAccumulator(KeyType, KeyTypes, IsTuple, Encoded,
  872. Compare.Get(), Equate.Get(), Hash.Get(), ctx, ItemsCountHint));
  873. }
  874. void RegisterDependencies() const final {
  875. if (const auto flow = this->FlowDependsOn(Flow)) {
  876. this->Own(flow, Item);
  877. this->DependsOn(flow, Key);
  878. }
  879. }
  880. TType* const KeyType;
  881. IComputationNode* const Flow;
  882. IComputationExternalNode* const Item;
  883. IComputationNode* const Key;
  884. const ui64 ItemsCountHint;
  885. TKeyTypes KeyTypes;
  886. bool IsTuple;
  887. bool Encoded;
  888. bool UseIHash;
  889. NUdf::ICompare::TPtr Compare;
  890. NUdf::IEquate::TPtr Equate;
  891. NUdf::IHash::TPtr Hash;
  892. };
  893. template <typename TSetAccumulator>
  894. class TSqueezeSetWideWrapper : public TStatefulFlowCodegeneratorNode<TSqueezeSetWideWrapper<TSetAccumulator>> {
  895. using TBase = TStatefulFlowCodegeneratorNode<TSqueezeSetWideWrapper<TSetAccumulator>>;
  896. public:
  897. class TState : public TComputationValue<TState> {
  898. using TBase = TComputationValue<TState>;
  899. public:
  900. TState(TMemoryUsageInfo* memInfo, TSetAccumulator&& setAccum)
  901. : TBase(memInfo), SetAccum(std::move(setAccum)) {}
  902. NUdf::TUnboxedValuePod Build() {
  903. return SetAccum.Build().Release();
  904. }
  905. void Insert(NUdf::TUnboxedValuePod value) {
  906. SetAccum.Add(value);
  907. }
  908. private:
  909. TSetAccumulator SetAccum;
  910. };
  911. TSqueezeSetWideWrapper(TComputationMutables& mutables, TType* keyType,
  912. IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* key, ui64 itemsCountHint)
  913. : TBase(mutables, flow, EValueRepresentation::Boxed, EValueRepresentation::Any)
  914. , KeyType(keyType)
  915. , Flow(flow)
  916. , Items(std::move(items))
  917. , Key(key)
  918. , ItemsCountHint(itemsCountHint)
  919. , PasstroughKey(GetPasstroughtMap(TComputationNodePtrVector{Key}, Items).front())
  920. , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size()))
  921. {
  922. GetDictionaryKeyTypes(KeyType, KeyTypes, IsTuple, Encoded, UseIHash);
  923. Compare = UseIHash && TSetAccumulator::IsSorted ? MakeCompareImpl(KeyType) : nullptr;
  924. Equate = UseIHash ? MakeEquateImpl(KeyType) : nullptr;
  925. Hash = UseIHash && !TSetAccumulator::IsSorted ? MakeHashImpl(KeyType) : nullptr;
  926. }
  927. NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
  928. if (state.IsFinish()) {
  929. return state.Release();
  930. } else if (state.IsInvalid()) {
  931. MakeState(ctx, state);
  932. }
  933. auto** fields = ctx.WideFields.data() + WideFieldsIndex;
  934. while (const auto statePtr = static_cast<TState*>(state.AsBoxed().Get())) {
  935. for (auto i = 0U; i < Items.size(); ++i)
  936. if (Key == Items[i] || Items[i]->GetDependencesCount() > 0U)
  937. fields[i] = &Items[i]->RefValue(ctx);
  938. switch (const auto result = Flow->FetchValues(ctx, fields)) {
  939. case EFetchResult::One:
  940. statePtr->Insert(Key->GetValue(ctx).Release());
  941. continue;
  942. case EFetchResult::Yield:
  943. return NUdf::TUnboxedValuePod::MakeYield();
  944. case EFetchResult::Finish: {
  945. const auto dict = statePtr->Build();
  946. state = NUdf::TUnboxedValuePod::MakeFinish();
  947. return dict;
  948. }
  949. }
  950. }
  951. Y_UNREACHABLE();
  952. }
  953. #ifndef MKQL_DISABLE_CODEGEN
  954. Value* DoGenerateGetValue(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
  955. auto& context = ctx.Codegen.GetContext();
  956. const auto valueType = Type::getInt128Ty(context);
  957. TLLVMFieldsStructureStateWithAccum<TLLVMFieldsStructure<TComputationValue<TState>>> fieldsStruct(context);
  958. const auto stateType = StructType::get(context, fieldsStruct.GetFieldsArray());
  959. const auto statePtrType = PointerType::getUnqual(stateType);
  960. const auto make = BasicBlock::Create(context, "make", ctx.Func);
  961. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  962. BranchInst::Create(make, main, IsInvalid(statePtr, block), block);
  963. block = make;
  964. const auto ptrType = PointerType::getUnqual(StructType::get(context));
  965. const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
  966. const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TSqueezeSetWideWrapper<TSetAccumulator>::MakeState));
  967. const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false);
  968. const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block);
  969. CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block);
  970. BranchInst::Create(main, block);
  971. block = main;
  972. const auto more = BasicBlock::Create(context, "more", ctx.Func);
  973. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  974. const auto plus = BasicBlock::Create(context, "plus", ctx.Func);
  975. const auto over = BasicBlock::Create(context, "over", ctx.Func);
  976. const auto result = PHINode::Create(valueType, 3U, "result", over);
  977. const auto state = new LoadInst(valueType, statePtr, "state", block);
  978. const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
  979. const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block);
  980. result->addIncoming(GetFinish(context), block);
  981. BranchInst::Create(over, more, IsFinish(state, block), block);
  982. block = more;
  983. const auto getres = GetNodeValues(Flow, ctx, block);
  984. result->addIncoming(GetYield(context), block);
  985. const auto action = SwitchInst::Create(getres.first, plus, 2U, block);
  986. action->addCase(ConstantInt::get(Type::getInt32Ty(context), i32(EFetchResult::Finish)), done);
  987. action->addCase(ConstantInt::get(Type::getInt32Ty(context), i32(EFetchResult::Yield)), over);
  988. block = plus;
  989. if (!PasstroughKey) {
  990. for (auto i = 0U; i < Items.size(); ++i)
  991. if (Items[i]->GetDependencesCount() > 0U)
  992. EnsureDynamicCast<ICodegeneratorExternalNode*>(Items[i])->CreateSetValue(ctx, block, getres.second[i](ctx, block));
  993. }
  994. const auto key = PasstroughKey ? getres.second[*PasstroughKey](ctx, block) : GetNodeValue(Key, ctx, block);
  995. const auto insert = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Insert));
  996. const auto keyArg = WrapArgumentForWindows(key, ctx, block);
  997. const auto insType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), keyArg->getType()}, false);
  998. const auto insPtr = CastInst::Create(Instruction::IntToPtr, insert, PointerType::getUnqual(insType), "insert", block);
  999. CallInst::Create(insType, insPtr, {stateArg, keyArg}, "", block);
  1000. BranchInst::Create(more, block);
  1001. block = done;
  1002. const auto build = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Build));
  1003. if (NYql::NCodegen::ETarget::Windows != ctx.Codegen.GetEffectiveTarget()) {
  1004. const auto funType = FunctionType::get(valueType, {stateArg->getType()}, false);
  1005. const auto funcPtr = CastInst::Create(Instruction::IntToPtr, build, PointerType::getUnqual(funType), "build", block);
  1006. const auto dict = CallInst::Create(funType, funcPtr, {stateArg}, "dict", block);
  1007. UnRefBoxed(state, ctx, block);
  1008. result->addIncoming(dict, block);
  1009. } else {
  1010. const auto ptr = new AllocaInst(valueType, 0U, "ptr", block);
  1011. const auto funType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), ptr->getType()}, false);
  1012. const auto funcPtr = CastInst::Create(Instruction::IntToPtr, build, PointerType::getUnqual(funType), "build", block);
  1013. CallInst::Create(funType, funcPtr, {stateArg, ptr}, "", block);
  1014. const auto dict = new LoadInst(valueType, ptr, "dict", block);
  1015. UnRefBoxed(state, ctx, block);
  1016. result->addIncoming(dict, block);
  1017. }
  1018. new StoreInst(GetFinish(context), statePtr, block);
  1019. BranchInst::Create(over, block);
  1020. block = over;
  1021. return result;
  1022. }
  1023. #endif
  1024. private:
  1025. void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
  1026. state = ctx.HolderFactory.Create<TState>(TSetAccumulator(KeyType, KeyTypes, IsTuple, Encoded,
  1027. Compare.Get(), Equate.Get(), Hash.Get(), ctx, ItemsCountHint));
  1028. }
  1029. void RegisterDependencies() const final {
  1030. if (const auto flow = this->FlowDependsOn(Flow)) {
  1031. std::for_each(Items.cbegin(), Items.cend(), std::bind(&TSqueezeSetWideWrapper::Own, flow, std::placeholders::_1));
  1032. this->DependsOn(flow, Key);
  1033. }
  1034. }
  1035. TType* const KeyType;
  1036. IComputationWideFlowNode* const Flow;
  1037. const TComputationExternalNodePtrVector Items;
  1038. IComputationNode* const Key;
  1039. const ui64 ItemsCountHint;
  1040. TKeyTypes KeyTypes;
  1041. bool IsTuple;
  1042. bool Encoded;
  1043. bool UseIHash;
  1044. const std::optional<size_t> PasstroughKey;
  1045. const ui32 WideFieldsIndex;
  1046. NUdf::ICompare::TPtr Compare;
  1047. NUdf::IEquate::TPtr Equate;
  1048. NUdf::IHash::TPtr Hash;
  1049. };
  1050. template <typename TMapAccumulator, bool IsStream>
  1051. class TMapWrapper : public TMutableComputationNode<TMapWrapper<TMapAccumulator, IsStream>> {
  1052. typedef TMutableComputationNode<TMapWrapper<TMapAccumulator, IsStream>> TBaseComputation;
  1053. public:
  1054. class TStreamValue : public TComputationValue<TStreamValue> {
  1055. public:
  1056. TStreamValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& input, IComputationExternalNode* const item,
  1057. IComputationNode* const key, IComputationNode* const payload, TMapAccumulator&& mapAccum, TComputationContext& ctx)
  1058. : TComputationValue<TStreamValue>(memInfo)
  1059. , Input(std::move(input))
  1060. , Item(item)
  1061. , Key(key)
  1062. , Payload(payload)
  1063. , MapAccum(std::move(mapAccum))
  1064. , Ctx(ctx) {}
  1065. private:
  1066. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
  1067. if (Finished) {
  1068. return NUdf::EFetchStatus::Finish;
  1069. }
  1070. for (;;) {
  1071. NUdf::TUnboxedValue item;
  1072. switch (auto status = Input.Fetch(item)) {
  1073. case NUdf::EFetchStatus::Ok: {
  1074. Item->SetValue(Ctx, std::move(item));
  1075. MapAccum.Add(Key->GetValue(Ctx), Payload->GetValue(Ctx));
  1076. break; // and continue
  1077. }
  1078. case NUdf::EFetchStatus::Finish: {
  1079. result = MapAccum.Build();
  1080. Finished = true;
  1081. return NUdf::EFetchStatus::Ok;
  1082. }
  1083. case NUdf::EFetchStatus::Yield: {
  1084. return NUdf::EFetchStatus::Yield;
  1085. }
  1086. }
  1087. }
  1088. }
  1089. NUdf::TUnboxedValue Input;
  1090. IComputationExternalNode* const Item;
  1091. IComputationNode* const Key;
  1092. IComputationNode* const Payload;
  1093. TMapAccumulator MapAccum;
  1094. TComputationContext& Ctx;
  1095. bool Finished = false;
  1096. };
  1097. TMapWrapper(TComputationMutables& mutables, TType* keyType, TType* payloadType, IComputationNode* list, IComputationExternalNode* item,
  1098. IComputationNode* key, IComputationNode* payload, ui64 itemsCountHint)
  1099. : TBaseComputation(mutables, EValueRepresentation::Boxed)
  1100. , KeyType(keyType)
  1101. , PayloadType(payloadType)
  1102. , List(list)
  1103. , Item(item)
  1104. , Key(key)
  1105. , Payload(payload)
  1106. , ItemsCountHint(itemsCountHint)
  1107. {
  1108. GetDictionaryKeyTypes(KeyType, KeyTypes, IsTuple, Encoded, UseIHash);
  1109. Compare = UseIHash && TMapAccumulator::IsSorted ? MakeCompareImpl(KeyType) : nullptr;
  1110. Equate = UseIHash ? MakeEquateImpl(KeyType) : nullptr;
  1111. Hash = UseIHash && !TMapAccumulator::IsSorted ? MakeHashImpl(KeyType) : nullptr;
  1112. }
  1113. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  1114. if constexpr (IsStream) {
  1115. return ctx.HolderFactory.Create<TStreamValue>(List->GetValue(ctx), Item, Key, Payload,
  1116. TMapAccumulator(KeyType, PayloadType, KeyTypes, IsTuple, Encoded, Compare.Get(), Equate.Get(), Hash.Get(),
  1117. ctx, ItemsCountHint), ctx);
  1118. }
  1119. const auto& list = List->GetValue(ctx);
  1120. auto itemsCountHint = ItemsCountHint;
  1121. if (list.HasFastListLength()) {
  1122. if (const auto size = list.GetListLength())
  1123. itemsCountHint = size;
  1124. else
  1125. return ctx.HolderFactory.GetEmptyContainerLazy();
  1126. }
  1127. TMapAccumulator accumulator(KeyType, PayloadType, KeyTypes, IsTuple, Encoded,
  1128. Compare.Get(), Equate.Get(), Hash.Get(), ctx, itemsCountHint);
  1129. TThresher<false>::DoForEachItem(list,
  1130. [this, &accumulator, &ctx] (NUdf::TUnboxedValue&& item) {
  1131. Item->SetValue(ctx, std::move(item));
  1132. accumulator.Add(Key->GetValue(ctx), Payload->GetValue(ctx));
  1133. }
  1134. );
  1135. return accumulator.Build().Release();
  1136. }
  1137. private:
  1138. void RegisterDependencies() const final {
  1139. this->DependsOn(List);
  1140. this->Own(Item);
  1141. this->DependsOn(Key);
  1142. this->DependsOn(Payload);
  1143. }
  1144. TType* const KeyType;
  1145. TType* PayloadType;
  1146. IComputationNode* const List;
  1147. IComputationExternalNode* const Item;
  1148. IComputationNode* const Key;
  1149. IComputationNode* const Payload;
  1150. const ui64 ItemsCountHint;
  1151. TKeyTypes KeyTypes;
  1152. bool IsTuple;
  1153. bool Encoded;
  1154. bool UseIHash;
  1155. NUdf::ICompare::TPtr Compare;
  1156. NUdf::IEquate::TPtr Equate;
  1157. NUdf::IHash::TPtr Hash;
  1158. };
  1159. template <typename TMapAccumulator>
  1160. class TSqueezeMapFlowWrapper : public TStatefulFlowCodegeneratorNode<TSqueezeMapFlowWrapper<TMapAccumulator>> {
  1161. using TBase = TStatefulFlowCodegeneratorNode<TSqueezeMapFlowWrapper<TMapAccumulator>>;
  1162. public:
  1163. class TState : public TComputationValue<TState> {
  1164. using TBase = TComputationValue<TState>;
  1165. public:
  1166. TState(TMemoryUsageInfo* memInfo, TMapAccumulator&& mapAccum)
  1167. : TBase(memInfo), MapAccum(std::move(mapAccum)) {}
  1168. NUdf::TUnboxedValuePod Build() {
  1169. return MapAccum.Build().Release();
  1170. }
  1171. void Insert(NUdf::TUnboxedValuePod key, NUdf::TUnboxedValuePod value) {
  1172. MapAccum.Add(key, value);
  1173. }
  1174. private:
  1175. TMapAccumulator MapAccum;
  1176. };
  1177. TSqueezeMapFlowWrapper(TComputationMutables& mutables, TType* keyType, TType* payloadType,
  1178. IComputationNode* flow, IComputationExternalNode* item, IComputationNode* key, IComputationNode* payload,
  1179. ui64 itemsCountHint)
  1180. : TBase(mutables, flow, EValueRepresentation::Boxed, EValueRepresentation::Any)
  1181. , KeyType(keyType)
  1182. , PayloadType(payloadType)
  1183. , Flow(flow)
  1184. , Item(item)
  1185. , Key(key)
  1186. , Payload(payload)
  1187. , ItemsCountHint(itemsCountHint)
  1188. {
  1189. GetDictionaryKeyTypes(KeyType, KeyTypes, IsTuple, Encoded, UseIHash);
  1190. Compare = UseIHash && TMapAccumulator::IsSorted ? MakeCompareImpl(KeyType) : nullptr;
  1191. Equate = UseIHash ? MakeEquateImpl(KeyType) : nullptr;
  1192. Hash = UseIHash && !TMapAccumulator::IsSorted ? MakeHashImpl(KeyType) : nullptr;
  1193. }
  1194. NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
  1195. if (state.IsFinish()) {
  1196. return state;
  1197. } else if (state.IsInvalid()) {
  1198. MakeState(ctx, state);
  1199. }
  1200. while (const auto statePtr = static_cast<TState*>(state.AsBoxed().Get())) {
  1201. if (auto item = Flow->GetValue(ctx); item.IsYield()) {
  1202. return item.Release();
  1203. } else if (item.IsFinish()) {
  1204. const auto dict = statePtr->Build();
  1205. state = std::move(item);
  1206. return dict;
  1207. } else {
  1208. Item->SetValue(ctx, std::move(item));
  1209. statePtr->Insert(Key->GetValue(ctx).Release(), Payload->GetValue(ctx).Release());
  1210. }
  1211. }
  1212. Y_UNREACHABLE();
  1213. }
  1214. #ifndef MKQL_DISABLE_CODEGEN
  1215. Value* DoGenerateGetValue(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
  1216. auto& context = ctx.Codegen.GetContext();
  1217. const auto codegenItemArg = dynamic_cast<ICodegeneratorExternalNode*>(Item);
  1218. MKQL_ENSURE(codegenItemArg, "Item must be codegenerator node.");
  1219. const auto valueType = Type::getInt128Ty(context);
  1220. TLLVMFieldsStructureStateWithAccum<TLLVMFieldsStructure<TComputationValue<TState>>> fieldsStruct(context);
  1221. const auto stateType = StructType::get(context, fieldsStruct.GetFieldsArray());
  1222. const auto statePtrType = PointerType::getUnqual(stateType);
  1223. const auto make = BasicBlock::Create(context, "make", ctx.Func);
  1224. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  1225. BranchInst::Create(make, main, IsInvalid(statePtr, block), block);
  1226. block = make;
  1227. const auto ptrType = PointerType::getUnqual(StructType::get(context));
  1228. const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
  1229. const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TSqueezeMapFlowWrapper<TMapAccumulator>::MakeState));
  1230. const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false);
  1231. const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block);
  1232. CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block);
  1233. BranchInst::Create(main, block);
  1234. block = main;
  1235. const auto more = BasicBlock::Create(context, "more", ctx.Func);
  1236. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  1237. const auto plus = BasicBlock::Create(context, "plus", ctx.Func);
  1238. const auto over = BasicBlock::Create(context, "over", ctx.Func);
  1239. const auto result = PHINode::Create(valueType, 3U, "result", over);
  1240. const auto state = new LoadInst(valueType, statePtr, "state", block);
  1241. const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
  1242. const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block);
  1243. result->addIncoming(GetFinish(context), block);
  1244. BranchInst::Create(over, more, IsFinish(state, block), block);
  1245. block = more;
  1246. const auto item = GetNodeValue(Flow, ctx, block);
  1247. result->addIncoming(GetYield(context), block);
  1248. const auto choise = SwitchInst::Create(item, plus, 2U, block);
  1249. choise->addCase(GetFinish(context), done);
  1250. choise->addCase(GetYield(context), over);
  1251. block = plus;
  1252. codegenItemArg->CreateSetValue(ctx, block, item);
  1253. const auto key = GetNodeValue(Key, ctx, block);
  1254. const auto payload = GetNodeValue(Payload, ctx, block);
  1255. const auto insert = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Insert));
  1256. const auto keyArg = WrapArgumentForWindows(key, ctx, block);
  1257. const auto payloadArg = WrapArgumentForWindows(payload, ctx, block);
  1258. const auto insType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), keyArg->getType(), payloadArg->getType()}, false);
  1259. const auto insPtr = CastInst::Create(Instruction::IntToPtr, insert, PointerType::getUnqual(insType), "insert", block);
  1260. CallInst::Create(insType, insPtr, {stateArg, keyArg, payloadArg}, "", block);
  1261. BranchInst::Create(more, block);
  1262. block = done;
  1263. const auto build = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Build));
  1264. if (NYql::NCodegen::ETarget::Windows != ctx.Codegen.GetEffectiveTarget()) {
  1265. const auto funType = FunctionType::get(valueType, {stateArg->getType()}, false);
  1266. const auto funcPtr = CastInst::Create(Instruction::IntToPtr, build, PointerType::getUnqual(funType), "build", block);
  1267. const auto dict = CallInst::Create(funType, funcPtr, {stateArg}, "dict", block);
  1268. UnRefBoxed(state, ctx, block);
  1269. result->addIncoming(dict, block);
  1270. } else {
  1271. const auto ptr = new AllocaInst(valueType, 0U, "ptr", block);
  1272. const auto funType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), ptr->getType()}, false);
  1273. const auto funcPtr = CastInst::Create(Instruction::IntToPtr, build, PointerType::getUnqual(funType), "build", block);
  1274. CallInst::Create(funType, funcPtr, {stateArg, ptr}, "", block);
  1275. const auto dict = new LoadInst(valueType, ptr, "dict", block);
  1276. UnRefBoxed(state, ctx, block);
  1277. result->addIncoming(dict, block);
  1278. }
  1279. new StoreInst(item, statePtr, block);
  1280. BranchInst::Create(over, block);
  1281. block = over;
  1282. return result;
  1283. }
  1284. #endif
  1285. private:
  1286. void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
  1287. state = ctx.HolderFactory.Create<TState>(TMapAccumulator(KeyType, PayloadType, KeyTypes, IsTuple, Encoded,
  1288. Compare.Get(), Equate.Get(), Hash.Get(), ctx, ItemsCountHint));
  1289. }
  1290. void RegisterDependencies() const final {
  1291. if (const auto flow = this->FlowDependsOn(Flow)) {
  1292. this->Own(flow, Item);
  1293. this->DependsOn(flow, Key);
  1294. this->DependsOn(flow, Payload);
  1295. }
  1296. }
  1297. TType* const KeyType;
  1298. TType* PayloadType;
  1299. IComputationNode* const Flow;
  1300. IComputationExternalNode* const Item;
  1301. IComputationNode* const Key;
  1302. IComputationNode* const Payload;
  1303. const ui64 ItemsCountHint;
  1304. TKeyTypes KeyTypes;
  1305. bool IsTuple;
  1306. bool Encoded;
  1307. bool UseIHash;
  1308. NUdf::ICompare::TPtr Compare;
  1309. NUdf::IEquate::TPtr Equate;
  1310. NUdf::IHash::TPtr Hash;
  1311. };
  1312. template <typename TMapAccumulator>
  1313. class TSqueezeMapWideWrapper : public TStatefulFlowCodegeneratorNode<TSqueezeMapWideWrapper<TMapAccumulator>> {
  1314. using TBase = TStatefulFlowCodegeneratorNode<TSqueezeMapWideWrapper<TMapAccumulator>>;
  1315. public:
  1316. class TState : public TComputationValue<TState> {
  1317. using TBase = TComputationValue<TState>;
  1318. public:
  1319. TState(TMemoryUsageInfo* memInfo, TMapAccumulator&& mapAccum)
  1320. : TBase(memInfo), MapAccum(std::move(mapAccum)) {}
  1321. NUdf::TUnboxedValuePod Build() {
  1322. return MapAccum.Build().Release();
  1323. }
  1324. void Insert(NUdf::TUnboxedValuePod key, NUdf::TUnboxedValuePod value) {
  1325. MapAccum.Add(key, value);
  1326. }
  1327. private:
  1328. TMapAccumulator MapAccum;
  1329. };
  1330. TSqueezeMapWideWrapper(TComputationMutables& mutables, TType* keyType, TType* payloadType,
  1331. IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* key, IComputationNode* payload,
  1332. ui64 itemsCountHint)
  1333. : TBase(mutables, flow, EValueRepresentation::Boxed, EValueRepresentation::Any)
  1334. , KeyType(keyType)
  1335. , PayloadType(payloadType)
  1336. , Flow(flow)
  1337. , Items(std::move(items))
  1338. , Key(key)
  1339. , Payload(payload)
  1340. , ItemsCountHint(itemsCountHint)
  1341. , PasstroughKey(GetPasstroughtMap(TComputationNodePtrVector{Key}, Items).front())
  1342. , PasstroughPayload(GetPasstroughtMap(TComputationNodePtrVector{Payload}, Items).front())
  1343. , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size()))
  1344. {
  1345. GetDictionaryKeyTypes(KeyType, KeyTypes, IsTuple, Encoded, UseIHash);
  1346. Compare = UseIHash && TMapAccumulator::IsSorted ? MakeCompareImpl(KeyType) : nullptr;
  1347. Equate = UseIHash ? MakeEquateImpl(KeyType) : nullptr;
  1348. Hash = UseIHash && !TMapAccumulator::IsSorted ? MakeHashImpl(KeyType) : nullptr;
  1349. }
  1350. NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
  1351. if (state.IsFinish()) {
  1352. return state;
  1353. } else if (state.IsInvalid()) {
  1354. MakeState(ctx, state);
  1355. }
  1356. auto** fields = ctx.WideFields.data() + WideFieldsIndex;
  1357. while (const auto statePtr = static_cast<TState*>(state.AsBoxed().Get())) {
  1358. for (auto i = 0U; i < Items.size(); ++i)
  1359. if (Key == Items[i] || Payload == Items[i] || Items[i]->GetDependencesCount() > 0U)
  1360. fields[i] = &Items[i]->RefValue(ctx);
  1361. switch (const auto result = Flow->FetchValues(ctx, fields)) {
  1362. case EFetchResult::One:
  1363. statePtr->Insert(Key->GetValue(ctx).Release(), Payload->GetValue(ctx).Release());
  1364. continue;
  1365. case EFetchResult::Yield:
  1366. return NUdf::TUnboxedValuePod::MakeYield();
  1367. case EFetchResult::Finish: {
  1368. const auto dict = statePtr->Build();
  1369. state = NUdf::TUnboxedValuePod::MakeFinish();
  1370. return dict;
  1371. }
  1372. }
  1373. }
  1374. Y_UNREACHABLE();
  1375. }
  1376. #ifndef MKQL_DISABLE_CODEGEN
  1377. Value* DoGenerateGetValue(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
  1378. auto& context = ctx.Codegen.GetContext();
  1379. const auto valueType = Type::getInt128Ty(context);
  1380. TLLVMFieldsStructureStateWithAccum<TLLVMFieldsStructure<TComputationValue<TState>>> fieldsStruct(context);
  1381. const auto stateType = StructType::get(context, fieldsStruct.GetFieldsArray());
  1382. const auto statePtrType = PointerType::getUnqual(stateType);
  1383. const auto make = BasicBlock::Create(context, "make", ctx.Func);
  1384. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  1385. BranchInst::Create(make, main, IsInvalid(statePtr, block), block);
  1386. block = make;
  1387. const auto ptrType = PointerType::getUnqual(StructType::get(context));
  1388. const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
  1389. const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TSqueezeMapWideWrapper<TMapAccumulator>::MakeState));
  1390. const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false);
  1391. const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block);
  1392. CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block);
  1393. BranchInst::Create(main, block);
  1394. block = main;
  1395. const auto more = BasicBlock::Create(context, "more", ctx.Func);
  1396. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  1397. const auto plus = BasicBlock::Create(context, "plus", ctx.Func);
  1398. const auto over = BasicBlock::Create(context, "over", ctx.Func);
  1399. const auto result = PHINode::Create(valueType, 3U, "result", over);
  1400. const auto state = new LoadInst(valueType, statePtr, "state", block);
  1401. const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
  1402. const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block);
  1403. result->addIncoming(GetFinish(context), block);
  1404. BranchInst::Create(over, more, IsFinish(state, block), block);
  1405. block = more;
  1406. const auto getres = GetNodeValues(Flow, ctx, block);
  1407. result->addIncoming(GetYield(context), block);
  1408. const auto action = SwitchInst::Create(getres.first, plus, 2U, block);
  1409. action->addCase(ConstantInt::get(Type::getInt32Ty(context), i32(EFetchResult::Finish)), done);
  1410. action->addCase(ConstantInt::get(Type::getInt32Ty(context), i32(EFetchResult::Yield)), over);
  1411. block = plus;
  1412. if (!(PasstroughKey && PasstroughPayload)) {
  1413. for (auto i = 0U; i < Items.size(); ++i)
  1414. if (Items[i]->GetDependencesCount() > 0U)
  1415. EnsureDynamicCast<ICodegeneratorExternalNode*>(Items[i])->CreateSetValue(ctx, block, getres.second[i](ctx, block));
  1416. }
  1417. const auto key = PasstroughKey ? getres.second[*PasstroughKey](ctx, block) : GetNodeValue(Key, ctx, block);
  1418. const auto payload = PasstroughPayload ? getres.second[*PasstroughPayload](ctx, block) : GetNodeValue(Payload, ctx, block);
  1419. const auto insert = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Insert));
  1420. const auto keyArg = WrapArgumentForWindows(key, ctx, block);
  1421. const auto payloadArg = WrapArgumentForWindows(payload, ctx, block);
  1422. const auto insType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), keyArg->getType(), payloadArg->getType()}, false);
  1423. const auto insPtr = CastInst::Create(Instruction::IntToPtr, insert, PointerType::getUnqual(insType), "insert", block);
  1424. CallInst::Create(insType, insPtr, {stateArg, keyArg, payloadArg}, "", block);
  1425. BranchInst::Create(more, block);
  1426. block = done;
  1427. const auto build = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Build));
  1428. if (NYql::NCodegen::ETarget::Windows != ctx.Codegen.GetEffectiveTarget()) {
  1429. const auto funType = FunctionType::get(valueType, {stateArg->getType()}, false);
  1430. const auto funcPtr = CastInst::Create(Instruction::IntToPtr, build, PointerType::getUnqual(funType), "build", block);
  1431. const auto dict = CallInst::Create(funType, funcPtr, {stateArg}, "dict", block);
  1432. UnRefBoxed(state, ctx, block);
  1433. result->addIncoming(dict, block);
  1434. } else {
  1435. const auto ptr = new AllocaInst(valueType, 0U, "ptr", block);
  1436. const auto funType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), ptr->getType()}, false);
  1437. const auto funcPtr = CastInst::Create(Instruction::IntToPtr, build, PointerType::getUnqual(funType), "build", block);
  1438. CallInst::Create(funType, funcPtr, {stateArg, ptr}, "", block);
  1439. const auto dict = new LoadInst(valueType, ptr, "dict", block);
  1440. UnRefBoxed(state, ctx, block);
  1441. result->addIncoming(dict, block);
  1442. }
  1443. new StoreInst(GetFinish(context), statePtr, block);
  1444. BranchInst::Create(over, block);
  1445. block = over;
  1446. return result;
  1447. }
  1448. #endif
  1449. private:
  1450. void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
  1451. state = ctx.HolderFactory.Create<TState>(TMapAccumulator(KeyType, PayloadType, KeyTypes, IsTuple, Encoded,
  1452. Compare.Get(), Equate.Get(), Hash.Get(), ctx, ItemsCountHint));
  1453. }
  1454. void RegisterDependencies() const final {
  1455. if (const auto flow = this->FlowDependsOn(Flow)) {
  1456. std::for_each(Items.cbegin(), Items.cend(), std::bind(&TSqueezeMapWideWrapper::Own, flow, std::placeholders::_1));
  1457. this->DependsOn(flow, Key);
  1458. this->DependsOn(flow, Payload);
  1459. }
  1460. }
  1461. TType* const KeyType;
  1462. TType* PayloadType;
  1463. IComputationWideFlowNode* const Flow;
  1464. const TComputationExternalNodePtrVector Items;
  1465. IComputationNode* const Key;
  1466. IComputationNode* const Payload;
  1467. const ui64 ItemsCountHint;
  1468. TKeyTypes KeyTypes;
  1469. bool IsTuple;
  1470. bool Encoded;
  1471. bool UseIHash;
  1472. const std::optional<size_t> PasstroughKey;
  1473. const std::optional<size_t> PasstroughPayload;
  1474. mutable std::vector<NUdf::TUnboxedValue*> Fields;
  1475. const ui32 WideFieldsIndex;
  1476. NUdf::ICompare::TPtr Compare;
  1477. NUdf::IEquate::TPtr Equate;
  1478. NUdf::IHash::TPtr Hash;
  1479. };
  1480. template <typename TAccumulator>
  1481. IComputationNode* WrapToSet(TCallable& callable, const TNodeLocator& nodeLocator, TComputationMutables& mutables) {
  1482. const auto keyType = callable.GetInput(callable.GetInputsCount() - 5U).GetStaticType();
  1483. const auto itemsCountHint = AS_VALUE(TDataLiteral, callable.GetInput(callable.GetInputsCount() - 1U))->AsValue().Get<ui64>();
  1484. const auto flow = LocateNode(nodeLocator, callable, 0U);
  1485. const auto keySelector = LocateNode(nodeLocator, callable, callable.GetInputsCount() - 5U);
  1486. if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) {
  1487. const auto width = callable.GetInputsCount() - 6U;
  1488. TComputationExternalNodePtrVector args(width, nullptr);
  1489. auto index = 0U;
  1490. std::generate_n(args.begin(), width, [&](){ return LocateExternalNode(nodeLocator, callable, ++index); });
  1491. return new TSqueezeSetWideWrapper<TAccumulator>(mutables, keyType, wide, std::move(args), keySelector, itemsCountHint);
  1492. }
  1493. const auto itemArg = LocateExternalNode(nodeLocator, callable, 1U);
  1494. const auto type = callable.GetInput(0U).GetStaticType();
  1495. if (type->IsList()) {
  1496. return new TSetWrapper<TAccumulator, false>(mutables, keyType, flow, itemArg, keySelector, itemsCountHint);
  1497. }
  1498. if (type->IsFlow()) {
  1499. return new TSqueezeSetFlowWrapper<TAccumulator>(mutables, keyType, flow, itemArg, keySelector, itemsCountHint);
  1500. }
  1501. if (type->IsStream()) {
  1502. return new TSetWrapper<TAccumulator, true>(mutables, keyType, flow, itemArg, keySelector, itemsCountHint);
  1503. }
  1504. THROW yexception() << "Expected list, flow or stream.";
  1505. }
  1506. template <typename TAccumulator>
  1507. IComputationNode* WrapToMap(TCallable& callable, const TNodeLocator& nodeLocator, TComputationMutables& mutables) {
  1508. const auto keyType = callable.GetInput(callable.GetInputsCount() - 5U).GetStaticType();
  1509. const auto payloadType = callable.GetInput(callable.GetInputsCount() - 4U).GetStaticType();
  1510. const auto itemsCountHint = AS_VALUE(TDataLiteral, callable.GetInput(callable.GetInputsCount() - 1U))->AsValue().Get<ui64>();
  1511. const auto flow = LocateNode(nodeLocator, callable, 0U);
  1512. const auto keySelector = LocateNode(nodeLocator, callable, callable.GetInputsCount() - 5U);
  1513. const auto payloadSelector = LocateNode(nodeLocator, callable, callable.GetInputsCount() - 4U);
  1514. if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) {
  1515. const auto width = callable.GetInputsCount() - 6U;
  1516. TComputationExternalNodePtrVector args(width, nullptr);
  1517. auto index = 0U;
  1518. std::generate(args.begin(), args.end(), [&](){ return LocateExternalNode(nodeLocator, callable, ++index); });
  1519. return new TSqueezeMapWideWrapper<TAccumulator>(mutables, keyType, payloadType, wide, std::move(args), keySelector, payloadSelector, itemsCountHint);
  1520. }
  1521. const auto itemArg = LocateExternalNode(nodeLocator, callable, 1U);
  1522. const auto type = callable.GetInput(0U).GetStaticType();
  1523. if (type->IsList()) {
  1524. return new TMapWrapper<TAccumulator, false>(mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint);
  1525. }
  1526. if (type->IsFlow()) {
  1527. return new TSqueezeMapFlowWrapper<TAccumulator>(mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint);
  1528. }
  1529. if (type->IsStream()) {
  1530. return new TMapWrapper<TAccumulator, true>(mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint);
  1531. }
  1532. THROW yexception() << "Expected list, flow or stream.";
  1533. }
  1534. template <bool IsList>
  1535. IComputationNode* WrapToSortedDictInternal(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  1536. MKQL_ENSURE(callable.GetInputsCount() >= 6U, "Expected six or more args.");
  1537. const auto type = callable.GetInput(0U).GetStaticType();
  1538. if constexpr (IsList) {
  1539. MKQL_ENSURE(type->IsList(), "Expected list.");
  1540. } else {
  1541. MKQL_ENSURE(type->IsFlow() || type->IsStream(), "Expected flow or stream.");
  1542. }
  1543. const auto keyType = callable.GetInput(callable.GetInputsCount() - 5U).GetStaticType();
  1544. const auto payloadType = callable.GetInput(callable.GetInputsCount() - 4U).GetStaticType();
  1545. const auto multiData = AS_VALUE(TDataLiteral, callable.GetInput(callable.GetInputsCount() - 3U));
  1546. const bool isMulti = multiData->AsValue().Get<bool>();
  1547. const auto itemsCountHint = AS_VALUE(TDataLiteral, callable.GetInput(callable.GetInputsCount() - 1U))->AsValue().Get<ui64>();
  1548. const auto flow = LocateNode(ctx.NodeLocator, callable, 0U);
  1549. const auto keySelector = LocateNode(ctx.NodeLocator, callable, callable.GetInputsCount() -5U);
  1550. const auto payloadSelector = LocateNode(ctx.NodeLocator, callable, callable.GetInputsCount() -4U);
  1551. if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) {
  1552. const auto width = callable.GetInputsCount() - 6U;
  1553. TComputationExternalNodePtrVector args(width, nullptr);
  1554. auto index = 0U;
  1555. std::generate(args.begin(), args.end(), [&](){ return LocateExternalNode(ctx.NodeLocator, callable, ++index); });
  1556. if (!isMulti && payloadType->IsVoid()) {
  1557. return new TSqueezeSetWideWrapper<TSortedSetAccumulator>(ctx.Mutables, keyType, wide, std::move(args), keySelector, itemsCountHint);
  1558. } else if (isMulti) {
  1559. return new TSqueezeMapWideWrapper<TSortedMapAccumulator<true>>(ctx.Mutables, keyType, payloadType, wide, std::move(args), keySelector, payloadSelector, itemsCountHint);
  1560. } else {
  1561. return new TSqueezeMapWideWrapper<TSortedMapAccumulator<false>>(ctx.Mutables, keyType, payloadType, wide, std::move(args), keySelector, payloadSelector, itemsCountHint);
  1562. }
  1563. }
  1564. const auto itemArg = LocateExternalNode(ctx.NodeLocator, callable, 1U);
  1565. if (!isMulti && payloadType->IsVoid()) {
  1566. if (type->IsList()) {
  1567. return new TSetWrapper<TSortedSetAccumulator, false>(ctx.Mutables, keyType, flow, itemArg, keySelector, itemsCountHint);
  1568. }
  1569. if (type->IsFlow()) {
  1570. return new TSqueezeSetFlowWrapper<TSortedSetAccumulator>(ctx.Mutables, keyType, flow, itemArg, keySelector, itemsCountHint);
  1571. }
  1572. if (type->IsStream()) {
  1573. return new TSetWrapper<TSortedSetAccumulator, true>(ctx.Mutables, keyType, flow, itemArg, keySelector, itemsCountHint);
  1574. }
  1575. } else if (isMulti) {
  1576. if (type->IsList()) {
  1577. return new TMapWrapper<TSortedMapAccumulator<true>, false>(ctx.Mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint);
  1578. }
  1579. if (type->IsFlow()) {
  1580. return new TSqueezeMapFlowWrapper<TSortedMapAccumulator<true>>(ctx.Mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint);
  1581. }
  1582. if (type->IsStream()) {
  1583. return new TMapWrapper<TSortedMapAccumulator<true>, true>(ctx.Mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint);
  1584. }
  1585. } else {
  1586. if (type->IsList()) {
  1587. return new TMapWrapper<TSortedMapAccumulator<false>, false>(ctx.Mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint);
  1588. }
  1589. if (type->IsFlow()) {
  1590. return new TSqueezeMapFlowWrapper<TSortedMapAccumulator<false>>(ctx.Mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint);
  1591. }
  1592. if (type->IsStream()) {
  1593. return new TMapWrapper<TSortedMapAccumulator<false>, true>(ctx.Mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint);
  1594. }
  1595. }
  1596. THROW yexception() << "Expected list, flow or stream.";
  1597. }
  1598. template <bool IsList>
  1599. IComputationNode* WrapToHashedDictInternal(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  1600. MKQL_ENSURE(callable.GetInputsCount() >= 6U, "Expected six or more args.");
  1601. const auto type = callable.GetInput(0U).GetStaticType();
  1602. if constexpr (IsList) {
  1603. MKQL_ENSURE(type->IsList(), "Expected list.");
  1604. } else {
  1605. MKQL_ENSURE(type->IsFlow() || type->IsStream(), "Expected flow or stream.");
  1606. }
  1607. const auto keyType = callable.GetInput(callable.GetInputsCount() - 5U).GetStaticType();
  1608. const auto payloadType = callable.GetInput(callable.GetInputsCount() - 4U).GetStaticType();
  1609. const bool multi = AS_VALUE(TDataLiteral, callable.GetInput(callable.GetInputsCount() - 3U))->AsValue().Get<bool>();
  1610. const bool isCompact = AS_VALUE(TDataLiteral, callable.GetInput(callable.GetInputsCount() - 2U))->AsValue().Get<bool>();
  1611. const auto payloadSelectorNode = callable.GetInput(callable.GetInputsCount() - 4U);
  1612. const bool isOptional = keyType->IsOptional();
  1613. const auto unwrappedKeyType = isOptional ? AS_TYPE(TOptionalType, keyType)->GetItemType() : keyType;
  1614. if (!multi && payloadType->IsVoid()) {
  1615. if (isCompact) {
  1616. if (unwrappedKeyType->IsData()) {
  1617. #define USE_HASHED_SINGLE_FIXED_COMPACT_SET(xType, xLayoutType) \
  1618. case NUdf::TDataType<xType>::Id: \
  1619. if (isOptional) { \
  1620. return WrapToSet< \
  1621. THashedSingleFixedCompactSetAccumulator<xLayoutType, true>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1622. } else { \
  1623. return WrapToSet< \
  1624. THashedSingleFixedCompactSetAccumulator<xLayoutType, false>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1625. }
  1626. switch (AS_TYPE(TDataType, unwrappedKeyType)->GetSchemeType()) {
  1627. KNOWN_FIXED_VALUE_TYPES(USE_HASHED_SINGLE_FIXED_COMPACT_SET)
  1628. }
  1629. #undef USE_HASHED_SINGLE_FIXED_COMPACT_SET
  1630. }
  1631. return WrapToSet<THashedCompactSetAccumulator>(callable, ctx.NodeLocator, ctx.Mutables);
  1632. }
  1633. if (unwrappedKeyType->IsData()) {
  1634. #define USE_HASHED_SINGLE_FIXED_SET(xType, xLayoutType) \
  1635. case NUdf::TDataType<xType>::Id: \
  1636. if (isOptional) { \
  1637. return WrapToSet< \
  1638. THashedSingleFixedSetAccumulator<xLayoutType, true>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1639. } else { \
  1640. return WrapToSet< \
  1641. THashedSingleFixedSetAccumulator<xLayoutType, false>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1642. }
  1643. switch (AS_TYPE(TDataType, unwrappedKeyType)->GetSchemeType()) {
  1644. KNOWN_FIXED_VALUE_TYPES(USE_HASHED_SINGLE_FIXED_SET)
  1645. }
  1646. #undef USE_HASHED_SINGLE_FIXED_SET
  1647. }
  1648. return WrapToSet<THashedSetAccumulator>(callable, ctx.NodeLocator, ctx.Mutables);
  1649. }
  1650. if (isCompact) {
  1651. if (unwrappedKeyType->IsData()) {
  1652. #define USE_HASHED_SINGLE_FIXED_COMPACT_MAP(xType, xLayoutType) \
  1653. case NUdf::TDataType<xType>::Id: \
  1654. if (multi) { \
  1655. if (isOptional) { \
  1656. return WrapToMap< \
  1657. THashedSingleFixedCompactMapAccumulator<xLayoutType, true, true>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1658. } else { \
  1659. return WrapToMap< \
  1660. THashedSingleFixedCompactMapAccumulator<xLayoutType, false, true>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1661. } \
  1662. } else { \
  1663. if (isOptional) { \
  1664. return WrapToMap< \
  1665. THashedSingleFixedCompactMapAccumulator<xLayoutType, true, false>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1666. } else { \
  1667. return WrapToMap< \
  1668. THashedSingleFixedCompactMapAccumulator<xLayoutType, false, false>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1669. } \
  1670. }
  1671. switch (AS_TYPE(TDataType, unwrappedKeyType)->GetSchemeType()) {
  1672. KNOWN_FIXED_VALUE_TYPES(USE_HASHED_SINGLE_FIXED_COMPACT_MAP)
  1673. }
  1674. #undef USE_HASHED_SINGLE_FIXED_COMPACT_MAP
  1675. }
  1676. if (multi) {
  1677. return WrapToMap<THashedCompactMapAccumulator<true>>(callable, ctx.NodeLocator, ctx.Mutables);
  1678. } else {
  1679. return WrapToMap<THashedCompactMapAccumulator<false>>(callable, ctx.NodeLocator, ctx.Mutables);
  1680. }
  1681. }
  1682. if (unwrappedKeyType->IsData()) {
  1683. #define USE_HASHED_SINGLE_FIXED_MAP(xType, xLayoutType) \
  1684. case NUdf::TDataType<xType>::Id: \
  1685. if (multi) { \
  1686. if (isOptional) { \
  1687. return WrapToMap< \
  1688. THashedSingleFixedMultiMapAccumulator<xLayoutType, true>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1689. } else { \
  1690. return WrapToMap< \
  1691. THashedSingleFixedMultiMapAccumulator<xLayoutType, false>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1692. } \
  1693. } else { \
  1694. if (isOptional) { \
  1695. return WrapToMap< \
  1696. THashedSingleFixedMapAccumulator<xLayoutType, true>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1697. } else { \
  1698. return WrapToMap< \
  1699. THashedSingleFixedMapAccumulator<xLayoutType, false>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1700. } \
  1701. }
  1702. switch (AS_TYPE(TDataType, unwrappedKeyType)->GetSchemeType()) {
  1703. KNOWN_FIXED_VALUE_TYPES(USE_HASHED_SINGLE_FIXED_MAP)
  1704. }
  1705. #undef USE_HASHED_SINGLE_FIXED_MAP
  1706. }
  1707. if (multi) {
  1708. return WrapToMap<THashedMultiMapAccumulator>(callable, ctx.NodeLocator, ctx.Mutables);
  1709. } else {
  1710. return WrapToMap<THashedMapAccumulator>(callable, ctx.NodeLocator, ctx.Mutables);
  1711. }
  1712. }
  1713. }
  1714. IComputationNode* WrapToSortedDict(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  1715. return WrapToSortedDictInternal<true>(callable, ctx);
  1716. }
  1717. IComputationNode* WrapToHashedDict(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  1718. return WrapToHashedDictInternal<true>(callable, ctx);
  1719. }
  1720. IComputationNode* WrapSqueezeToSortedDict(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  1721. return WrapToSortedDictInternal<false>(callable, ctx);
  1722. }
  1723. IComputationNode* WrapSqueezeToHashedDict(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  1724. return WrapToHashedDictInternal<false>(callable, ctx);
  1725. }
  1726. }
  1727. }