mkql_todict.cpp 84 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130
  1. #include "mkql_todict.h"
  2. #include <yql/essentials/minikql/computation/mkql_computation_list_adapter.h>
  3. #include <yql/essentials/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
  4. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  5. #include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
  6. #include <yql/essentials/minikql/computation/mkql_llvm_base.h> // Y_IGNORE
  7. #include <yql/essentials/minikql/computation/presort.h>
  8. #include <yql/essentials/minikql/mkql_node_cast.h>
  9. #include <yql/essentials/minikql/mkql_string_util.h>
  10. #include <yql/essentials/public/udf/udf_types.h>
  11. #include <yql/essentials/utils/cast.h>
  12. #include <yql/essentials/utils/hash.h>
  13. #include <algorithm>
  14. #include <unordered_map>
  15. #include <optional>
  16. #include <vector>
  17. namespace NKikimr {
  18. namespace NMiniKQL {
  19. using NYql::EnsureDynamicCast;
  20. namespace {
  21. class ISetAccumulator {
  22. public:
  23. virtual ~ISetAccumulator() = default;
  24. virtual void Add(NUdf::TUnboxedValue&& key) = 0;
  25. virtual NUdf::TUnboxedValue Build() = 0;
  26. };
  27. class ISetAccumulatorFactory {
  28. public:
  29. virtual ~ISetAccumulatorFactory() = default;
  30. virtual bool IsSorted() const = 0;
  31. virtual std::unique_ptr<ISetAccumulator> Create(TType* keyType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  32. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx,
  33. ui64 itemsCountHint) const = 0;
  34. };
  35. class IMapAccumulator {
  36. public:
  37. virtual ~IMapAccumulator() = default;
  38. virtual void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload) = 0;
  39. virtual NUdf::TUnboxedValue Build() = 0;
  40. };
  41. class IMapAccumulatorFactory {
  42. public:
  43. virtual ~IMapAccumulatorFactory() = default;
  44. virtual bool IsSorted() const = 0;
  45. virtual std::unique_ptr<IMapAccumulator> Create(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  46. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint) const = 0;
  47. };
  48. template <typename T>
  49. class TSetAccumulatorFactory : public ISetAccumulatorFactory {
  50. public:
  51. bool IsSorted() const final {
  52. return T::IsSorted;
  53. }
  54. std::unique_ptr<ISetAccumulator> Create(TType* keyType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  55. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx,
  56. ui64 itemsCountHint) const {
  57. return std::make_unique<T>(keyType, keyTypes, isTuple, encoded, compare, equate, hash, ctx, itemsCountHint);
  58. }
  59. };
  60. template <typename T>
  61. class TMapAccumulatorFactory : public IMapAccumulatorFactory {
  62. public:
  63. bool IsSorted() const final {
  64. return T::IsSorted;
  65. }
  66. std::unique_ptr<IMapAccumulator> Create(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  67. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx,
  68. ui64 itemsCountHint) const {
  69. return std::make_unique<T>(keyType, payloadType, keyTypes, isTuple, encoded, compare, equate, hash, ctx, itemsCountHint);
  70. }
  71. };
  72. class THashedMultiMapAccumulator : public IMapAccumulator {
  73. using TMapType = TValuesDictHashMap;
  74. TComputationContext& Ctx;
  75. TType* KeyType;
  76. const TKeyTypes& KeyTypes;
  77. bool IsTuple;
  78. std::shared_ptr<TValuePacker> Packer;
  79. const NUdf::IHash* Hash;
  80. const NUdf::IEquate* Equate;
  81. TMapType Map;
  82. public:
  83. static constexpr bool IsSorted = false;
  84. THashedMultiMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  85. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  86. : Ctx(ctx), KeyType(keyType), KeyTypes(keyTypes), IsTuple(isTuple), Hash(hash), Equate(equate)
  87. , Map(0, TValueHasher(KeyTypes, isTuple, hash), TValueEqual(KeyTypes, isTuple, equate))
  88. {
  89. Y_UNUSED(compare);
  90. if (encoded) {
  91. Packer = std::make_shared<TValuePacker>(true, keyType);
  92. }
  93. Y_UNUSED(payloadType);
  94. Map.reserve(itemsCountHint);
  95. }
  96. void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload) final
  97. {
  98. if (Packer) {
  99. key = MakeString(Packer->Pack(key));
  100. }
  101. auto it = Map.find(key);
  102. if (it == Map.end()) {
  103. it = Map.emplace(std::move(key), Ctx.HolderFactory.NewVectorHolder()).first;
  104. }
  105. it->second.Push(std::move(payload));
  106. }
  107. NUdf::TUnboxedValue Build() final
  108. {
  109. const auto filler = [this](TValuesDictHashMap& targetMap) {
  110. targetMap = std::move(Map);
  111. };
  112. return Ctx.HolderFactory.CreateDirectHashedDictHolder(filler, KeyTypes, IsTuple, true, Packer ? KeyType : nullptr, Hash, Equate);
  113. }
  114. };
  115. class THashedMapAccumulator : public IMapAccumulator {
  116. using TMapType = TValuesDictHashMap;
  117. TComputationContext& Ctx;
  118. TType* KeyType;
  119. const TKeyTypes& KeyTypes;
  120. const bool IsTuple;
  121. std::shared_ptr<TValuePacker> Packer;
  122. const NUdf::IHash* Hash;
  123. const NUdf::IEquate* Equate;
  124. TMapType Map;
  125. public:
  126. static constexpr bool IsSorted = false;
  127. THashedMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  128. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  129. : Ctx(ctx), KeyType(keyType), KeyTypes(keyTypes), IsTuple(isTuple), Hash(hash), Equate(equate)
  130. , Map(0, TValueHasher(KeyTypes, isTuple, hash), TValueEqual(KeyTypes, isTuple, equate))
  131. {
  132. Y_UNUSED(compare);
  133. if (encoded) {
  134. Packer = std::make_shared<TValuePacker>(true, keyType);
  135. }
  136. Y_UNUSED(payloadType);
  137. Map.reserve(itemsCountHint);
  138. }
  139. void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload) final
  140. {
  141. if (Packer) {
  142. key = MakeString(Packer->Pack(key));
  143. }
  144. Map.emplace(std::move(key), std::move(payload));
  145. }
  146. NUdf::TUnboxedValue Build() final
  147. {
  148. const auto filler = [this](TMapType& targetMap) {
  149. targetMap = std::move(Map);
  150. };
  151. return Ctx.HolderFactory.CreateDirectHashedDictHolder(filler, KeyTypes, IsTuple, true, Packer ? KeyType : nullptr, Hash, Equate);
  152. }
  153. };
  154. template<typename T, bool OptionalKey>
  155. class THashedSingleFixedMultiMapAccumulator : public IMapAccumulator {
  156. using TMapType = TValuesDictHashSingleFixedMap<T>;
  157. TComputationContext& Ctx;
  158. const TKeyTypes& KeyTypes;
  159. TMapType Map;
  160. TUnboxedValueVector NullPayloads;
  161. NUdf::TUnboxedValue CurrentEmptyVectorForInsert;
  162. public:
  163. static constexpr bool IsSorted = false;
  164. THashedSingleFixedMultiMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  165. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  166. : Ctx(ctx), KeyTypes(keyTypes), Map(0, TMyHash<T>(), TMyEquals<T>()) {
  167. Y_UNUSED(keyType);
  168. Y_UNUSED(payloadType);
  169. Y_UNUSED(isTuple);
  170. Y_UNUSED(encoded);
  171. Y_UNUSED(compare);
  172. Y_UNUSED(equate);
  173. Y_UNUSED(hash);
  174. Map.reserve(itemsCountHint);
  175. CurrentEmptyVectorForInsert = Ctx.HolderFactory.NewVectorHolder();
  176. }
  177. void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload) final {
  178. if constexpr (OptionalKey) {
  179. if (!key) {
  180. NullPayloads.emplace_back(std::move(payload));
  181. return;
  182. }
  183. }
  184. auto insertInfo = Map.emplace(key.Get<T>(), CurrentEmptyVectorForInsert);
  185. if (insertInfo.second) {
  186. CurrentEmptyVectorForInsert = Ctx.HolderFactory.NewVectorHolder();
  187. }
  188. insertInfo.first->second.Push(payload.Release());
  189. }
  190. NUdf::TUnboxedValue Build() final {
  191. std::optional<NUdf::TUnboxedValue> nullPayload;
  192. if (NullPayloads.size()) {
  193. nullPayload = Ctx.HolderFactory.VectorAsVectorHolder(std::move(NullPayloads));
  194. }
  195. return Ctx.HolderFactory.CreateDirectHashedSingleFixedMapHolder<T, OptionalKey>(std::move(Map), std::move(nullPayload));
  196. }
  197. };
  198. template<typename T, bool OptionalKey>
  199. class THashedSingleFixedMapAccumulator : public IMapAccumulator {
  200. using TMapType = TValuesDictHashSingleFixedMap<T>;
  201. TComputationContext& Ctx;
  202. TMapType Map;
  203. std::optional<NUdf::TUnboxedValue> NullPayload;
  204. public:
  205. static constexpr bool IsSorted = false;
  206. THashedSingleFixedMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  207. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  208. : Ctx(ctx), Map(0, TMyHash<T>(), TMyEquals<T>())
  209. {
  210. Y_UNUSED(keyType);
  211. Y_UNUSED(payloadType);
  212. Y_UNUSED(keyTypes);
  213. Y_UNUSED(isTuple);
  214. Y_UNUSED(encoded);
  215. Y_UNUSED(compare);
  216. Y_UNUSED(equate);
  217. Y_UNUSED(hash);
  218. Map.reserve(itemsCountHint);
  219. }
  220. void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload) final
  221. {
  222. if constexpr (OptionalKey) {
  223. if (!key) {
  224. NullPayload.emplace(std::move(payload));
  225. return;
  226. }
  227. }
  228. Map.emplace(key.Get<T>(), std::move(payload));
  229. }
  230. NUdf::TUnboxedValue Build() final
  231. {
  232. return Ctx.HolderFactory.CreateDirectHashedSingleFixedMapHolder<T, OptionalKey>(std::move(Map), std::move(NullPayload));
  233. }
  234. };
  235. class THashedSetAccumulator : public ISetAccumulator {
  236. using TSetType = TValuesDictHashSet;
  237. TComputationContext& Ctx;
  238. TType* KeyType;
  239. const TKeyTypes& KeyTypes;
  240. bool IsTuple;
  241. std::shared_ptr<TValuePacker> Packer;
  242. TSetType Set;
  243. const NUdf::IHash* Hash;
  244. const NUdf::IEquate* Equate;
  245. public:
  246. static constexpr bool IsSorted = false;
  247. THashedSetAccumulator(TType* keyType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  248. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  249. : Ctx(ctx), KeyType(keyType), KeyTypes(keyTypes), IsTuple(isTuple), Set(0, TValueHasher(KeyTypes, isTuple, hash),
  250. TValueEqual(KeyTypes, isTuple, equate)), Hash(hash), Equate(equate)
  251. {
  252. Y_UNUSED(compare);
  253. if (encoded) {
  254. Packer = std::make_shared<TValuePacker>(true, keyType);
  255. }
  256. Set.reserve(itemsCountHint);
  257. }
  258. void Add(NUdf::TUnboxedValue&& key) final
  259. {
  260. if (Packer) {
  261. key = MakeString(Packer->Pack(key));
  262. }
  263. Set.emplace(std::move(key));
  264. }
  265. NUdf::TUnboxedValue Build() final
  266. {
  267. const auto filler = [this](TSetType& targetSet) {
  268. targetSet = std::move(Set);
  269. };
  270. return Ctx.HolderFactory.CreateDirectHashedSetHolder(filler, KeyTypes, IsTuple, true, Packer ? KeyType : nullptr, Hash, Equate);
  271. }
  272. };
  273. template <typename T, bool OptionalKey>
  274. class THashedSingleFixedSetAccumulator : public ISetAccumulator{
  275. using TSetType = TValuesDictHashSingleFixedSet<T>;
  276. TComputationContext& Ctx;
  277. TSetType Set;
  278. bool HasNull = false;
  279. public:
  280. static constexpr bool IsSorted = false;
  281. THashedSingleFixedSetAccumulator(TType* keyType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  282. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  283. : Ctx(ctx), Set(0, TMyHash<T>(), TMyEquals<T>())
  284. {
  285. Y_UNUSED(keyType);
  286. Y_UNUSED(keyTypes);
  287. Y_UNUSED(isTuple);
  288. Y_UNUSED(encoded);
  289. Y_UNUSED(compare);
  290. Y_UNUSED(equate);
  291. Y_UNUSED(hash);
  292. Set.reserve(itemsCountHint);
  293. }
  294. void Add(NUdf::TUnboxedValue&& key) final
  295. {
  296. if constexpr (OptionalKey) {
  297. if (!key) {
  298. HasNull = true;
  299. return;
  300. }
  301. }
  302. Set.emplace(key.Get<T>());
  303. }
  304. NUdf::TUnboxedValue Build() final
  305. {
  306. return Ctx.HolderFactory.CreateDirectHashedSingleFixedSetHolder<T, OptionalKey>(std::move(Set), HasNull);
  307. }
  308. };
  309. template <typename T, bool OptionalKey>
  310. class THashedSingleFixedCompactSetAccumulator : public ISetAccumulator {
  311. using TSetType = TValuesDictHashSingleFixedCompactSet<T>;
  312. TComputationContext& Ctx;
  313. TPagedArena Pool;
  314. TSetType Set;
  315. bool HasNull = false;
  316. public:
  317. static constexpr bool IsSorted = false;
  318. THashedSingleFixedCompactSetAccumulator(TType* keyType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  319. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  320. : Ctx(ctx), Pool(&Ctx.HolderFactory.GetPagePool()), Set(Ctx.HolderFactory.GetPagePool(), itemsCountHint / COMPACT_HASH_MAX_LOAD_FACTOR)
  321. {
  322. Y_UNUSED(keyType);
  323. Y_UNUSED(keyTypes);
  324. Y_UNUSED(isTuple);
  325. Y_UNUSED(encoded);
  326. Y_UNUSED(compare);
  327. Y_UNUSED(equate);
  328. Y_UNUSED(hash);
  329. Set.SetMaxLoadFactor(COMPACT_HASH_MAX_LOAD_FACTOR);
  330. }
  331. void Add(NUdf::TUnboxedValue&& key) final
  332. {
  333. if constexpr (OptionalKey) {
  334. if (!key) {
  335. HasNull = true;
  336. return;
  337. }
  338. }
  339. Set.Insert(key.Get<T>());
  340. }
  341. NUdf::TUnboxedValue Build() final
  342. {
  343. return Ctx.HolderFactory.CreateDirectHashedSingleFixedCompactSetHolder<T, OptionalKey>(std::move(Set), HasNull);
  344. }
  345. };
  346. class THashedCompactSetAccumulator : public ISetAccumulator {
  347. using TSetType = TValuesDictHashCompactSet;
  348. TComputationContext& Ctx;
  349. TPagedArena Pool;
  350. TSetType Set;
  351. TType *KeyType;
  352. std::shared_ptr<TValuePacker> KeyPacker;
  353. public:
  354. static constexpr bool IsSorted = false;
  355. THashedCompactSetAccumulator(TType* keyType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  356. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  357. : Ctx(ctx), Pool(&Ctx.HolderFactory.GetPagePool()), Set(Ctx.HolderFactory.GetPagePool(), itemsCountHint / COMPACT_HASH_MAX_LOAD_FACTOR, TSmallValueHash(), TSmallValueEqual())
  358. , KeyType(keyType), KeyPacker(std::make_shared<TValuePacker>(true, keyType))
  359. {
  360. Y_UNUSED(keyTypes);
  361. Y_UNUSED(isTuple);
  362. Y_UNUSED(encoded);
  363. Y_UNUSED(compare);
  364. Y_UNUSED(equate);
  365. Y_UNUSED(hash);
  366. Set.SetMaxLoadFactor(COMPACT_HASH_MAX_LOAD_FACTOR);
  367. }
  368. void Add(NUdf::TUnboxedValue&& key) final
  369. {
  370. Set.Insert(AddSmallValue(Pool, KeyPacker->Pack(key)));
  371. }
  372. NUdf::TUnboxedValue Build() final
  373. {
  374. return Ctx.HolderFactory.CreateDirectHashedCompactSetHolder(std::move(Set), std::move(Pool), KeyType, &Ctx);
  375. }
  376. };
  377. template <bool Multi>
  378. class THashedCompactMapAccumulator;
  379. template <>
  380. class THashedCompactMapAccumulator<false> : public IMapAccumulator {
  381. using TMapType = TValuesDictHashCompactMap;
  382. TComputationContext& Ctx;
  383. TPagedArena Pool;
  384. TMapType Map;
  385. TType *KeyType, *PayloadType;
  386. std::shared_ptr<TValuePacker> KeyPacker, PayloadPacker;
  387. public:
  388. static constexpr bool IsSorted = false;
  389. THashedCompactMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  390. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  391. : Ctx(ctx), Pool(&Ctx.HolderFactory.GetPagePool()), Map(Ctx.HolderFactory.GetPagePool(), itemsCountHint / COMPACT_HASH_MAX_LOAD_FACTOR)
  392. , KeyType(keyType), PayloadType(payloadType)
  393. , KeyPacker(std::make_shared<TValuePacker>(true, keyType))
  394. , PayloadPacker(std::make_shared<TValuePacker>(false, payloadType))
  395. {
  396. Y_UNUSED(keyTypes);
  397. Y_UNUSED(isTuple);
  398. Y_UNUSED(encoded);
  399. Y_UNUSED(compare);
  400. Y_UNUSED(equate);
  401. Y_UNUSED(hash);
  402. Map.SetMaxLoadFactor(COMPACT_HASH_MAX_LOAD_FACTOR);
  403. }
  404. void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload) final
  405. {
  406. Map.InsertNew(AddSmallValue(Pool, KeyPacker->Pack(key)), AddSmallValue(Pool, PayloadPacker->Pack(payload)));
  407. }
  408. NUdf::TUnboxedValue Build() final
  409. {
  410. return Ctx.HolderFactory.CreateDirectHashedCompactMapHolder(std::move(Map), std::move(Pool), KeyType, PayloadType, &Ctx);
  411. }
  412. };
  413. template <>
  414. class THashedCompactMapAccumulator<true> : public IMapAccumulator {
  415. using TMapType = TValuesDictHashCompactMultiMap;
  416. TComputationContext& Ctx;
  417. TPagedArena Pool;
  418. TMapType Map;
  419. TType *KeyType, *PayloadType;
  420. std::shared_ptr<TValuePacker> KeyPacker, PayloadPacker;
  421. public:
  422. static constexpr bool IsSorted = false;
  423. THashedCompactMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  424. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  425. : Ctx(ctx), Pool(&Ctx.HolderFactory.GetPagePool()), Map(Ctx.HolderFactory.GetPagePool(), itemsCountHint / COMPACT_HASH_MAX_LOAD_FACTOR)
  426. , KeyType(keyType), PayloadType(payloadType)
  427. , KeyPacker(std::make_shared<TValuePacker>(true, keyType))
  428. , PayloadPacker(std::make_shared<TValuePacker>(false, payloadType))
  429. {
  430. Y_UNUSED(keyTypes);
  431. Y_UNUSED(isTuple);
  432. Y_UNUSED(encoded);
  433. Y_UNUSED(compare);
  434. Y_UNUSED(equate);
  435. Y_UNUSED(hash);
  436. Map.SetMaxLoadFactor(COMPACT_HASH_MAX_LOAD_FACTOR);
  437. }
  438. void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload) final
  439. {
  440. Map.Insert(AddSmallValue(Pool, KeyPacker->Pack(key)), AddSmallValue(Pool, PayloadPacker->Pack(payload)));
  441. }
  442. NUdf::TUnboxedValue Build() final
  443. {
  444. return Ctx.HolderFactory.CreateDirectHashedCompactMultiMapHolder(std::move(Map), std::move(Pool), KeyType, PayloadType, &Ctx);
  445. }
  446. };
  447. template <typename T, bool OptionalKey, bool Multi>
  448. class THashedSingleFixedCompactMapAccumulator;
  449. template <typename T, bool OptionalKey>
  450. class THashedSingleFixedCompactMapAccumulator<T, OptionalKey, false> : public IMapAccumulator {
  451. using TMapType = TValuesDictHashSingleFixedCompactMap<T>;
  452. TComputationContext& Ctx;
  453. TPagedArena Pool;
  454. TMapType Map;
  455. std::optional<ui64> NullPayload;
  456. TType* PayloadType;
  457. std::shared_ptr<TValuePacker> PayloadPacker;
  458. public:
  459. static constexpr bool IsSorted = false;
  460. THashedSingleFixedCompactMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  461. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  462. : Ctx(ctx), Pool(&Ctx.HolderFactory.GetPagePool()), Map(Ctx.HolderFactory.GetPagePool(), itemsCountHint / COMPACT_HASH_MAX_LOAD_FACTOR)
  463. , PayloadType(payloadType), PayloadPacker(std::make_shared<TValuePacker>(false, payloadType))
  464. {
  465. Y_UNUSED(keyType);
  466. Y_UNUSED(keyTypes);
  467. Y_UNUSED(isTuple);
  468. Y_UNUSED(encoded);
  469. Y_UNUSED(compare);
  470. Y_UNUSED(equate);
  471. Y_UNUSED(hash);
  472. Map.SetMaxLoadFactor(COMPACT_HASH_MAX_LOAD_FACTOR);
  473. }
  474. void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload) final
  475. {
  476. if constexpr (OptionalKey) {
  477. if (!key) {
  478. NullPayload = AddSmallValue(Pool, PayloadPacker->Pack(payload));
  479. return;
  480. }
  481. }
  482. Map.InsertNew(key.Get<T>(), AddSmallValue(Pool, PayloadPacker->Pack(payload)));
  483. }
  484. NUdf::TUnboxedValue Build() final
  485. {
  486. return Ctx.HolderFactory.CreateDirectHashedSingleFixedCompactMapHolder<T, OptionalKey>(std::move(Map), std::move(NullPayload), std::move(Pool), PayloadType, &Ctx);
  487. }
  488. };
  489. template <typename T, bool OptionalKey>
  490. class THashedSingleFixedCompactMapAccumulator<T, OptionalKey, true> : public IMapAccumulator {
  491. using TMapType = TValuesDictHashSingleFixedCompactMultiMap<T>;
  492. TComputationContext& Ctx;
  493. TPagedArena Pool;
  494. TMapType Map;
  495. std::vector<ui64> NullPayloads;
  496. TType* PayloadType;
  497. std::shared_ptr<TValuePacker> PayloadPacker;
  498. public:
  499. static constexpr bool IsSorted = false;
  500. THashedSingleFixedCompactMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  501. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  502. : Ctx(ctx), Pool(&Ctx.HolderFactory.GetPagePool()), Map(Ctx.HolderFactory.GetPagePool(), itemsCountHint / COMPACT_HASH_MAX_LOAD_FACTOR)
  503. , PayloadType(payloadType), PayloadPacker(std::make_shared<TValuePacker>(false, payloadType))
  504. {
  505. Y_UNUSED(keyTypes);
  506. Y_UNUSED(keyType);
  507. Y_UNUSED(isTuple);
  508. Y_UNUSED(encoded);
  509. Y_UNUSED(compare);
  510. Y_UNUSED(equate);
  511. Y_UNUSED(hash);
  512. Map.SetMaxLoadFactor(COMPACT_HASH_MAX_LOAD_FACTOR);
  513. }
  514. void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload) final
  515. {
  516. if constexpr (OptionalKey) {
  517. if (!key) {
  518. NullPayloads.push_back(AddSmallValue(Pool, PayloadPacker->Pack(payload)));
  519. return;
  520. }
  521. }
  522. Map.Insert(key.Get<T>(), AddSmallValue(Pool, PayloadPacker->Pack(payload)));
  523. }
  524. NUdf::TUnboxedValue Build() final
  525. {
  526. return Ctx.HolderFactory.CreateDirectHashedSingleFixedCompactMultiMapHolder<T, OptionalKey>(std::move(Map), std::move(NullPayloads), std::move(Pool), PayloadType, &Ctx);
  527. }
  528. };
  529. class TSortedSetAccumulator : public ISetAccumulator {
  530. TComputationContext& Ctx;
  531. TType* KeyType;
  532. const TKeyTypes& KeyTypes;
  533. bool IsTuple;
  534. const NUdf::ICompare* Compare;
  535. const NUdf::IEquate* Equate;
  536. std::optional<TGenericPresortEncoder> Packer;
  537. TUnboxedValueVector Items;
  538. public:
  539. static constexpr bool IsSorted = true;
  540. TSortedSetAccumulator(TType* keyType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  541. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  542. : Ctx(ctx), KeyType(keyType), KeyTypes(keyTypes), IsTuple(isTuple), Compare(compare), Equate(equate)
  543. {
  544. Y_UNUSED(hash);
  545. if (encoded) {
  546. Packer.emplace(KeyType);
  547. }
  548. Items.reserve(itemsCountHint);
  549. }
  550. void Add(NUdf::TUnboxedValue&& key) final
  551. {
  552. if (Packer) {
  553. key = MakeString(Packer->Encode(key, false));
  554. }
  555. Items.emplace_back(std::move(key));
  556. }
  557. NUdf::TUnboxedValue Build() final
  558. {
  559. const TSortedSetFiller filler = [this](TUnboxedValueVector& values) {
  560. std::stable_sort(Items.begin(), Items.end(), TValueLess(KeyTypes, IsTuple, Compare));
  561. Items.erase(std::unique(Items.begin(), Items.end(), TValueEqual(KeyTypes, IsTuple, Equate)), Items.end());
  562. values = std::move(Items);
  563. };
  564. return Ctx.HolderFactory.CreateDirectSortedSetHolder(filler, KeyTypes, IsTuple,
  565. EDictSortMode::SortedUniqueAscending, true, Packer ? KeyType : nullptr, Compare, Equate);
  566. }
  567. };
  568. template<bool IsMulti>
  569. class TSortedMapAccumulator;
  570. template<>
  571. class TSortedMapAccumulator<false> : public IMapAccumulator {
  572. TComputationContext& Ctx;
  573. TType* KeyType;
  574. const TKeyTypes& KeyTypes;
  575. bool IsTuple;
  576. const NUdf::ICompare* Compare;
  577. const NUdf::IEquate* Equate;
  578. std::optional<TGenericPresortEncoder> Packer;
  579. TKeyPayloadPairVector Items;
  580. public:
  581. static constexpr bool IsSorted = true;
  582. TSortedMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  583. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  584. : Ctx(ctx)
  585. , KeyType(keyType)
  586. , KeyTypes(keyTypes)
  587. , IsTuple(isTuple)
  588. , Compare(compare)
  589. , Equate(equate)
  590. {
  591. Y_UNUSED(hash);
  592. if (encoded) {
  593. Packer.emplace(KeyType);
  594. }
  595. Y_UNUSED(payloadType);
  596. Items.reserve(itemsCountHint);
  597. }
  598. void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload) final
  599. {
  600. if (Packer) {
  601. key = MakeString(Packer->Encode(key, false));
  602. }
  603. Items.emplace_back(std::move(key), std::move(payload));
  604. }
  605. NUdf::TUnboxedValue Build() final
  606. {
  607. const TSortedDictFiller filler = [this](TKeyPayloadPairVector& values) {
  608. values = std::move(Items);
  609. };
  610. return Ctx.HolderFactory.CreateDirectSortedDictHolder(filler, KeyTypes, IsTuple, EDictSortMode::RequiresSorting,
  611. true, Packer ? KeyType : nullptr, Compare, Equate);
  612. }
  613. };
  614. template<>
  615. class TSortedMapAccumulator<true> : public IMapAccumulator {
  616. TComputationContext& Ctx;
  617. TType* KeyType;
  618. const TKeyTypes& KeyTypes;
  619. bool IsTuple;
  620. const NUdf::ICompare* Compare;
  621. const NUdf::IEquate* Equate;
  622. std::optional<TGenericPresortEncoder> Packer;
  623. TKeyPayloadPairVector Items;
  624. public:
  625. static constexpr bool IsSorted = true;
  626. TSortedMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
  627. const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
  628. : Ctx(ctx), KeyType(keyType), KeyTypes(keyTypes), IsTuple(isTuple), Compare(compare), Equate(equate)
  629. {
  630. Y_UNUSED(hash);
  631. if (encoded) {
  632. Packer.emplace(KeyType);
  633. }
  634. Y_UNUSED(payloadType);
  635. Items.reserve(itemsCountHint);
  636. }
  637. void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload) final
  638. {
  639. if (Packer) {
  640. key = MakeString(Packer->Encode(key, false));
  641. }
  642. Items.emplace_back(std::move(key), std::move(payload));
  643. }
  644. NUdf::TUnboxedValue Build() final
  645. {
  646. const TSortedDictFiller filler = [this](TKeyPayloadPairVector& values) {
  647. std::stable_sort(Items.begin(), Items.end(), TKeyPayloadPairLess(KeyTypes, IsTuple, Compare));
  648. TKeyPayloadPairVector groups;
  649. groups.reserve(Items.size());
  650. if (!Items.empty()) {
  651. TDefaultListRepresentation currentList(std::move(Items.begin()->second));
  652. auto lastKey = std::move(Items.begin()->first);
  653. TValueEqual eqPredicate(KeyTypes, IsTuple, Equate);
  654. for (auto it = Items.begin() + 1; it != Items.end(); ++it) {
  655. if (eqPredicate(lastKey, it->first)) {
  656. currentList = currentList.Append(std::move(it->second));
  657. }
  658. else {
  659. auto payload = Ctx.HolderFactory.CreateDirectListHolder(std::move(currentList));
  660. groups.emplace_back(std::move(lastKey), std::move(payload));
  661. currentList = TDefaultListRepresentation(std::move(it->second));
  662. lastKey = std::move(it->first);
  663. }
  664. }
  665. auto payload = Ctx.HolderFactory.CreateDirectListHolder(std::move(currentList));
  666. groups.emplace_back(std::move(lastKey), std::move(payload));
  667. }
  668. values = std::move(groups);
  669. };
  670. return Ctx.HolderFactory.CreateDirectSortedDictHolder(filler, KeyTypes, IsTuple,
  671. EDictSortMode::SortedUniqueAscending, true, Packer ? KeyType : nullptr, Compare, Equate);
  672. }
  673. };
  674. class TSetWrapper : public TMutableComputationNode<TSetWrapper> {
  675. typedef TMutableComputationNode<TSetWrapper> TBaseComputation;
  676. public:
  677. class TStreamValue : public TComputationValue<TStreamValue> {
  678. public:
  679. TStreamValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& input, IComputationExternalNode* const item,
  680. IComputationNode* const key, std::unique_ptr<ISetAccumulator>&& setAccum, TComputationContext& ctx)
  681. : TComputationValue<TStreamValue>(memInfo)
  682. , Input(std::move(input))
  683. , Item(item)
  684. , Key(key)
  685. , SetAccum(std::move(setAccum))
  686. , Ctx(ctx) {}
  687. private:
  688. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
  689. if (Finished) {
  690. return NUdf::EFetchStatus::Finish;
  691. }
  692. for (;;) {
  693. NUdf::TUnboxedValue item;
  694. switch (auto status = Input.Fetch(item)) {
  695. case NUdf::EFetchStatus::Ok: {
  696. Item->SetValue(Ctx, std::move(item));
  697. SetAccum->Add(Key->GetValue(Ctx));
  698. break; // and continue
  699. }
  700. case NUdf::EFetchStatus::Finish: {
  701. result = SetAccum->Build();
  702. Finished = true;
  703. return NUdf::EFetchStatus::Ok;
  704. }
  705. case NUdf::EFetchStatus::Yield: {
  706. return NUdf::EFetchStatus::Yield;
  707. }
  708. }
  709. }
  710. }
  711. NUdf::TUnboxedValue Input;
  712. IComputationExternalNode* const Item;
  713. IComputationNode* const Key;
  714. const std::unique_ptr<ISetAccumulator> SetAccum;
  715. TComputationContext& Ctx;
  716. bool Finished = false;
  717. };
  718. TSetWrapper(TComputationMutables& mutables, TType* keyType, IComputationNode* list, IComputationExternalNode* item,
  719. IComputationNode* key, ui64 itemsCountHint, bool isStream, std::unique_ptr<ISetAccumulatorFactory> factory)
  720. : TBaseComputation(mutables, EValueRepresentation::Boxed)
  721. , KeyType(keyType)
  722. , List(list)
  723. , Item(item)
  724. , Key(key)
  725. , ItemsCountHint(itemsCountHint)
  726. , IsStream(isStream)
  727. , Factory(std::move(factory))
  728. {
  729. GetDictionaryKeyTypes(KeyType, KeyTypes, IsTuple, Encoded, UseIHash);
  730. Compare = UseIHash && Factory->IsSorted() ? MakeCompareImpl(KeyType) : nullptr;
  731. Equate = UseIHash ? MakeEquateImpl(KeyType) : nullptr;
  732. Hash = UseIHash && !Factory->IsSorted() ? MakeHashImpl(KeyType) : nullptr;
  733. }
  734. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  735. if (IsStream) {
  736. return ctx.HolderFactory.Create<TStreamValue>(List->GetValue(ctx), Item, Key,
  737. Factory->Create(KeyType, KeyTypes, IsTuple, Encoded, Compare.Get(), Equate.Get(), Hash.Get(),
  738. ctx, ItemsCountHint), ctx);
  739. }
  740. const auto& list = List->GetValue(ctx);
  741. auto itemsCountHint = ItemsCountHint;
  742. if (list.HasFastListLength()) {
  743. if (const auto size = list.GetListLength())
  744. itemsCountHint = size;
  745. else
  746. return ctx.HolderFactory.GetEmptyContainerLazy();
  747. }
  748. auto acc = Factory->Create(KeyType, KeyTypes, IsTuple, Encoded, Compare.Get(), Equate.Get(), Hash.Get(),
  749. ctx, itemsCountHint);
  750. TThresher<false>::DoForEachItem(list,
  751. [this, &acc, &ctx] (NUdf::TUnboxedValue&& item) {
  752. Item->SetValue(ctx, std::move(item));
  753. acc->Add(Key->GetValue(ctx));
  754. }
  755. );
  756. return acc->Build().Release();
  757. }
  758. private:
  759. void RegisterDependencies() const final {
  760. this->DependsOn(List);
  761. this->Own(Item);
  762. this->DependsOn(Key);
  763. }
  764. TType* const KeyType;
  765. IComputationNode* const List;
  766. IComputationExternalNode* const Item;
  767. IComputationNode* const Key;
  768. const ui64 ItemsCountHint;
  769. const bool IsStream;
  770. const std::unique_ptr<ISetAccumulatorFactory> Factory;
  771. TKeyTypes KeyTypes;
  772. bool IsTuple;
  773. bool Encoded;
  774. bool UseIHash;
  775. NUdf::ICompare::TPtr Compare;
  776. NUdf::IEquate::TPtr Equate;
  777. NUdf::IHash::TPtr Hash;
  778. };
  779. #ifndef MKQL_DISABLE_CODEGEN
  780. template <class TLLVMBase>
  781. class TLLVMFieldsStructureStateWithAccum: public TLLVMBase {
  782. private:
  783. using TBase = TLLVMBase;
  784. llvm::PointerType* StructPtrType;
  785. protected:
  786. using TBase::Context;
  787. public:
  788. std::vector<llvm::Type*> GetFieldsArray() {
  789. std::vector<llvm::Type*> result = TBase::GetFields();
  790. result.emplace_back(StructPtrType); //accumulator
  791. return result;
  792. }
  793. llvm::Constant* GetAccumulator() {
  794. return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 0);
  795. }
  796. TLLVMFieldsStructureStateWithAccum(llvm::LLVMContext& context)
  797. : TBase(context)
  798. , StructPtrType(PointerType::getUnqual(StructType::get(context))) {
  799. }
  800. };
  801. #endif
  802. class TSqueezeSetFlowWrapper : public TStatefulFlowCodegeneratorNode<TSqueezeSetFlowWrapper> {
  803. using TBase = TStatefulFlowCodegeneratorNode<TSqueezeSetFlowWrapper>;
  804. public:
  805. class TState : public TComputationValue<TState> {
  806. using TBase = TComputationValue<TState>;
  807. public:
  808. TState(TMemoryUsageInfo* memInfo, std::unique_ptr<ISetAccumulator>&& setAccum)
  809. : TBase(memInfo), SetAccum(std::move(setAccum)) {}
  810. NUdf::TUnboxedValuePod Build() {
  811. return SetAccum->Build().Release();
  812. }
  813. void Insert(NUdf::TUnboxedValuePod value) {
  814. SetAccum->Add(value);
  815. }
  816. private:
  817. const std::unique_ptr<ISetAccumulator> SetAccum;
  818. };
  819. TSqueezeSetFlowWrapper(TComputationMutables& mutables, TType* keyType,
  820. IComputationNode* flow, IComputationExternalNode* item, IComputationNode* key, ui64 itemsCountHint,
  821. std::unique_ptr<ISetAccumulatorFactory> factory)
  822. : TBase(mutables, flow, EValueRepresentation::Boxed, EValueRepresentation::Any)
  823. , KeyType(keyType)
  824. , Flow(flow)
  825. , Item(item)
  826. , Key(key)
  827. , ItemsCountHint(itemsCountHint)
  828. , Factory(std::move(factory))
  829. {
  830. GetDictionaryKeyTypes(KeyType, KeyTypes, IsTuple, Encoded, UseIHash);
  831. Compare = UseIHash && Factory->IsSorted() ? MakeCompareImpl(KeyType) : nullptr;
  832. Equate = UseIHash ? MakeEquateImpl(KeyType) : nullptr;
  833. Hash = UseIHash && !Factory->IsSorted() ? MakeHashImpl(KeyType) : nullptr;
  834. }
  835. NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
  836. if (state.IsFinish()) {
  837. return state.Release();
  838. } else if (state.IsInvalid()) {
  839. MakeState(ctx, state);
  840. }
  841. while (const auto statePtr = static_cast<TState*>(state.AsBoxed().Get())) {
  842. if (auto item = Flow->GetValue(ctx); item.IsYield()) {
  843. return item.Release();
  844. } else if (item.IsFinish()) {
  845. const auto dict = statePtr->Build();
  846. state = std::move(item);
  847. return dict;
  848. } else {
  849. Item->SetValue(ctx, std::move(item));
  850. statePtr->Insert(Key->GetValue(ctx).Release());
  851. }
  852. }
  853. Y_UNREACHABLE();
  854. }
  855. #ifndef MKQL_DISABLE_CODEGEN
  856. Value* DoGenerateGetValue(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
  857. auto& context = ctx.Codegen.GetContext();
  858. const auto codegenItemArg = dynamic_cast<ICodegeneratorExternalNode*>(Item);
  859. MKQL_ENSURE(codegenItemArg, "Item must be codegenerator node.");
  860. const auto valueType = Type::getInt128Ty(context);
  861. TLLVMFieldsStructureStateWithAccum<TLLVMFieldsStructure<TComputationValue<TState>>> fieldsStruct(context);
  862. const auto stateType = StructType::get(context, fieldsStruct.GetFieldsArray());
  863. const auto statePtrType = PointerType::getUnqual(stateType);
  864. const auto make = BasicBlock::Create(context, "make", ctx.Func);
  865. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  866. BranchInst::Create(make, main, IsInvalid(statePtr, block, context), block);
  867. block = make;
  868. const auto ptrType = PointerType::getUnqual(StructType::get(context));
  869. const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
  870. const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TSqueezeSetFlowWrapper::MakeState));
  871. const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false);
  872. const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block);
  873. CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block);
  874. BranchInst::Create(main, block);
  875. block = main;
  876. const auto more = BasicBlock::Create(context, "more", ctx.Func);
  877. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  878. const auto plus = BasicBlock::Create(context, "plus", ctx.Func);
  879. const auto over = BasicBlock::Create(context, "over", ctx.Func);
  880. const auto result = PHINode::Create(valueType, 3U, "result", over);
  881. const auto state = new LoadInst(valueType, statePtr, "state", block);
  882. const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
  883. const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block);
  884. result->addIncoming(GetFinish(context), block);
  885. BranchInst::Create(over, more, IsFinish(state, block, context), block);
  886. block = more;
  887. const auto item = GetNodeValue(Flow, ctx, block);
  888. result->addIncoming(GetYield(context), block);
  889. const auto choise = SwitchInst::Create(item, plus, 2U, block);
  890. choise->addCase(GetFinish(context), done);
  891. choise->addCase(GetYield(context), over);
  892. block = plus;
  893. codegenItemArg->CreateSetValue(ctx, block, item);
  894. const auto key = GetNodeValue(Key, ctx, block);
  895. const auto insert = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Insert));
  896. const auto keyArg = key;
  897. const auto insType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), keyArg->getType()}, false);
  898. const auto insPtr = CastInst::Create(Instruction::IntToPtr, insert, PointerType::getUnqual(insType), "insert", block);
  899. CallInst::Create(insType, insPtr, {stateArg, keyArg}, "", block);
  900. BranchInst::Create(more, block);
  901. block = done;
  902. const auto build = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Build));
  903. const auto funType = FunctionType::get(valueType, {stateArg->getType()}, false);
  904. const auto funcPtr = CastInst::Create(Instruction::IntToPtr, build, PointerType::getUnqual(funType), "build", block);
  905. const auto dict = CallInst::Create(funType, funcPtr, {stateArg}, "dict", block);
  906. UnRefBoxed(state, ctx, block);
  907. result->addIncoming(dict, block);
  908. new StoreInst(item, statePtr, block);
  909. BranchInst::Create(over, block);
  910. block = over;
  911. return result;
  912. }
  913. #endif
  914. private:
  915. void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
  916. state = ctx.HolderFactory.Create<TState>(Factory->Create(KeyType, KeyTypes, IsTuple, Encoded,
  917. Compare.Get(), Equate.Get(), Hash.Get(), ctx, ItemsCountHint));
  918. }
  919. void RegisterDependencies() const final {
  920. if (const auto flow = this->FlowDependsOn(Flow)) {
  921. this->Own(flow, Item);
  922. this->DependsOn(flow, Key);
  923. }
  924. }
  925. TType* const KeyType;
  926. IComputationNode* const Flow;
  927. IComputationExternalNode* const Item;
  928. IComputationNode* const Key;
  929. const ui64 ItemsCountHint;
  930. const std::unique_ptr<ISetAccumulatorFactory> Factory;
  931. TKeyTypes KeyTypes;
  932. bool IsTuple;
  933. bool Encoded;
  934. bool UseIHash;
  935. NUdf::ICompare::TPtr Compare;
  936. NUdf::IEquate::TPtr Equate;
  937. NUdf::IHash::TPtr Hash;
  938. };
  939. class TSqueezeSetWideWrapper : public TStatefulFlowCodegeneratorNode<TSqueezeSetWideWrapper> {
  940. using TBase = TStatefulFlowCodegeneratorNode<TSqueezeSetWideWrapper>;
  941. public:
  942. class TState : public TComputationValue<TState> {
  943. using TBase = TComputationValue<TState>;
  944. public:
  945. TState(TMemoryUsageInfo* memInfo, std::unique_ptr<ISetAccumulator>&& setAccum)
  946. : TBase(memInfo), SetAccum(std::move(setAccum)) {}
  947. NUdf::TUnboxedValuePod Build() {
  948. return SetAccum->Build().Release();
  949. }
  950. void Insert(NUdf::TUnboxedValuePod value) {
  951. SetAccum->Add(value);
  952. }
  953. private:
  954. const std::unique_ptr<ISetAccumulator> SetAccum;
  955. };
  956. TSqueezeSetWideWrapper(TComputationMutables& mutables, TType* keyType,
  957. IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* key,
  958. ui64 itemsCountHint, std::unique_ptr<ISetAccumulatorFactory> factory)
  959. : TBase(mutables, flow, EValueRepresentation::Boxed, EValueRepresentation::Any)
  960. , KeyType(keyType)
  961. , Flow(flow)
  962. , Items(std::move(items))
  963. , Key(key)
  964. , ItemsCountHint(itemsCountHint)
  965. , Factory(std::move(factory))
  966. , PasstroughKey(GetPasstroughtMap(TComputationNodePtrVector{Key}, Items).front())
  967. , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size()))
  968. {
  969. GetDictionaryKeyTypes(KeyType, KeyTypes, IsTuple, Encoded, UseIHash);
  970. Compare = UseIHash && Factory->IsSorted() ? MakeCompareImpl(KeyType) : nullptr;
  971. Equate = UseIHash ? MakeEquateImpl(KeyType) : nullptr;
  972. Hash = UseIHash && !Factory->IsSorted() ? MakeHashImpl(KeyType) : nullptr;
  973. }
  974. NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
  975. if (state.IsFinish()) {
  976. return state.Release();
  977. } else if (state.IsInvalid()) {
  978. MakeState(ctx, state);
  979. }
  980. auto** fields = ctx.WideFields.data() + WideFieldsIndex;
  981. while (const auto statePtr = static_cast<TState*>(state.AsBoxed().Get())) {
  982. for (auto i = 0U; i < Items.size(); ++i)
  983. if (Key == Items[i] || Items[i]->GetDependencesCount() > 0U)
  984. fields[i] = &Items[i]->RefValue(ctx);
  985. switch (const auto result = Flow->FetchValues(ctx, fields)) {
  986. case EFetchResult::One:
  987. statePtr->Insert(Key->GetValue(ctx).Release());
  988. continue;
  989. case EFetchResult::Yield:
  990. return NUdf::TUnboxedValuePod::MakeYield();
  991. case EFetchResult::Finish: {
  992. const auto dict = statePtr->Build();
  993. state = NUdf::TUnboxedValuePod::MakeFinish();
  994. return dict;
  995. }
  996. }
  997. }
  998. Y_UNREACHABLE();
  999. }
  1000. #ifndef MKQL_DISABLE_CODEGEN
  1001. Value* DoGenerateGetValue(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
  1002. auto& context = ctx.Codegen.GetContext();
  1003. const auto valueType = Type::getInt128Ty(context);
  1004. TLLVMFieldsStructureStateWithAccum<TLLVMFieldsStructure<TComputationValue<TState>>> fieldsStruct(context);
  1005. const auto stateType = StructType::get(context, fieldsStruct.GetFieldsArray());
  1006. const auto statePtrType = PointerType::getUnqual(stateType);
  1007. const auto make = BasicBlock::Create(context, "make", ctx.Func);
  1008. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  1009. BranchInst::Create(make, main, IsInvalid(statePtr, block, context), block);
  1010. block = make;
  1011. const auto ptrType = PointerType::getUnqual(StructType::get(context));
  1012. const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
  1013. const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TSqueezeSetWideWrapper::MakeState));
  1014. const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false);
  1015. const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block);
  1016. CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block);
  1017. BranchInst::Create(main, block);
  1018. block = main;
  1019. const auto more = BasicBlock::Create(context, "more", ctx.Func);
  1020. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  1021. const auto plus = BasicBlock::Create(context, "plus", ctx.Func);
  1022. const auto over = BasicBlock::Create(context, "over", ctx.Func);
  1023. const auto result = PHINode::Create(valueType, 3U, "result", over);
  1024. const auto state = new LoadInst(valueType, statePtr, "state", block);
  1025. const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
  1026. const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block);
  1027. result->addIncoming(GetFinish(context), block);
  1028. BranchInst::Create(over, more, IsFinish(state, block, context), block);
  1029. block = more;
  1030. const auto getres = GetNodeValues(Flow, ctx, block);
  1031. result->addIncoming(GetYield(context), block);
  1032. const auto action = SwitchInst::Create(getres.first, plus, 2U, block);
  1033. action->addCase(ConstantInt::get(Type::getInt32Ty(context), i32(EFetchResult::Finish)), done);
  1034. action->addCase(ConstantInt::get(Type::getInt32Ty(context), i32(EFetchResult::Yield)), over);
  1035. block = plus;
  1036. if (!PasstroughKey) {
  1037. for (auto i = 0U; i < Items.size(); ++i)
  1038. if (Items[i]->GetDependencesCount() > 0U)
  1039. EnsureDynamicCast<ICodegeneratorExternalNode*>(Items[i])->CreateSetValue(ctx, block, getres.second[i](ctx, block));
  1040. }
  1041. const auto key = PasstroughKey ? getres.second[*PasstroughKey](ctx, block) : GetNodeValue(Key, ctx, block);
  1042. const auto insert = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Insert));
  1043. const auto keyArg = key;
  1044. const auto insType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), keyArg->getType()}, false);
  1045. const auto insPtr = CastInst::Create(Instruction::IntToPtr, insert, PointerType::getUnqual(insType), "insert", block);
  1046. CallInst::Create(insType, insPtr, {stateArg, keyArg}, "", block);
  1047. BranchInst::Create(more, block);
  1048. block = done;
  1049. const auto build = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Build));
  1050. const auto funType = FunctionType::get(valueType, {stateArg->getType()}, false);
  1051. const auto funcPtr = CastInst::Create(Instruction::IntToPtr, build, PointerType::getUnqual(funType), "build", block);
  1052. const auto dict = CallInst::Create(funType, funcPtr, {stateArg}, "dict", block);
  1053. UnRefBoxed(state, ctx, block);
  1054. result->addIncoming(dict, block);
  1055. new StoreInst(GetFinish(context), statePtr, block);
  1056. BranchInst::Create(over, block);
  1057. block = over;
  1058. return result;
  1059. }
  1060. #endif
  1061. private:
  1062. void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
  1063. state = ctx.HolderFactory.Create<TState>(Factory->Create(KeyType, KeyTypes, IsTuple, Encoded,
  1064. Compare.Get(), Equate.Get(), Hash.Get(), ctx, ItemsCountHint));
  1065. }
  1066. void RegisterDependencies() const final {
  1067. if (const auto flow = this->FlowDependsOn(Flow)) {
  1068. std::for_each(Items.cbegin(), Items.cend(), std::bind(&TSqueezeSetWideWrapper::Own, flow, std::placeholders::_1));
  1069. this->DependsOn(flow, Key);
  1070. }
  1071. }
  1072. TType* const KeyType;
  1073. IComputationWideFlowNode* const Flow;
  1074. const TComputationExternalNodePtrVector Items;
  1075. IComputationNode* const Key;
  1076. const ui64 ItemsCountHint;
  1077. const std::unique_ptr<ISetAccumulatorFactory> Factory;
  1078. TKeyTypes KeyTypes;
  1079. bool IsTuple;
  1080. bool Encoded;
  1081. bool UseIHash;
  1082. const std::optional<size_t> PasstroughKey;
  1083. const ui32 WideFieldsIndex;
  1084. NUdf::ICompare::TPtr Compare;
  1085. NUdf::IEquate::TPtr Equate;
  1086. NUdf::IHash::TPtr Hash;
  1087. };
  1088. class TMapWrapper : public TMutableComputationNode<TMapWrapper> {
  1089. typedef TMutableComputationNode<TMapWrapper> TBaseComputation;
  1090. public:
  1091. class TStreamValue : public TComputationValue<TStreamValue> {
  1092. public:
  1093. TStreamValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& input, IComputationExternalNode* const item,
  1094. IComputationNode* const key, IComputationNode* const payload, std::unique_ptr<IMapAccumulator>&& mapAccum, TComputationContext& ctx)
  1095. : TComputationValue<TStreamValue>(memInfo)
  1096. , Input(std::move(input))
  1097. , Item(item)
  1098. , Key(key)
  1099. , Payload(payload)
  1100. , MapAccum(std::move(mapAccum))
  1101. , Ctx(ctx) {}
  1102. private:
  1103. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
  1104. if (Finished) {
  1105. return NUdf::EFetchStatus::Finish;
  1106. }
  1107. for (;;) {
  1108. NUdf::TUnboxedValue item;
  1109. switch (auto status = Input.Fetch(item)) {
  1110. case NUdf::EFetchStatus::Ok: {
  1111. Item->SetValue(Ctx, std::move(item));
  1112. MapAccum->Add(Key->GetValue(Ctx), Payload->GetValue(Ctx));
  1113. break; // and continue
  1114. }
  1115. case NUdf::EFetchStatus::Finish: {
  1116. result = MapAccum->Build();
  1117. Finished = true;
  1118. return NUdf::EFetchStatus::Ok;
  1119. }
  1120. case NUdf::EFetchStatus::Yield: {
  1121. return NUdf::EFetchStatus::Yield;
  1122. }
  1123. }
  1124. }
  1125. }
  1126. NUdf::TUnboxedValue Input;
  1127. IComputationExternalNode* const Item;
  1128. IComputationNode* const Key;
  1129. IComputationNode* const Payload;
  1130. const std::unique_ptr<IMapAccumulator> MapAccum;
  1131. TComputationContext& Ctx;
  1132. bool Finished = false;
  1133. };
  1134. TMapWrapper(TComputationMutables& mutables, TType* keyType, TType* payloadType, IComputationNode* list, IComputationExternalNode* item,
  1135. IComputationNode* key, IComputationNode* payload, ui64 itemsCountHint, bool isStream, std::unique_ptr<IMapAccumulatorFactory> factory)
  1136. : TBaseComputation(mutables, EValueRepresentation::Boxed)
  1137. , KeyType(keyType)
  1138. , PayloadType(payloadType)
  1139. , List(list)
  1140. , Item(item)
  1141. , Key(key)
  1142. , Payload(payload)
  1143. , ItemsCountHint(itemsCountHint)
  1144. , IsStream(isStream)
  1145. , Factory(std::move(factory))
  1146. {
  1147. GetDictionaryKeyTypes(KeyType, KeyTypes, IsTuple, Encoded, UseIHash);
  1148. Compare = UseIHash && Factory->IsSorted() ? MakeCompareImpl(KeyType) : nullptr;
  1149. Equate = UseIHash ? MakeEquateImpl(KeyType) : nullptr;
  1150. Hash = UseIHash && !Factory->IsSorted() ? MakeHashImpl(KeyType) : nullptr;
  1151. }
  1152. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  1153. if (IsStream) {
  1154. return ctx.HolderFactory.Create<TStreamValue>(List->GetValue(ctx), Item, Key, Payload,
  1155. Factory->Create(KeyType, PayloadType, KeyTypes, IsTuple, Encoded, Compare.Get(), Equate.Get(), Hash.Get(),
  1156. ctx, ItemsCountHint), ctx);
  1157. }
  1158. const auto& list = List->GetValue(ctx);
  1159. auto itemsCountHint = ItemsCountHint;
  1160. if (list.HasFastListLength()) {
  1161. if (const auto size = list.GetListLength())
  1162. itemsCountHint = size;
  1163. else
  1164. return ctx.HolderFactory.GetEmptyContainerLazy();
  1165. }
  1166. auto acc = Factory->Create(KeyType, PayloadType, KeyTypes, IsTuple, Encoded,
  1167. Compare.Get(), Equate.Get(), Hash.Get(), ctx, itemsCountHint);
  1168. TThresher<false>::DoForEachItem(list,
  1169. [this, &acc, &ctx] (NUdf::TUnboxedValue&& item) {
  1170. Item->SetValue(ctx, std::move(item));
  1171. acc->Add(Key->GetValue(ctx), Payload->GetValue(ctx));
  1172. }
  1173. );
  1174. return acc->Build().Release();
  1175. }
  1176. private:
  1177. void RegisterDependencies() const final {
  1178. this->DependsOn(List);
  1179. this->Own(Item);
  1180. this->DependsOn(Key);
  1181. this->DependsOn(Payload);
  1182. }
  1183. TType* const KeyType;
  1184. TType* PayloadType;
  1185. IComputationNode* const List;
  1186. IComputationExternalNode* const Item;
  1187. IComputationNode* const Key;
  1188. IComputationNode* const Payload;
  1189. const ui64 ItemsCountHint;
  1190. const bool IsStream;
  1191. const std::unique_ptr<IMapAccumulatorFactory> Factory;
  1192. TKeyTypes KeyTypes;
  1193. bool IsTuple;
  1194. bool Encoded;
  1195. bool UseIHash;
  1196. NUdf::ICompare::TPtr Compare;
  1197. NUdf::IEquate::TPtr Equate;
  1198. NUdf::IHash::TPtr Hash;
  1199. };
  1200. class TSqueezeMapFlowWrapper : public TStatefulFlowCodegeneratorNode<TSqueezeMapFlowWrapper> {
  1201. using TBase = TStatefulFlowCodegeneratorNode<TSqueezeMapFlowWrapper>;
  1202. public:
  1203. class TState : public TComputationValue<TState> {
  1204. using TBase = TComputationValue<TState>;
  1205. public:
  1206. TState(TMemoryUsageInfo* memInfo, std::unique_ptr<IMapAccumulator>&& mapAccum)
  1207. : TBase(memInfo), MapAccum(std::move(mapAccum)) {}
  1208. NUdf::TUnboxedValuePod Build() {
  1209. return MapAccum->Build().Release();
  1210. }
  1211. void Insert(NUdf::TUnboxedValuePod key, NUdf::TUnboxedValuePod value) {
  1212. MapAccum->Add(key, value);
  1213. }
  1214. private:
  1215. const std::unique_ptr<IMapAccumulator> MapAccum;
  1216. };
  1217. TSqueezeMapFlowWrapper(TComputationMutables& mutables, TType* keyType, TType* payloadType,
  1218. IComputationNode* flow, IComputationExternalNode* item, IComputationNode* key, IComputationNode* payload,
  1219. ui64 itemsCountHint, std::unique_ptr<IMapAccumulatorFactory> factory)
  1220. : TBase(mutables, flow, EValueRepresentation::Boxed, EValueRepresentation::Any)
  1221. , KeyType(keyType)
  1222. , PayloadType(payloadType)
  1223. , Flow(flow)
  1224. , Item(item)
  1225. , Key(key)
  1226. , Payload(payload)
  1227. , ItemsCountHint(itemsCountHint)
  1228. , Factory(std::move(factory))
  1229. {
  1230. GetDictionaryKeyTypes(KeyType, KeyTypes, IsTuple, Encoded, UseIHash);
  1231. Compare = UseIHash && Factory->IsSorted() ? MakeCompareImpl(KeyType) : nullptr;
  1232. Equate = UseIHash ? MakeEquateImpl(KeyType) : nullptr;
  1233. Hash = UseIHash && !Factory->IsSorted() ? MakeHashImpl(KeyType) : nullptr;
  1234. }
  1235. NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
  1236. if (state.IsFinish()) {
  1237. return state;
  1238. } else if (state.IsInvalid()) {
  1239. MakeState(ctx, state);
  1240. }
  1241. while (const auto statePtr = static_cast<TState*>(state.AsBoxed().Get())) {
  1242. if (auto item = Flow->GetValue(ctx); item.IsYield()) {
  1243. return item.Release();
  1244. } else if (item.IsFinish()) {
  1245. const auto dict = statePtr->Build();
  1246. state = std::move(item);
  1247. return dict;
  1248. } else {
  1249. Item->SetValue(ctx, std::move(item));
  1250. statePtr->Insert(Key->GetValue(ctx).Release(), Payload->GetValue(ctx).Release());
  1251. }
  1252. }
  1253. Y_UNREACHABLE();
  1254. }
  1255. #ifndef MKQL_DISABLE_CODEGEN
  1256. Value* DoGenerateGetValue(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
  1257. auto& context = ctx.Codegen.GetContext();
  1258. const auto codegenItemArg = dynamic_cast<ICodegeneratorExternalNode*>(Item);
  1259. MKQL_ENSURE(codegenItemArg, "Item must be codegenerator node.");
  1260. const auto valueType = Type::getInt128Ty(context);
  1261. TLLVMFieldsStructureStateWithAccum<TLLVMFieldsStructure<TComputationValue<TState>>> fieldsStruct(context);
  1262. const auto stateType = StructType::get(context, fieldsStruct.GetFieldsArray());
  1263. const auto statePtrType = PointerType::getUnqual(stateType);
  1264. const auto make = BasicBlock::Create(context, "make", ctx.Func);
  1265. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  1266. BranchInst::Create(make, main, IsInvalid(statePtr, block, context), block);
  1267. block = make;
  1268. const auto ptrType = PointerType::getUnqual(StructType::get(context));
  1269. const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
  1270. const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TSqueezeMapFlowWrapper::MakeState));
  1271. const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false);
  1272. const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block);
  1273. CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block);
  1274. BranchInst::Create(main, block);
  1275. block = main;
  1276. const auto more = BasicBlock::Create(context, "more", ctx.Func);
  1277. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  1278. const auto plus = BasicBlock::Create(context, "plus", ctx.Func);
  1279. const auto over = BasicBlock::Create(context, "over", ctx.Func);
  1280. const auto result = PHINode::Create(valueType, 3U, "result", over);
  1281. const auto state = new LoadInst(valueType, statePtr, "state", block);
  1282. const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
  1283. const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block);
  1284. result->addIncoming(GetFinish(context), block);
  1285. BranchInst::Create(over, more, IsFinish(state, block, context), block);
  1286. block = more;
  1287. const auto item = GetNodeValue(Flow, ctx, block);
  1288. result->addIncoming(GetYield(context), block);
  1289. const auto choise = SwitchInst::Create(item, plus, 2U, block);
  1290. choise->addCase(GetFinish(context), done);
  1291. choise->addCase(GetYield(context), over);
  1292. block = plus;
  1293. codegenItemArg->CreateSetValue(ctx, block, item);
  1294. const auto key = GetNodeValue(Key, ctx, block);
  1295. const auto payload = GetNodeValue(Payload, ctx, block);
  1296. const auto insert = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Insert));
  1297. const auto keyArg = key;
  1298. const auto payloadArg = payload;
  1299. const auto insType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), keyArg->getType(), payloadArg->getType()}, false);
  1300. const auto insPtr = CastInst::Create(Instruction::IntToPtr, insert, PointerType::getUnqual(insType), "insert", block);
  1301. CallInst::Create(insType, insPtr, {stateArg, keyArg, payloadArg}, "", block);
  1302. BranchInst::Create(more, block);
  1303. block = done;
  1304. const auto build = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Build));
  1305. const auto funType = FunctionType::get(valueType, {stateArg->getType()}, false);
  1306. const auto funcPtr = CastInst::Create(Instruction::IntToPtr, build, PointerType::getUnqual(funType), "build", block);
  1307. const auto dict = CallInst::Create(funType, funcPtr, {stateArg}, "dict", block);
  1308. UnRefBoxed(state, ctx, block);
  1309. result->addIncoming(dict, block);
  1310. new StoreInst(item, statePtr, block);
  1311. BranchInst::Create(over, block);
  1312. block = over;
  1313. return result;
  1314. }
  1315. #endif
  1316. private:
  1317. void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
  1318. state = ctx.HolderFactory.Create<TState>(Factory->Create(KeyType, PayloadType, KeyTypes, IsTuple, Encoded,
  1319. Compare.Get(), Equate.Get(), Hash.Get(), ctx, ItemsCountHint));
  1320. }
  1321. void RegisterDependencies() const final {
  1322. if (const auto flow = this->FlowDependsOn(Flow)) {
  1323. this->Own(flow, Item);
  1324. this->DependsOn(flow, Key);
  1325. this->DependsOn(flow, Payload);
  1326. }
  1327. }
  1328. TType* const KeyType;
  1329. TType* PayloadType;
  1330. IComputationNode* const Flow;
  1331. IComputationExternalNode* const Item;
  1332. IComputationNode* const Key;
  1333. IComputationNode* const Payload;
  1334. const ui64 ItemsCountHint;
  1335. const std::unique_ptr<IMapAccumulatorFactory> Factory;
  1336. TKeyTypes KeyTypes;
  1337. bool IsTuple;
  1338. bool Encoded;
  1339. bool UseIHash;
  1340. NUdf::ICompare::TPtr Compare;
  1341. NUdf::IEquate::TPtr Equate;
  1342. NUdf::IHash::TPtr Hash;
  1343. };
  1344. class TSqueezeMapWideWrapper : public TStatefulFlowCodegeneratorNode<TSqueezeMapWideWrapper> {
  1345. using TBase = TStatefulFlowCodegeneratorNode<TSqueezeMapWideWrapper>;
  1346. public:
  1347. class TState : public TComputationValue<TState> {
  1348. using TBase = TComputationValue<TState>;
  1349. public:
  1350. TState(TMemoryUsageInfo* memInfo, std::unique_ptr<IMapAccumulator>&& mapAccum)
  1351. : TBase(memInfo), MapAccum(std::move(mapAccum)) {}
  1352. NUdf::TUnboxedValuePod Build() {
  1353. return MapAccum->Build().Release();
  1354. }
  1355. void Insert(NUdf::TUnboxedValuePod key, NUdf::TUnboxedValuePod value) {
  1356. MapAccum->Add(key, value);
  1357. }
  1358. private:
  1359. const std::unique_ptr<IMapAccumulator> MapAccum;
  1360. };
  1361. TSqueezeMapWideWrapper(TComputationMutables& mutables, TType* keyType, TType* payloadType,
  1362. IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* key, IComputationNode* payload,
  1363. ui64 itemsCountHint, std::unique_ptr<IMapAccumulatorFactory> factory)
  1364. : TBase(mutables, flow, EValueRepresentation::Boxed, EValueRepresentation::Any)
  1365. , KeyType(keyType)
  1366. , PayloadType(payloadType)
  1367. , Flow(flow)
  1368. , Items(std::move(items))
  1369. , Key(key)
  1370. , Payload(payload)
  1371. , ItemsCountHint(itemsCountHint)
  1372. , Factory(std::move(factory))
  1373. , PasstroughKey(GetPasstroughtMap(TComputationNodePtrVector{Key}, Items).front())
  1374. , PasstroughPayload(GetPasstroughtMap(TComputationNodePtrVector{Payload}, Items).front())
  1375. , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size()))
  1376. {
  1377. GetDictionaryKeyTypes(KeyType, KeyTypes, IsTuple, Encoded, UseIHash);
  1378. Compare = UseIHash && Factory->IsSorted() ? MakeCompareImpl(KeyType) : nullptr;
  1379. Equate = UseIHash ? MakeEquateImpl(KeyType) : nullptr;
  1380. Hash = UseIHash && !Factory->IsSorted() ? MakeHashImpl(KeyType) : nullptr;
  1381. }
  1382. NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
  1383. if (state.IsFinish()) {
  1384. return state;
  1385. } else if (state.IsInvalid()) {
  1386. MakeState(ctx, state);
  1387. }
  1388. auto** fields = ctx.WideFields.data() + WideFieldsIndex;
  1389. while (const auto statePtr = static_cast<TState*>(state.AsBoxed().Get())) {
  1390. for (auto i = 0U; i < Items.size(); ++i)
  1391. if (Key == Items[i] || Payload == Items[i] || Items[i]->GetDependencesCount() > 0U)
  1392. fields[i] = &Items[i]->RefValue(ctx);
  1393. switch (const auto result = Flow->FetchValues(ctx, fields)) {
  1394. case EFetchResult::One:
  1395. statePtr->Insert(Key->GetValue(ctx).Release(), Payload->GetValue(ctx).Release());
  1396. continue;
  1397. case EFetchResult::Yield:
  1398. return NUdf::TUnboxedValuePod::MakeYield();
  1399. case EFetchResult::Finish: {
  1400. const auto dict = statePtr->Build();
  1401. state = NUdf::TUnboxedValuePod::MakeFinish();
  1402. return dict;
  1403. }
  1404. }
  1405. }
  1406. Y_UNREACHABLE();
  1407. }
  1408. #ifndef MKQL_DISABLE_CODEGEN
  1409. Value* DoGenerateGetValue(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
  1410. auto& context = ctx.Codegen.GetContext();
  1411. const auto valueType = Type::getInt128Ty(context);
  1412. TLLVMFieldsStructureStateWithAccum<TLLVMFieldsStructure<TComputationValue<TState>>> fieldsStruct(context);
  1413. const auto stateType = StructType::get(context, fieldsStruct.GetFieldsArray());
  1414. const auto statePtrType = PointerType::getUnqual(stateType);
  1415. const auto make = BasicBlock::Create(context, "make", ctx.Func);
  1416. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  1417. BranchInst::Create(make, main, IsInvalid(statePtr, block, context), block);
  1418. block = make;
  1419. const auto ptrType = PointerType::getUnqual(StructType::get(context));
  1420. const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
  1421. const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TSqueezeMapWideWrapper::MakeState));
  1422. const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false);
  1423. const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block);
  1424. CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block);
  1425. BranchInst::Create(main, block);
  1426. block = main;
  1427. const auto more = BasicBlock::Create(context, "more", ctx.Func);
  1428. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  1429. const auto plus = BasicBlock::Create(context, "plus", ctx.Func);
  1430. const auto over = BasicBlock::Create(context, "over", ctx.Func);
  1431. const auto result = PHINode::Create(valueType, 3U, "result", over);
  1432. const auto state = new LoadInst(valueType, statePtr, "state", block);
  1433. const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
  1434. const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block);
  1435. result->addIncoming(GetFinish(context), block);
  1436. BranchInst::Create(over, more, IsFinish(state, block, context), block);
  1437. block = more;
  1438. const auto getres = GetNodeValues(Flow, ctx, block);
  1439. result->addIncoming(GetYield(context), block);
  1440. const auto action = SwitchInst::Create(getres.first, plus, 2U, block);
  1441. action->addCase(ConstantInt::get(Type::getInt32Ty(context), i32(EFetchResult::Finish)), done);
  1442. action->addCase(ConstantInt::get(Type::getInt32Ty(context), i32(EFetchResult::Yield)), over);
  1443. block = plus;
  1444. if (!(PasstroughKey && PasstroughPayload)) {
  1445. for (auto i = 0U; i < Items.size(); ++i)
  1446. if (Items[i]->GetDependencesCount() > 0U)
  1447. EnsureDynamicCast<ICodegeneratorExternalNode*>(Items[i])->CreateSetValue(ctx, block, getres.second[i](ctx, block));
  1448. }
  1449. const auto key = PasstroughKey ? getres.second[*PasstroughKey](ctx, block) : GetNodeValue(Key, ctx, block);
  1450. const auto payload = PasstroughPayload ? getres.second[*PasstroughPayload](ctx, block) : GetNodeValue(Payload, ctx, block);
  1451. const auto insert = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Insert));
  1452. const auto keyArg = key;
  1453. const auto payloadArg = payload;
  1454. const auto insType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), keyArg->getType(), payloadArg->getType()}, false);
  1455. const auto insPtr = CastInst::Create(Instruction::IntToPtr, insert, PointerType::getUnqual(insType), "insert", block);
  1456. CallInst::Create(insType, insPtr, {stateArg, keyArg, payloadArg}, "", block);
  1457. BranchInst::Create(more, block);
  1458. block = done;
  1459. const auto build = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Build));
  1460. const auto funType = FunctionType::get(valueType, {stateArg->getType()}, false);
  1461. const auto funcPtr = CastInst::Create(Instruction::IntToPtr, build, PointerType::getUnqual(funType), "build", block);
  1462. const auto dict = CallInst::Create(funType, funcPtr, {stateArg}, "dict", block);
  1463. UnRefBoxed(state, ctx, block);
  1464. result->addIncoming(dict, block);
  1465. new StoreInst(GetFinish(context), statePtr, block);
  1466. BranchInst::Create(over, block);
  1467. block = over;
  1468. return result;
  1469. }
  1470. #endif
  1471. private:
  1472. void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
  1473. state = ctx.HolderFactory.Create<TState>(Factory->Create(KeyType, PayloadType, KeyTypes, IsTuple, Encoded,
  1474. Compare.Get(), Equate.Get(), Hash.Get(), ctx, ItemsCountHint));
  1475. }
  1476. void RegisterDependencies() const final {
  1477. if (const auto flow = this->FlowDependsOn(Flow)) {
  1478. std::for_each(Items.cbegin(), Items.cend(), std::bind(&TSqueezeMapWideWrapper::Own, flow, std::placeholders::_1));
  1479. this->DependsOn(flow, Key);
  1480. this->DependsOn(flow, Payload);
  1481. }
  1482. }
  1483. TType* const KeyType;
  1484. TType* PayloadType;
  1485. IComputationWideFlowNode* const Flow;
  1486. const TComputationExternalNodePtrVector Items;
  1487. IComputationNode* const Key;
  1488. IComputationNode* const Payload;
  1489. const ui64 ItemsCountHint;
  1490. const std::unique_ptr<IMapAccumulatorFactory> Factory;
  1491. TKeyTypes KeyTypes;
  1492. bool IsTuple;
  1493. bool Encoded;
  1494. bool UseIHash;
  1495. const std::optional<size_t> PasstroughKey;
  1496. const std::optional<size_t> PasstroughPayload;
  1497. mutable std::vector<NUdf::TUnboxedValue*> Fields;
  1498. const ui32 WideFieldsIndex;
  1499. NUdf::ICompare::TPtr Compare;
  1500. NUdf::IEquate::TPtr Equate;
  1501. NUdf::IHash::TPtr Hash;
  1502. };
  1503. template <typename TAccumulator>
  1504. IComputationNode* WrapToSet(TCallable& callable, const TNodeLocator& nodeLocator, TComputationMutables& mutables) {
  1505. const auto keyType = callable.GetInput(callable.GetInputsCount() - 5U).GetStaticType();
  1506. const auto itemsCountHint = AS_VALUE(TDataLiteral, callable.GetInput(callable.GetInputsCount() - 1U))->AsValue().Get<ui64>();
  1507. const auto flow = LocateNode(nodeLocator, callable, 0U);
  1508. const auto keySelector = LocateNode(nodeLocator, callable, callable.GetInputsCount() - 5U);
  1509. auto factory = std::make_unique<TSetAccumulatorFactory<TAccumulator>>();
  1510. if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) {
  1511. const auto width = callable.GetInputsCount() - 6U;
  1512. TComputationExternalNodePtrVector args(width, nullptr);
  1513. auto index = 0U;
  1514. std::generate_n(args.begin(), width, [&](){ return LocateExternalNode(nodeLocator, callable, ++index); });
  1515. return new TSqueezeSetWideWrapper(mutables, keyType, wide, std::move(args), keySelector, itemsCountHint, std::move(factory));
  1516. }
  1517. const auto itemArg = LocateExternalNode(nodeLocator, callable, 1U);
  1518. const auto type = callable.GetInput(0U).GetStaticType();
  1519. if (type->IsList()) {
  1520. return new TSetWrapper(mutables, keyType, flow, itemArg, keySelector, itemsCountHint, false, std::move(factory));
  1521. }
  1522. if (type->IsFlow()) {
  1523. return new TSqueezeSetFlowWrapper(mutables, keyType, flow, itemArg, keySelector, itemsCountHint, std::move(factory));
  1524. }
  1525. if (type->IsStream()) {
  1526. return new TSetWrapper(mutables, keyType, flow, itemArg, keySelector, itemsCountHint, true, std::move(factory));
  1527. }
  1528. THROW yexception() << "Expected list, flow or stream.";
  1529. }
  1530. template <typename TAccumulator>
  1531. IComputationNode* WrapToMap(TCallable& callable, const TNodeLocator& nodeLocator, TComputationMutables& mutables) {
  1532. const auto keyType = callable.GetInput(callable.GetInputsCount() - 5U).GetStaticType();
  1533. const auto payloadType = callable.GetInput(callable.GetInputsCount() - 4U).GetStaticType();
  1534. const auto itemsCountHint = AS_VALUE(TDataLiteral, callable.GetInput(callable.GetInputsCount() - 1U))->AsValue().Get<ui64>();
  1535. const auto flow = LocateNode(nodeLocator, callable, 0U);
  1536. const auto keySelector = LocateNode(nodeLocator, callable, callable.GetInputsCount() - 5U);
  1537. const auto payloadSelector = LocateNode(nodeLocator, callable, callable.GetInputsCount() - 4U);
  1538. auto factory = std::make_unique<TMapAccumulatorFactory<TAccumulator>>();
  1539. if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) {
  1540. const auto width = callable.GetInputsCount() - 6U;
  1541. TComputationExternalNodePtrVector args(width, nullptr);
  1542. auto index = 0U;
  1543. std::generate(args.begin(), args.end(), [&](){ return LocateExternalNode(nodeLocator, callable, ++index); });
  1544. return new TSqueezeMapWideWrapper(mutables, keyType, payloadType, wide, std::move(args), keySelector, payloadSelector, itemsCountHint, std::move(factory));
  1545. }
  1546. const auto itemArg = LocateExternalNode(nodeLocator, callable, 1U);
  1547. const auto type = callable.GetInput(0U).GetStaticType();
  1548. if (type->IsList()) {
  1549. return new TMapWrapper(mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint, false, std::move(factory));
  1550. }
  1551. if (type->IsFlow()) {
  1552. return new TSqueezeMapFlowWrapper(mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint, std::move(factory));
  1553. }
  1554. if (type->IsStream()) {
  1555. return new TMapWrapper(mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint, true, std::move(factory));
  1556. }
  1557. THROW yexception() << "Expected list, flow or stream.";
  1558. }
  1559. IComputationNode* WrapToSortedDictInternal(TCallable& callable, const TComputationNodeFactoryContext& ctx, bool isList) {
  1560. MKQL_ENSURE(callable.GetInputsCount() >= 6U, "Expected six or more args.");
  1561. const auto type = callable.GetInput(0U).GetStaticType();
  1562. if (isList) {
  1563. MKQL_ENSURE(type->IsList(), "Expected list.");
  1564. } else {
  1565. MKQL_ENSURE(type->IsFlow() || type->IsStream(), "Expected flow or stream.");
  1566. }
  1567. const auto keyType = callable.GetInput(callable.GetInputsCount() - 5U).GetStaticType();
  1568. const auto payloadType = callable.GetInput(callable.GetInputsCount() - 4U).GetStaticType();
  1569. const auto multiData = AS_VALUE(TDataLiteral, callable.GetInput(callable.GetInputsCount() - 3U));
  1570. const bool isMulti = multiData->AsValue().Get<bool>();
  1571. const auto itemsCountHint = AS_VALUE(TDataLiteral, callable.GetInput(callable.GetInputsCount() - 1U))->AsValue().Get<ui64>();
  1572. const auto flow = LocateNode(ctx.NodeLocator, callable, 0U);
  1573. const auto keySelector = LocateNode(ctx.NodeLocator, callable, callable.GetInputsCount() -5U);
  1574. const auto payloadSelector = LocateNode(ctx.NodeLocator, callable, callable.GetInputsCount() -4U);
  1575. if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) {
  1576. const auto width = callable.GetInputsCount() - 6U;
  1577. TComputationExternalNodePtrVector args(width, nullptr);
  1578. auto index = 0U;
  1579. std::generate(args.begin(), args.end(), [&](){ return LocateExternalNode(ctx.NodeLocator, callable, ++index); });
  1580. if (!isMulti && payloadType->IsVoid()) {
  1581. return new TSqueezeSetWideWrapper(ctx.Mutables, keyType, wide, std::move(args), keySelector, itemsCountHint,
  1582. std::make_unique<TSetAccumulatorFactory<TSortedSetAccumulator>>());
  1583. } else if (isMulti) {
  1584. return new TSqueezeMapWideWrapper(ctx.Mutables, keyType, payloadType, wide, std::move(args), keySelector, payloadSelector, itemsCountHint,
  1585. std::make_unique<TMapAccumulatorFactory<TSortedMapAccumulator<true>>>());
  1586. } else {
  1587. return new TSqueezeMapWideWrapper(ctx.Mutables, keyType, payloadType, wide, std::move(args), keySelector, payloadSelector, itemsCountHint,
  1588. std::make_unique<TMapAccumulatorFactory<TSortedMapAccumulator<false>>>());
  1589. }
  1590. }
  1591. const auto itemArg = LocateExternalNode(ctx.NodeLocator, callable, 1U);
  1592. if (!isMulti && payloadType->IsVoid()) {
  1593. auto factory = std::make_unique<TSetAccumulatorFactory<TSortedSetAccumulator>>();
  1594. if (type->IsList()) {
  1595. return new TSetWrapper(ctx.Mutables, keyType, flow, itemArg, keySelector, itemsCountHint,
  1596. false, std::move(factory));
  1597. }
  1598. if (type->IsFlow()) {
  1599. return new TSqueezeSetFlowWrapper(ctx.Mutables, keyType, flow, itemArg, keySelector,
  1600. itemsCountHint, std::move(factory));
  1601. }
  1602. if (type->IsStream()) {
  1603. return new TSetWrapper(ctx.Mutables, keyType, flow, itemArg, keySelector, itemsCountHint,
  1604. true, std::move(factory));
  1605. }
  1606. } else if (isMulti) {
  1607. auto factory = std::make_unique<TMapAccumulatorFactory<TSortedMapAccumulator<true>>>();
  1608. if (type->IsList()) {
  1609. return new TMapWrapper(ctx.Mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint,
  1610. false, std::move(factory));
  1611. }
  1612. if (type->IsFlow()) {
  1613. return new TSqueezeMapFlowWrapper(ctx.Mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector,
  1614. itemsCountHint, std::move(factory));
  1615. }
  1616. if (type->IsStream()) {
  1617. return new TMapWrapper(ctx.Mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint,
  1618. true, std::move(factory));
  1619. }
  1620. } else {
  1621. auto factory = std::make_unique<TMapAccumulatorFactory<TSortedMapAccumulator<false>>>();
  1622. if (type->IsList()) {
  1623. return new TMapWrapper(ctx.Mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint,
  1624. false, std::move(factory));
  1625. }
  1626. if (type->IsFlow()) {
  1627. return new TSqueezeMapFlowWrapper(ctx.Mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector,
  1628. itemsCountHint, std::move(factory));
  1629. }
  1630. if (type->IsStream()) {
  1631. return new TMapWrapper(ctx.Mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint,
  1632. true, std::move(factory));
  1633. }
  1634. }
  1635. THROW yexception() << "Expected list, flow or stream.";
  1636. }
  1637. IComputationNode* WrapToHashedDictInternal(TCallable& callable, const TComputationNodeFactoryContext& ctx, bool isList) {
  1638. MKQL_ENSURE(callable.GetInputsCount() >= 6U, "Expected six or more args.");
  1639. const auto type = callable.GetInput(0U).GetStaticType();
  1640. if (isList) {
  1641. MKQL_ENSURE(type->IsList(), "Expected list.");
  1642. } else {
  1643. MKQL_ENSURE(type->IsFlow() || type->IsStream(), "Expected flow or stream.");
  1644. }
  1645. const auto keyType = callable.GetInput(callable.GetInputsCount() - 5U).GetStaticType();
  1646. const auto payloadType = callable.GetInput(callable.GetInputsCount() - 4U).GetStaticType();
  1647. const bool multi = AS_VALUE(TDataLiteral, callable.GetInput(callable.GetInputsCount() - 3U))->AsValue().Get<bool>();
  1648. const bool isCompact = AS_VALUE(TDataLiteral, callable.GetInput(callable.GetInputsCount() - 2U))->AsValue().Get<bool>();
  1649. const auto payloadSelectorNode = callable.GetInput(callable.GetInputsCount() - 4U);
  1650. const bool isOptional = keyType->IsOptional();
  1651. const auto unwrappedKeyType = isOptional ? AS_TYPE(TOptionalType, keyType)->GetItemType() : keyType;
  1652. if (!multi && payloadType->IsVoid()) {
  1653. if (isCompact) {
  1654. if (unwrappedKeyType->IsData()) {
  1655. #define USE_HASHED_SINGLE_FIXED_COMPACT_SET(xType, xLayoutType) \
  1656. case NUdf::TDataType<xType>::Id: \
  1657. if (isOptional) { \
  1658. return WrapToSet< \
  1659. THashedSingleFixedCompactSetAccumulator<xLayoutType, true>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1660. } else { \
  1661. return WrapToSet< \
  1662. THashedSingleFixedCompactSetAccumulator<xLayoutType, false>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1663. }
  1664. switch (AS_TYPE(TDataType, unwrappedKeyType)->GetSchemeType()) {
  1665. KNOWN_FIXED_VALUE_TYPES(USE_HASHED_SINGLE_FIXED_COMPACT_SET)
  1666. }
  1667. #undef USE_HASHED_SINGLE_FIXED_COMPACT_SET
  1668. }
  1669. return WrapToSet<THashedCompactSetAccumulator>(callable, ctx.NodeLocator, ctx.Mutables);
  1670. }
  1671. if (unwrappedKeyType->IsData()) {
  1672. #define USE_HASHED_SINGLE_FIXED_SET(xType, xLayoutType) \
  1673. case NUdf::TDataType<xType>::Id: \
  1674. if (isOptional) { \
  1675. return WrapToSet< \
  1676. THashedSingleFixedSetAccumulator<xLayoutType, true>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1677. } else { \
  1678. return WrapToSet< \
  1679. THashedSingleFixedSetAccumulator<xLayoutType, false>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1680. }
  1681. switch (AS_TYPE(TDataType, unwrappedKeyType)->GetSchemeType()) {
  1682. KNOWN_FIXED_VALUE_TYPES(USE_HASHED_SINGLE_FIXED_SET)
  1683. }
  1684. #undef USE_HASHED_SINGLE_FIXED_SET
  1685. }
  1686. return WrapToSet<THashedSetAccumulator>(callable, ctx.NodeLocator, ctx.Mutables);
  1687. }
  1688. if (isCompact) {
  1689. if (unwrappedKeyType->IsData()) {
  1690. #define USE_HASHED_SINGLE_FIXED_COMPACT_MAP(xType, xLayoutType) \
  1691. case NUdf::TDataType<xType>::Id: \
  1692. if (multi) { \
  1693. if (isOptional) { \
  1694. return WrapToMap< \
  1695. THashedSingleFixedCompactMapAccumulator<xLayoutType, true, true>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1696. } else { \
  1697. return WrapToMap< \
  1698. THashedSingleFixedCompactMapAccumulator<xLayoutType, false, true>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1699. } \
  1700. } else { \
  1701. if (isOptional) { \
  1702. return WrapToMap< \
  1703. THashedSingleFixedCompactMapAccumulator<xLayoutType, true, false>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1704. } else { \
  1705. return WrapToMap< \
  1706. THashedSingleFixedCompactMapAccumulator<xLayoutType, false, false>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1707. } \
  1708. }
  1709. switch (AS_TYPE(TDataType, unwrappedKeyType)->GetSchemeType()) {
  1710. KNOWN_FIXED_VALUE_TYPES(USE_HASHED_SINGLE_FIXED_COMPACT_MAP)
  1711. }
  1712. #undef USE_HASHED_SINGLE_FIXED_COMPACT_MAP
  1713. }
  1714. if (multi) {
  1715. return WrapToMap<THashedCompactMapAccumulator<true>>(callable, ctx.NodeLocator, ctx.Mutables);
  1716. } else {
  1717. return WrapToMap<THashedCompactMapAccumulator<false>>(callable, ctx.NodeLocator, ctx.Mutables);
  1718. }
  1719. }
  1720. if (unwrappedKeyType->IsData()) {
  1721. #define USE_HASHED_SINGLE_FIXED_MAP(xType, xLayoutType) \
  1722. case NUdf::TDataType<xType>::Id: \
  1723. if (multi) { \
  1724. if (isOptional) { \
  1725. return WrapToMap< \
  1726. THashedSingleFixedMultiMapAccumulator<xLayoutType, true>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1727. } else { \
  1728. return WrapToMap< \
  1729. THashedSingleFixedMultiMapAccumulator<xLayoutType, false>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1730. } \
  1731. } else { \
  1732. if (isOptional) { \
  1733. return WrapToMap< \
  1734. THashedSingleFixedMapAccumulator<xLayoutType, true>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1735. } else { \
  1736. return WrapToMap< \
  1737. THashedSingleFixedMapAccumulator<xLayoutType, false>>(callable, ctx.NodeLocator, ctx.Mutables); \
  1738. } \
  1739. }
  1740. switch (AS_TYPE(TDataType, unwrappedKeyType)->GetSchemeType()) {
  1741. KNOWN_FIXED_VALUE_TYPES(USE_HASHED_SINGLE_FIXED_MAP)
  1742. }
  1743. #undef USE_HASHED_SINGLE_FIXED_MAP
  1744. }
  1745. if (multi) {
  1746. return WrapToMap<THashedMultiMapAccumulator>(callable, ctx.NodeLocator, ctx.Mutables);
  1747. } else {
  1748. return WrapToMap<THashedMapAccumulator>(callable, ctx.NodeLocator, ctx.Mutables);
  1749. }
  1750. }
  1751. }
  1752. IComputationNode* WrapToSortedDict(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  1753. return WrapToSortedDictInternal(callable, ctx, true);
  1754. }
  1755. IComputationNode* WrapToHashedDict(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  1756. return WrapToHashedDictInternal(callable, ctx, true);
  1757. }
  1758. IComputationNode* WrapSqueezeToSortedDict(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  1759. return WrapToSortedDictInternal(callable, ctx, false);
  1760. }
  1761. IComputationNode* WrapSqueezeToHashedDict(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  1762. return WrapToHashedDictInternal(callable, ctx, false);
  1763. }
  1764. }
  1765. }