1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083 |
- #include "mkql_todict.h"
- #include <yql/essentials/minikql/computation/mkql_computation_list_adapter.h>
- #include <yql/essentials/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
- #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
- #include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
- #include <yql/essentials/minikql/computation/mkql_llvm_base.h> // Y_IGNORE
- #include <yql/essentials/minikql/computation/presort.h>
- #include <yql/essentials/minikql/mkql_node_cast.h>
- #include <yql/essentials/minikql/mkql_string_util.h>
- #include <yql/essentials/public/udf/udf_types.h>
- #include <yql/essentials/utils/cast.h>
- #include <yql/essentials/utils/hash.h>
- #include <algorithm>
- #include <unordered_map>
- #include <optional>
- #include <vector>
- namespace NKikimr {
- namespace NMiniKQL {
- using NYql::EnsureDynamicCast;
- namespace {
- class THashedMultiMapAccumulator {
- using TMapType = TValuesDictHashMap;
- TComputationContext& Ctx;
- TType* KeyType;
- const TKeyTypes& KeyTypes;
- bool IsTuple;
- std::shared_ptr<TValuePacker> Packer;
- const NUdf::IHash* Hash;
- const NUdf::IEquate* Equate;
- TMapType Map;
- public:
- static constexpr bool IsSorted = false;
- THashedMultiMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
- const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
- : Ctx(ctx), KeyType(keyType), KeyTypes(keyTypes), IsTuple(isTuple), Hash(hash), Equate(equate)
- , Map(0, TValueHasher(KeyTypes, isTuple, hash), TValueEqual(KeyTypes, isTuple, equate))
- {
- Y_UNUSED(compare);
- if (encoded) {
- Packer = std::make_shared<TValuePacker>(true, keyType);
- }
- Y_UNUSED(payloadType);
- Map.reserve(itemsCountHint);
- }
- void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload)
- {
- if (Packer) {
- key = MakeString(Packer->Pack(key));
- }
- auto it = Map.find(key);
- if (it == Map.end()) {
- it = Map.emplace(std::move(key), Ctx.HolderFactory.NewVectorHolder()).first;
- }
- it->second.Push(std::move(payload));
- }
- NUdf::TUnboxedValue Build()
- {
- const auto filler = [this](TValuesDictHashMap& targetMap) {
- targetMap = std::move(Map);
- };
- return Ctx.HolderFactory.CreateDirectHashedDictHolder(filler, KeyTypes, IsTuple, true, Packer ? KeyType : nullptr, Hash, Equate);
- }
- };
- class THashedMapAccumulator {
- using TMapType = TValuesDictHashMap;
- TComputationContext& Ctx;
- TType* KeyType;
- const TKeyTypes& KeyTypes;
- const bool IsTuple;
- std::shared_ptr<TValuePacker> Packer;
- const NUdf::IHash* Hash;
- const NUdf::IEquate* Equate;
- TMapType Map;
- public:
- static constexpr bool IsSorted = false;
- THashedMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
- const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
- : Ctx(ctx), KeyType(keyType), KeyTypes(keyTypes), IsTuple(isTuple), Hash(hash), Equate(equate)
- , Map(0, TValueHasher(KeyTypes, isTuple, hash), TValueEqual(KeyTypes, isTuple, equate))
- {
- Y_UNUSED(compare);
- if (encoded) {
- Packer = std::make_shared<TValuePacker>(true, keyType);
- }
- Y_UNUSED(payloadType);
- Map.reserve(itemsCountHint);
- }
- void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload)
- {
- if (Packer) {
- key = MakeString(Packer->Pack(key));
- }
- Map.emplace(std::move(key), std::move(payload));
- }
- NUdf::TUnboxedValue Build()
- {
- const auto filler = [this](TMapType& targetMap) {
- targetMap = std::move(Map);
- };
- return Ctx.HolderFactory.CreateDirectHashedDictHolder(filler, KeyTypes, IsTuple, true, Packer ? KeyType : nullptr, Hash, Equate);
- }
- };
- template<typename T, bool OptionalKey>
- class THashedSingleFixedMultiMapAccumulator {
- using TMapType = TValuesDictHashSingleFixedMap<T>;
- TComputationContext& Ctx;
- const TKeyTypes& KeyTypes;
- TMapType Map;
- TUnboxedValueVector NullPayloads;
- NUdf::TUnboxedValue CurrentEmptyVectorForInsert;
- public:
- static constexpr bool IsSorted = false;
- THashedSingleFixedMultiMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
- const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
- : Ctx(ctx), KeyTypes(keyTypes), Map(0, TMyHash<T>(), TMyEquals<T>()) {
- Y_UNUSED(keyType);
- Y_UNUSED(payloadType);
- Y_UNUSED(isTuple);
- Y_UNUSED(encoded);
- Y_UNUSED(compare);
- Y_UNUSED(equate);
- Y_UNUSED(hash);
- Map.reserve(itemsCountHint);
- CurrentEmptyVectorForInsert = Ctx.HolderFactory.NewVectorHolder();
- }
- void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload) {
- if constexpr (OptionalKey) {
- if (!key) {
- NullPayloads.emplace_back(std::move(payload));
- return;
- }
- }
- auto insertInfo = Map.emplace(key.Get<T>(), CurrentEmptyVectorForInsert);
- if (insertInfo.second) {
- CurrentEmptyVectorForInsert = Ctx.HolderFactory.NewVectorHolder();
- }
- insertInfo.first->second.Push(payload.Release());
- }
- NUdf::TUnboxedValue Build() {
- std::optional<NUdf::TUnboxedValue> nullPayload;
- if (NullPayloads.size()) {
- nullPayload = Ctx.HolderFactory.VectorAsVectorHolder(std::move(NullPayloads));
- }
- return Ctx.HolderFactory.CreateDirectHashedSingleFixedMapHolder<T, OptionalKey>(std::move(Map), std::move(nullPayload));
- }
- };
- template<typename T, bool OptionalKey>
- class THashedSingleFixedMapAccumulator {
- using TMapType = TValuesDictHashSingleFixedMap<T>;
- TComputationContext& Ctx;
- TMapType Map;
- std::optional<NUdf::TUnboxedValue> NullPayload;
- public:
- static constexpr bool IsSorted = false;
- THashedSingleFixedMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
- const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
- : Ctx(ctx), Map(0, TMyHash<T>(), TMyEquals<T>())
- {
- Y_UNUSED(keyType);
- Y_UNUSED(payloadType);
- Y_UNUSED(keyTypes);
- Y_UNUSED(isTuple);
- Y_UNUSED(encoded);
- Y_UNUSED(compare);
- Y_UNUSED(equate);
- Y_UNUSED(hash);
- Map.reserve(itemsCountHint);
- }
- void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload)
- {
- if constexpr (OptionalKey) {
- if (!key) {
- NullPayload.emplace(std::move(payload));
- return;
- }
- }
- Map.emplace(key.Get<T>(), std::move(payload));
- }
- NUdf::TUnboxedValue Build()
- {
- return Ctx.HolderFactory.CreateDirectHashedSingleFixedMapHolder<T, OptionalKey>(std::move(Map), std::move(NullPayload));
- }
- };
- class THashedSetAccumulator {
- using TSetType = TValuesDictHashSet;
- TComputationContext& Ctx;
- TType* KeyType;
- const TKeyTypes& KeyTypes;
- bool IsTuple;
- std::shared_ptr<TValuePacker> Packer;
- TSetType Set;
- const NUdf::IHash* Hash;
- const NUdf::IEquate* Equate;
- public:
- static constexpr bool IsSorted = false;
- THashedSetAccumulator(TType* keyType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
- const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
- : Ctx(ctx), KeyType(keyType), KeyTypes(keyTypes), IsTuple(isTuple), Set(0, TValueHasher(KeyTypes, isTuple, hash),
- TValueEqual(KeyTypes, isTuple, equate)), Hash(hash), Equate(equate)
- {
- Y_UNUSED(compare);
- if (encoded) {
- Packer = std::make_shared<TValuePacker>(true, keyType);
- }
- Set.reserve(itemsCountHint);
- }
- void Add(NUdf::TUnboxedValue&& key)
- {
- if (Packer) {
- key = MakeString(Packer->Pack(key));
- }
- Set.emplace(std::move(key));
- }
- NUdf::TUnboxedValue Build()
- {
- const auto filler = [this](TSetType& targetSet) {
- targetSet = std::move(Set);
- };
- return Ctx.HolderFactory.CreateDirectHashedSetHolder(filler, KeyTypes, IsTuple, true, Packer ? KeyType : nullptr, Hash, Equate);
- }
- };
- template <typename T, bool OptionalKey>
- class THashedSingleFixedSetAccumulator {
- using TSetType = TValuesDictHashSingleFixedSet<T>;
- TComputationContext& Ctx;
- TSetType Set;
- bool HasNull = false;
- public:
- static constexpr bool IsSorted = false;
- THashedSingleFixedSetAccumulator(TType* keyType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
- const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
- : Ctx(ctx), Set(0, TMyHash<T>(), TMyEquals<T>())
- {
- Y_UNUSED(keyType);
- Y_UNUSED(keyTypes);
- Y_UNUSED(isTuple);
- Y_UNUSED(encoded);
- Y_UNUSED(compare);
- Y_UNUSED(equate);
- Y_UNUSED(hash);
- Set.reserve(itemsCountHint);
- }
- void Add(NUdf::TUnboxedValue&& key)
- {
- if constexpr (OptionalKey) {
- if (!key) {
- HasNull = true;
- return;
- }
- }
- Set.emplace(key.Get<T>());
- }
- NUdf::TUnboxedValue Build()
- {
- return Ctx.HolderFactory.CreateDirectHashedSingleFixedSetHolder<T, OptionalKey>(std::move(Set), HasNull);
- }
- };
- template <typename T, bool OptionalKey>
- class THashedSingleFixedCompactSetAccumulator {
- using TSetType = TValuesDictHashSingleFixedCompactSet<T>;
- TComputationContext& Ctx;
- TPagedArena Pool;
- TSetType Set;
- bool HasNull = false;
- public:
- static constexpr bool IsSorted = false;
- THashedSingleFixedCompactSetAccumulator(TType* keyType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
- const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
- : Ctx(ctx), Pool(&Ctx.HolderFactory.GetPagePool()), Set(Ctx.HolderFactory.GetPagePool(), itemsCountHint / COMPACT_HASH_MAX_LOAD_FACTOR)
- {
- Y_UNUSED(keyType);
- Y_UNUSED(keyTypes);
- Y_UNUSED(isTuple);
- Y_UNUSED(encoded);
- Y_UNUSED(compare);
- Y_UNUSED(equate);
- Y_UNUSED(hash);
- Set.SetMaxLoadFactor(COMPACT_HASH_MAX_LOAD_FACTOR);
- }
- void Add(NUdf::TUnboxedValue&& key)
- {
- if constexpr (OptionalKey) {
- if (!key) {
- HasNull = true;
- return;
- }
- }
- Set.Insert(key.Get<T>());
- }
- NUdf::TUnboxedValue Build()
- {
- return Ctx.HolderFactory.CreateDirectHashedSingleFixedCompactSetHolder<T, OptionalKey>(std::move(Set), HasNull);
- }
- };
- class THashedCompactSetAccumulator {
- using TSetType = TValuesDictHashCompactSet;
- TComputationContext& Ctx;
- TPagedArena Pool;
- TSetType Set;
- TType *KeyType;
- std::shared_ptr<TValuePacker> KeyPacker;
- public:
- static constexpr bool IsSorted = false;
- THashedCompactSetAccumulator(TType* keyType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
- const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
- : Ctx(ctx), Pool(&Ctx.HolderFactory.GetPagePool()), Set(Ctx.HolderFactory.GetPagePool(), itemsCountHint / COMPACT_HASH_MAX_LOAD_FACTOR, TSmallValueHash(), TSmallValueEqual())
- , KeyType(keyType), KeyPacker(std::make_shared<TValuePacker>(true, keyType))
- {
- Y_UNUSED(keyTypes);
- Y_UNUSED(isTuple);
- Y_UNUSED(encoded);
- Y_UNUSED(compare);
- Y_UNUSED(equate);
- Y_UNUSED(hash);
- Set.SetMaxLoadFactor(COMPACT_HASH_MAX_LOAD_FACTOR);
- }
- void Add(NUdf::TUnboxedValue&& key)
- {
- Set.Insert(AddSmallValue(Pool, KeyPacker->Pack(key)));
- }
- NUdf::TUnboxedValue Build()
- {
- return Ctx.HolderFactory.CreateDirectHashedCompactSetHolder(std::move(Set), std::move(Pool), KeyType, &Ctx);
- }
- };
- template <bool Multi>
- class THashedCompactMapAccumulator;
- template <>
- class THashedCompactMapAccumulator<false> {
- using TMapType = TValuesDictHashCompactMap;
- TComputationContext& Ctx;
- TPagedArena Pool;
- TMapType Map;
- TType *KeyType, *PayloadType;
- std::shared_ptr<TValuePacker> KeyPacker, PayloadPacker;
- public:
- static constexpr bool IsSorted = false;
- THashedCompactMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
- const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
- : Ctx(ctx), Pool(&Ctx.HolderFactory.GetPagePool()), Map(Ctx.HolderFactory.GetPagePool(), itemsCountHint / COMPACT_HASH_MAX_LOAD_FACTOR)
- , KeyType(keyType), PayloadType(payloadType)
- , KeyPacker(std::make_shared<TValuePacker>(true, keyType))
- , PayloadPacker(std::make_shared<TValuePacker>(false, payloadType))
- {
- Y_UNUSED(keyTypes);
- Y_UNUSED(isTuple);
- Y_UNUSED(encoded);
- Y_UNUSED(compare);
- Y_UNUSED(equate);
- Y_UNUSED(hash);
- Map.SetMaxLoadFactor(COMPACT_HASH_MAX_LOAD_FACTOR);
- }
- void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload)
- {
- Map.InsertNew(AddSmallValue(Pool, KeyPacker->Pack(key)), AddSmallValue(Pool, PayloadPacker->Pack(payload)));
- }
- NUdf::TUnboxedValue Build()
- {
- return Ctx.HolderFactory.CreateDirectHashedCompactMapHolder(std::move(Map), std::move(Pool), KeyType, PayloadType, &Ctx);
- }
- };
- template <>
- class THashedCompactMapAccumulator<true> {
- using TMapType = TValuesDictHashCompactMultiMap;
- TComputationContext& Ctx;
- TPagedArena Pool;
- TMapType Map;
- TType *KeyType, *PayloadType;
- std::shared_ptr<TValuePacker> KeyPacker, PayloadPacker;
- public:
- static constexpr bool IsSorted = false;
- THashedCompactMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
- const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
- : Ctx(ctx), Pool(&Ctx.HolderFactory.GetPagePool()), Map(Ctx.HolderFactory.GetPagePool(), itemsCountHint / COMPACT_HASH_MAX_LOAD_FACTOR)
- , KeyType(keyType), PayloadType(payloadType)
- , KeyPacker(std::make_shared<TValuePacker>(true, keyType))
- , PayloadPacker(std::make_shared<TValuePacker>(false, payloadType))
- {
- Y_UNUSED(keyTypes);
- Y_UNUSED(isTuple);
- Y_UNUSED(encoded);
- Y_UNUSED(compare);
- Y_UNUSED(equate);
- Y_UNUSED(hash);
- Map.SetMaxLoadFactor(COMPACT_HASH_MAX_LOAD_FACTOR);
- }
- void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload)
- {
- Map.Insert(AddSmallValue(Pool, KeyPacker->Pack(key)), AddSmallValue(Pool, PayloadPacker->Pack(payload)));
- }
- NUdf::TUnboxedValue Build()
- {
- return Ctx.HolderFactory.CreateDirectHashedCompactMultiMapHolder(std::move(Map), std::move(Pool), KeyType, PayloadType, &Ctx);
- }
- };
- template <typename T, bool OptionalKey, bool Multi>
- class THashedSingleFixedCompactMapAccumulator;
- template <typename T, bool OptionalKey>
- class THashedSingleFixedCompactMapAccumulator<T, OptionalKey, false> {
- using TMapType = TValuesDictHashSingleFixedCompactMap<T>;
- TComputationContext& Ctx;
- TPagedArena Pool;
- TMapType Map;
- std::optional<ui64> NullPayload;
- TType* PayloadType;
- std::shared_ptr<TValuePacker> PayloadPacker;
- public:
- static constexpr bool IsSorted = false;
- THashedSingleFixedCompactMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
- const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
- : Ctx(ctx), Pool(&Ctx.HolderFactory.GetPagePool()), Map(Ctx.HolderFactory.GetPagePool(), itemsCountHint / COMPACT_HASH_MAX_LOAD_FACTOR)
- , PayloadType(payloadType), PayloadPacker(std::make_shared<TValuePacker>(false, payloadType))
- {
- Y_UNUSED(keyType);
- Y_UNUSED(keyTypes);
- Y_UNUSED(isTuple);
- Y_UNUSED(encoded);
- Y_UNUSED(compare);
- Y_UNUSED(equate);
- Y_UNUSED(hash);
- Map.SetMaxLoadFactor(COMPACT_HASH_MAX_LOAD_FACTOR);
- }
- void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload)
- {
- if constexpr (OptionalKey) {
- if (!key) {
- NullPayload = AddSmallValue(Pool, PayloadPacker->Pack(payload));
- return;
- }
- }
- Map.InsertNew(key.Get<T>(), AddSmallValue(Pool, PayloadPacker->Pack(payload)));
- }
- NUdf::TUnboxedValue Build()
- {
- return Ctx.HolderFactory.CreateDirectHashedSingleFixedCompactMapHolder<T, OptionalKey>(std::move(Map), std::move(NullPayload), std::move(Pool), PayloadType, &Ctx);
- }
- };
- template <typename T, bool OptionalKey>
- class THashedSingleFixedCompactMapAccumulator<T, OptionalKey, true> {
- using TMapType = TValuesDictHashSingleFixedCompactMultiMap<T>;
- TComputationContext& Ctx;
- TPagedArena Pool;
- TMapType Map;
- std::vector<ui64> NullPayloads;
- TType* PayloadType;
- std::shared_ptr<TValuePacker> PayloadPacker;
- public:
- static constexpr bool IsSorted = false;
- THashedSingleFixedCompactMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
- const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
- : Ctx(ctx), Pool(&Ctx.HolderFactory.GetPagePool()), Map(Ctx.HolderFactory.GetPagePool(), itemsCountHint / COMPACT_HASH_MAX_LOAD_FACTOR)
- , PayloadType(payloadType), PayloadPacker(std::make_shared<TValuePacker>(false, payloadType))
- {
- Y_UNUSED(keyTypes);
- Y_UNUSED(keyType);
- Y_UNUSED(isTuple);
- Y_UNUSED(encoded);
- Y_UNUSED(compare);
- Y_UNUSED(equate);
- Y_UNUSED(hash);
- Map.SetMaxLoadFactor(COMPACT_HASH_MAX_LOAD_FACTOR);
- }
- void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload)
- {
- if constexpr (OptionalKey) {
- if (!key) {
- NullPayloads.push_back(AddSmallValue(Pool, PayloadPacker->Pack(payload)));
- return;
- }
- }
- Map.Insert(key.Get<T>(), AddSmallValue(Pool, PayloadPacker->Pack(payload)));
- }
- NUdf::TUnboxedValue Build()
- {
- return Ctx.HolderFactory.CreateDirectHashedSingleFixedCompactMultiMapHolder<T, OptionalKey>(std::move(Map), std::move(NullPayloads), std::move(Pool), PayloadType, &Ctx);
- }
- };
- class TSortedSetAccumulator {
- TComputationContext& Ctx;
- TType* KeyType;
- const TKeyTypes& KeyTypes;
- bool IsTuple;
- const NUdf::ICompare* Compare;
- const NUdf::IEquate* Equate;
- std::optional<TGenericPresortEncoder> Packer;
- TUnboxedValueVector Items;
- public:
- static constexpr bool IsSorted = true;
- TSortedSetAccumulator(TType* keyType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
- const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
- : Ctx(ctx), KeyType(keyType), KeyTypes(keyTypes), IsTuple(isTuple), Compare(compare), Equate(equate)
- {
- Y_UNUSED(hash);
- if (encoded) {
- Packer.emplace(KeyType);
- }
- Items.reserve(itemsCountHint);
- }
- void Add(NUdf::TUnboxedValue&& key)
- {
- if (Packer) {
- key = MakeString(Packer->Encode(key, false));
- }
- Items.emplace_back(std::move(key));
- }
- NUdf::TUnboxedValue Build()
- {
- const TSortedSetFiller filler = [this](TUnboxedValueVector& values) {
- std::stable_sort(Items.begin(), Items.end(), TValueLess(KeyTypes, IsTuple, Compare));
- Items.erase(std::unique(Items.begin(), Items.end(), TValueEqual(KeyTypes, IsTuple, Equate)), Items.end());
- values = std::move(Items);
- };
- return Ctx.HolderFactory.CreateDirectSortedSetHolder(filler, KeyTypes, IsTuple,
- EDictSortMode::SortedUniqueAscending, true, Packer ? KeyType : nullptr, Compare, Equate);
- }
- };
- template<bool IsMulti>
- class TSortedMapAccumulator;
- template<>
- class TSortedMapAccumulator<false> {
- TComputationContext& Ctx;
- TType* KeyType;
- const TKeyTypes& KeyTypes;
- bool IsTuple;
- const NUdf::ICompare* Compare;
- const NUdf::IEquate* Equate;
- std::optional<TGenericPresortEncoder> Packer;
- TKeyPayloadPairVector Items;
- public:
- static constexpr bool IsSorted = true;
- TSortedMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
- const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
- : Ctx(ctx)
- , KeyType(keyType)
- , KeyTypes(keyTypes)
- , IsTuple(isTuple)
- , Compare(compare)
- , Equate(equate)
- {
- Y_UNUSED(hash);
- if (encoded) {
- Packer.emplace(KeyType);
- }
- Y_UNUSED(payloadType);
- Items.reserve(itemsCountHint);
- }
- void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload)
- {
- if (Packer) {
- key = MakeString(Packer->Encode(key, false));
- }
- Items.emplace_back(std::move(key), std::move(payload));
- }
- NUdf::TUnboxedValue Build()
- {
- const TSortedDictFiller filler = [this](TKeyPayloadPairVector& values) {
- values = std::move(Items);
- };
- return Ctx.HolderFactory.CreateDirectSortedDictHolder(filler, KeyTypes, IsTuple, EDictSortMode::RequiresSorting,
- true, Packer ? KeyType : nullptr, Compare, Equate);
- }
- };
- template<>
- class TSortedMapAccumulator<true> {
- TComputationContext& Ctx;
- TType* KeyType;
- const TKeyTypes& KeyTypes;
- bool IsTuple;
- const NUdf::ICompare* Compare;
- const NUdf::IEquate* Equate;
- std::optional<TGenericPresortEncoder> Packer;
- TKeyPayloadPairVector Items;
- public:
- static constexpr bool IsSorted = true;
- TSortedMapAccumulator(TType* keyType, TType* payloadType, const TKeyTypes& keyTypes, bool isTuple, bool encoded,
- const NUdf::ICompare* compare, const NUdf::IEquate* equate, const NUdf::IHash* hash, TComputationContext& ctx, ui64 itemsCountHint)
- : Ctx(ctx), KeyType(keyType), KeyTypes(keyTypes), IsTuple(isTuple), Compare(compare), Equate(equate)
- {
- Y_UNUSED(hash);
- if (encoded) {
- Packer.emplace(KeyType);
- }
- Y_UNUSED(payloadType);
- Items.reserve(itemsCountHint);
- }
- void Add(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& payload)
- {
- if (Packer) {
- key = MakeString(Packer->Encode(key, false));
- }
- Items.emplace_back(std::move(key), std::move(payload));
- }
- NUdf::TUnboxedValue Build()
- {
- const TSortedDictFiller filler = [this](TKeyPayloadPairVector& values) {
- std::stable_sort(Items.begin(), Items.end(), TKeyPayloadPairLess(KeyTypes, IsTuple, Compare));
- TKeyPayloadPairVector groups;
- groups.reserve(Items.size());
- if (!Items.empty()) {
- TDefaultListRepresentation currentList(std::move(Items.begin()->second));
- auto lastKey = std::move(Items.begin()->first);
- TValueEqual eqPredicate(KeyTypes, IsTuple, Equate);
- for (auto it = Items.begin() + 1; it != Items.end(); ++it) {
- if (eqPredicate(lastKey, it->first)) {
- currentList = currentList.Append(std::move(it->second));
- }
- else {
- auto payload = Ctx.HolderFactory.CreateDirectListHolder(std::move(currentList));
- groups.emplace_back(std::move(lastKey), std::move(payload));
- currentList = TDefaultListRepresentation(std::move(it->second));
- lastKey = std::move(it->first);
- }
- }
- auto payload = Ctx.HolderFactory.CreateDirectListHolder(std::move(currentList));
- groups.emplace_back(std::move(lastKey), std::move(payload));
- }
- values = std::move(groups);
- };
- return Ctx.HolderFactory.CreateDirectSortedDictHolder(filler, KeyTypes, IsTuple,
- EDictSortMode::SortedUniqueAscending, true, Packer ? KeyType : nullptr, Compare, Equate);
- }
- };
- template <typename TSetAccumulator, bool IsStream>
- class TSetWrapper : public TMutableComputationNode<TSetWrapper<TSetAccumulator, IsStream>> {
- typedef TMutableComputationNode<TSetWrapper<TSetAccumulator, IsStream>> TBaseComputation;
- public:
- class TStreamValue : public TComputationValue<TStreamValue> {
- public:
- TStreamValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& input, IComputationExternalNode* const item,
- IComputationNode* const key, TSetAccumulator&& setAccum, TComputationContext& ctx)
- : TComputationValue<TStreamValue>(memInfo)
- , Input(std::move(input))
- , Item(item)
- , Key(key)
- , SetAccum(std::move(setAccum))
- , Ctx(ctx) {}
- private:
- NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
- if (Finished) {
- return NUdf::EFetchStatus::Finish;
- }
- for (;;) {
- NUdf::TUnboxedValue item;
- switch (auto status = Input.Fetch(item)) {
- case NUdf::EFetchStatus::Ok: {
- Item->SetValue(Ctx, std::move(item));
- SetAccum.Add(Key->GetValue(Ctx));
- break; // and continue
- }
- case NUdf::EFetchStatus::Finish: {
- result = SetAccum.Build();
- Finished = true;
- return NUdf::EFetchStatus::Ok;
- }
- case NUdf::EFetchStatus::Yield: {
- return NUdf::EFetchStatus::Yield;
- }
- }
- }
- }
- NUdf::TUnboxedValue Input;
- IComputationExternalNode* const Item;
- IComputationNode* const Key;
- TSetAccumulator SetAccum;
- TComputationContext& Ctx;
- bool Finished = false;
- };
- TSetWrapper(TComputationMutables& mutables, TType* keyType, IComputationNode* list, IComputationExternalNode* item,
- IComputationNode* key, ui64 itemsCountHint)
- : TBaseComputation(mutables, EValueRepresentation::Boxed)
- , KeyType(keyType)
- , List(list)
- , Item(item)
- , Key(key)
- , ItemsCountHint(itemsCountHint)
- {
- GetDictionaryKeyTypes(KeyType, KeyTypes, IsTuple, Encoded, UseIHash);
- Compare = UseIHash && TSetAccumulator::IsSorted ? MakeCompareImpl(KeyType) : nullptr;
- Equate = UseIHash ? MakeEquateImpl(KeyType) : nullptr;
- Hash = UseIHash && !TSetAccumulator::IsSorted ? MakeHashImpl(KeyType) : nullptr;
- }
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- if constexpr (IsStream) {
- return ctx.HolderFactory.Create<TStreamValue>(List->GetValue(ctx), Item, Key,
- TSetAccumulator(KeyType, KeyTypes, IsTuple, Encoded, Compare.Get(), Equate.Get(), Hash.Get(),
- ctx, ItemsCountHint), ctx);
- }
- const auto& list = List->GetValue(ctx);
- auto itemsCountHint = ItemsCountHint;
- if (list.HasFastListLength()) {
- if (const auto size = list.GetListLength())
- itemsCountHint = size;
- else
- return ctx.HolderFactory.GetEmptyContainerLazy();
- }
- TSetAccumulator accumulator(KeyType, KeyTypes, IsTuple, Encoded, Compare.Get(), Equate.Get(), Hash.Get(),
- ctx, itemsCountHint);
- TThresher<false>::DoForEachItem(list,
- [this, &accumulator, &ctx] (NUdf::TUnboxedValue&& item) {
- Item->SetValue(ctx, std::move(item));
- accumulator.Add(Key->GetValue(ctx));
- }
- );
- return accumulator.Build().Release();
- }
- private:
- void RegisterDependencies() const final {
- this->DependsOn(List);
- this->Own(Item);
- this->DependsOn(Key);
- }
- TType* const KeyType;
- IComputationNode* const List;
- IComputationExternalNode* const Item;
- IComputationNode* const Key;
- const ui64 ItemsCountHint;
- TKeyTypes KeyTypes;
- bool IsTuple;
- bool Encoded;
- bool UseIHash;
- NUdf::ICompare::TPtr Compare;
- NUdf::IEquate::TPtr Equate;
- NUdf::IHash::TPtr Hash;
- };
- #ifndef MKQL_DISABLE_CODEGEN
- template <class TLLVMBase>
- class TLLVMFieldsStructureStateWithAccum: public TLLVMBase {
- private:
- using TBase = TLLVMBase;
- llvm::PointerType* StructPtrType;
- protected:
- using TBase::Context;
- public:
- std::vector<llvm::Type*> GetFieldsArray() {
- std::vector<llvm::Type*> result = TBase::GetFields();
- result.emplace_back(StructPtrType); //accumulator
- return result;
- }
- llvm::Constant* GetAccumulator() {
- return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 0);
- }
- TLLVMFieldsStructureStateWithAccum(llvm::LLVMContext& context)
- : TBase(context)
- , StructPtrType(PointerType::getUnqual(StructType::get(context))) {
- }
- };
- #endif
- template <typename TSetAccumulator>
- class TSqueezeSetFlowWrapper : public TStatefulFlowCodegeneratorNode<TSqueezeSetFlowWrapper<TSetAccumulator>> {
- using TBase = TStatefulFlowCodegeneratorNode<TSqueezeSetFlowWrapper<TSetAccumulator>>;
- public:
- class TState : public TComputationValue<TState> {
- using TBase = TComputationValue<TState>;
- public:
- TState(TMemoryUsageInfo* memInfo, TSetAccumulator&& setAccum)
- : TBase(memInfo), SetAccum(std::move(setAccum)) {}
- NUdf::TUnboxedValuePod Build() {
- return SetAccum.Build().Release();
- }
- void Insert(NUdf::TUnboxedValuePod value) {
- SetAccum.Add(value);
- }
- private:
- TSetAccumulator SetAccum;
- };
- TSqueezeSetFlowWrapper(TComputationMutables& mutables, TType* keyType,
- IComputationNode* flow, IComputationExternalNode* item, IComputationNode* key, ui64 itemsCountHint)
- : TBase(mutables, flow, EValueRepresentation::Boxed, EValueRepresentation::Any)
- , KeyType(keyType)
- , Flow(flow)
- , Item(item)
- , Key(key)
- , ItemsCountHint(itemsCountHint)
- {
- GetDictionaryKeyTypes(KeyType, KeyTypes, IsTuple, Encoded, UseIHash);
- Compare = UseIHash && TSetAccumulator::IsSorted ? MakeCompareImpl(KeyType) : nullptr;
- Equate = UseIHash ? MakeEquateImpl(KeyType) : nullptr;
- Hash = UseIHash && !TSetAccumulator::IsSorted ? MakeHashImpl(KeyType) : nullptr;
- }
- NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
- if (state.IsFinish()) {
- return state.Release();
- } else if (state.IsInvalid()) {
- MakeState(ctx, state);
- }
- while (const auto statePtr = static_cast<TState*>(state.AsBoxed().Get())) {
- if (auto item = Flow->GetValue(ctx); item.IsYield()) {
- return item.Release();
- } else if (item.IsFinish()) {
- const auto dict = statePtr->Build();
- state = std::move(item);
- return dict;
- } else {
- Item->SetValue(ctx, std::move(item));
- statePtr->Insert(Key->GetValue(ctx).Release());
- }
- }
- Y_UNREACHABLE();
- }
- #ifndef MKQL_DISABLE_CODEGEN
- Value* DoGenerateGetValue(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
- auto& context = ctx.Codegen.GetContext();
- const auto codegenItemArg = dynamic_cast<ICodegeneratorExternalNode*>(Item);
- MKQL_ENSURE(codegenItemArg, "Item must be codegenerator node.");
- const auto valueType = Type::getInt128Ty(context);
- TLLVMFieldsStructureStateWithAccum<TLLVMFieldsStructure<TComputationValue<TState>>> fieldsStruct(context);
- const auto stateType = StructType::get(context, fieldsStruct.GetFieldsArray());
- const auto statePtrType = PointerType::getUnqual(stateType);
- const auto make = BasicBlock::Create(context, "make", ctx.Func);
- const auto main = BasicBlock::Create(context, "main", ctx.Func);
- BranchInst::Create(make, main, IsInvalid(statePtr, block), block);
- block = make;
- const auto ptrType = PointerType::getUnqual(StructType::get(context));
- const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
- const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TSqueezeSetFlowWrapper<TSetAccumulator>::MakeState));
- const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false);
- const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block);
- CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block);
- BranchInst::Create(main, block);
- block = main;
- const auto more = BasicBlock::Create(context, "more", ctx.Func);
- const auto done = BasicBlock::Create(context, "done", ctx.Func);
- const auto plus = BasicBlock::Create(context, "plus", ctx.Func);
- const auto over = BasicBlock::Create(context, "over", ctx.Func);
- const auto result = PHINode::Create(valueType, 3U, "result", over);
- const auto state = new LoadInst(valueType, statePtr, "state", block);
- const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
- const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block);
- result->addIncoming(GetFinish(context), block);
- BranchInst::Create(over, more, IsFinish(state, block), block);
- block = more;
- const auto item = GetNodeValue(Flow, ctx, block);
- result->addIncoming(GetYield(context), block);
- const auto choise = SwitchInst::Create(item, plus, 2U, block);
- choise->addCase(GetFinish(context), done);
- choise->addCase(GetYield(context), over);
- block = plus;
- codegenItemArg->CreateSetValue(ctx, block, item);
- const auto key = GetNodeValue(Key, ctx, block);
- const auto insert = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Insert));
- const auto keyArg = WrapArgumentForWindows(key, ctx, block);
- const auto insType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), keyArg->getType()}, false);
- const auto insPtr = CastInst::Create(Instruction::IntToPtr, insert, PointerType::getUnqual(insType), "insert", block);
- CallInst::Create(insType, insPtr, {stateArg, keyArg}, "", block);
- BranchInst::Create(more, block);
- block = done;
- const auto build = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Build));
- if (NYql::NCodegen::ETarget::Windows != ctx.Codegen.GetEffectiveTarget()) {
- const auto funType = FunctionType::get(valueType, {stateArg->getType()}, false);
- const auto funcPtr = CastInst::Create(Instruction::IntToPtr, build, PointerType::getUnqual(funType), "build", block);
- const auto dict = CallInst::Create(funType, funcPtr, {stateArg}, "dict", block);
- UnRefBoxed(state, ctx, block);
- result->addIncoming(dict, block);
- } else {
- const auto ptr = new AllocaInst(valueType, 0U, "ptr", block);
- const auto funType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), ptr->getType()}, false);
- const auto funcPtr = CastInst::Create(Instruction::IntToPtr, build, PointerType::getUnqual(funType), "build", block);
- CallInst::Create(funType, funcPtr, {stateArg, ptr}, "", block);
- const auto dict = new LoadInst(valueType, ptr, "dict", block);
- UnRefBoxed(state, ctx, block);
- result->addIncoming(dict, block);
- }
- new StoreInst(item, statePtr, block);
- BranchInst::Create(over, block);
- block = over;
- return result;
- }
- #endif
- private:
- void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
- state = ctx.HolderFactory.Create<TState>(TSetAccumulator(KeyType, KeyTypes, IsTuple, Encoded,
- Compare.Get(), Equate.Get(), Hash.Get(), ctx, ItemsCountHint));
- }
- void RegisterDependencies() const final {
- if (const auto flow = this->FlowDependsOn(Flow)) {
- this->Own(flow, Item);
- this->DependsOn(flow, Key);
- }
- }
- TType* const KeyType;
- IComputationNode* const Flow;
- IComputationExternalNode* const Item;
- IComputationNode* const Key;
- const ui64 ItemsCountHint;
- TKeyTypes KeyTypes;
- bool IsTuple;
- bool Encoded;
- bool UseIHash;
- NUdf::ICompare::TPtr Compare;
- NUdf::IEquate::TPtr Equate;
- NUdf::IHash::TPtr Hash;
- };
- template <typename TSetAccumulator>
- class TSqueezeSetWideWrapper : public TStatefulFlowCodegeneratorNode<TSqueezeSetWideWrapper<TSetAccumulator>> {
- using TBase = TStatefulFlowCodegeneratorNode<TSqueezeSetWideWrapper<TSetAccumulator>>;
- public:
- class TState : public TComputationValue<TState> {
- using TBase = TComputationValue<TState>;
- public:
- TState(TMemoryUsageInfo* memInfo, TSetAccumulator&& setAccum)
- : TBase(memInfo), SetAccum(std::move(setAccum)) {}
- NUdf::TUnboxedValuePod Build() {
- return SetAccum.Build().Release();
- }
- void Insert(NUdf::TUnboxedValuePod value) {
- SetAccum.Add(value);
- }
- private:
- TSetAccumulator SetAccum;
- };
- TSqueezeSetWideWrapper(TComputationMutables& mutables, TType* keyType,
- IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* key, ui64 itemsCountHint)
- : TBase(mutables, flow, EValueRepresentation::Boxed, EValueRepresentation::Any)
- , KeyType(keyType)
- , Flow(flow)
- , Items(std::move(items))
- , Key(key)
- , ItemsCountHint(itemsCountHint)
- , PasstroughKey(GetPasstroughtMap(TComputationNodePtrVector{Key}, Items).front())
- , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size()))
- {
- GetDictionaryKeyTypes(KeyType, KeyTypes, IsTuple, Encoded, UseIHash);
- Compare = UseIHash && TSetAccumulator::IsSorted ? MakeCompareImpl(KeyType) : nullptr;
- Equate = UseIHash ? MakeEquateImpl(KeyType) : nullptr;
- Hash = UseIHash && !TSetAccumulator::IsSorted ? MakeHashImpl(KeyType) : nullptr;
- }
- NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
- if (state.IsFinish()) {
- return state.Release();
- } else if (state.IsInvalid()) {
- MakeState(ctx, state);
- }
- auto** fields = ctx.WideFields.data() + WideFieldsIndex;
- while (const auto statePtr = static_cast<TState*>(state.AsBoxed().Get())) {
- for (auto i = 0U; i < Items.size(); ++i)
- if (Key == Items[i] || Items[i]->GetDependencesCount() > 0U)
- fields[i] = &Items[i]->RefValue(ctx);
- switch (const auto result = Flow->FetchValues(ctx, fields)) {
- case EFetchResult::One:
- statePtr->Insert(Key->GetValue(ctx).Release());
- continue;
- case EFetchResult::Yield:
- return NUdf::TUnboxedValuePod::MakeYield();
- case EFetchResult::Finish: {
- const auto dict = statePtr->Build();
- state = NUdf::TUnboxedValuePod::MakeFinish();
- return dict;
- }
- }
- }
- Y_UNREACHABLE();
- }
- #ifndef MKQL_DISABLE_CODEGEN
- Value* DoGenerateGetValue(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
- auto& context = ctx.Codegen.GetContext();
- const auto valueType = Type::getInt128Ty(context);
- TLLVMFieldsStructureStateWithAccum<TLLVMFieldsStructure<TComputationValue<TState>>> fieldsStruct(context);
- const auto stateType = StructType::get(context, fieldsStruct.GetFieldsArray());
- const auto statePtrType = PointerType::getUnqual(stateType);
- const auto make = BasicBlock::Create(context, "make", ctx.Func);
- const auto main = BasicBlock::Create(context, "main", ctx.Func);
- BranchInst::Create(make, main, IsInvalid(statePtr, block), block);
- block = make;
- const auto ptrType = PointerType::getUnqual(StructType::get(context));
- const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
- const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TSqueezeSetWideWrapper<TSetAccumulator>::MakeState));
- const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false);
- const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block);
- CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block);
- BranchInst::Create(main, block);
- block = main;
- const auto more = BasicBlock::Create(context, "more", ctx.Func);
- const auto done = BasicBlock::Create(context, "done", ctx.Func);
- const auto plus = BasicBlock::Create(context, "plus", ctx.Func);
- const auto over = BasicBlock::Create(context, "over", ctx.Func);
- const auto result = PHINode::Create(valueType, 3U, "result", over);
- const auto state = new LoadInst(valueType, statePtr, "state", block);
- const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
- const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block);
- result->addIncoming(GetFinish(context), block);
- BranchInst::Create(over, more, IsFinish(state, block), block);
- block = more;
- const auto getres = GetNodeValues(Flow, ctx, block);
- result->addIncoming(GetYield(context), block);
- const auto action = SwitchInst::Create(getres.first, plus, 2U, block);
- action->addCase(ConstantInt::get(Type::getInt32Ty(context), i32(EFetchResult::Finish)), done);
- action->addCase(ConstantInt::get(Type::getInt32Ty(context), i32(EFetchResult::Yield)), over);
- block = plus;
- if (!PasstroughKey) {
- for (auto i = 0U; i < Items.size(); ++i)
- if (Items[i]->GetDependencesCount() > 0U)
- EnsureDynamicCast<ICodegeneratorExternalNode*>(Items[i])->CreateSetValue(ctx, block, getres.second[i](ctx, block));
- }
- const auto key = PasstroughKey ? getres.second[*PasstroughKey](ctx, block) : GetNodeValue(Key, ctx, block);
- const auto insert = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Insert));
- const auto keyArg = WrapArgumentForWindows(key, ctx, block);
- const auto insType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), keyArg->getType()}, false);
- const auto insPtr = CastInst::Create(Instruction::IntToPtr, insert, PointerType::getUnqual(insType), "insert", block);
- CallInst::Create(insType, insPtr, {stateArg, keyArg}, "", block);
- BranchInst::Create(more, block);
- block = done;
- const auto build = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Build));
- if (NYql::NCodegen::ETarget::Windows != ctx.Codegen.GetEffectiveTarget()) {
- const auto funType = FunctionType::get(valueType, {stateArg->getType()}, false);
- const auto funcPtr = CastInst::Create(Instruction::IntToPtr, build, PointerType::getUnqual(funType), "build", block);
- const auto dict = CallInst::Create(funType, funcPtr, {stateArg}, "dict", block);
- UnRefBoxed(state, ctx, block);
- result->addIncoming(dict, block);
- } else {
- const auto ptr = new AllocaInst(valueType, 0U, "ptr", block);
- const auto funType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), ptr->getType()}, false);
- const auto funcPtr = CastInst::Create(Instruction::IntToPtr, build, PointerType::getUnqual(funType), "build", block);
- CallInst::Create(funType, funcPtr, {stateArg, ptr}, "", block);
- const auto dict = new LoadInst(valueType, ptr, "dict", block);
- UnRefBoxed(state, ctx, block);
- result->addIncoming(dict, block);
- }
- new StoreInst(GetFinish(context), statePtr, block);
- BranchInst::Create(over, block);
- block = over;
- return result;
- }
- #endif
- private:
- void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
- state = ctx.HolderFactory.Create<TState>(TSetAccumulator(KeyType, KeyTypes, IsTuple, Encoded,
- Compare.Get(), Equate.Get(), Hash.Get(), ctx, ItemsCountHint));
- }
- void RegisterDependencies() const final {
- if (const auto flow = this->FlowDependsOn(Flow)) {
- std::for_each(Items.cbegin(), Items.cend(), std::bind(&TSqueezeSetWideWrapper::Own, flow, std::placeholders::_1));
- this->DependsOn(flow, Key);
- }
- }
- TType* const KeyType;
- IComputationWideFlowNode* const Flow;
- const TComputationExternalNodePtrVector Items;
- IComputationNode* const Key;
- const ui64 ItemsCountHint;
- TKeyTypes KeyTypes;
- bool IsTuple;
- bool Encoded;
- bool UseIHash;
- const std::optional<size_t> PasstroughKey;
- const ui32 WideFieldsIndex;
- NUdf::ICompare::TPtr Compare;
- NUdf::IEquate::TPtr Equate;
- NUdf::IHash::TPtr Hash;
- };
- template <typename TMapAccumulator, bool IsStream>
- class TMapWrapper : public TMutableComputationNode<TMapWrapper<TMapAccumulator, IsStream>> {
- typedef TMutableComputationNode<TMapWrapper<TMapAccumulator, IsStream>> TBaseComputation;
- public:
- class TStreamValue : public TComputationValue<TStreamValue> {
- public:
- TStreamValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& input, IComputationExternalNode* const item,
- IComputationNode* const key, IComputationNode* const payload, TMapAccumulator&& mapAccum, TComputationContext& ctx)
- : TComputationValue<TStreamValue>(memInfo)
- , Input(std::move(input))
- , Item(item)
- , Key(key)
- , Payload(payload)
- , MapAccum(std::move(mapAccum))
- , Ctx(ctx) {}
- private:
- NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
- if (Finished) {
- return NUdf::EFetchStatus::Finish;
- }
- for (;;) {
- NUdf::TUnboxedValue item;
- switch (auto status = Input.Fetch(item)) {
- case NUdf::EFetchStatus::Ok: {
- Item->SetValue(Ctx, std::move(item));
- MapAccum.Add(Key->GetValue(Ctx), Payload->GetValue(Ctx));
- break; // and continue
- }
- case NUdf::EFetchStatus::Finish: {
- result = MapAccum.Build();
- Finished = true;
- return NUdf::EFetchStatus::Ok;
- }
- case NUdf::EFetchStatus::Yield: {
- return NUdf::EFetchStatus::Yield;
- }
- }
- }
- }
- NUdf::TUnboxedValue Input;
- IComputationExternalNode* const Item;
- IComputationNode* const Key;
- IComputationNode* const Payload;
- TMapAccumulator MapAccum;
- TComputationContext& Ctx;
- bool Finished = false;
- };
- TMapWrapper(TComputationMutables& mutables, TType* keyType, TType* payloadType, IComputationNode* list, IComputationExternalNode* item,
- IComputationNode* key, IComputationNode* payload, ui64 itemsCountHint)
- : TBaseComputation(mutables, EValueRepresentation::Boxed)
- , KeyType(keyType)
- , PayloadType(payloadType)
- , List(list)
- , Item(item)
- , Key(key)
- , Payload(payload)
- , ItemsCountHint(itemsCountHint)
- {
- GetDictionaryKeyTypes(KeyType, KeyTypes, IsTuple, Encoded, UseIHash);
- Compare = UseIHash && TMapAccumulator::IsSorted ? MakeCompareImpl(KeyType) : nullptr;
- Equate = UseIHash ? MakeEquateImpl(KeyType) : nullptr;
- Hash = UseIHash && !TMapAccumulator::IsSorted ? MakeHashImpl(KeyType) : nullptr;
- }
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- if constexpr (IsStream) {
- return ctx.HolderFactory.Create<TStreamValue>(List->GetValue(ctx), Item, Key, Payload,
- TMapAccumulator(KeyType, PayloadType, KeyTypes, IsTuple, Encoded, Compare.Get(), Equate.Get(), Hash.Get(),
- ctx, ItemsCountHint), ctx);
- }
- const auto& list = List->GetValue(ctx);
- auto itemsCountHint = ItemsCountHint;
- if (list.HasFastListLength()) {
- if (const auto size = list.GetListLength())
- itemsCountHint = size;
- else
- return ctx.HolderFactory.GetEmptyContainerLazy();
- }
- TMapAccumulator accumulator(KeyType, PayloadType, KeyTypes, IsTuple, Encoded,
- Compare.Get(), Equate.Get(), Hash.Get(), ctx, itemsCountHint);
- TThresher<false>::DoForEachItem(list,
- [this, &accumulator, &ctx] (NUdf::TUnboxedValue&& item) {
- Item->SetValue(ctx, std::move(item));
- accumulator.Add(Key->GetValue(ctx), Payload->GetValue(ctx));
- }
- );
- return accumulator.Build().Release();
- }
- private:
- void RegisterDependencies() const final {
- this->DependsOn(List);
- this->Own(Item);
- this->DependsOn(Key);
- this->DependsOn(Payload);
- }
- TType* const KeyType;
- TType* PayloadType;
- IComputationNode* const List;
- IComputationExternalNode* const Item;
- IComputationNode* const Key;
- IComputationNode* const Payload;
- const ui64 ItemsCountHint;
- TKeyTypes KeyTypes;
- bool IsTuple;
- bool Encoded;
- bool UseIHash;
- NUdf::ICompare::TPtr Compare;
- NUdf::IEquate::TPtr Equate;
- NUdf::IHash::TPtr Hash;
- };
- template <typename TMapAccumulator>
- class TSqueezeMapFlowWrapper : public TStatefulFlowCodegeneratorNode<TSqueezeMapFlowWrapper<TMapAccumulator>> {
- using TBase = TStatefulFlowCodegeneratorNode<TSqueezeMapFlowWrapper<TMapAccumulator>>;
- public:
- class TState : public TComputationValue<TState> {
- using TBase = TComputationValue<TState>;
- public:
- TState(TMemoryUsageInfo* memInfo, TMapAccumulator&& mapAccum)
- : TBase(memInfo), MapAccum(std::move(mapAccum)) {}
- NUdf::TUnboxedValuePod Build() {
- return MapAccum.Build().Release();
- }
- void Insert(NUdf::TUnboxedValuePod key, NUdf::TUnboxedValuePod value) {
- MapAccum.Add(key, value);
- }
- private:
- TMapAccumulator MapAccum;
- };
- TSqueezeMapFlowWrapper(TComputationMutables& mutables, TType* keyType, TType* payloadType,
- IComputationNode* flow, IComputationExternalNode* item, IComputationNode* key, IComputationNode* payload,
- ui64 itemsCountHint)
- : TBase(mutables, flow, EValueRepresentation::Boxed, EValueRepresentation::Any)
- , KeyType(keyType)
- , PayloadType(payloadType)
- , Flow(flow)
- , Item(item)
- , Key(key)
- , Payload(payload)
- , ItemsCountHint(itemsCountHint)
- {
- GetDictionaryKeyTypes(KeyType, KeyTypes, IsTuple, Encoded, UseIHash);
- Compare = UseIHash && TMapAccumulator::IsSorted ? MakeCompareImpl(KeyType) : nullptr;
- Equate = UseIHash ? MakeEquateImpl(KeyType) : nullptr;
- Hash = UseIHash && !TMapAccumulator::IsSorted ? MakeHashImpl(KeyType) : nullptr;
- }
- NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
- if (state.IsFinish()) {
- return state;
- } else if (state.IsInvalid()) {
- MakeState(ctx, state);
- }
- while (const auto statePtr = static_cast<TState*>(state.AsBoxed().Get())) {
- if (auto item = Flow->GetValue(ctx); item.IsYield()) {
- return item.Release();
- } else if (item.IsFinish()) {
- const auto dict = statePtr->Build();
- state = std::move(item);
- return dict;
- } else {
- Item->SetValue(ctx, std::move(item));
- statePtr->Insert(Key->GetValue(ctx).Release(), Payload->GetValue(ctx).Release());
- }
- }
- Y_UNREACHABLE();
- }
- #ifndef MKQL_DISABLE_CODEGEN
- Value* DoGenerateGetValue(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
- auto& context = ctx.Codegen.GetContext();
- const auto codegenItemArg = dynamic_cast<ICodegeneratorExternalNode*>(Item);
- MKQL_ENSURE(codegenItemArg, "Item must be codegenerator node.");
- const auto valueType = Type::getInt128Ty(context);
- TLLVMFieldsStructureStateWithAccum<TLLVMFieldsStructure<TComputationValue<TState>>> fieldsStruct(context);
- const auto stateType = StructType::get(context, fieldsStruct.GetFieldsArray());
- const auto statePtrType = PointerType::getUnqual(stateType);
- const auto make = BasicBlock::Create(context, "make", ctx.Func);
- const auto main = BasicBlock::Create(context, "main", ctx.Func);
- BranchInst::Create(make, main, IsInvalid(statePtr, block), block);
- block = make;
- const auto ptrType = PointerType::getUnqual(StructType::get(context));
- const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
- const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TSqueezeMapFlowWrapper<TMapAccumulator>::MakeState));
- const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false);
- const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block);
- CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block);
- BranchInst::Create(main, block);
- block = main;
- const auto more = BasicBlock::Create(context, "more", ctx.Func);
- const auto done = BasicBlock::Create(context, "done", ctx.Func);
- const auto plus = BasicBlock::Create(context, "plus", ctx.Func);
- const auto over = BasicBlock::Create(context, "over", ctx.Func);
- const auto result = PHINode::Create(valueType, 3U, "result", over);
- const auto state = new LoadInst(valueType, statePtr, "state", block);
- const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
- const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block);
- result->addIncoming(GetFinish(context), block);
- BranchInst::Create(over, more, IsFinish(state, block), block);
- block = more;
- const auto item = GetNodeValue(Flow, ctx, block);
- result->addIncoming(GetYield(context), block);
- const auto choise = SwitchInst::Create(item, plus, 2U, block);
- choise->addCase(GetFinish(context), done);
- choise->addCase(GetYield(context), over);
- block = plus;
- codegenItemArg->CreateSetValue(ctx, block, item);
- const auto key = GetNodeValue(Key, ctx, block);
- const auto payload = GetNodeValue(Payload, ctx, block);
- const auto insert = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Insert));
- const auto keyArg = WrapArgumentForWindows(key, ctx, block);
- const auto payloadArg = WrapArgumentForWindows(payload, ctx, block);
- const auto insType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), keyArg->getType(), payloadArg->getType()}, false);
- const auto insPtr = CastInst::Create(Instruction::IntToPtr, insert, PointerType::getUnqual(insType), "insert", block);
- CallInst::Create(insType, insPtr, {stateArg, keyArg, payloadArg}, "", block);
- BranchInst::Create(more, block);
- block = done;
- const auto build = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Build));
- if (NYql::NCodegen::ETarget::Windows != ctx.Codegen.GetEffectiveTarget()) {
- const auto funType = FunctionType::get(valueType, {stateArg->getType()}, false);
- const auto funcPtr = CastInst::Create(Instruction::IntToPtr, build, PointerType::getUnqual(funType), "build", block);
- const auto dict = CallInst::Create(funType, funcPtr, {stateArg}, "dict", block);
- UnRefBoxed(state, ctx, block);
- result->addIncoming(dict, block);
- } else {
- const auto ptr = new AllocaInst(valueType, 0U, "ptr", block);
- const auto funType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), ptr->getType()}, false);
- const auto funcPtr = CastInst::Create(Instruction::IntToPtr, build, PointerType::getUnqual(funType), "build", block);
- CallInst::Create(funType, funcPtr, {stateArg, ptr}, "", block);
- const auto dict = new LoadInst(valueType, ptr, "dict", block);
- UnRefBoxed(state, ctx, block);
- result->addIncoming(dict, block);
- }
- new StoreInst(item, statePtr, block);
- BranchInst::Create(over, block);
- block = over;
- return result;
- }
- #endif
- private:
- void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
- state = ctx.HolderFactory.Create<TState>(TMapAccumulator(KeyType, PayloadType, KeyTypes, IsTuple, Encoded,
- Compare.Get(), Equate.Get(), Hash.Get(), ctx, ItemsCountHint));
- }
- void RegisterDependencies() const final {
- if (const auto flow = this->FlowDependsOn(Flow)) {
- this->Own(flow, Item);
- this->DependsOn(flow, Key);
- this->DependsOn(flow, Payload);
- }
- }
- TType* const KeyType;
- TType* PayloadType;
- IComputationNode* const Flow;
- IComputationExternalNode* const Item;
- IComputationNode* const Key;
- IComputationNode* const Payload;
- const ui64 ItemsCountHint;
- TKeyTypes KeyTypes;
- bool IsTuple;
- bool Encoded;
- bool UseIHash;
- NUdf::ICompare::TPtr Compare;
- NUdf::IEquate::TPtr Equate;
- NUdf::IHash::TPtr Hash;
- };
- template <typename TMapAccumulator>
- class TSqueezeMapWideWrapper : public TStatefulFlowCodegeneratorNode<TSqueezeMapWideWrapper<TMapAccumulator>> {
- using TBase = TStatefulFlowCodegeneratorNode<TSqueezeMapWideWrapper<TMapAccumulator>>;
- public:
- class TState : public TComputationValue<TState> {
- using TBase = TComputationValue<TState>;
- public:
- TState(TMemoryUsageInfo* memInfo, TMapAccumulator&& mapAccum)
- : TBase(memInfo), MapAccum(std::move(mapAccum)) {}
- NUdf::TUnboxedValuePod Build() {
- return MapAccum.Build().Release();
- }
- void Insert(NUdf::TUnboxedValuePod key, NUdf::TUnboxedValuePod value) {
- MapAccum.Add(key, value);
- }
- private:
- TMapAccumulator MapAccum;
- };
- TSqueezeMapWideWrapper(TComputationMutables& mutables, TType* keyType, TType* payloadType,
- IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* key, IComputationNode* payload,
- ui64 itemsCountHint)
- : TBase(mutables, flow, EValueRepresentation::Boxed, EValueRepresentation::Any)
- , KeyType(keyType)
- , PayloadType(payloadType)
- , Flow(flow)
- , Items(std::move(items))
- , Key(key)
- , Payload(payload)
- , ItemsCountHint(itemsCountHint)
- , PasstroughKey(GetPasstroughtMap(TComputationNodePtrVector{Key}, Items).front())
- , PasstroughPayload(GetPasstroughtMap(TComputationNodePtrVector{Payload}, Items).front())
- , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size()))
- {
- GetDictionaryKeyTypes(KeyType, KeyTypes, IsTuple, Encoded, UseIHash);
- Compare = UseIHash && TMapAccumulator::IsSorted ? MakeCompareImpl(KeyType) : nullptr;
- Equate = UseIHash ? MakeEquateImpl(KeyType) : nullptr;
- Hash = UseIHash && !TMapAccumulator::IsSorted ? MakeHashImpl(KeyType) : nullptr;
- }
- NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
- if (state.IsFinish()) {
- return state;
- } else if (state.IsInvalid()) {
- MakeState(ctx, state);
- }
- auto** fields = ctx.WideFields.data() + WideFieldsIndex;
- while (const auto statePtr = static_cast<TState*>(state.AsBoxed().Get())) {
- for (auto i = 0U; i < Items.size(); ++i)
- if (Key == Items[i] || Payload == Items[i] || Items[i]->GetDependencesCount() > 0U)
- fields[i] = &Items[i]->RefValue(ctx);
- switch (const auto result = Flow->FetchValues(ctx, fields)) {
- case EFetchResult::One:
- statePtr->Insert(Key->GetValue(ctx).Release(), Payload->GetValue(ctx).Release());
- continue;
- case EFetchResult::Yield:
- return NUdf::TUnboxedValuePod::MakeYield();
- case EFetchResult::Finish: {
- const auto dict = statePtr->Build();
- state = NUdf::TUnboxedValuePod::MakeFinish();
- return dict;
- }
- }
- }
- Y_UNREACHABLE();
- }
- #ifndef MKQL_DISABLE_CODEGEN
- Value* DoGenerateGetValue(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
- auto& context = ctx.Codegen.GetContext();
- const auto valueType = Type::getInt128Ty(context);
- TLLVMFieldsStructureStateWithAccum<TLLVMFieldsStructure<TComputationValue<TState>>> fieldsStruct(context);
- const auto stateType = StructType::get(context, fieldsStruct.GetFieldsArray());
- const auto statePtrType = PointerType::getUnqual(stateType);
- const auto make = BasicBlock::Create(context, "make", ctx.Func);
- const auto main = BasicBlock::Create(context, "main", ctx.Func);
- BranchInst::Create(make, main, IsInvalid(statePtr, block), block);
- block = make;
- const auto ptrType = PointerType::getUnqual(StructType::get(context));
- const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
- const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TSqueezeMapWideWrapper<TMapAccumulator>::MakeState));
- const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false);
- const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block);
- CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block);
- BranchInst::Create(main, block);
- block = main;
- const auto more = BasicBlock::Create(context, "more", ctx.Func);
- const auto done = BasicBlock::Create(context, "done", ctx.Func);
- const auto plus = BasicBlock::Create(context, "plus", ctx.Func);
- const auto over = BasicBlock::Create(context, "over", ctx.Func);
- const auto result = PHINode::Create(valueType, 3U, "result", over);
- const auto state = new LoadInst(valueType, statePtr, "state", block);
- const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
- const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block);
- result->addIncoming(GetFinish(context), block);
- BranchInst::Create(over, more, IsFinish(state, block), block);
- block = more;
- const auto getres = GetNodeValues(Flow, ctx, block);
- result->addIncoming(GetYield(context), block);
- const auto action = SwitchInst::Create(getres.first, plus, 2U, block);
- action->addCase(ConstantInt::get(Type::getInt32Ty(context), i32(EFetchResult::Finish)), done);
- action->addCase(ConstantInt::get(Type::getInt32Ty(context), i32(EFetchResult::Yield)), over);
- block = plus;
- if (!(PasstroughKey && PasstroughPayload)) {
- for (auto i = 0U; i < Items.size(); ++i)
- if (Items[i]->GetDependencesCount() > 0U)
- EnsureDynamicCast<ICodegeneratorExternalNode*>(Items[i])->CreateSetValue(ctx, block, getres.second[i](ctx, block));
- }
- const auto key = PasstroughKey ? getres.second[*PasstroughKey](ctx, block) : GetNodeValue(Key, ctx, block);
- const auto payload = PasstroughPayload ? getres.second[*PasstroughPayload](ctx, block) : GetNodeValue(Payload, ctx, block);
- const auto insert = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Insert));
- const auto keyArg = WrapArgumentForWindows(key, ctx, block);
- const auto payloadArg = WrapArgumentForWindows(payload, ctx, block);
- const auto insType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), keyArg->getType(), payloadArg->getType()}, false);
- const auto insPtr = CastInst::Create(Instruction::IntToPtr, insert, PointerType::getUnqual(insType), "insert", block);
- CallInst::Create(insType, insPtr, {stateArg, keyArg, payloadArg}, "", block);
- BranchInst::Create(more, block);
- block = done;
- const auto build = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Build));
- if (NYql::NCodegen::ETarget::Windows != ctx.Codegen.GetEffectiveTarget()) {
- const auto funType = FunctionType::get(valueType, {stateArg->getType()}, false);
- const auto funcPtr = CastInst::Create(Instruction::IntToPtr, build, PointerType::getUnqual(funType), "build", block);
- const auto dict = CallInst::Create(funType, funcPtr, {stateArg}, "dict", block);
- UnRefBoxed(state, ctx, block);
- result->addIncoming(dict, block);
- } else {
- const auto ptr = new AllocaInst(valueType, 0U, "ptr", block);
- const auto funType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), ptr->getType()}, false);
- const auto funcPtr = CastInst::Create(Instruction::IntToPtr, build, PointerType::getUnqual(funType), "build", block);
- CallInst::Create(funType, funcPtr, {stateArg, ptr}, "", block);
- const auto dict = new LoadInst(valueType, ptr, "dict", block);
- UnRefBoxed(state, ctx, block);
- result->addIncoming(dict, block);
- }
- new StoreInst(GetFinish(context), statePtr, block);
- BranchInst::Create(over, block);
- block = over;
- return result;
- }
- #endif
- private:
- void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
- state = ctx.HolderFactory.Create<TState>(TMapAccumulator(KeyType, PayloadType, KeyTypes, IsTuple, Encoded,
- Compare.Get(), Equate.Get(), Hash.Get(), ctx, ItemsCountHint));
- }
- void RegisterDependencies() const final {
- if (const auto flow = this->FlowDependsOn(Flow)) {
- std::for_each(Items.cbegin(), Items.cend(), std::bind(&TSqueezeMapWideWrapper::Own, flow, std::placeholders::_1));
- this->DependsOn(flow, Key);
- this->DependsOn(flow, Payload);
- }
- }
- TType* const KeyType;
- TType* PayloadType;
- IComputationWideFlowNode* const Flow;
- const TComputationExternalNodePtrVector Items;
- IComputationNode* const Key;
- IComputationNode* const Payload;
- const ui64 ItemsCountHint;
- TKeyTypes KeyTypes;
- bool IsTuple;
- bool Encoded;
- bool UseIHash;
- const std::optional<size_t> PasstroughKey;
- const std::optional<size_t> PasstroughPayload;
- mutable std::vector<NUdf::TUnboxedValue*> Fields;
- const ui32 WideFieldsIndex;
- NUdf::ICompare::TPtr Compare;
- NUdf::IEquate::TPtr Equate;
- NUdf::IHash::TPtr Hash;
- };
- template <typename TAccumulator>
- IComputationNode* WrapToSet(TCallable& callable, const TNodeLocator& nodeLocator, TComputationMutables& mutables) {
- const auto keyType = callable.GetInput(callable.GetInputsCount() - 5U).GetStaticType();
- const auto itemsCountHint = AS_VALUE(TDataLiteral, callable.GetInput(callable.GetInputsCount() - 1U))->AsValue().Get<ui64>();
- const auto flow = LocateNode(nodeLocator, callable, 0U);
- const auto keySelector = LocateNode(nodeLocator, callable, callable.GetInputsCount() - 5U);
- if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) {
- const auto width = callable.GetInputsCount() - 6U;
- TComputationExternalNodePtrVector args(width, nullptr);
- auto index = 0U;
- std::generate_n(args.begin(), width, [&](){ return LocateExternalNode(nodeLocator, callable, ++index); });
- return new TSqueezeSetWideWrapper<TAccumulator>(mutables, keyType, wide, std::move(args), keySelector, itemsCountHint);
- }
- const auto itemArg = LocateExternalNode(nodeLocator, callable, 1U);
- const auto type = callable.GetInput(0U).GetStaticType();
- if (type->IsList()) {
- return new TSetWrapper<TAccumulator, false>(mutables, keyType, flow, itemArg, keySelector, itemsCountHint);
- }
- if (type->IsFlow()) {
- return new TSqueezeSetFlowWrapper<TAccumulator>(mutables, keyType, flow, itemArg, keySelector, itemsCountHint);
- }
- if (type->IsStream()) {
- return new TSetWrapper<TAccumulator, true>(mutables, keyType, flow, itemArg, keySelector, itemsCountHint);
- }
- THROW yexception() << "Expected list, flow or stream.";
- }
- template <typename TAccumulator>
- IComputationNode* WrapToMap(TCallable& callable, const TNodeLocator& nodeLocator, TComputationMutables& mutables) {
- const auto keyType = callable.GetInput(callable.GetInputsCount() - 5U).GetStaticType();
- const auto payloadType = callable.GetInput(callable.GetInputsCount() - 4U).GetStaticType();
- const auto itemsCountHint = AS_VALUE(TDataLiteral, callable.GetInput(callable.GetInputsCount() - 1U))->AsValue().Get<ui64>();
- const auto flow = LocateNode(nodeLocator, callable, 0U);
- const auto keySelector = LocateNode(nodeLocator, callable, callable.GetInputsCount() - 5U);
- const auto payloadSelector = LocateNode(nodeLocator, callable, callable.GetInputsCount() - 4U);
- if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) {
- const auto width = callable.GetInputsCount() - 6U;
- TComputationExternalNodePtrVector args(width, nullptr);
- auto index = 0U;
- std::generate(args.begin(), args.end(), [&](){ return LocateExternalNode(nodeLocator, callable, ++index); });
- return new TSqueezeMapWideWrapper<TAccumulator>(mutables, keyType, payloadType, wide, std::move(args), keySelector, payloadSelector, itemsCountHint);
- }
- const auto itemArg = LocateExternalNode(nodeLocator, callable, 1U);
- const auto type = callable.GetInput(0U).GetStaticType();
- if (type->IsList()) {
- return new TMapWrapper<TAccumulator, false>(mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint);
- }
- if (type->IsFlow()) {
- return new TSqueezeMapFlowWrapper<TAccumulator>(mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint);
- }
- if (type->IsStream()) {
- return new TMapWrapper<TAccumulator, true>(mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint);
- }
- THROW yexception() << "Expected list, flow or stream.";
- }
- template <bool IsList>
- IComputationNode* WrapToSortedDictInternal(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() >= 6U, "Expected six or more args.");
- const auto type = callable.GetInput(0U).GetStaticType();
- if constexpr (IsList) {
- MKQL_ENSURE(type->IsList(), "Expected list.");
- } else {
- MKQL_ENSURE(type->IsFlow() || type->IsStream(), "Expected flow or stream.");
- }
- const auto keyType = callable.GetInput(callable.GetInputsCount() - 5U).GetStaticType();
- const auto payloadType = callable.GetInput(callable.GetInputsCount() - 4U).GetStaticType();
- const auto multiData = AS_VALUE(TDataLiteral, callable.GetInput(callable.GetInputsCount() - 3U));
- const bool isMulti = multiData->AsValue().Get<bool>();
- const auto itemsCountHint = AS_VALUE(TDataLiteral, callable.GetInput(callable.GetInputsCount() - 1U))->AsValue().Get<ui64>();
- const auto flow = LocateNode(ctx.NodeLocator, callable, 0U);
- const auto keySelector = LocateNode(ctx.NodeLocator, callable, callable.GetInputsCount() -5U);
- const auto payloadSelector = LocateNode(ctx.NodeLocator, callable, callable.GetInputsCount() -4U);
- if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) {
- const auto width = callable.GetInputsCount() - 6U;
- TComputationExternalNodePtrVector args(width, nullptr);
- auto index = 0U;
- std::generate(args.begin(), args.end(), [&](){ return LocateExternalNode(ctx.NodeLocator, callable, ++index); });
- if (!isMulti && payloadType->IsVoid()) {
- return new TSqueezeSetWideWrapper<TSortedSetAccumulator>(ctx.Mutables, keyType, wide, std::move(args), keySelector, itemsCountHint);
- } else if (isMulti) {
- return new TSqueezeMapWideWrapper<TSortedMapAccumulator<true>>(ctx.Mutables, keyType, payloadType, wide, std::move(args), keySelector, payloadSelector, itemsCountHint);
- } else {
- return new TSqueezeMapWideWrapper<TSortedMapAccumulator<false>>(ctx.Mutables, keyType, payloadType, wide, std::move(args), keySelector, payloadSelector, itemsCountHint);
- }
- }
- const auto itemArg = LocateExternalNode(ctx.NodeLocator, callable, 1U);
- if (!isMulti && payloadType->IsVoid()) {
- if (type->IsList()) {
- return new TSetWrapper<TSortedSetAccumulator, false>(ctx.Mutables, keyType, flow, itemArg, keySelector, itemsCountHint);
- }
- if (type->IsFlow()) {
- return new TSqueezeSetFlowWrapper<TSortedSetAccumulator>(ctx.Mutables, keyType, flow, itemArg, keySelector, itemsCountHint);
- }
- if (type->IsStream()) {
- return new TSetWrapper<TSortedSetAccumulator, true>(ctx.Mutables, keyType, flow, itemArg, keySelector, itemsCountHint);
- }
- } else if (isMulti) {
- if (type->IsList()) {
- return new TMapWrapper<TSortedMapAccumulator<true>, false>(ctx.Mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint);
- }
- if (type->IsFlow()) {
- return new TSqueezeMapFlowWrapper<TSortedMapAccumulator<true>>(ctx.Mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint);
- }
- if (type->IsStream()) {
- return new TMapWrapper<TSortedMapAccumulator<true>, true>(ctx.Mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint);
- }
- } else {
- if (type->IsList()) {
- return new TMapWrapper<TSortedMapAccumulator<false>, false>(ctx.Mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint);
- }
- if (type->IsFlow()) {
- return new TSqueezeMapFlowWrapper<TSortedMapAccumulator<false>>(ctx.Mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint);
- }
- if (type->IsStream()) {
- return new TMapWrapper<TSortedMapAccumulator<false>, true>(ctx.Mutables, keyType, payloadType, flow, itemArg, keySelector, payloadSelector, itemsCountHint);
- }
- }
- THROW yexception() << "Expected list, flow or stream.";
- }
- template <bool IsList>
- IComputationNode* WrapToHashedDictInternal(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() >= 6U, "Expected six or more args.");
- const auto type = callable.GetInput(0U).GetStaticType();
- if constexpr (IsList) {
- MKQL_ENSURE(type->IsList(), "Expected list.");
- } else {
- MKQL_ENSURE(type->IsFlow() || type->IsStream(), "Expected flow or stream.");
- }
- const auto keyType = callable.GetInput(callable.GetInputsCount() - 5U).GetStaticType();
- const auto payloadType = callable.GetInput(callable.GetInputsCount() - 4U).GetStaticType();
- const bool multi = AS_VALUE(TDataLiteral, callable.GetInput(callable.GetInputsCount() - 3U))->AsValue().Get<bool>();
- const bool isCompact = AS_VALUE(TDataLiteral, callable.GetInput(callable.GetInputsCount() - 2U))->AsValue().Get<bool>();
- const auto payloadSelectorNode = callable.GetInput(callable.GetInputsCount() - 4U);
- const bool isOptional = keyType->IsOptional();
- const auto unwrappedKeyType = isOptional ? AS_TYPE(TOptionalType, keyType)->GetItemType() : keyType;
- if (!multi && payloadType->IsVoid()) {
- if (isCompact) {
- if (unwrappedKeyType->IsData()) {
- #define USE_HASHED_SINGLE_FIXED_COMPACT_SET(xType, xLayoutType) \
- case NUdf::TDataType<xType>::Id: \
- if (isOptional) { \
- return WrapToSet< \
- THashedSingleFixedCompactSetAccumulator<xLayoutType, true>>(callable, ctx.NodeLocator, ctx.Mutables); \
- } else { \
- return WrapToSet< \
- THashedSingleFixedCompactSetAccumulator<xLayoutType, false>>(callable, ctx.NodeLocator, ctx.Mutables); \
- }
- switch (AS_TYPE(TDataType, unwrappedKeyType)->GetSchemeType()) {
- KNOWN_FIXED_VALUE_TYPES(USE_HASHED_SINGLE_FIXED_COMPACT_SET)
- }
- #undef USE_HASHED_SINGLE_FIXED_COMPACT_SET
- }
- return WrapToSet<THashedCompactSetAccumulator>(callable, ctx.NodeLocator, ctx.Mutables);
- }
- if (unwrappedKeyType->IsData()) {
- #define USE_HASHED_SINGLE_FIXED_SET(xType, xLayoutType) \
- case NUdf::TDataType<xType>::Id: \
- if (isOptional) { \
- return WrapToSet< \
- THashedSingleFixedSetAccumulator<xLayoutType, true>>(callable, ctx.NodeLocator, ctx.Mutables); \
- } else { \
- return WrapToSet< \
- THashedSingleFixedSetAccumulator<xLayoutType, false>>(callable, ctx.NodeLocator, ctx.Mutables); \
- }
- switch (AS_TYPE(TDataType, unwrappedKeyType)->GetSchemeType()) {
- KNOWN_FIXED_VALUE_TYPES(USE_HASHED_SINGLE_FIXED_SET)
- }
- #undef USE_HASHED_SINGLE_FIXED_SET
- }
- return WrapToSet<THashedSetAccumulator>(callable, ctx.NodeLocator, ctx.Mutables);
- }
- if (isCompact) {
- if (unwrappedKeyType->IsData()) {
- #define USE_HASHED_SINGLE_FIXED_COMPACT_MAP(xType, xLayoutType) \
- case NUdf::TDataType<xType>::Id: \
- if (multi) { \
- if (isOptional) { \
- return WrapToMap< \
- THashedSingleFixedCompactMapAccumulator<xLayoutType, true, true>>(callable, ctx.NodeLocator, ctx.Mutables); \
- } else { \
- return WrapToMap< \
- THashedSingleFixedCompactMapAccumulator<xLayoutType, false, true>>(callable, ctx.NodeLocator, ctx.Mutables); \
- } \
- } else { \
- if (isOptional) { \
- return WrapToMap< \
- THashedSingleFixedCompactMapAccumulator<xLayoutType, true, false>>(callable, ctx.NodeLocator, ctx.Mutables); \
- } else { \
- return WrapToMap< \
- THashedSingleFixedCompactMapAccumulator<xLayoutType, false, false>>(callable, ctx.NodeLocator, ctx.Mutables); \
- } \
- }
- switch (AS_TYPE(TDataType, unwrappedKeyType)->GetSchemeType()) {
- KNOWN_FIXED_VALUE_TYPES(USE_HASHED_SINGLE_FIXED_COMPACT_MAP)
- }
- #undef USE_HASHED_SINGLE_FIXED_COMPACT_MAP
- }
- if (multi) {
- return WrapToMap<THashedCompactMapAccumulator<true>>(callable, ctx.NodeLocator, ctx.Mutables);
- } else {
- return WrapToMap<THashedCompactMapAccumulator<false>>(callable, ctx.NodeLocator, ctx.Mutables);
- }
- }
- if (unwrappedKeyType->IsData()) {
- #define USE_HASHED_SINGLE_FIXED_MAP(xType, xLayoutType) \
- case NUdf::TDataType<xType>::Id: \
- if (multi) { \
- if (isOptional) { \
- return WrapToMap< \
- THashedSingleFixedMultiMapAccumulator<xLayoutType, true>>(callable, ctx.NodeLocator, ctx.Mutables); \
- } else { \
- return WrapToMap< \
- THashedSingleFixedMultiMapAccumulator<xLayoutType, false>>(callable, ctx.NodeLocator, ctx.Mutables); \
- } \
- } else { \
- if (isOptional) { \
- return WrapToMap< \
- THashedSingleFixedMapAccumulator<xLayoutType, true>>(callable, ctx.NodeLocator, ctx.Mutables); \
- } else { \
- return WrapToMap< \
- THashedSingleFixedMapAccumulator<xLayoutType, false>>(callable, ctx.NodeLocator, ctx.Mutables); \
- } \
- }
- switch (AS_TYPE(TDataType, unwrappedKeyType)->GetSchemeType()) {
- KNOWN_FIXED_VALUE_TYPES(USE_HASHED_SINGLE_FIXED_MAP)
- }
- #undef USE_HASHED_SINGLE_FIXED_MAP
- }
- if (multi) {
- return WrapToMap<THashedMultiMapAccumulator>(callable, ctx.NodeLocator, ctx.Mutables);
- } else {
- return WrapToMap<THashedMapAccumulator>(callable, ctx.NodeLocator, ctx.Mutables);
- }
- }
- }
- IComputationNode* WrapToSortedDict(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- return WrapToSortedDictInternal<true>(callable, ctx);
- }
- IComputationNode* WrapToHashedDict(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- return WrapToHashedDictInternal<true>(callable, ctx);
- }
- IComputationNode* WrapSqueezeToSortedDict(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- return WrapToSortedDictInternal<false>(callable, ctx);
- }
- IComputationNode* WrapSqueezeToHashedDict(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- return WrapToHashedDictInternal<false>(callable, ctx);
- }
- }
- }
|