12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893 |
- #include "yql_opt_window.h"
- #include "yql_opt_utils.h"
- #include "yql_expr_type_annotation.h"
- #include <yql/essentials/core/yql_expr_optimize.h>
- #include <yql/essentials/utils/log/log.h>
- namespace NYql {
- using namespace NNodes;
- namespace {
- const TStringBuf SessionStartMemberName = "_yql_window_session_start";
- const TStringBuf SessionParamsMemberName = "_yql_window_session_params";
- enum class EFrameBoundsType : ui8 {
- EMPTY,
- LAGGING,
- CURRENT,
- LEADING,
- FULL,
- GENERIC,
- };
- EFrameBoundsType FrameBoundsType(const TWindowFrameSettings& settings) {
- auto first = settings.GetFirstOffset();
- auto last = settings.GetLastOffset();
- if (first.Defined() && last.Defined() && first > last) {
- return EFrameBoundsType::EMPTY;
- }
- if (!first.Defined()) {
- if (!last.Defined()) {
- return EFrameBoundsType::FULL;
- }
- if (*last < 0) {
- return EFrameBoundsType::LAGGING;
- }
- return *last > 0 ? EFrameBoundsType::LEADING : EFrameBoundsType::CURRENT;
- }
- return EFrameBoundsType::GENERIC;
- }
- TExprNode::TPtr ReplaceLastLambdaArgWithUnsignedLiteral(const TExprNode& lambda, ui32 literal, TExprContext& ctx) {
- YQL_ENSURE(lambda.IsLambda());
- TExprNodeList args = lambda.ChildPtr(0)->ChildrenList();
- YQL_ENSURE(!args.empty());
- auto literalNode = ctx.Builder(lambda.Pos())
- .Callable("Uint32")
- .Atom(0, literal)
- .Seal()
- .Build();
- auto newBody = ctx.ReplaceNodes(lambda.ChildPtr(1), {{args.back().Get(), literalNode}});
- args.pop_back();
- return ctx.NewLambda(lambda.Pos(), ctx.NewArguments(lambda.Pos(), std::move(args)), std::move(newBody));
- }
- TExprNode::TPtr ReplaceFirstLambdaArgWithCastStruct(const TExprNode& lambda, const TTypeAnnotationNode& targetType, TExprContext& ctx) {
- YQL_ENSURE(lambda.IsLambda());
- YQL_ENSURE(targetType.GetKind() == ETypeAnnotationKind::Struct);
- TExprNodeList args = lambda.ChildPtr(0)->ChildrenList();
- YQL_ENSURE(!args.empty());
- auto newArg = ctx.NewArgument(lambda.Pos(), "row");
- auto cast = ctx.Builder(lambda.Pos())
- .Callable("MatchType")
- .Add(0, newArg)
- .Atom(1, "Optional", TNodeFlags::Default)
- .Lambda(2)
- .Param("row")
- .Callable("Map")
- .Arg(0, "row")
- .Lambda(1)
- .Param("unwrapped")
- .Callable("CastStruct")
- .Arg(0, "unwrapped")
- .Add(1, ExpandType(lambda.Pos(), targetType, ctx))
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Lambda(3)
- .Param("row")
- .Callable("CastStruct")
- .Arg(0, "row")
- .Add(1, ExpandType(lambda.Pos(), targetType, ctx))
- .Seal()
- .Seal()
- .Seal()
- .Build();
- auto newBody = ctx.ReplaceNodes(lambda.ChildPtr(1), {{args.front().Get(), cast}});
- args[0] = newArg;
- return ctx.NewLambda(lambda.Pos(), ctx.NewArguments(lambda.Pos(), std::move(args)), std::move(newBody));
- }
- TExprNode::TPtr AddOptionalIfNotAlreadyOptionalOrNull(const TExprNode::TPtr& lambda, TExprContext& ctx) {
- YQL_ENSURE(lambda->IsLambda());
- YQL_ENSURE(lambda->ChildPtr(0)->ChildrenSize() == 1);
- auto identity = MakeIdentityLambda(lambda->Pos(), ctx);
- return ctx.Builder(lambda->Pos())
- .Lambda()
- .Param("arg")
- .Callable("MatchType")
- .Apply(0, lambda)
- .With(0, "arg")
- .Seal()
- .Atom(1, "Optional", TNodeFlags::Default)
- .Add(2, identity)
- .Atom(3, "Null", TNodeFlags::Default)
- .Add(4, identity)
- .Atom(5, "Pg", TNodeFlags::Default)
- .Add(6, identity)
- .Lambda(7)
- .Param("result")
- .Callable("Just")
- .Arg(0, "result")
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
- struct TRawTrait {
- TPositionHandle Pos;
- // Init/Update/Default are set only for aggregations
- TExprNode::TPtr InitLambda;
- TExprNode::TPtr UpdateLambda;
- TExprNode::TPtr DefaultValue;
- TExprNode::TPtr CalculateLambda;
- TMaybe<i64> CalculateLambdaLead; // lead/lag for input to CalculateLambda;
- TVector<TExprNode::TPtr> Params; // NTile
- const TTypeAnnotationNode* OutputType = nullptr;
- TWindowFrameSettings FrameSettings;
- };
- struct TCalcOverWindowTraits {
- TMap<TStringBuf, TRawTrait> RawTraits;
- ui64 MaxDataOutpace = 0;
- ui64 MaxDataLag = 0;
- ui64 MaxUnboundedPrecedingLag = 0;
- const TTypeAnnotationNode* LagQueueItemType = nullptr;
- };
- TExprNode::TPtr ApplyDistinctForInitLambda(TExprNode::TPtr initLambda, const TStringBuf& distinctKey, const TTypeAnnotationNode& distinctKeyType, const TTypeAnnotationNode& distinctKeyOrigType, TExprContext& ctx) {
- bool hasParent = initLambda->Child(0)->ChildrenSize() == 2;
- bool distinctKeyIsStruct = distinctKeyOrigType.GetKind() == ETypeAnnotationKind::Struct;
- auto expandedDistinctKeyType = ExpandType(initLambda->Pos(), distinctKeyType, ctx);
- auto expandedDistinctKeyOrigType = ExpandType(initLambda->Pos(), distinctKeyOrigType, ctx);
- auto setCreateUdf = ctx.Builder(initLambda->Pos())
- .Callable("Udf")
- .Atom(0, "Set.Create")
- .Callable(1, "Void").Seal()
- .Callable(2, "TupleType")
- .Callable(0, "VoidType").Seal()
- .Callable(1, "VoidType").Seal()
- .Add(2, expandedDistinctKeyOrigType)
- .Seal()
- .Seal()
- .Build();
- auto setCreateLambda = ctx.Builder(initLambda->Pos())
- .Lambda()
- .Param("value")
- .Param("parent")
- .Callable("NamedApply")
- .Add(0, setCreateUdf)
- .List(1)
- .Arg(0, "value")
- .Callable(1, "Uint32")
- .Atom(0, 0)
- .Seal()
- .Seal()
- .Callable(2, "AsStruct").Seal()
- .Callable(3, "DependsOn")
- .Arg(0, "parent")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- initLambda = ctx.Builder(initLambda->Pos())
- .Lambda()
- .Param("value")
- .Param("parent")
- .List()
- // aggregation state
- .Apply(0, initLambda)
- .Do([&](TExprNodeReplaceBuilder& builder) -> TExprNodeReplaceBuilder& {
- if (distinctKeyIsStruct) {
- return builder
- .With(0)
- .Callable("CastStruct")
- .Arg(0, "value")
- .Add(1, expandedDistinctKeyType)
- .Seal()
- .Done();
- } else {
- return builder.With(0, "value");
- }
- })
- .Do([&](TExprNodeReplaceBuilder& builder) -> TExprNodeReplaceBuilder& {
- return hasParent ? builder.With(1, "parent") : builder;
- })
- .Seal()
- // distinct set state
- .Apply(1, setCreateLambda)
- .With(0, "value")
- .With(1, "parent")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- return ctx.Builder(initLambda->Pos())
- .Lambda()
- .Param("row")
- .Param("parent")
- .Apply(initLambda)
- .With(0)
- .Callable("Member")
- .Arg(0, "row")
- .Atom(1, distinctKey)
- .Seal()
- .Done()
- .With(1, "parent")
- .Seal()
- .Seal()
- .Build();
- }
- TExprNode::TPtr ApplyDistinctForUpdateLambda(TExprNode::TPtr updateLambda, const TStringBuf& distinctKey, const TTypeAnnotationNode& distinctKeyType, const TTypeAnnotationNode& distinctKeyOrigType, TExprContext& ctx) {
- bool hasParent = updateLambda->Child(0)->ChildrenSize() == 3;
- bool distinctKeyIsStruct = distinctKeyOrigType.GetKind() == ETypeAnnotationKind::Struct;
- auto expandedDistinctKeyType = ExpandType(updateLambda->Pos(), distinctKeyType, ctx);
- auto expandedDistinctKeyOrigType = ExpandType(updateLambda->Pos(), distinctKeyOrigType, ctx);
-
- auto setAddValueUdf = ctx.Builder(updateLambda->Pos())
- .Callable("Udf")
- .Atom(0, "Set.AddValue")
- .Callable(1, "Void").Seal()
- .Callable(2, "TupleType")
- .Callable(0, "VoidType").Seal()
- .Callable(1, "VoidType").Seal()
- .Add(2, expandedDistinctKeyOrigType)
- .Seal()
- .Seal()
- .Build();
- auto setWasChangedUdf = ctx.Builder(updateLambda->Pos())
- .Callable("Udf")
- .Atom(0, "Set.WasChanged")
- .Callable(1, "Void").Seal()
- .Callable(2, "TupleType")
- .Callable(0, "VoidType").Seal()
- .Callable(1, "VoidType").Seal()
- .Add(2, expandedDistinctKeyOrigType)
- .Seal()
- .Seal()
- .Build();
- auto setInsertLambda = ctx.Builder(updateLambda->Pos())
- .Lambda()
- .Param("set")
- .Param("value")
- .Param("parent")
- .Callable("NamedApply")
- .Add(0, setAddValueUdf)
- .List(1)
- .Arg(0, "set")
- .Arg(1, "value")
- .Seal()
- .Callable(2, "AsStruct").Seal()
- .Callable(3, "DependsOn")
- .Arg(0, "parent")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- auto setWasChangedLambda = ctx.Builder(updateLambda->Pos())
- .Lambda()
- .Param("set")
- .Param("parent")
- .Callable("NamedApply")
- .Add(0, setWasChangedUdf)
- .List(1)
- .Arg(0, "set")
- .Seal()
- .Callable(2, "AsStruct").Seal()
- .Callable(3, "DependsOn")
- .Arg(0, "parent")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- updateLambda = ctx.Builder(updateLambda->Pos())
- .Lambda()
- .Param("value")
- .Param("state")
- .Param("parent")
- .Callable("If")
- // condition
- .Apply(0, setWasChangedLambda)
- .With(0)
- .Apply(setInsertLambda)
- .With(0)
- .Callable("Nth")
- .Arg(0, "state")
- .Atom(1, 1)
- .Seal()
- .Done()
- .With(1, "value")
- .With(2, "parent")
- .Seal()
- .Done()
- .With(1, "parent")
- .Seal()
- // new state
- .List(1)
- // aggregation state
- .Apply(0, updateLambda)
- .Do([&](TExprNodeReplaceBuilder& builder) -> TExprNodeReplaceBuilder& {
- if (distinctKeyIsStruct) {
- return builder
- .With(0)
- .Callable("CastStruct")
- .Arg(0, "value")
- .Add(1, expandedDistinctKeyType)
- .Seal()
- .Done();
- } else {
- return builder.With(0, "value");
- }
- })
- .With(1)
- .Callable("Nth")
- .Arg(0, "state")
- .Atom(1, 0)
- .Seal()
- .Done()
- .Do([&](TExprNodeReplaceBuilder& builder) -> TExprNodeReplaceBuilder& {
- return hasParent ? builder.With(2, "parent") : builder;
- })
- .Seal()
- // distinct set state
- .Apply(1, setInsertLambda)
- .With(0)
- .Callable("Nth")
- .Arg(0, "state")
- .Atom(1, 1)
- .Seal()
- .Done()
- .With(1, "value")
- .With(2, "parent")
- .Seal()
- .Seal()
- // old state
- .Arg(2, "state")
- .Seal()
- .Seal()
- .Build();
- return ctx.Builder(updateLambda->Pos())
- .Lambda()
- .Param("row")
- .Param("state")
- .Param("parent")
- .Apply(updateLambda)
- .With(0)
- .Callable("Member")
- .Arg(0, "row")
- .Atom(1, distinctKey)
- .Seal()
- .Done()
- .With(1, "state")
- .With(2, "parent")
- .Seal()
- .Seal()
- .Build();
- }
- TExprNode::TPtr ApplyDistinctForCalculateLambda(TExprNode::TPtr calculateLambda, TExprContext& ctx) {
- return ctx.Builder(calculateLambda->Pos())
- .Lambda()
- .Param("state")
- .Apply(calculateLambda)
- .With(0)
- .Callable("Nth")
- .Arg(0, "state")
- .Atom(1, 0)
- .Seal()
- .Done()
- .Seal()
- .Seal()
- .Build();
- }
- TCalcOverWindowTraits ExtractCalcOverWindowTraits(const TExprNode::TPtr& frames, const TStructExprType& rowType, TExprContext& ctx) {
- TCalcOverWindowTraits result;
- auto& maxDataOutpace = result.MaxDataOutpace;
- auto& maxDataLag = result.MaxDataLag;
- auto& maxUnboundedPrecedingLag = result.MaxUnboundedPrecedingLag;
- TVector<const TItemExprType*> lagQueueStructItems;
- for (auto& winOn : frames->ChildrenList()) {
- TWindowFrameSettings frameSettings = TWindowFrameSettings::Parse(*winOn, ctx);
- ui64 frameOutpace = 0;
- ui64 frameLag = 0;
- const EFrameType ft = frameSettings.GetFrameType();
- if (ft == EFrameType::FrameByRows) {
- const EFrameBoundsType frameType = FrameBoundsType(frameSettings);
- const auto frameFirst = frameSettings.GetFirstOffset();
- const auto frameLast = frameSettings.GetLastOffset();
- if (frameType != EFrameBoundsType::EMPTY) {
- if (!frameLast.Defined() || *frameLast > 0) {
- frameOutpace = frameLast.Defined() ? ui64(*frameLast) : Max<ui64>();
- }
- if (frameFirst.Defined() && *frameFirst < 0) {
- frameLag = ui64(0 - *frameFirst);
- }
- }
- } else {
- // The only frame we currently support
- YQL_ENSURE(ft == EFrameType::FrameByRange);
- YQL_ENSURE(IsUnbounded(frameSettings.GetFirst()));
- YQL_ENSURE(IsCurrentRow(frameSettings.GetLast()));
- }
- const auto& winOnChildren = winOn->ChildrenList();
- YQL_ENSURE(winOnChildren.size() > 1);
- for (size_t i = 1; i < winOnChildren.size(); ++i) {
- auto item = winOnChildren[i];
- YQL_ENSURE(item->IsList());
- auto nameNode = item->Child(0);
- YQL_ENSURE(nameNode->IsAtom());
- TStringBuf name = nameNode->Content();
- YQL_ENSURE(!result.RawTraits.contains(name));
- auto traits = item->Child(1);
- auto& rawTraits = result.RawTraits[name];
- rawTraits.FrameSettings = frameSettings;
- rawTraits.Pos = traits->Pos();
- YQL_ENSURE(traits->IsCallable({"WindowTraits","CumeDist"}) || ft == EFrameType::FrameByRows, "Non-canonical frame for window functions");
- if (traits->IsCallable("WindowTraits")) {
- maxDataOutpace = Max(maxDataOutpace, frameOutpace);
- maxDataLag = Max(maxDataLag, frameLag);
- auto initLambda = traits->ChildPtr(1);
- auto updateLambda = traits->ChildPtr(2);
- auto calculateLambda = traits->ChildPtr(4);
- rawTraits.OutputType = calculateLambda->GetTypeAnn();
- YQL_ENSURE(rawTraits.OutputType);
- auto lambdaInputType = traits->Child(0)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
-
- if (item->ChildrenSize() == 3) {
- auto distinctKey = item->Child(2)->Content();
- auto distinctKeyOrigType = rowType.FindItemType(distinctKey);
- YQL_ENSURE(distinctKeyOrigType);
- initLambda = ApplyDistinctForInitLambda(initLambda, distinctKey, *lambdaInputType, *distinctKeyOrigType, ctx);
- updateLambda = ApplyDistinctForUpdateLambda(updateLambda, distinctKey, *lambdaInputType, *distinctKeyOrigType, ctx);
- calculateLambda = ApplyDistinctForCalculateLambda(calculateLambda, ctx);
- } else {
- initLambda = ReplaceFirstLambdaArgWithCastStruct(*initLambda, *lambdaInputType, ctx);
- updateLambda = ReplaceFirstLambdaArgWithCastStruct(*updateLambda, *lambdaInputType, ctx);
- }
- if (initLambda->Child(0)->ChildrenSize() == 2) {
- initLambda = ReplaceLastLambdaArgWithUnsignedLiteral(*initLambda, i, ctx);
- }
- if (updateLambda->Child(0)->ChildrenSize() == 3) {
- updateLambda = ReplaceLastLambdaArgWithUnsignedLiteral(*updateLambda, i, ctx);
- }
- rawTraits.InitLambda = initLambda;
- rawTraits.UpdateLambda = updateLambda;
- rawTraits.CalculateLambda = calculateLambda;
- rawTraits.DefaultValue = traits->ChildPtr(5);
- if (ft == EFrameType::FrameByRows) {
- const EFrameBoundsType frameType = FrameBoundsType(frameSettings);
- const auto frameLast = frameSettings.GetLastOffset();
- if (frameType == EFrameBoundsType::LAGGING) {
- maxUnboundedPrecedingLag = Max(maxUnboundedPrecedingLag, ui64(abs(*frameLast)));
- lagQueueStructItems.push_back(ctx.MakeType<TItemExprType>(name, rawTraits.OutputType));
- }
- }
- } else if (traits->IsCallable({"Lead", "Lag"})) {
- i64 lead = 1;
- if (traits->ChildrenSize() == 3) {
- YQL_ENSURE(traits->Child(2)->IsCallable("Int64"));
- lead = FromString<i64>(traits->Child(2)->Child(0)->Content());
- }
- if (traits->IsCallable("Lag")) {
- lead = -lead;
- }
- if (lead < 0) {
- maxDataLag = Max(maxDataLag, ui64(abs(lead)));
- } else {
- maxDataOutpace = Max<ui64>(maxDataOutpace, lead);
- }
- auto lambdaInputType =
- traits->Child(0)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TListExprType>()->GetItemType();
- rawTraits.CalculateLambda = ReplaceFirstLambdaArgWithCastStruct(*traits->Child(1), *lambdaInputType, ctx);
- rawTraits.CalculateLambdaLead = lead;
- rawTraits.OutputType = traits->Child(1)->GetTypeAnn();
- YQL_ENSURE(rawTraits.OutputType);
- } else if (traits->IsCallable({"Rank", "DenseRank", "PercentRank"})) {
- rawTraits.OutputType = traits->Child(1)->GetTypeAnn();
- auto lambdaInputType =
- traits->Child(0)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TListExprType>()->GetItemType();
- auto lambda = ReplaceFirstLambdaArgWithCastStruct(*traits->Child(1), *lambdaInputType, ctx);
- rawTraits.CalculateLambda = ctx.ChangeChild(*traits, 1, std::move(lambda));
- } else {
- YQL_ENSURE(traits->IsCallable({"RowNumber","CumeDist","NTile"}));
- rawTraits.CalculateLambda = traits;
- rawTraits.OutputType = traits->GetTypeAnn();
- for (ui32 i = 1; i < traits->ChildrenSize(); ++i) {
- rawTraits.Params.push_back(traits->ChildPtr(i));
- }
- }
- }
- }
- result.LagQueueItemType = ctx.MakeType<TStructExprType>(lagQueueStructItems);
- return result;
- }
- TExprNode::TPtr BuildUint64(TPositionHandle pos, ui64 value, TExprContext& ctx) {
- return ctx.Builder(pos)
- .Callable("Uint64")
- .Atom(0, ToString(value))
- .Seal()
- .Build();
- }
- TExprNode::TPtr BuildDouble(TPositionHandle pos, double value, TExprContext& ctx) {
- return ctx.Builder(pos)
- .Callable("Double")
- .Atom(0, ToString(value))
- .Seal()
- .Build();
- }
- TExprNode::TPtr BuildQueuePeek(TPositionHandle pos, const TExprNode::TPtr& queue, ui64 index, const TExprNode::TPtr& dependsOn,
- TExprContext& ctx)
- {
- return ctx.Builder(pos)
- .Callable("QueuePeek")
- .Add(0, queue)
- .Add(1, BuildUint64(pos, index, ctx))
- .Callable(2, "DependsOn")
- .Add(0, dependsOn)
- .Seal()
- .Seal()
- .Build();
- }
- TExprNode::TPtr BuildQueueRange(TPositionHandle pos, const TExprNode::TPtr& queue, ui64 begin, ui64 end,
- const TExprNode::TPtr& dependsOn, TExprContext& ctx)
- {
- return ctx.Builder(pos)
- .Callable("FlatMap")
- .Callable(0, "QueueRange")
- .Add(0, queue)
- .Add(1, BuildUint64(pos, begin, ctx))
- .Add(2, BuildUint64(pos, end, ctx))
- .Callable(3, "DependsOn")
- .Add(0, dependsOn)
- .Seal()
- .Seal()
- .Lambda(1)
- .Param("item")
- .Arg("item")
- .Seal()
- .Seal()
- .Build();
- }
- TExprNode::TPtr BuildQueue(TPositionHandle pos, const TTypeAnnotationNode& itemType, ui64 queueSize, ui64 initSize,
- const TExprNode::TPtr& dependsOn, TExprContext& ctx)
- {
- TExprNode::TPtr size;
- if (queueSize == Max<ui64>()) {
- size = ctx.NewCallable(pos, "Void", {});
- } else {
- size = BuildUint64(pos, queueSize, ctx);
- }
- return ctx.Builder(pos)
- .Callable("QueueCreate")
- .Add(0, ExpandType(pos, itemType, ctx))
- .Add(1, size)
- .Add(2, BuildUint64(pos, initSize, ctx))
- .Callable(3, "DependsOn")
- .Add(0, dependsOn)
- .Seal()
- .Seal()
- .Build();
- }
- TExprNode::TPtr CoalesceQueueOutput(TPositionHandle pos, const TExprNode::TPtr& output, bool rawOutputIsOptional,
- const TExprNode::TPtr& defaultValue, TExprContext& ctx)
- {
- // output is has type Optional<RawOutputType>
- if (!rawOutputIsOptional) {
- return ctx.Builder(pos)
- .Callable("Coalesce")
- .Add(0, output)
- .Add(1, defaultValue)
- .Seal()
- .Build();
- }
- return ctx.Builder(pos)
- .Callable("IfPresent")
- .Add(0, output)
- .Lambda(1)
- .Param("item")
- .Callable("Coalesce")
- .Arg(0, "item")
- .Add(1, defaultValue)
- .Seal()
- .Seal()
- .Add(2, defaultValue)
- .Seal()
- .Build();
- }
- TExprNode::TPtr WrapWithWinContext(const TExprNode::TPtr& input, TExprContext& ctx) {
- if (HasContextFuncs(*input)) {
- return ctx.Builder(input->Pos())
- .Callable("WithContext")
- .Add(0, input)
- .Atom(1, "WinAgg", TNodeFlags::Default)
- .Seal()
- .Build();
- }
- return input;
- }
- TExprNode::TPtr BuildInitLambdaForChain1Map(TPositionHandle pos, const TExprNode::TPtr& initStateLambda,
- const TExprNode::TPtr& calculateLambda, TExprContext& ctx)
- {
- return ctx.Builder(pos)
- .Lambda()
- .Param("row")
- .List()
- .Do([&](TExprNodeBuilder& parent)->TExprNodeBuilder& {
- if (calculateLambda->Head().ChildrenSize() == 1) {
- parent.Apply(0, calculateLambda)
- .With(0)
- .Apply(initStateLambda)
- .With(0, "row")
- .Seal()
- .Done()
- .Seal();
- } else {
- parent.Apply(0, calculateLambda)
- .With(0)
- .Apply(initStateLambda)
- .With(0, "row")
- .Seal()
- .Done()
- .With(1, "row")
- .Seal();
- }
- return parent;
- })
- .Apply(1, initStateLambda)
- .With(0, "row")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
- TExprNode::TPtr BuildUpdateLambdaForChain1Map(TPositionHandle pos, const TExprNode::TPtr& updateStateLambda,
- const TExprNode::TPtr& calculateLambda, TExprContext& ctx)
- {
- return ctx.Builder(pos)
- .Lambda()
- .Param("row")
- .Param("state")
- .List()
- .Do([&](TExprNodeBuilder& parent)->TExprNodeBuilder& {
- if (calculateLambda->Head().ChildrenSize() == 1) {
- parent.Apply(0, calculateLambda)
- .With(0)
- .Apply(updateStateLambda)
- .With(0, "row")
- .With(1, "state")
- .Seal()
- .Done()
- .Seal();
- } else {
- parent.Apply(0, calculateLambda)
- .With(0)
- .Apply(updateStateLambda)
- .With(0, "row")
- .With(1, "state")
- .Seal()
- .Done()
- .With(1, "row")
- .Seal();
- }
- return parent;
- })
- .Apply(1, updateStateLambda)
- .With(0, "row")
- .With(1, "state")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
- class TChain1MapTraits : public TThrRefBase, public TNonCopyable {
- public:
- using TPtr = TIntrusivePtr<TChain1MapTraits>;
- TChain1MapTraits(TStringBuf name, TPositionHandle pos)
- : Name(name)
- , Pos(pos)
- {
- }
- TStringBuf GetName() const {
- return Name;
- }
- TPositionHandle GetPos() const {
- return Pos;
- }
- // Lambda(row) -> AsTuple(output, state)
- virtual TExprNode::TPtr BuildInitLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const = 0;
- // Lambda(row, state) -> AsTuple(output, state)
- virtual TExprNode::TPtr BuildUpdateLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const = 0;
- virtual TExprNode::TPtr ExtractLaggingOutput(const TExprNode::TPtr& lagQueue,
- const TExprNode::TPtr& dependsOn, TExprContext& ctx) const
- {
- Y_UNUSED(lagQueue);
- Y_UNUSED(dependsOn);
- Y_UNUSED(ctx);
- return {};
- }
- virtual ~TChain1MapTraits() = default;
- private:
- const TStringBuf Name;
- const TPositionHandle Pos;
- };
- class TChain1MapTraitsLagLead : public TChain1MapTraits {
- public:
- TChain1MapTraitsLagLead(TStringBuf name, const TRawTrait& raw, TMaybe<ui64> queueOffset)
- : TChain1MapTraits(name, raw.Pos)
- , QueueOffset(queueOffset)
- , LeadLagLambda(raw.CalculateLambda)
- {
- }
- // Lambda(row) -> AsTuple(output, state)
- TExprNode::TPtr BuildInitLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
- return ctx.Builder(GetPos())
- .Lambda()
- .Param("row")
- .List()
- .Apply(0, CalculateOutputLambda(dataQueue, ctx))
- .With(0, "row")
- .Seal()
- .Callable(1, "Void")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
- // Lambda(row, state) -> AsTuple(output, state)
- TExprNode::TPtr BuildUpdateLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
- return ctx.Builder(GetPos())
- .Lambda()
- .Param("row")
- .Param("state")
- .List()
- .Apply(0, CalculateOutputLambda(dataQueue, ctx))
- .With(0, "row")
- .Seal()
- .Arg(1, "state")
- .Seal()
- .Seal()
- .Build();
- }
- private:
- TExprNode::TPtr CalculateOutputLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const {
- if (!QueueOffset.Defined()) {
- return AddOptionalIfNotAlreadyOptionalOrNull(LeadLagLambda, ctx);
- }
- YQL_ENSURE(dataQueue);
- auto rowArg = ctx.NewArgument(GetPos(), "row");
- auto body = ctx.Builder(GetPos())
- .Callable("IfPresent")
- .Add(0, BuildQueuePeek(GetPos(), dataQueue, *QueueOffset, rowArg, ctx))
- .Add(1, AddOptionalIfNotAlreadyOptionalOrNull(LeadLagLambda, ctx))
- .Callable(2, "Null")
- .Seal()
- .Seal()
- .Build();
- return ctx.NewLambda(GetPos(), ctx.NewArguments(GetPos(), {rowArg}), std::move(body));
- }
- const TMaybe<ui64> QueueOffset;
- const TExprNode::TPtr LeadLagLambda;
- };
- class TChain1MapTraitsRowNumber : public TChain1MapTraits {
- public:
- TChain1MapTraitsRowNumber(TStringBuf name, const TRawTrait& raw)
- : TChain1MapTraits(name, raw.Pos)
- {
- }
- // Lambda(row) -> AsTuple(output, state)
- TExprNode::TPtr BuildInitLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
- Y_UNUSED(dataQueue);
- return ctx.Builder(GetPos())
- .Lambda()
- .Param("row")
- .List()
- .Add(0, BuildUint64(GetPos(), 1, ctx))
- .Add(1, BuildUint64(GetPos(), 1, ctx))
- .Seal()
- .Seal()
- .Build();
- }
- // Lambda(row, state) -> AsTuple(output, state)
- TExprNode::TPtr BuildUpdateLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
- Y_UNUSED(dataQueue);
- return ctx.Builder(GetPos())
- .Lambda()
- .Param("row")
- .Param("state")
- .List()
- .Callable(0, "Inc")
- .Arg(0, "state")
- .Seal()
- .Callable(1, "Inc")
- .Arg(0, "state")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
- };
- class TChain1MapTraitsCumeDist : public TChain1MapTraits {
- public:
- TChain1MapTraitsCumeDist(TStringBuf name, const TRawTrait& raw, const TString& partitionRowsColumn)
- : TChain1MapTraits(name, raw.Pos)
- , PartitionRowsColumn(partitionRowsColumn)
- {
- }
- // Lambda(row) -> AsTuple(output, state)
- TExprNode::TPtr BuildInitLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
- Y_UNUSED(dataQueue);
- return ctx.Builder(GetPos())
- .Lambda()
- .Param("row")
- .List()
- .Callable(0, "/")
- .Add(0, BuildDouble(GetPos(), 1.0, ctx))
- .Callable(1, "Member")
- .Arg(0, "row")
- .Atom(1, PartitionRowsColumn)
- .Seal()
- .Seal()
- .Add(1, BuildUint64(GetPos(), 1, ctx))
- .Seal()
- .Seal()
- .Build();
- }
- // Lambda(row, state) -> AsTuple(output, state)
- TExprNode::TPtr BuildUpdateLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
- Y_UNUSED(dataQueue);
- return ctx.Builder(GetPos())
- .Lambda()
- .Param("row")
- .Param("state")
- .List()
- .Callable(0, "/")
- .Callable(0, "SafeCast")
- .Callable(0, "Inc")
- .Arg(0, "state")
- .Seal()
- .Atom(1, "Double")
- .Seal()
- .Callable(1, "Member")
- .Arg(0, "row")
- .Atom(1, PartitionRowsColumn)
- .Seal()
- .Seal()
- .Callable(1, "Inc")
- .Arg(0, "state")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
- private:
- const TString PartitionRowsColumn;
- };
- class TChain1MapTraitsNTile : public TChain1MapTraits {
- public:
- TChain1MapTraitsNTile(TStringBuf name, const TRawTrait& raw, const TString& partitionRowsColumn)
- : TChain1MapTraits(name, raw.Pos)
- , PartitionRowsColumn(partitionRowsColumn)
- {
- YQL_ENSURE(raw.Params.size() == 1);
- Param = raw.Params[0];
- }
- // Lambda(row) -> AsTuple(output, state)
- TExprNode::TPtr BuildInitLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
- Y_UNUSED(dataQueue);
- return ctx.Builder(GetPos())
- .Lambda()
- .Param("row")
- .List()
- .Add(0, BuildUint64(GetPos(), 1, ctx))
- .Add(1, BuildUint64(GetPos(), 1, ctx))
- .Seal()
- .Seal()
- .Build();
- }
- // Lambda(row, state) -> AsTuple(output, state)
- TExprNode::TPtr BuildUpdateLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
- Y_UNUSED(dataQueue);
- return ctx.Builder(GetPos())
- .Lambda()
- .Param("row")
- .Param("state")
- .List()
- .Callable(0, "Inc")
- .Callable(0, "Unwrap")
- .Callable(0, "/")
- .Callable(0, "*")
- .Callable(0, "SafeCast")
- .Add(0, Param)
- .Atom(1, "Uint64")
- .Seal()
- .Arg(1, "state")
- .Seal()
- .Callable(1, "Member")
- .Arg(0, "row")
- .Atom(1, PartitionRowsColumn)
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Callable(1, "Inc")
- .Arg(0, "state")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
- private:
- const TString PartitionRowsColumn;
- TExprNode::TPtr Param;
- };
- class TChain1MapTraitsRankBase : public TChain1MapTraits {
- public:
- TChain1MapTraitsRankBase(TStringBuf name, const TRawTrait& raw)
- : TChain1MapTraits(name, raw.Pos)
- , ExtractForCompareLambda(raw.CalculateLambda->ChildPtr(1))
- , Ansi(HasSetting(*raw.CalculateLambda->Child(2), "ansi"))
- , KeyType(raw.OutputType)
- {
- }
- virtual TExprNode::TPtr BuildCalculateLambda(TExprContext& ctx) const {
- return ctx.Builder(GetPos())
- .Lambda()
- .Param("state")
- .Callable("Nth")
- .Arg(0, "state")
- .Atom(1, "0")
- .Seal()
- .Seal()
- .Build();
- }
- // Lambda(row) -> AsTuple(output, state)
- TExprNode::TPtr BuildInitLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const final {
- Y_UNUSED(dataQueue);
- auto initKeyLambda = BuildRawInitLambda(ctx);
- if (!Ansi && KeyType->GetKind() == ETypeAnnotationKind::Optional) {
- auto stateType = GetStateType(KeyType->Cast<TOptionalExprType>()->GetItemType(), ctx);
- initKeyLambda = BuildOptKeyInitLambda(initKeyLambda, stateType, ctx);
- }
- auto initRowLambda = ctx.Builder(GetPos())
- .Lambda()
- .Param("row")
- .Apply(initKeyLambda)
- .With(0)
- .Apply(ExtractForCompareLambda)
- .With(0, "row")
- .Seal()
- .Done()
- .Seal()
- .Seal()
- .Build();
- return BuildInitLambdaForChain1Map(GetPos(), initRowLambda, BuildCalculateLambda(ctx), ctx);
- }
- // Lambda(row, state) -> AsTuple(output, state)
- TExprNode::TPtr BuildUpdateLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const final {
- Y_UNUSED(dataQueue);
- bool useAggrEquals = Ansi;
- auto updateKeyLambda = BuildRawUpdateLambda(useAggrEquals, ctx);
- if (!Ansi && KeyType->GetKind() == ETypeAnnotationKind::Optional) {
- auto stateType = GetStateType(KeyType->Cast<TOptionalExprType>()->GetItemType(), ctx);
- updateKeyLambda = ctx.Builder(GetPos())
- .Lambda()
- .Param("key")
- .Param("state")
- .Callable("IfPresent")
- .Arg(0, "state")
- .Lambda(1)
- .Param("unwrappedState")
- .Callable("IfPresent")
- .Arg(0, "key")
- .Lambda(1)
- .Param("unwrappedKey")
- .Callable("Just")
- .Apply(0, updateKeyLambda)
- .With(0, "unwrappedKey")
- .With(1, "unwrappedState")
- .Seal()
- .Seal()
- .Seal()
- .Callable(2, "Just")
- .Arg(0, "unwrappedState")
- .Seal()
- .Seal()
- .Seal()
- .Apply(2, BuildOptKeyInitLambda(BuildRawInitLambda(ctx), stateType, ctx))
- .With(0, "key")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
- auto updateRowLambda = ctx.Builder(GetPos())
- .Lambda()
- .Param("row")
- .Param("state")
- .Apply(updateKeyLambda)
- .With(0)
- .Apply(ExtractForCompareLambda)
- .With(0, "row")
- .Seal()
- .Done()
- .With(1, "state")
- .Seal()
- .Seal()
- .Build();
- return BuildUpdateLambdaForChain1Map(GetPos(), updateRowLambda, BuildCalculateLambda(ctx), ctx);
- }
- virtual TExprNode::TPtr BuildRawInitLambda(TExprContext& ctx) const = 0;
- virtual TExprNode::TPtr BuildRawUpdateLambda(bool useAggrEquals, TExprContext& ctx) const = 0;
- virtual const TTypeAnnotationNode* GetStateType(const TTypeAnnotationNode* keyType, TExprContext& ctx) const = 0;
- private:
- TExprNode::TPtr BuildOptKeyInitLambda(const TExprNode::TPtr& rawInitKeyLambda,
- const TTypeAnnotationNode* stateType, TExprContext& ctx) const
- {
- auto optStateType = ctx.MakeType<TOptionalExprType>(stateType);
- return ctx.Builder(GetPos())
- .Lambda()
- .Param("key")
- .Callable("IfPresent")
- .Arg(0, "key")
- .Lambda(1)
- .Param("unwrapped")
- .Callable("Just")
- .Apply(0, rawInitKeyLambda)
- .With(0, "unwrapped")
- .Seal()
- .Seal()
- .Seal()
- .Callable(2, "Nothing")
- .Add(0, ExpandType(GetPos(), *optStateType, ctx))
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
- const TExprNode::TPtr ExtractForCompareLambda;
- const bool Ansi;
- const TTypeAnnotationNode* const KeyType;
- };
- class TChain1MapTraitsRank : public TChain1MapTraitsRankBase {
- public:
- TChain1MapTraitsRank(TStringBuf name, const TRawTrait& raw)
- : TChain1MapTraitsRankBase(name, raw)
- {
- }
- TExprNode::TPtr BuildRawInitLambda(TExprContext& ctx) const final {
- auto one = BuildUint64(GetPos(), 1, ctx);
- return ctx.Builder(GetPos())
- .Lambda()
- .Param("key")
- .List()
- .Add(0, one)
- .Add(1, one)
- .Arg(2, "key")
- .Seal()
- .Seal()
- .Build();
- }
- TExprNode::TPtr BuildRawUpdateLambda(bool useAggrEquals, TExprContext& ctx) const final {
- return ctx.Builder(GetPos())
- .Lambda()
- .Param("key")
- .Param("state")
- .List()
- .Callable(0, "If")
- .Callable(0, useAggrEquals ? "AggrEquals" : "==")
- .Arg(0, "key")
- .Callable(1, "Nth")
- .Arg(0, "state")
- .Atom(1, "2")
- .Seal()
- .Seal()
- .Callable(1, "Nth")
- .Arg(0, "state")
- .Atom(1, "0")
- .Seal()
- .Callable(2, "Inc")
- .Callable(0, "Nth")
- .Arg(0, "state")
- .Atom(1, "1")
- .Seal()
- .Seal()
- .Seal()
- .Callable(1, "Inc")
- .Callable(0, "Nth")
- .Arg(0, "state")
- .Atom(1, "1")
- .Seal()
- .Seal()
- .Arg(2, "key")
- .Seal()
- .Seal()
- .Build();
- }
- const TTypeAnnotationNode* GetStateType(const TTypeAnnotationNode* keyType, TExprContext& ctx) const final {
- return ctx.MakeType<TTupleExprType>(TTypeAnnotationNode::TListType{
- ctx.MakeType<TDataExprType>(EDataSlot::Uint64),
- ctx.MakeType<TDataExprType>(EDataSlot::Uint64),
- keyType
- });
- }
- };
- class TChain1MapTraitsPercentRank : public TChain1MapTraitsRank {
- public:
- TChain1MapTraitsPercentRank(TStringBuf name, const TRawTrait& raw, const TString& partitionRowsColumn)
- : TChain1MapTraitsRank(name, raw)
- , PartitionRowsColumn(partitionRowsColumn)
- {
- }
- virtual TExprNode::TPtr BuildCalculateLambda(TExprContext& ctx) const {
- return ctx.Builder(GetPos())
- .Lambda()
- .Param("state")
- .Param("row")
- .Callable("/")
- .Callable(0, "SafeCast")
- .Callable(0, "Dec")
- .Callable(0, "Nth")
- .Arg(0, "state")
- .Atom(1, "0")
- .Seal()
- .Seal()
- .Atom(1, "Double")
- .Seal()
- .Callable(1, "Dec")
- .Callable(0, "Member")
- .Arg(0, "row")
- .Atom(1, PartitionRowsColumn)
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
- private:
- const TString PartitionRowsColumn;
- };
- class TChain1MapTraitsDenseRank : public TChain1MapTraitsRankBase {
- public:
- TChain1MapTraitsDenseRank(TStringBuf name, const TRawTrait& raw)
- : TChain1MapTraitsRankBase(name, raw)
- {
- }
- TExprNode::TPtr BuildRawInitLambda(TExprContext& ctx) const final {
- return ctx.Builder(GetPos())
- .Lambda()
- .Param("key")
- .List()
- .Add(0, BuildUint64(GetPos(), 1, ctx))
- .Arg(1, "key")
- .Seal()
- .Seal()
- .Build();
- }
- TExprNode::TPtr BuildRawUpdateLambda(bool useAggrEquals, TExprContext& ctx) const final {
- return ctx.Builder(GetPos())
- .Lambda()
- .Param("key")
- .Param("state")
- .List()
- .Callable(0, "If")
- .Callable(0, useAggrEquals ? "AggrEquals" : "==")
- .Arg(0, "key")
- .Callable(1, "Nth")
- .Arg(0, "state")
- .Atom(1, "1")
- .Seal()
- .Seal()
- .Callable(1, "Nth")
- .Arg(0, "state")
- .Atom(1, "0")
- .Seal()
- .Callable(2, "Inc")
- .Callable(0, "Nth")
- .Arg(0, "state")
- .Atom(1, "0")
- .Seal()
- .Seal()
- .Seal()
- .Arg(1, "key")
- .Seal()
- .Seal()
- .Build();
- }
- const TTypeAnnotationNode* GetStateType(const TTypeAnnotationNode* keyType, TExprContext& ctx) const final {
- return ctx.MakeType<TTupleExprType>(TTypeAnnotationNode::TListType{
- ctx.MakeType<TDataExprType>(EDataSlot::Uint64),
- keyType
- });
- }
- };
- class TChain1MapTraitsStateBase : public TChain1MapTraits {
- public:
- TChain1MapTraitsStateBase(TStringBuf name, const TRawTrait& raw)
- : TChain1MapTraits(name, raw.Pos)
- , FrameNeverEmpty(raw.FrameSettings.IsNonEmpty())
- , InitLambda(raw.InitLambda)
- , UpdateLambda(raw.UpdateLambda)
- , CalculateLambda(raw.CalculateLambda)
- , DefaultValue(raw.DefaultValue)
- {
- }
- protected:
- TExprNode::TPtr GetInitLambda() const {
- return InitLambda;
- }
- TExprNode::TPtr GetUpdateLambda() const {
- return UpdateLambda;
- }
- TExprNode::TPtr GetCalculateLambda() const {
- return CalculateLambda;
- }
- TExprNode::TPtr GetDefaultValue() const {
- return DefaultValue;
- }
- const bool FrameNeverEmpty;
- private:
- const TExprNode::TPtr InitLambda;
- const TExprNode::TPtr UpdateLambda;
- const TExprNode::TPtr CalculateLambda;
- const TExprNode::TPtr DefaultValue;
- };
- class TChain1MapTraitsCurrentOrLagging : public TChain1MapTraitsStateBase {
- public:
- TChain1MapTraitsCurrentOrLagging(TStringBuf name, const TRawTrait& raw, TMaybe<ui64> lagQueueIndex)
- : TChain1MapTraitsStateBase(name, raw)
- , LaggingQueueIndex(lagQueueIndex)
- , OutputIsOptional(raw.OutputType->IsOptionalOrNull())
- {
- }
- // Lambda(row) -> AsTuple(output, state)
- TExprNode::TPtr BuildInitLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
- Y_UNUSED(dataQueue);
- return BuildInitLambdaForChain1Map(GetPos(), GetInitLambda(), GetCalculateLambda(), ctx);
- }
- // Lambda(row, state) -> AsTuple(output, state)
- TExprNode::TPtr BuildUpdateLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
- Y_UNUSED(dataQueue);
- return BuildUpdateLambdaForChain1Map(GetPos(), GetUpdateLambda(), GetCalculateLambda(), ctx);
- }
- TExprNode::TPtr ExtractLaggingOutput(const TExprNode::TPtr& lagQueue,
- const TExprNode::TPtr& dependsOn, TExprContext& ctx) const override
- {
- if (!LaggingQueueIndex.Defined()) {
- return {};
- }
- YQL_ENSURE(!FrameNeverEmpty);
- auto output = ctx.Builder(GetPos())
- .Callable("Map")
- .Add(0, BuildQueuePeek(GetPos(), lagQueue, *LaggingQueueIndex, dependsOn, ctx))
- .Lambda(1)
- .Param("struct")
- .Callable("Member")
- .Arg(0, "struct")
- .Atom(1, GetName())
- .Seal()
- .Seal()
- .Seal()
- .Build();
- return CoalesceQueueOutput(GetPos(), output, OutputIsOptional, GetDefaultValue(), ctx);
- }
- private:
- const TMaybe<ui64> LaggingQueueIndex;
- const bool OutputIsOptional;
- };
- class TChain1MapTraitsLeading : public TChain1MapTraitsStateBase {
- public:
- TChain1MapTraitsLeading(TStringBuf name, const TRawTrait& raw, ui64 currentRowIndex, ui64 lastRowIndex)
- : TChain1MapTraitsStateBase(name, raw)
- , QueueBegin(currentRowIndex + 1)
- , QueueEnd(lastRowIndex + 1)
- {
- }
- // Lambda(row) -> AsTuple(output, state)
- TExprNode::TPtr BuildInitLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
- YQL_ENSURE(dataQueue);
- auto originalInit = GetInitLambda();
- auto originalUpdate = GetUpdateLambda();
- auto calculate = GetCalculateLambda();
- auto rowArg = ctx.NewArgument(GetPos(), "row");
- auto state = ctx.Builder(GetPos())
- .Callable("Fold")
- .Add(0, BuildQueueRange(GetPos(), dataQueue, QueueBegin, QueueEnd, rowArg, ctx))
- .Apply(1, originalInit)
- .With(0, rowArg)
- .Seal()
- .Add(2, ctx.DeepCopyLambda(*originalUpdate))
- .Seal()
- .Build();
- state = WrapWithWinContext(state, ctx);
- auto initBody = ctx.Builder(GetPos())
- .List()
- .Apply(0, calculate)
- .With(0, state)
- .Seal()
- .Apply(1, originalInit)
- .With(0, rowArg)
- .Seal()
- .Seal()
- .Build();
- return ctx.NewLambda(GetPos(), ctx.NewArguments(GetPos(), {rowArg}), std::move(initBody));
- }
- // Lambda(row, state) -> AsTuple(output, state)
- TExprNode::TPtr BuildUpdateLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
- YQL_ENSURE(dataQueue);
- auto originalInit = GetInitLambda();
- auto originalUpdate = GetUpdateLambda();
- auto calculate = GetCalculateLambda();
- auto rowArg = ctx.NewArgument(GetPos(), "row");
- auto stateArg = ctx.NewArgument(GetPos(), "state");
- auto state = ctx.Builder(GetPos())
- .Callable("Fold")
- .Add(0, BuildQueueRange(GetPos(), dataQueue, QueueBegin, QueueEnd, rowArg, ctx))
- .Apply(1, originalUpdate)
- .With(0, rowArg)
- .With(1, stateArg)
- .Seal()
- .Add(2, ctx.DeepCopyLambda(*originalUpdate))
- .Seal()
- .Build();
- state = WrapWithWinContext(state, ctx);
- auto updateBody = ctx.Builder(GetPos())
- .List()
- .Apply(0, calculate)
- .With(0, state)
- .Seal()
- .Apply(1, originalUpdate)
- .With(0, rowArg)
- .With(1, stateArg)
- .Seal()
- .Seal()
- .Build();
- return ctx.NewLambda(GetPos(), ctx.NewArguments(GetPos(), {rowArg, stateArg}), std::move(updateBody));
- }
- private:
- const ui64 QueueBegin;
- const ui64 QueueEnd;
- };
- class TChain1MapTraitsFull : public TChain1MapTraitsStateBase {
- public:
- TChain1MapTraitsFull(TStringBuf name, const TRawTrait& raw, ui64 currentRowIndex)
- : TChain1MapTraitsStateBase(name, raw)
- , QueueBegin(currentRowIndex + 1)
- {
- }
- // Lambda(row) -> AsTuple(output, state)
- // state == output
- TExprNode::TPtr BuildInitLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
- auto originalInit = GetInitLambda();
- auto originalUpdate = GetUpdateLambda();
- auto calculate = GetCalculateLambda();
- auto rowArg = ctx.NewArgument(GetPos(), "row");
- auto state = ctx.Builder(GetPos())
- .Callable("Fold")
- .Add(0, BuildQueueRange(GetPos(), dataQueue, QueueBegin, Max<ui64>(), rowArg, ctx))
- .Apply(1, originalInit)
- .With(0, rowArg)
- .Seal()
- .Add(2, ctx.DeepCopyLambda(*originalUpdate))
- .Seal()
- .Build();
- state = WrapWithWinContext(state, ctx);
- auto initBody = ctx.Builder(GetPos())
- .List()
- .Apply(0, calculate)
- .With(0, state)
- .Seal()
- .Apply(1, calculate)
- .With(0, state)
- .Seal()
- .Seal()
- .Build();
- return ctx.NewLambda(GetPos(), ctx.NewArguments(GetPos(), {rowArg}), std::move(initBody));
- }
- // Lambda(row, state) -> AsTuple(output, state)
- TExprNode::TPtr BuildUpdateLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
- Y_UNUSED(dataQueue);
- return ctx.Builder(GetPos())
- .Lambda()
- .Param("row")
- .Param("state")
- .List()
- .Arg(0, "state")
- .Arg(1, "state")
- .Seal()
- .Seal()
- .Build();
- }
- private:
- const ui64 QueueBegin;
- };
- class TChain1MapTraitsGeneric : public TChain1MapTraitsStateBase {
- public:
- TChain1MapTraitsGeneric(TStringBuf name, const TRawTrait& raw, ui64 queueBegin, ui64 queueEnd)
- : TChain1MapTraitsStateBase(name, raw)
- , QueueBegin(queueBegin)
- , QueueEnd(queueEnd)
- , OutputIsOptional(raw.OutputType->IsOptionalOrNull())
- {
- }
- // Lambda(row) -> AsTuple(output, state)
- TExprNode::TPtr BuildInitLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
- auto rowArg = ctx.NewArgument(GetPos(), "row");
- auto body = ctx.Builder(GetPos())
- .List()
- .Add(0, BuildFinalOutput(rowArg, dataQueue, ctx))
- .Callable(1, "Void")
- .Seal()
- .Seal()
- .Build();
- return ctx.NewLambda(GetPos(), ctx.NewArguments(GetPos(), {rowArg}), std::move(body));
- }
- // Lambda(row, state) -> AsTuple(output, state)
- TExprNode::TPtr BuildUpdateLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
- auto rowArg = ctx.NewArgument(GetPos(), "row");
- auto stateArg = ctx.NewArgument(GetPos(), "state");
- auto body = ctx.Builder(GetPos())
- .List()
- .Add(0, BuildFinalOutput(rowArg, dataQueue, ctx))
- .Add(1, stateArg)
- .Seal()
- .Build();
- return ctx.NewLambda(GetPos(), ctx.NewArguments(GetPos(), {rowArg, stateArg}), std::move(body));
- }
- private:
- TExprNode::TPtr BuildFinalOutput(const TExprNode::TPtr& rowArg, const TExprNode::TPtr& dataQueue, TExprContext& ctx) const {
- YQL_ENSURE(dataQueue);
- auto originalInit = GetInitLambda();
- auto originalUpdate = GetUpdateLambda();
- auto calculate = GetCalculateLambda();
- auto fold1 = ctx.Builder(GetPos())
- .Callable("Fold1")
- .Add(0, BuildQueueRange(GetPos(), dataQueue, QueueBegin, QueueEnd, rowArg, ctx))
- .Add(1, ctx.DeepCopyLambda(*originalInit))
- .Add(2, ctx.DeepCopyLambda(*originalUpdate))
- .Seal()
- .Build();
- fold1 = WrapWithWinContext(fold1, ctx);
- auto output = ctx.Builder(GetPos())
- .Callable("Map")
- .Add(0, fold1)
- .Add(1, ctx.DeepCopyLambda(*calculate))
- .Seal()
- .Build();
- if (FrameNeverEmpty) {
- // output is always non-empty optional in this case
- // we do IfPresent with some fake output value to remove optional
- // this will have exactly the same result as Unwrap(output)
- return ctx.Builder(GetPos())
- .Callable("IfPresent")
- .Add(0, output)
- .Lambda(1)
- .Param("unwrapped")
- .Arg("unwrapped")
- .Seal()
- .Apply(2, calculate)
- .With(0)
- .Apply(originalInit)
- .With(0, rowArg)
- .Seal()
- .Done()
- .Seal()
- .Seal()
- .Build();
- }
- return CoalesceQueueOutput(GetPos(), output, OutputIsOptional, GetDefaultValue(), ctx);
- }
- const ui64 QueueBegin;
- const ui64 QueueEnd;
- const bool OutputIsOptional;
- };
- class TChain1MapTraitsEmpty : public TChain1MapTraitsStateBase {
- public:
- TChain1MapTraitsEmpty(TStringBuf name, const TRawTrait& raw)
- : TChain1MapTraitsStateBase(name, raw)
- , RawOutputType(raw.OutputType)
- {
- }
- // Lambda(row) -> AsTuple(output, state)
- TExprNode::TPtr BuildInitLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
- Y_UNUSED(dataQueue);
- return ctx.Builder(GetPos())
- .Lambda()
- .Param("row")
- .List()
- .Add(0, BuildFinalOutput(ctx))
- .Callable(1, "Void")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
- // Lambda(row, state) -> AsTuple(output, state)
- TExprNode::TPtr BuildUpdateLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
- Y_UNUSED(dataQueue);
- return ctx.Builder(GetPos())
- .Lambda()
- .Param("row")
- .Param("state")
- .List()
- .Add(0, BuildFinalOutput(ctx))
- .Arg(1, "state")
- .Seal()
- .Seal()
- .Build();
- }
- private:
- TExprNode::TPtr BuildFinalOutput(TExprContext& ctx) const {
- const auto defaultValue = GetDefaultValue();
- YQL_ENSURE(!FrameNeverEmpty);
- if (defaultValue->IsCallable("Null")) {
- auto resultingType = RawOutputType;
- if (!resultingType->IsOptionalOrNull()) {
- resultingType = ctx.MakeType<TOptionalExprType>(resultingType);
- }
- return ctx.Builder(GetPos())
- .Callable("Nothing")
- .Add(0, ExpandType(GetPos(), *resultingType, ctx))
- .Seal()
- .Build();
- }
- return defaultValue;
- }
- const TTypeAnnotationNode* const RawOutputType;
- };
- struct TQueueParams {
- ui64 DataOutpace = 0;
- ui64 DataLag = 0;
- bool DataQueueNeeded = false;
- ui64 LagQueueSize = 0;
- const TTypeAnnotationNode* LagQueueItemType = nullptr;
- };
- TVector<TChain1MapTraits::TPtr> BuildFoldMapTraits(TQueueParams& queueParams, const TExprNode::TPtr& frames,
- const TMaybe<TString>& partitionRowsColumn, const TStructExprType& rowType, TExprContext& ctx) {
- queueParams = {};
- TVector<TChain1MapTraits::TPtr> result;
- TCalcOverWindowTraits traits = ExtractCalcOverWindowTraits(frames, rowType, ctx);
- if (traits.LagQueueItemType->Cast<TStructExprType>()->GetSize()) {
- YQL_ENSURE(traits.MaxUnboundedPrecedingLag > 0);
- queueParams.LagQueueSize = traits.MaxUnboundedPrecedingLag;
- queueParams.LagQueueItemType = traits.LagQueueItemType;
- }
- ui64 currentRowIndex = 0;
- if (traits.MaxDataOutpace || traits.MaxDataLag) {
- queueParams.DataOutpace = traits.MaxDataOutpace;
- queueParams.DataLag = traits.MaxDataLag;
- currentRowIndex = queueParams.DataLag;
- queueParams.DataQueueNeeded = true;
- }
- for (const auto& item : traits.RawTraits) {
- TStringBuf name = item.first;
- const TRawTrait& trait = item.second;
- if (!trait.InitLambda) {
- YQL_ENSURE(!trait.UpdateLambda);
- YQL_ENSURE(!trait.DefaultValue);
- if (trait.CalculateLambdaLead.Defined()) {
- TMaybe<ui64> queueOffset;
- if (*trait.CalculateLambdaLead) {
- queueOffset = currentRowIndex + *trait.CalculateLambdaLead;
- }
- result.push_back(new TChain1MapTraitsLagLead(name, trait, queueOffset));
- } else if (trait.CalculateLambda->IsCallable("RowNumber")) {
- result.push_back(new TChain1MapTraitsRowNumber(name, trait));
- } else if (trait.CalculateLambda->IsCallable("Rank")) {
- result.push_back(new TChain1MapTraitsRank(name, trait));
- } else if (trait.CalculateLambda->IsCallable("CumeDist")) {
- result.push_back(new TChain1MapTraitsCumeDist(name, trait, *partitionRowsColumn));
- } else if (trait.CalculateLambda->IsCallable("NTile")) {
- result.push_back(new TChain1MapTraitsNTile(name, trait, *partitionRowsColumn));
- } else if (trait.CalculateLambda->IsCallable("PercentRank")) {
- result.push_back(new TChain1MapTraitsPercentRank(name, trait, *partitionRowsColumn));
- } else {
- YQL_ENSURE(trait.CalculateLambda->IsCallable("DenseRank"));
- result.push_back(new TChain1MapTraitsDenseRank(name, trait));
- }
- continue;
- }
- if (trait.FrameSettings.GetFrameType() == EFrameType::FrameByRange) {
- result.push_back(new TChain1MapTraitsCurrentOrLagging(name, trait, {}));
- continue;
- }
- YQL_ENSURE(trait.FrameSettings.GetFrameType() == EFrameType::FrameByRows);
- switch(FrameBoundsType(trait.FrameSettings)) {
- case EFrameBoundsType::CURRENT:
- case EFrameBoundsType::LAGGING: {
- TMaybe<ui64> lagQueueIndex;
- auto end = *trait.FrameSettings.GetLastOffset();
- YQL_ENSURE(end <= 0);
- if (end < 0) {
- YQL_ENSURE(queueParams.LagQueueSize >= ui64(0 - end));
- lagQueueIndex = queueParams.LagQueueSize + end;
- }
- result.push_back(new TChain1MapTraitsCurrentOrLagging(name, trait, lagQueueIndex));
- break;
- }
- case EFrameBoundsType::LEADING: {
- auto end = *trait.FrameSettings.GetLastOffset();
- YQL_ENSURE(end > 0);
- ui64 lastRowIndex = currentRowIndex + ui64(end);
- result.push_back(new TChain1MapTraitsLeading(name, trait, currentRowIndex, lastRowIndex));
- break;
- }
- case EFrameBoundsType::FULL: {
- result.push_back(new TChain1MapTraitsFull(name, trait, currentRowIndex));
- break;
- }
- case EFrameBoundsType::GENERIC: {
- queueParams.DataQueueNeeded = true;
- auto first = trait.FrameSettings.GetFirstOffset();
- auto last = trait.FrameSettings.GetLastOffset();
- YQL_ENSURE(first.Defined());
- ui64 beginIndex = currentRowIndex + *first;
- ui64 endIndex = last.Defined() ? (currentRowIndex + *last + 1) : Max<ui64>();
- result.push_back(new TChain1MapTraitsGeneric(name, trait, beginIndex, endIndex));
- break;
- }
- case EFrameBoundsType::EMPTY: {
- result.push_back(new TChain1MapTraitsEmpty(name, trait));
- break;
- }
- }
- }
- return result;
- }
- TExprNode::TPtr ConvertStructOfTuplesToTupleOfStructs(TPositionHandle pos, const TExprNode::TPtr& input, TExprContext& ctx) {
- return ctx.Builder(pos)
- .List()
- .Callable(0, "StaticMap")
- .Add(0, input)
- .Lambda(1)
- .Param("tuple")
- .Callable("Nth")
- .Arg(0, "tuple")
- .Atom(1, "0")
- .Seal()
- .Seal()
- .Seal()
- .Callable(1, "StaticMap")
- .Add(0, input)
- .Lambda(1)
- .Param("tuple")
- .Callable("Nth")
- .Arg(0, "tuple")
- .Atom(1, "1")
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
- TExprNode::TPtr AddInputMembersToOutput(TPositionHandle pos, const TExprNode::TPtr& tupleOfOutputStructAndStateStruct,
- const TExprNode::TPtr& rowArg, TExprContext& ctx)
- {
- return ctx.Builder(pos)
- .List()
- .Callable(0, "FlattenMembers")
- .List(0)
- .Atom(0, "")
- .Callable(1, "Nth")
- .Add(0, tupleOfOutputStructAndStateStruct)
- .Atom(1, "0")
- .Seal()
- .Seal()
- .List(1)
- .Atom(0, "")
- .Add(1, rowArg)
- .Seal()
- .Seal()
- .Callable(1, "Nth")
- .Add(0, tupleOfOutputStructAndStateStruct)
- .Atom(1, "1")
- .Seal()
- .Seal()
- .Build();
- }
- template<typename T>
- TExprNode::TPtr SelectMembers(TPositionHandle pos, const T& members, const TExprNode::TPtr& structNode, TExprContext& ctx) {
- TExprNodeList structItems;
- for (auto& name : members) {
- structItems.push_back(
- ctx.Builder(pos)
- .List()
- .Atom(0, name)
- .Callable(1, "Member")
- .Add(0, structNode)
- .Atom(1, name)
- .Seal()
- .Seal()
- .Build()
- );
- }
- return ctx.NewCallable(pos, "AsStruct", std::move(structItems));
- }
- TExprNode::TPtr HandleLaggingItems(TPositionHandle pos, const TExprNode::TPtr& rowArg,
- const TExprNode::TPtr& tupleOfOutputAndState, const TVector<TChain1MapTraits::TPtr>& traits,
- const TExprNode::TPtr& lagQueue, TExprContext& ctx)
- {
- TExprNodeList laggingStructItems;
- TSet<TStringBuf> laggingNames;
- TSet<TStringBuf> otherNames;
- for (auto& trait : traits) {
- auto name = trait->GetName();
- auto laggingOutput = trait->ExtractLaggingOutput(lagQueue, rowArg, ctx);
- if (laggingOutput) {
- laggingNames.insert(name);
- laggingStructItems.push_back(
- ctx.Builder(pos)
- .List()
- .Atom(0, name)
- .Add(1, laggingOutput)
- .Seal()
- .Build()
- );
- } else {
- otherNames.insert(trait->GetName());
- }
- }
- if (laggingStructItems.empty()) {
- return tupleOfOutputAndState;
- }
- YQL_ENSURE(lagQueue);
- auto output = ctx.NewCallable(pos, "Nth", { tupleOfOutputAndState, ctx.NewAtom(pos, "0")});
- auto state = ctx.NewCallable(pos, "Nth", { tupleOfOutputAndState, ctx.NewAtom(pos, "1")});
- auto leadingOutput = SelectMembers(pos, laggingNames, output, ctx);
- auto otherOutput = SelectMembers(pos, otherNames, output, ctx);
- auto laggingOutput = ctx.NewCallable(pos, "AsStruct", std::move(laggingStructItems));
- output = ctx.Builder(pos)
- .Callable("FlattenMembers")
- .List(0)
- .Atom(0, "")
- .Add(1, laggingOutput)
- .Seal()
- .List(1)
- .Atom(0, "")
- .Add(1, otherOutput)
- .Seal()
- .Seal()
- .Build();
- return ctx.Builder(pos)
- .List()
- .Add(0, output)
- .Callable(1, "Seq")
- .Add(0, output)
- .Add(1, state)
- .Add(2, leadingOutput)
- .List(3)
- .Add(0, state)
- .Callable(1, "QueuePush")
- .Callable(0, "QueuePop")
- .Add(0, lagQueue)
- .Seal()
- .Add(1, leadingOutput)
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
- TExprNode::TPtr BuildChain1MapInitLambda(TPositionHandle pos, const TVector<TChain1MapTraits::TPtr>& traits,
- const TExprNode::TPtr& dataQueue, ui64 lagQueueSize, const TTypeAnnotationNode* lagQueueItemType, TExprContext& ctx)
- {
- auto rowArg = ctx.NewArgument(pos, "row");
- TExprNode::TPtr lagQueue;
- if (lagQueueSize) {
- YQL_ENSURE(lagQueueItemType);
- lagQueue = BuildQueue(pos, *lagQueueItemType, lagQueueSize, lagQueueSize, rowArg, ctx);
- }
- TExprNodeList structItems;
- for (auto& trait : traits) {
- structItems.push_back(
- ctx.Builder(pos)
- .List()
- .Atom(0, trait->GetName())
- .Apply(1, trait->BuildInitLambda(dataQueue, ctx))
- .With(0, rowArg)
- .Seal()
- .Seal()
- .Build()
- );
- }
- auto asStruct = ctx.NewCallable(pos, "AsStruct", std::move(structItems));
- auto tupleOfOutputAndState = ConvertStructOfTuplesToTupleOfStructs(pos, asStruct, ctx);
- tupleOfOutputAndState = HandleLaggingItems(pos, rowArg, tupleOfOutputAndState, traits, lagQueue, ctx);
- auto finalBody = AddInputMembersToOutput(pos, tupleOfOutputAndState, rowArg, ctx);
- return ctx.NewLambda(pos, ctx.NewArguments(pos, {rowArg}), std::move(finalBody));
- }
- TExprNode::TPtr BuildChain1MapUpdateLambda(TPositionHandle pos, const TVector<TChain1MapTraits::TPtr>& traits,
- const TExprNode::TPtr& dataQueue, bool haveLagQueue, TExprContext& ctx)
- {
- const auto rowArg = ctx.NewArgument(pos, "row");
- const auto stateArg = ctx.NewArgument(pos, "state");
- auto state = ctx.Builder(pos)
- .Callable("Nth")
- .Add(0, stateArg)
- .Atom(1, "1", TNodeFlags::Default)
- .Seal()
- .Build();
- TExprNode::TPtr lagQueue;
- if (haveLagQueue) {
- lagQueue = ctx.Builder(pos)
- .Callable("Nth")
- .Add(0, state)
- .Atom(1, "1", TNodeFlags::Default)
- .Seal()
- .Build();
- state = ctx.Builder(pos)
- .Callable("Nth")
- .Add(0, std::move(state))
- .Atom(1, "0", TNodeFlags::Default)
- .Seal()
- .Build();
- }
- TExprNodeList structItems;
- for (auto& trait : traits) {
- structItems.push_back(
- ctx.Builder(pos)
- .List()
- .Atom(0, trait->GetName())
- .Apply(1, trait->BuildUpdateLambda(dataQueue, ctx))
- .With(0, rowArg)
- .With(1)
- .Callable("Member")
- .Add(0, state)
- .Atom(1, trait->GetName())
- .Seal()
- .Done()
- .Seal()
- .Seal()
- .Build()
- );
- }
- auto asStruct = ctx.NewCallable(pos, "AsStruct", std::move(structItems));
- auto tupleOfOutputAndState = ConvertStructOfTuplesToTupleOfStructs(pos, asStruct, ctx);
- tupleOfOutputAndState = HandleLaggingItems(pos, rowArg, tupleOfOutputAndState, traits, lagQueue, ctx);
- auto finalBody = AddInputMembersToOutput(pos, tupleOfOutputAndState, rowArg, ctx);
- return ctx.NewLambda(pos, ctx.NewArguments(pos, {rowArg, stateArg}), std::move(finalBody));
- }
- bool IsNonCompactFullFrame(const TExprNode& winOnRows, TExprContext& ctx) {
- TWindowFrameSettings frameSettings = TWindowFrameSettings::Parse(winOnRows, ctx);
- return frameSettings.GetFrameType() == FrameByRows &&
- !frameSettings.IsCompact() && !frameSettings.GetFirstOffset().Defined() && !frameSettings.GetLastOffset().Defined();
- }
- TExprNode::TPtr DeduceCompatibleSort(const TExprNode::TPtr& traitsOne, const TExprNode::TPtr& traitsTwo) {
- YQL_ENSURE(traitsOne->IsCallable({"Void", "SortTraits"}));
- YQL_ENSURE(traitsTwo->IsCallable({"Void", "SortTraits"}));
- if (traitsOne->IsCallable("Void")) {
- return traitsTwo;
- }
- if (traitsTwo->IsCallable("Void")) {
- return traitsOne;
- }
- // TODO: need more advanced logic here
- if (traitsOne == traitsTwo) {
- return traitsOne;
- }
- return {};
- }
- TExprNode::TPtr BuildPartitionsByKeys(TPositionHandle pos, const TExprNode::TPtr& input, const TExprNode::TPtr& keySelector,
- const TExprNode::TPtr& sortOrder, const TExprNode::TPtr& sortKey, const TExprNode::TPtr& streamProcessingLambda,
- const TExprNode::TPtr& sessionKey, const TExprNode::TPtr& sessionInit, const TExprNode::TPtr& sessionUpdate,
- const TExprNode::TPtr& sessionColumns, TExprContext& ctx)
- {
- TExprNode::TPtr preprocessLambda;
- TExprNode::TPtr chopperKeySelector;
- const TExprNode::TPtr addSessionColumnsArg = ctx.NewArgument(pos, "row");
- TExprNode::TPtr addSessionColumnsBody = addSessionColumnsArg;
- if (sessionUpdate) {
- YQL_ENSURE(sessionKey);
- YQL_ENSURE(sessionInit);
- preprocessLambda =
- AddSessionParamsMemberLambda(pos, SessionStartMemberName, SessionParamsMemberName, keySelector, sessionKey, sessionInit, sessionUpdate, ctx);
- chopperKeySelector = ctx.Builder(pos)
- .Lambda()
- .Param("item")
- .List()
- .Apply(0, keySelector)
- .With(0, "item")
- .Seal()
- .Callable(1, "Member")
- .Arg(0, "item")
- .Atom(1, SessionStartMemberName)
- .Seal()
- .Seal()
- .Seal()
- .Build();
- } else {
- YQL_ENSURE(!sessionKey);
- preprocessLambda = MakeIdentityLambda(pos, ctx);
- chopperKeySelector = keySelector;
- }
- for (auto& column : sessionColumns->ChildrenList()) {
- addSessionColumnsBody = ctx.Builder(pos)
- .Callable("AddMember")
- .Add(0, addSessionColumnsBody)
- .Add(1, column)
- .Callable(2, "Member")
- .Add(0, addSessionColumnsArg)
- .Atom(1, SessionParamsMemberName)
- .Seal()
- .Seal()
- .Build();
- }
- addSessionColumnsBody = ctx.Builder(pos)
- .Callable("ForceRemoveMember")
- .Callable(0, "ForceRemoveMember")
- .Add(0, addSessionColumnsBody)
- .Atom(1, SessionStartMemberName)
- .Seal()
- .Atom(1, SessionParamsMemberName)
- .Seal()
- .Build();
- auto addSessionColumnsLambda = ctx.NewLambda(pos, ctx.NewArguments(pos, { addSessionColumnsArg }), std::move(addSessionColumnsBody));
- auto groupSwitchLambda = ctx.Builder(pos)
- .Lambda()
- .Param("prevKey")
- .Param("item")
- .Callable("AggrNotEquals")
- .Arg(0, "prevKey")
- .Apply(1, chopperKeySelector)
- .With(0, "item")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- return ctx.Builder(pos)
- .Callable("PartitionsByKeys")
- .Add(0, input)
- .Add(1, keySelector)
- .Add(2, sortOrder)
- .Add(3, sortKey)
- .Lambda(4)
- .Param("partitionedStream")
- .Callable("ForwardList")
- .Callable(0, "Chopper")
- .Callable(0, "ToStream")
- .Apply(0, preprocessLambda)
- .With(0, "partitionedStream")
- .Seal()
- .Seal()
- .Add(1, chopperKeySelector)
- .Add(2, groupSwitchLambda)
- .Lambda(3)
- .Param("key")
- .Param("singlePartition")
- .Callable("Map")
- .Apply(0, streamProcessingLambda)
- .With(0, "singlePartition")
- .Seal()
- .Add(1, addSessionColumnsLambda)
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
- enum EFold1LambdaKind {
- INIT,
- UPDATE,
- CALCULATE,
- };
- TExprNode::TPtr BuildFold1Lambda(TPositionHandle pos, const TExprNode::TPtr& frames, EFold1LambdaKind kind,
- const TExprNodeList& keyColumns, const TStructExprType& rowType, TExprContext& ctx)
- {
- TExprNode::TPtr arg1 = ctx.NewArgument(pos, "arg1");
- TExprNodeList args = { arg1 };
- TExprNode::TPtr arg2;
- if (kind == EFold1LambdaKind::UPDATE) {
- arg2 = ctx.NewArgument(pos, "arg2");
- args.push_back(arg2);
- }
- TExprNodeList structItems;
- for (auto& winOn : frames->ChildrenList()) {
- YQL_ENSURE(IsNonCompactFullFrame(*winOn, ctx));
- for (ui32 i = 1; i < winOn->ChildrenSize(); ++i) {
- YQL_ENSURE(winOn->Child(i)->IsList());
- YQL_ENSURE(winOn->Child(i)->Child(0)->IsAtom());
- YQL_ENSURE(winOn->Child(i)->Child(1)->IsCallable("WindowTraits"));
- YQL_ENSURE(2 <= winOn->Child(i)->ChildrenSize() && winOn->Child(i)->ChildrenSize() <= 3);
- auto column = winOn->Child(i)->ChildPtr(0);
- auto traits = winOn->Child(i)->ChildPtr(1);
- auto traitsInputType = traits->Child(0)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
- TStringBuf distinctKey;
- const TTypeAnnotationNode* distinctKeyOrigType = nullptr;
- if (winOn->Child(i)->ChildrenSize() == 3) {
- auto distinctKeyNode = winOn->Child(i)->Child(2);
- YQL_ENSURE(distinctKeyNode->IsAtom());
- distinctKey = distinctKeyNode->Content();
- distinctKeyOrigType = rowType.FindItemType(distinctKey);
- YQL_ENSURE(distinctKeyOrigType);
- }
- TExprNode::TPtr applied;
- switch (kind) {
- case EFold1LambdaKind::INIT: {
- auto lambda = traits->ChildPtr(1);
- if (distinctKeyOrigType) {
- lambda = ApplyDistinctForInitLambda(lambda, distinctKey, *traitsInputType, *distinctKeyOrigType, ctx);
- } else {
- lambda = ReplaceFirstLambdaArgWithCastStruct(*lambda, *traitsInputType, ctx);
- }
- if (lambda->Child(0)->ChildrenSize() == 2) {
- lambda = ReplaceLastLambdaArgWithUnsignedLiteral(*lambda, i, ctx);
- }
- YQL_ENSURE(lambda->Child(0)->ChildrenSize() == 1);
- applied = ctx.Builder(pos)
- .Apply(lambda)
- .With(0, arg1)
- .Seal()
- .Build();
- break;
- }
- case EFold1LambdaKind::CALCULATE: {
- auto lambda = traits->ChildPtr(4);
- YQL_ENSURE(lambda->Child(0)->ChildrenSize() == 1);
- if (distinctKeyOrigType) {
- lambda = ApplyDistinctForCalculateLambda(lambda, ctx);
- }
- applied = ctx.Builder(pos)
- .Apply(lambda)
- .With(0)
- .Callable("Member")
- .Add(0, arg1)
- .Add(1, column)
- .Seal()
- .Done()
- .Seal()
- .Build();
- break;
- }
- case EFold1LambdaKind::UPDATE: {
- auto lambda = traits->ChildPtr(2);
- if (distinctKeyOrigType) {
- lambda = ApplyDistinctForUpdateLambda(lambda, distinctKey, *traitsInputType, *distinctKeyOrigType, ctx);
- } else {
- lambda = ReplaceFirstLambdaArgWithCastStruct(*lambda, *traitsInputType, ctx);
- }
- if (lambda->Child(0)->ChildrenSize() == 3) {
- lambda = ReplaceLastLambdaArgWithUnsignedLiteral(*lambda, i, ctx);
- }
- YQL_ENSURE(lambda->Child(0)->ChildrenSize() == 2);
-
- applied = ctx.Builder(pos)
- .Apply(lambda)
- .With(0, arg1)
- .With(1)
- .Callable("Member")
- .Add(0, arg2)
- .Add(1, column)
- .Seal()
- .Done()
- .Seal()
- .Build();
- break;
- }
- }
- structItems.push_back(ctx.NewList(pos, {column, applied}));
- }
- }
- // pass key columns as-is
- for (auto& keyColumn : keyColumns) {
- YQL_ENSURE(keyColumn->IsAtom());
- structItems.push_back(
- ctx.Builder(pos)
- .List()
- .Add(0, keyColumn)
- .Callable(1, "Member")
- .Add(0, arg1)
- .Add(1, keyColumn)
- .Seal()
- .Seal()
- .Build()
- );
- }
- return ctx.NewLambda(pos, ctx.NewArguments(pos, std::move(args)), ctx.NewCallable(pos, "AsStruct", std::move(structItems)));
- }
- TExprNode::TPtr ExpandNonCompactFullFrames(TPositionHandle pos, const TExprNode::TPtr& inputList,
- const TExprNode::TPtr& originalKeyColumns, const TExprNode::TPtr& sortTraits, const TExprNode::TPtr& frames,
- const TExprNode::TPtr& sessionTraits, const TExprNode::TPtr& sessionColumns, TExprContext& ctx)
- {
- TExprNode::TPtr sessionKey;
- TExprNode::TPtr sessionInit;
- TExprNode::TPtr sessionUpdate;
- TExprNode::TPtr sessionSortTraits;
- const TTypeAnnotationNode* sessionKeyType = nullptr;
- const TTypeAnnotationNode* sessionParamsType = nullptr;
- ExtractSessionWindowParams(pos, sessionTraits, sessionKey, sessionKeyType, sessionParamsType, sessionSortTraits, sessionInit, sessionUpdate, ctx);
- TExprNode::TPtr sortKey;
- TExprNode::TPtr sortOrder;
- TExprNode::TPtr input = inputList;
- if (input->IsCallable("ForwardList")) {
- // full frame strategy uses input 2 times (for grouping and join)
- // TODO: better way to detect "single use input"
- input = ctx.NewCallable(pos, "Collect", { input });
- }
- const auto rowType = inputList->GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
- TVector<const TItemExprType*> rowItems = rowType->GetItems();
- TExprNodeList originalKeysWithSession = originalKeyColumns->ChildrenList();
- TExprNodeList addedColumns;
- const auto commonSortTraits = DeduceCompatibleSort(sortTraits, sessionSortTraits);
- ExtractSortKeyAndOrder(pos, commonSortTraits ? commonSortTraits : sortTraits, sortKey, sortOrder, ctx);
- if (!commonSortTraits) {
- YQL_ENSURE(sessionKey);
- YQL_ENSURE(sessionInit);
- YQL_ENSURE(sessionUpdate);
- TExprNode::TPtr sessionSortKey;
- TExprNode::TPtr sessionSortOrder;
- ExtractSortKeyAndOrder(pos, sessionSortTraits, sessionSortKey, sessionSortOrder, ctx);
- const auto keySelector = BuildKeySelector(pos, *rowType, originalKeyColumns, ctx);
- input = ctx.Builder(pos)
- .Callable("PartitionsByKeys")
- .Add(0, input)
- .Add(1, keySelector)
- .Add(2, sessionSortOrder)
- .Add(3, sessionSortKey)
- .Lambda(4)
- .Param("partitionedStream")
- .Apply(AddSessionParamsMemberLambda(pos, SessionStartMemberName, SessionParamsMemberName, keySelector, sessionKey, sessionInit, sessionUpdate, ctx))
- .With(0, "partitionedStream")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- rowItems.push_back(ctx.MakeType<TItemExprType>(SessionParamsMemberName, sessionParamsType));
- addedColumns.push_back(ctx.NewAtom(pos, SessionParamsMemberName));
- originalKeysWithSession.push_back(ctx.NewAtom(pos, SessionStartMemberName));
- addedColumns.push_back(originalKeysWithSession.back());
- rowItems.push_back(ctx.MakeType<TItemExprType>(SessionStartMemberName, sessionKeyType));
- sessionKey = sessionInit = sessionUpdate = {};
- }
- TExprNodeList keyColumns;
- auto rowArg = ctx.NewArgument(pos, "row");
- auto addMembersBody = rowArg;
- static const TStringBuf keyColumnNamePrefix = "_yql_CalcOverWindowJoinKey";
- const TStructExprType* rowTypeWithSession = ctx.MakeType<TStructExprType>(rowItems);
- for (auto& keyColumn : originalKeysWithSession) {
- YQL_ENSURE(keyColumn->IsAtom());
- auto columnName = keyColumn->Content();
- const TTypeAnnotationNode* columnType =
- rowTypeWithSession->GetItems()[*rowTypeWithSession->FindItem(columnName)]->GetItemType();
- if (columnType->HasOptionalOrNull()) {
- addedColumns.push_back(ctx.NewAtom(pos, TStringBuilder() << keyColumnNamePrefix << addedColumns.size()));
- keyColumns.push_back(addedColumns.back());
- TStringBuf newName = addedColumns.back()->Content();
- const TTypeAnnotationNode* newType = ctx.MakeType<TDataExprType>(EDataSlot::String);
- rowItems.push_back(ctx.MakeType<TItemExprType>(newName, newType));
- addMembersBody = ctx.Builder(pos)
- .Callable("AddMember")
- .Add(0, addMembersBody)
- .Atom(1, newName)
- .Callable(2, "StablePickle")
- .Callable(0, "Member")
- .Add(0, rowArg)
- .Add(1, keyColumn)
- .Seal()
- .Seal()
- .Seal()
- .Build();
- } else {
- keyColumns.push_back(keyColumn);
- }
- }
- input = ctx.Builder(pos)
- .Callable("Map")
- .Add(0, input)
- .Add(1, ctx.NewLambda(pos, ctx.NewArguments(pos, { rowArg }), std::move(addMembersBody)))
- .Seal()
- .Build();
- auto keySelector = BuildKeySelector(pos, *ctx.MakeType<TStructExprType>(rowItems),
- ctx.NewList(pos, TExprNodeList{keyColumns}), ctx);
- TExprNode::TPtr preprocessLambda;
- TExprNode::TPtr groupKeySelector;
- TExprNode::TPtr condenseSwitch;
- if (sessionUpdate) {
- YQL_ENSURE(sessionKey);
- YQL_ENSURE(sessionInit);
- YQL_ENSURE(sessionKeyType);
- YQL_ENSURE(commonSortTraits);
- preprocessLambda =
- AddSessionParamsMemberLambda(pos, SessionStartMemberName, SessionParamsMemberName, keySelector, sessionKey, sessionInit, sessionUpdate, ctx);
- rowItems.push_back(ctx.MakeType<TItemExprType>(SessionStartMemberName, sessionKeyType));
- rowItems.push_back(ctx.MakeType<TItemExprType>(SessionParamsMemberName, sessionParamsType));
- addedColumns.push_back(ctx.NewAtom(pos, SessionStartMemberName));
- addedColumns.push_back(ctx.NewAtom(pos, SessionParamsMemberName));
- if (sessionKeyType->HasOptionalOrNull()) {
- addedColumns.push_back(ctx.NewAtom(pos, TStringBuilder() << keyColumnNamePrefix << addedColumns.size()));
- preprocessLambda = ctx.Builder(pos)
- .Lambda()
- .Param("stream")
- .Callable("OrderedMap")
- .Apply(0, preprocessLambda)
- .With(0, "stream")
- .Seal()
- .Lambda(1)
- .Param("item")
- .Callable("AddMember")
- .Arg(0, "item")
- .Add(1, addedColumns.back())
- .Callable(2, "StablePickle")
- .Callable(0, "Member")
- .Arg(0, "item")
- .Atom(1, SessionStartMemberName)
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Build();
- TStringBuf newName = addedColumns.back()->Content();
- const TTypeAnnotationNode* newType = ctx.MakeType<TDataExprType>(EDataSlot::String);
- rowItems.push_back(ctx.MakeType<TItemExprType>(newName, newType));
- }
- keyColumns.push_back(addedColumns.back());
- auto groupKeySelector = BuildKeySelector(pos, *ctx.MakeType<TStructExprType>(rowItems),
- ctx.NewList(pos, TExprNodeList{keyColumns}), ctx);
- condenseSwitch = ctx.Builder(pos)
- .Lambda()
- .Param("row")
- .Param("state")
- .Callable("AggrNotEquals")
- .Apply(0, groupKeySelector)
- .With(0, "row")
- .Seal()
- .Apply(1, groupKeySelector)
- .With(0, "state")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- } else {
- YQL_ENSURE(!sessionKey);
- preprocessLambda = MakeIdentityLambda(pos, ctx);
- auto groupKeySelector = keySelector;
- condenseSwitch = ctx.Builder(pos)
- .Lambda()
- .Param("row")
- .Param("state")
- .Callable("IsKeySwitch")
- .Arg(0, "row")
- .Arg(1, "state")
- .Add(2, groupKeySelector)
- .Add(3, groupKeySelector)
- .Seal()
- .Seal()
- .Build();
- }
- auto partitionByKeysLambda = ctx.Builder(pos)
- .Lambda()
- .Param("stream")
- .Callable("Map")
- .Callable(0, "Condense1")
- .Apply(0, preprocessLambda)
- .With(0, "stream")
- .Seal()
- .Add(1, BuildFold1Lambda(pos, frames, EFold1LambdaKind::INIT, keyColumns, *rowType, ctx))
- .Add(2, condenseSwitch)
- .Add(3, BuildFold1Lambda(pos, frames, EFold1LambdaKind::UPDATE, keyColumns, *rowType, ctx))
- .Seal()
- .Add(1, BuildFold1Lambda(pos, frames, EFold1LambdaKind::CALCULATE, keyColumns, *rowType, ctx))
- .Seal()
- .Seal()
- .Build();
- if (HasContextFuncs(*partitionByKeysLambda)) {
- partitionByKeysLambda = ctx.Builder(pos)
- .Lambda()
- .Param("stream")
- .Callable("WithContext")
- .Apply(0, partitionByKeysLambda)
- .With(0, "stream")
- .Seal()
- .Atom(1, "WinAgg", TNodeFlags::Default)
- .Seal()
- .Seal()
- .Build();
- }
- auto aggregated = ctx.Builder(pos)
- .Callable("PartitionsByKeys")
- .Add(0, input)
- .Add(1, keySelector)
- .Add(2, sortOrder)
- .Add(3, sortKey)
- .Add(4, partitionByKeysLambda)
- .Seal().Build();
- if (sessionUpdate) {
- // preprocess input without aggregation
- input = ctx.Builder(pos)
- .Callable("PartitionsByKeys")
- .Add(0, input)
- .Add(1, ctx.DeepCopyLambda(*keySelector))
- .Add(2, sortOrder)
- .Add(3, ctx.DeepCopyLambda(*sortKey))
- .Lambda(4)
- .Param("stream")
- .Apply(preprocessLambda)
- .With(0, "stream")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
- TExprNode::TPtr joined;
- if (!keyColumns.empty()) {
- // SELECT * FROM input AS a JOIN aggregated AS b USING(keyColumns)
- auto buildJoinKeysTuple = [&](TStringBuf side) {
- TExprNodeList items;
- for (const auto& keyColumn : keyColumns) {
- items.push_back(ctx.NewAtom(pos, side));
- items.push_back(keyColumn);
- }
- return ctx.NewList(pos, std::move(items));
- };
- joined = ctx.Builder(pos)
- .Callable("EquiJoin")
- .List(0)
- .Add(0, input)
- .Atom(1, "a", TNodeFlags::Default)
- .Seal()
- .List(1)
- .Add(0, aggregated)
- .Atom(1, "b", TNodeFlags::Default)
- .Seal()
- .List(2)
- .Atom(0, "Inner", TNodeFlags::Default)
- .Atom(1, "a", TNodeFlags::Default)
- .Atom(2, "b", TNodeFlags::Default)
- .Add(3, buildJoinKeysTuple("a"))
- .Add(4, buildJoinKeysTuple("b"))
- .List(5)
- .List(0)
- .Atom(0, "right", TNodeFlags::Default)
- .Atom(1, "any", TNodeFlags::Default)
- .Seal()
- .Seal()
- .Seal()
- .List(3).Seal()
- .Seal()
- .Build();
- // remove b.keys*
- auto rowArg = ctx.NewArgument(pos, "row");
- TExprNode::TPtr removed = rowArg;
- auto removeSide = [&](const TString& side, const TExprNodeList& keys) {
- for (const auto& keyColumn : keys) {
- YQL_ENSURE(keyColumn->IsAtom());
- TString toRemove = side + keyColumn->Content();
- removed = ctx.Builder(pos)
- .Callable("RemoveMember")
- .Add(0, removed)
- .Atom(1, toRemove)
- .Seal()
- .Build();
- }
- };
- removeSide("b.", keyColumns);
- // add session columns
- for (auto column : sessionColumns->ChildrenList()) {
- removed = ctx.Builder(pos)
- .Callable("AddMember")
- .Add(0, removed)
- .Add(1, column)
- .Callable(2, "Member")
- .Add(0, rowArg)
- .Atom(1, TString("a.") + SessionParamsMemberName)
- .Seal()
- .Seal()
- .Build();
- }
- removeSide("a.", addedColumns);
- joined = ctx.Builder(pos)
- .Callable("Map")
- .Add(0, joined)
- .Add(1, ctx.NewLambda(pos, ctx.NewArguments(pos, {rowArg}), std::move(removed)))
- .Seal()
- .Build();
- } else {
- // SELECT * FROM input AS a CROSS JOIN aggregated AS b
- joined = ctx.Builder(pos)
- .Callable("EquiJoin")
- .List(0)
- .Add(0, input)
- .Atom(1, "a", TNodeFlags::Default)
- .Seal()
- .List(1)
- .Add(0, aggregated)
- .Atom(1, "b", TNodeFlags::Default)
- .Seal()
- .List(2)
- .Atom(0, "Cross", TNodeFlags::Default)
- .Atom(1, "a", TNodeFlags::Default)
- .Atom(2, "b", TNodeFlags::Default)
- .List(3).Seal()
- .List(4).Seal()
- .List(5).Seal()
- .Seal()
- .List(3).Seal()
- .Seal()
- .Build();
- }
- return ctx.Builder(pos)
- .Callable("Map")
- .Add(0, joined)
- .Lambda(1)
- .Param("row")
- .Callable("DivePrefixMembers")
- .Arg(0, "row")
- .List(1)
- .Atom(0, "a.")
- .Atom(1, "b.")
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
- TExprNode::TPtr TryExpandNonCompactFullFrames(TPositionHandle pos, const TExprNode::TPtr& inputList, const TExprNode::TPtr& keyColumns,
- const TExprNode::TPtr& sortTraits, const TExprNode::TPtr& frames, const TExprNode::TPtr& sessionTraits,
- const TExprNode::TPtr& sessionColumns, TExprContext& ctx)
- {
- TExprNodeList nonCompactAggregatingFullFrames;
- TExprNodeList otherFrames;
- for (auto& winOn : frames->ChildrenList()) {
- if (!IsNonCompactFullFrame(*winOn, ctx)) {
- otherFrames.push_back(winOn);
- continue;
- }
- YQL_ENSURE(TCoWinOnBase::Match(winOn.Get()));
- TExprNodeList nonAggregates = { winOn->ChildPtr(0) };
- TExprNodeList aggregates = { winOn->ChildPtr(0) };
- for (ui32 i = 1; i < winOn->ChildrenSize(); ++i) {
- auto item = winOn->Child(i)->Child(1);
- if (item->IsCallable("WindowTraits")) {
- aggregates.push_back(winOn->ChildPtr(i));
- } else {
- nonAggregates.push_back(winOn->ChildPtr(i));
- }
- }
- if (aggregates.size() == 1) {
- otherFrames.push_back(winOn);
- continue;
- }
- nonCompactAggregatingFullFrames.push_back(ctx.ChangeChildren(*winOn, std::move(aggregates)));
- if (nonAggregates.size() > 1) {
- otherFrames.push_back(ctx.ChangeChildren(*winOn, std::move(nonAggregates)));
- }
- }
- if (nonCompactAggregatingFullFrames.empty()) {
- return {};
- }
- auto fullFrames = ctx.NewList(pos, std::move(nonCompactAggregatingFullFrames));
- auto nonFullFrames = ctx.NewList(pos, std::move(otherFrames));
- auto expanded = ExpandNonCompactFullFrames(pos, inputList, keyColumns, sortTraits, fullFrames, sessionTraits, sessionColumns, ctx);
- if (sessionTraits && !sessionTraits->IsCallable("Void")) {
- return Build<TCoCalcOverSessionWindow>(ctx, pos)
- .Input(expanded)
- .Keys(keyColumns)
- .SortSpec(sortTraits)
- .Frames(nonFullFrames)
- .SessionSpec(sessionTraits)
- .SessionColumns(sessionColumns)
- .Done().Ptr();
- }
- YQL_ENSURE(sessionColumns->ChildrenSize() == 0);
- return Build<TCoCalcOverWindow>(ctx, pos)
- .Input(expanded)
- .Keys(keyColumns)
- .SortSpec(sortTraits)
- .Frames(nonFullFrames)
- .Done().Ptr();
- }
- void SplitFramesByType(const TExprNode::TPtr& frames, TExprNode::TPtr& rowFrames, TExprNode::TPtr& rangeFrames, TExprNode::TPtr& groupFrames, TExprContext& ctx) {
- TExprNodeList rows;
- TExprNodeList range;
- TExprNodeList groups;
- for (auto& winOn : frames->ChildrenList()) {
- if (TCoWinOnRows::Match(winOn.Get())) {
- rows.push_back(std::move(winOn));
- } else if (TCoWinOnRange::Match(winOn.Get())) {
- range.push_back(std::move(winOn));
- } else {
- YQL_ENSURE(TCoWinOnGroups::Match(winOn.Get()));
- groups.push_back(std::move(winOn));
- }
- }
- rowFrames = ctx.NewList(frames->Pos(), std::move(rows));
- rangeFrames = ctx.NewList(frames->Pos(), std::move(range));
- groupFrames = ctx.NewList(frames->Pos(), std::move(groups));
- }
- const TStructExprType* ApplyFramesToType(const TStructExprType& inputType, const TStructExprType& finalOutputType, const TExprNode& frames, TExprContext& ctx) {
- TVector<const TItemExprType*> resultItems = inputType.GetItems();
- for (auto& frame : frames.ChildrenList()) {
- YQL_ENSURE(TCoWinOnBase::Match(frame.Get()));
- for (size_t i = 1; i < frame->ChildrenSize(); ++i) {
- YQL_ENSURE(frame->Child(i)->IsList());
- YQL_ENSURE(frame->Child(i)->Head().IsAtom());
- TStringBuf column = frame->Child(i)->Head().Content();
- const TTypeAnnotationNode* type = finalOutputType.FindItemType(column);
- YQL_ENSURE(type);
- resultItems.push_back(ctx.MakeType<TItemExprType>(column, type));
- }
- }
- return ctx.MakeType<TStructExprType>(resultItems);
- }
- bool NeedPartitionRows(const TExprNode::TPtr& frames, const TStructExprType& rowType, TExprContext& ctx) {
- if (frames->ChildrenSize() == 0) {
- return false;
- }
- TCalcOverWindowTraits traits = ExtractCalcOverWindowTraits(frames, rowType, ctx);
- for (const auto& item : traits.RawTraits) {
- const TRawTrait& trait = item.second;
- if (trait.CalculateLambda->IsCallable({"CumeDist","NTile","PercentRank"})) {
- return true;
- }
- }
-
- return false;
- }
- TString AllocatePartitionRowsColumn(const TStructExprType& rowType) {
- ui64 index = 0;
- for (;;) {
- auto name = "_yql_partition_rows_" + ToString(index);
- if (!rowType.FindItemType(name)) {
- return name;
- }
- ++index;
- }
- }
- TExprNode::TPtr AddPartitionRowsColumn(TPositionHandle pos, const TExprNode::TPtr& input, const TExprNode::TPtr& keyColumns,
- const TString& columnName, TExprContext& ctx, TTypeAnnotationContext& types) {
- auto exportsPtr = types.Modules->GetModule("/lib/yql/window.yql");
- YQL_ENSURE(exportsPtr);
- const auto& exports = exportsPtr->Symbols();
- const auto ex = exports.find("count_traits_factory");
- YQL_ENSURE(exports.cend() != ex);
- TNodeOnNodeOwnedMap deepClones;
- auto lambda = ctx.DeepCopy(*ex->second, exportsPtr->ExprCtx(), deepClones, true, false);
- auto listTypeNode = ctx.NewCallable(pos, "TypeOf", {input});
- auto extractor = ctx.Builder(pos)
- .Lambda()
- .Param("row")
- .Callable("Void")
- .Seal()
- .Seal()
- .Build();
- auto traits = ctx.ReplaceNodes(lambda->TailPtr(), {
- {lambda->Head().Child(0), listTypeNode},
- {lambda->Head().Child(1), extractor}
- });
- ctx.Step.Repeat(TExprStep::ExpandApplyForLambdas);
- auto status = ExpandApply(traits, traits, ctx);
- YQL_ENSURE(status != IGraphTransformer::TStatus::Error);
- return ctx.Builder(pos)
- .Callable("CalcOverWindow")
- .Add(0, input)
- .Add(1, keyColumns)
- .Callable(2, "Void")
- .Seal()
- .List(3)
- .Callable(0, "WinOnRows")
- .List(0)
- .List(0)
- .Atom(0, "begin")
- .Callable(1, "Void")
- .Seal()
- .Seal()
- .List(1)
- .Atom(0, "end")
- .Callable(1, "Void")
- .Seal()
- .Seal()
- .Seal()
- .List(1)
- .Atom(0, columnName)
- .Add(1, traits)
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
- TExprNode::TPtr RemovePartitionRowsColumn(TPositionHandle pos, const TExprNode::TPtr& input, const TString& columnName, TExprContext& ctx) {
- return ctx.Builder(pos)
- .Callable("Map")
- .Add(0, input)
- .Lambda(1)
- .Param("row")
- .Callable("RemoveMember")
- .Arg(0, "row")
- .Atom(1, columnName)
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
- TExprNode::TPtr ProcessRowsFrames(TPositionHandle pos, const TExprNode::TPtr& input, const TStructExprType& rowType, const TExprNode::TPtr& dependsOn,
- const TExprNode::TPtr& frames, const TMaybe<TString>& partitionRowsColumn, TExprContext& ctx)
- {
- if (frames->ChildrenSize() == 0) {
- return input;
- }
- TExprNode::TPtr processed = input;
- TExprNode::TPtr dataQueue;
- TQueueParams queueParams;
- TVector<TChain1MapTraits::TPtr> traits = BuildFoldMapTraits(queueParams, frames, partitionRowsColumn, rowType, ctx);
- if (queueParams.DataQueueNeeded) {
- ui64 queueSize = (queueParams.DataOutpace == Max<ui64>()) ? Max<ui64>() : (queueParams.DataOutpace + queueParams.DataLag + 2);
- dataQueue = BuildQueue(pos, rowType, queueSize, queueParams.DataLag, dependsOn, ctx);
- processed = ctx.Builder(pos)
- .Callable("PreserveStream")
- .Add(0, processed)
- .Add(1, dataQueue)
- .Add(2, BuildUint64(pos, queueParams.DataOutpace, ctx))
- .Seal()
- .Build();
- }
- processed = ctx.Builder(pos)
- .Callable("OrderedMap")
- .Callable(0, "Chain1Map")
- .Add(0, std::move(processed))
- .Add(1, BuildChain1MapInitLambda(pos, traits, dataQueue, queueParams.LagQueueSize, queueParams.LagQueueItemType, ctx))
- .Add(2, BuildChain1MapUpdateLambda(pos, traits, dataQueue, queueParams.LagQueueSize != 0, ctx))
- .Seal()
- .Lambda(1)
- .Param("pair")
- .Callable("Nth")
- .Arg(0, "pair")
- .Atom(1, "0", TNodeFlags::Default)
- .Seal()
- .Seal()
- .Seal()
- .Build();
- return WrapWithWinContext(processed, ctx);
- }
- TExprNode::TPtr ProcessRangeFrames(TPositionHandle pos, const TExprNode::TPtr& input, const TStructExprType& rowType, const TExprNode::TPtr& sortKey, const TExprNode::TPtr& frames,
- const TMaybe<TString>& partitionRowsColumn, TExprContext& ctx) {
- if (frames->ChildrenSize() == 0) {
- return input;
- }
- TExprNode::TPtr processed = input;
- TQueueParams queueParams;
- TVector<TChain1MapTraits::TPtr> traits = BuildFoldMapTraits(queueParams, frames, partitionRowsColumn, rowType, ctx);
- YQL_ENSURE(!queueParams.DataQueueNeeded);
- YQL_ENSURE(queueParams.LagQueueSize == 0);
- YQL_ENSURE(queueParams.LagQueueItemType == nullptr);
- // same processing as in WinOnRows
- processed = ctx.Builder(pos)
- .Callable("OrderedMap")
- .Callable(0, "Chain1Map")
- .Add(0, std::move(processed))
- .Add(1, BuildChain1MapInitLambda(pos, traits, nullptr, 0, nullptr, ctx))
- .Add(2, BuildChain1MapUpdateLambda(pos, traits, nullptr, false, ctx))
- .Seal()
- .Lambda(1)
- .Param("pair")
- .Callable("Nth")
- .Arg(0, "pair")
- .Atom(1, "0", TNodeFlags::Default)
- .Seal()
- .Seal()
- .Seal()
- .Build();
- processed = WrapWithWinContext(processed, ctx);
- TExprNode::TPtr sortKeyLambda = sortKey;
- if (sortKey->IsCallable("Void")) {
- sortKeyLambda = ctx.Builder(sortKey->Pos())
- .Lambda()
- .Param("row")
- .Callable("Void")
- .Seal()
- .Seal()
- .Build();
- }
- auto processedItemType = ctx.Builder(pos)
- .Callable("StreamItemType")
- .Callable(0, "TypeOf")
- .Add(0, processed)
- .Seal()
- .Seal()
- .Build();
- auto variantType = ctx.Builder(pos)
- .Callable("VariantType")
- .Callable(0, "StructType")
- .List(0)
- .Atom(0, "singleRow", TNodeFlags::Default)
- .Add(1, processedItemType)
- .Seal()
- .List(1)
- .Atom(0, "group", TNodeFlags::Default)
- .Callable(1, "ListType")
- .Add(0, processedItemType)
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Build();
- // split rows by groups with equal sortKey
- processed = ctx.Builder(pos)
- .Callable("Condense1")
- .Add(0, processed)
- .Lambda(1)
- .Param("row")
- .List()
- .Apply(0, sortKeyLambda)
- .With(0, "row")
- .Seal()
- .Callable(1, "Variant")
- .Arg(0, "row")
- .Atom(1, "singleRow", TNodeFlags::Default)
- .Add(2, variantType)
- .Seal()
- .Seal()
- .Seal()
- .Lambda(2)
- .Param("row")
- .Param("state")
- .Callable(0, "AggrNotEquals")
- .Apply(0, sortKeyLambda)
- .With(0, "row")
- .Seal()
- .Callable(1, "Nth")
- .Arg(0, "state")
- .Atom(1, "0", TNodeFlags::Default)
- .Seal()
- .Seal()
- .Seal()
- .Lambda(3)
- .Param("row")
- .Param("state")
- .List()
- .Callable(0, "Nth")
- .Arg(0, "state")
- .Atom(1, "0", TNodeFlags::Default)
- .Seal()
- .Callable(1, "Visit")
- .Callable(0, "Nth")
- .Arg(0, "state")
- .Atom(1, "1", TNodeFlags::Default)
- .Seal()
- .Atom(1, "singleRow", TNodeFlags::Default)
- .Lambda(2)
- .Param("singleRow")
- .Callable(0, "Variant")
- .Callable(0, "AsList")
- .Arg(0, "singleRow")
- .Arg(1, "row")
- .Seal()
- .Atom(1, "group", TNodeFlags::Default)
- .Add(2, variantType)
- .Seal()
- .Seal()
- .Atom(3, "group", TNodeFlags::Default)
- .Lambda(4)
- .Param("group")
- .Callable(0, "Variant")
- .Callable(0, "Insert")
- .Arg(0, "group")
- .Arg(1, "row")
- .Seal()
- .Atom(1, "group", TNodeFlags::Default)
- .Add(2, variantType)
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Build();
- processed = ctx.Builder(pos)
- .Callable("OrderedMap")
- .Add(0, processed)
- .Lambda(1)
- .Param("item")
- .Callable(0, "Nth")
- .Arg(0, "item")
- .Atom(1, "1", TNodeFlags::Default)
- .Seal()
- .Seal()
- .Seal()
- .Build();
- auto lastRowArg = ctx.NewArgument(pos, "lastRow");
- auto currentRowArg = ctx.NewArgument(pos, "currentRow");
- auto currentRow = currentRowArg;
- for (auto& trait : traits) {
- TStringBuf name = trait->GetName();
- currentRow = ctx.Builder(pos)
- .Callable("AddMember")
- .Callable(0, "RemoveMember")
- .Add(0, currentRow)
- .Atom(1, name)
- .Seal()
- .Atom(1, name)
- .Callable(2, "Member")
- .Add(0, lastRowArg)
- .Atom(1, name)
- .Seal()
- .Seal()
- .Build();
- }
- auto overwriteWithLastRowLambda = ctx.NewLambda(pos, ctx.NewArguments(pos, { currentRowArg, lastRowArg }), std::move(currentRow));
- // processed is currently stream of groups (=Variant<row, List<row>>>) with equal sort keys
- processed = ctx.Builder(pos)
- .Callable("OrderedFlatMap")
- .Add(0, processed)
- .Lambda(1)
- .Param("item")
- .Callable("Visit")
- .Arg(0, "item")
- .Atom(1, "singleRow", TNodeFlags::Default)
- .Lambda(2)
- .Param("singleRow")
- .Callable(0, "AsList")
- .Arg(0, "singleRow")
- .Seal()
- .Seal()
- .Atom(3, "group", TNodeFlags::Default)
- .Lambda(4)
- .Param("group")
- .Callable("Coalesce")
- .Callable(0, "Map")
- .Callable(0, "Last")
- .Arg(0, "group")
- .Seal()
- .Lambda(1)
- .Param("lastRow")
- .Callable("OrderedMap")
- .Arg(0, "group")
- .Lambda(1)
- .Param("currentRow")
- .Apply(overwriteWithLastRowLambda)
- .With(0, "currentRow")
- .With(1, "lastRow")
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Callable(1, "EmptyList")
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Build();
- return processed;
- }
- TExprNode::TPtr ExpandSingleCalcOverWindow(TPositionHandle pos, const TExprNode::TPtr& inputList, const TExprNode::TPtr& keyColumns,
- const TExprNode::TPtr& sortTraits, const TExprNode::TPtr& frames, const TExprNode::TPtr& sessionTraits,
- const TExprNode::TPtr& sessionColumns, const TStructExprType& outputRowType, TExprContext& ctx, TTypeAnnotationContext& types)
- {
- if (auto expanded = TryExpandNonCompactFullFrames(pos, inputList, keyColumns, sortTraits, frames, sessionTraits, sessionColumns, ctx)) {
- YQL_CLOG(INFO, Core) << "Expanded non-compact CalcOverWindow";
- return expanded;
- }
- TExprNode::TPtr sessionKey;
- TExprNode::TPtr sessionSortTraits;
- const TTypeAnnotationNode* sessionKeyType = nullptr;
- const TTypeAnnotationNode* sessionParamsType = nullptr;
- TExprNode::TPtr sessionInit;
- TExprNode::TPtr sessionUpdate;
- ExtractSessionWindowParams(pos, sessionTraits, sessionKey, sessionKeyType, sessionParamsType, sessionSortTraits, sessionInit, sessionUpdate, ctx);
- const auto originalRowType = inputList->GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
- TVector<const TItemExprType*> rowItems = originalRowType->GetItems();
- if (sessionKeyType) {
- YQL_ENSURE(sessionParamsType);
- rowItems.push_back(ctx.MakeType<TItemExprType>(SessionStartMemberName, sessionKeyType));
- rowItems.push_back(ctx.MakeType<TItemExprType>(SessionParamsMemberName, sessionParamsType));
- }
- auto rowType = ctx.MakeType<TStructExprType>(rowItems);
- auto keySelector = BuildKeySelector(pos, *rowType->Cast<TStructExprType>(), keyColumns, ctx);
- TExprNode::TPtr sortKey;
- TExprNode::TPtr sortOrder;
- ExtractSortKeyAndOrder(pos, sortTraits, sortKey, sortOrder, ctx);
- const TExprNode::TPtr originalSortKey = sortKey;
- TExprNode::TPtr input = inputList;
- const auto commonSortTraits = DeduceCompatibleSort(sortTraits, sessionSortTraits);
- ExtractSortKeyAndOrder(pos, commonSortTraits ? commonSortTraits : sortTraits, sortKey, sortOrder, ctx);
- auto fullKeyColumns = keyColumns;
- if (!commonSortTraits) {
- YQL_ENSURE(sessionKey);
- YQL_ENSURE(sessionInit);
- YQL_ENSURE(sessionUpdate);
- TExprNode::TPtr sessionSortKey;
- TExprNode::TPtr sessionSortOrder;
- ExtractSortKeyAndOrder(pos, sessionSortTraits, sessionSortKey, sessionSortOrder, ctx);
- input = ctx.Builder(pos)
- .Callable("PartitionsByKeys")
- .Add(0, input)
- .Add(1, keySelector)
- .Add(2, sessionSortOrder)
- .Add(3, sessionSortKey)
- .Lambda(4)
- .Param("partitionedStream")
- .Apply(AddSessionParamsMemberLambda(pos, SessionStartMemberName, SessionParamsMemberName, keySelector, sessionKey, sessionInit, sessionUpdate, ctx))
- .With(0, "partitionedStream")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- TExprNodeList keyColumnsList = keyColumns->ChildrenList();
- keyColumnsList.push_back(ctx.NewAtom(pos, SessionStartMemberName));
- auto keyColumnsWithSessionStart = ctx.NewList(pos, std::move(keyColumnsList));
- fullKeyColumns = keyColumnsWithSessionStart;
- keySelector = BuildKeySelector(pos, *rowType, keyColumnsWithSessionStart, ctx);
- sessionKey = sessionInit = sessionUpdate = {};
- }
- TExprNode::TPtr rowsFrames;
- TExprNode::TPtr rangeFrames;
- TExprNode::TPtr groupsFrames;
- SplitFramesByType(frames, rowsFrames, rangeFrames, groupsFrames, ctx);
- YQL_ENSURE(groupsFrames->ChildrenSize() == 0);
- auto topLevelStreamArg = ctx.NewArgument(pos, "stream");
- TExprNode::TPtr processed = topLevelStreamArg;
- TMaybe<TString> partitionRowsColumn;
- if (NeedPartitionRows(frames, *rowType, ctx)) {
- partitionRowsColumn = AllocatePartitionRowsColumn(outputRowType);
- input = AddPartitionRowsColumn(pos, input, fullKeyColumns, *partitionRowsColumn, ctx, types);
- }
- // All RANGE frames (even simplest RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
- // will require additional memory to store TableRow()'s - so we want to start with minimum size of row
- // (i.e. process range frames first)
- processed = ProcessRangeFrames(pos, processed, *rowType, originalSortKey, rangeFrames, partitionRowsColumn, ctx);
- rowType = ApplyFramesToType(*rowType, outputRowType, *rangeFrames, ctx);
- processed = ProcessRowsFrames(pos, processed, *rowType, topLevelStreamArg, rowsFrames, partitionRowsColumn, ctx);
- auto topLevelStreamProcessingLambda = ctx.NewLambda(pos, ctx.NewArguments(pos, {topLevelStreamArg}), std::move(processed));
- YQL_CLOG(INFO, Core) << "Expanded compact CalcOverWindow";
- auto res = BuildPartitionsByKeys(pos, input, keySelector, sortOrder, sortKey, topLevelStreamProcessingLambda, sessionKey,
- sessionInit, sessionUpdate, sessionColumns, ctx);
- if (partitionRowsColumn) {
- res = RemovePartitionRowsColumn(pos, res, *partitionRowsColumn, ctx);
- }
- return res;
- }
- } // namespace
- TExprNode::TPtr ExpandCalcOverWindow(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
- YQL_ENSURE(node->IsCallable({"CalcOverWindow", "CalcOverSessionWindow", "CalcOverWindowGroup"}));
- auto input = node->ChildPtr(0);
- auto calcs = ExtractCalcsOverWindow(node, ctx);
- if (calcs.empty()) {
- return input;
- }
- TCoCalcOverWindowTuple calc(calcs.front());
- if (calc.Frames().Size() != 0 || calc.SessionColumns().Size() != 0) {
- const TStructExprType& outputRowType = *node->GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
- input = ExpandSingleCalcOverWindow(node->Pos(), input, calc.Keys().Ptr(), calc.SortSpec().Ptr(), calc.Frames().Ptr(),
- calc.SessionSpec().Ptr(), calc.SessionColumns().Ptr(), outputRowType, ctx, types);
- }
- calcs.erase(calcs.begin());
- return RebuildCalcOverWindowGroup(node->Pos(), input, calcs, ctx);
- }
- TExprNodeList ExtractCalcsOverWindow(const TExprNodePtr& node, TExprContext& ctx) {
- TExprNodeList result;
- if (auto maybeBase = TMaybeNode<TCoCalcOverWindowBase>(node)) {
- TCoCalcOverWindowBase self(maybeBase.Cast());
- TExprNode::TPtr sessionSpec;
- TExprNode::TPtr sessionColumns;
- if (auto session = TMaybeNode<TCoCalcOverSessionWindow>(node)) {
- sessionSpec = session.Cast().SessionSpec().Ptr();
- sessionColumns = session.Cast().SessionColumns().Ptr();
- } else {
- sessionSpec = ctx.NewCallable(node->Pos(), "Void", {});
- sessionColumns = ctx.NewList(node->Pos(), {});
- }
- result.emplace_back(
- Build<TCoCalcOverWindowTuple>(ctx, node->Pos())
- .Keys(self.Keys())
- .SortSpec(self.SortSpec())
- .Frames(self.Frames())
- .SessionSpec(sessionSpec)
- .SessionColumns(sessionColumns)
- .Done().Ptr()
- );
- } else {
- result = TMaybeNode<TCoCalcOverWindowGroup>(node).Cast().Calcs().Ref().ChildrenList();
- }
- return result;
- }
- TExprNode::TPtr RebuildCalcOverWindowGroup(TPositionHandle pos, const TExprNode::TPtr& input, const TExprNodeList& calcs, TExprContext& ctx) {
- auto inputType = ctx.Builder(input->Pos())
- .Callable("TypeOf")
- .Add(0, input)
- .Seal()
- .Build();
- auto inputItemType = ctx.Builder(input->Pos())
- .Callable("ListItemType")
- .Add(0, inputType)
- .Seal()
- .Build();
- TExprNodeList fixedCalcs;
- for (auto calcNode : calcs) {
- TCoCalcOverWindowTuple calc(calcNode);
- auto sortSpec = calc.SortSpec().Ptr();
- if (sortSpec->IsCallable("SortTraits")) {
- sortSpec = ctx.Builder(sortSpec->Pos())
- .Callable("SortTraits")
- .Add(0, inputType)
- .Add(1, sortSpec->ChildPtr(1))
- .Add(2, ctx.DeepCopyLambda(*sortSpec->Child(2)))
- .Seal()
- .Build();
- } else {
- YQL_ENSURE(sortSpec->IsCallable("Void"));
- }
- auto sessionSpec = calc.SessionSpec().Ptr();
- if (sessionSpec->IsCallable("SessionWindowTraits")) {
- TCoSessionWindowTraits traits(sessionSpec);
- auto sessionSortSpec = traits.SortSpec().Ptr();
- if (auto maybeSort = TMaybeNode<TCoSortTraits>(sessionSortSpec)) {
- sessionSortSpec = Build<TCoSortTraits>(ctx, sessionSortSpec->Pos())
- .ListType(inputType)
- .SortDirections(maybeSort.Cast().SortDirections())
- .SortKeySelectorLambda(ctx.DeepCopyLambda(maybeSort.Cast().SortKeySelectorLambda().Ref()))
- .Done().Ptr();
- } else {
- YQL_ENSURE(sessionSortSpec->IsCallable("Void"));
- }
- sessionSpec = Build<TCoSessionWindowTraits>(ctx, traits.Pos())
- .ListType(inputType)
- .SortSpec(sessionSortSpec)
- .InitState(ctx.DeepCopyLambda(traits.InitState().Ref()))
- .UpdateState(ctx.DeepCopyLambda(traits.UpdateState().Ref()))
- .Calculate(ctx.DeepCopyLambda(traits.Calculate().Ref()))
- .Done().Ptr();
- } else {
- YQL_ENSURE(sessionSpec->IsCallable("Void"));
- }
- auto sessionColumns = calc.SessionColumns().Ptr();
- TExprNodeList newFrames;
- for (auto frameNode : calc.Frames().Ref().Children()) {
- YQL_ENSURE(TCoWinOnBase::Match(frameNode.Get()));
- TExprNodeList winOnArgs = { frameNode->ChildPtr(0) };
- for (ui32 i = 1; i < frameNode->ChildrenSize(); ++i) {
- auto kvTuple = frameNode->ChildPtr(i);
- YQL_ENSURE(kvTuple->IsList());
- YQL_ENSURE(2 <= kvTuple->ChildrenSize() && kvTuple->ChildrenSize() <= 3);
- auto columnName = kvTuple->ChildPtr(0);
- auto traits = kvTuple->ChildPtr(1);
- YQL_ENSURE(traits->IsCallable({"Lag", "Lead", "RowNumber", "Rank", "DenseRank", "WindowTraits", "PercentRank", "CumeDist", "NTile"}));
- if (traits->IsCallable("WindowTraits")) {
- bool isDistinct = kvTuple->ChildrenSize() == 3;
- if (!isDistinct) {
- YQL_ENSURE(traits->Head().GetTypeAnn());
- const TTypeAnnotationNode& oldItemType = *traits->Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
- traits = ctx.Builder(traits->Pos())
- .Callable(traits->Content())
- .Add(0, inputItemType)
- .Add(1, ctx.DeepCopyLambda(*ReplaceFirstLambdaArgWithCastStruct(*traits->Child(1), oldItemType, ctx)))
- .Add(2, ctx.DeepCopyLambda(*ReplaceFirstLambdaArgWithCastStruct(*traits->Child(2), oldItemType, ctx)))
- .Add(3, ctx.DeepCopyLambda(*ReplaceFirstLambdaArgWithCastStruct(*traits->Child(3), oldItemType, ctx)))
- .Add(4, ctx.DeepCopyLambda(*traits->Child(4)))
- .Add(5, traits->Child(5)->IsLambda() ? ctx.DeepCopyLambda(*traits->Child(5)) : traits->ChildPtr(5))
- .Seal()
- .Build();
- }
- } else if (traits->IsCallable({"Lag", "Lead", "Rank", "DenseRank", "PercentRank"})) {
- YQL_ENSURE(traits->Head().GetTypeAnn());
- const TTypeAnnotationNode& oldItemType = *traits->Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType()
- ->Cast<TListExprType>()->GetItemType();
- traits = ctx.ChangeChild(*traits, 1, ctx.DeepCopyLambda(*ReplaceFirstLambdaArgWithCastStruct(*traits->Child(1), oldItemType, ctx)));
- }
- winOnArgs.push_back(ctx.ChangeChild(*kvTuple, 1, std::move(traits)));
- }
- newFrames.push_back(ctx.ChangeChildren(*frameNode, std::move(winOnArgs)));
- }
- fixedCalcs.push_back(
- Build<TCoCalcOverWindowTuple>(ctx, calc.Pos())
- .Keys(calc.Keys())
- .SortSpec(sortSpec)
- .Frames(ctx.NewList(calc.Frames().Pos(), std::move(newFrames)))
- .SessionSpec(sessionSpec)
- .SessionColumns(sessionColumns)
- .Done().Ptr()
- );
- }
- return Build<TCoCalcOverWindowGroup>(ctx, pos)
- .Input(input)
- .Calcs(ctx.NewList(pos, std::move(fixedCalcs)))
- .Done().Ptr();
- }
- bool IsUnbounded(const NNodes::TCoFrameBound& bound) {
- if (bound.Ref().ChildrenSize() < 2) {
- return false;
- }
- if (auto maybeAtom = bound.Bound().Maybe<TCoAtom>()) {
- return maybeAtom.Cast().Value() == "unbounded";
- }
- return false;
- }
- bool IsCurrentRow(const NNodes::TCoFrameBound& bound) {
- return bound.Setting().Value() == "currentRow";
- }
- TWindowFrameSettings TWindowFrameSettings::Parse(const TExprNode& node, TExprContext& ctx) {
- auto maybeSettings = TryParse(node, ctx);
- YQL_ENSURE(maybeSettings);
- return *maybeSettings;
- }
- TMaybe<TWindowFrameSettings> TWindowFrameSettings::TryParse(const TExprNode& node, TExprContext& ctx) {
- TWindowFrameSettings settings;
- if (node.IsCallable("WinOnRows")) {
- settings.Type = EFrameType::FrameByRows;
- } else if (node.IsCallable("WinOnRange")) {
- settings.Type = EFrameType::FrameByRange;
- } else {
- YQL_ENSURE(node.IsCallable("WinOnGroups"));
- settings.Type = EFrameType::FrameByGroups;
- }
- auto frameSpec = node.Child(0);
- if (frameSpec->Type() == TExprNode::List) {
- bool hasBegin = false;
- bool hasEnd = false;
- for (const auto& setting : frameSpec->Children()) {
- if (!EnsureTupleMinSize(*setting, 1, ctx)) {
- return {};
- }
- if (!EnsureAtom(setting->Head(), ctx)) {
- return {};
- }
- const auto settingName = setting->Head().Content();
- if (settingName != "begin" && settingName != "end" && settingName != "compact") {
- ctx.AddError(
- TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Invalid frame bound '" << settingName << "'"));
- return {};
- }
- if (settingName == "compact") {
- settings.Compact = true;
- continue;
- }
- if (!EnsureTupleSize(*setting, 2, ctx)) {
- return {};
- }
- bool& hasBound = (settingName == "begin") ? hasBegin : hasEnd;
- if (hasBound) {
- ctx.AddError(
- TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Duplicate " << settingName << " frame bound detected"));
- return {};
- }
- hasBound = true;
- TMaybe<i32>& boundOffset = (settingName == "begin") ? settings.FirstOffset : settings.LastOffset;
- TExprNode::TPtr& frameBound = (settingName == "begin") ? settings.First : settings.Last;
- if (setting->Tail().IsList()) {
- TExprNode::TPtr fb = setting->TailPtr();
- if (!EnsureTupleMinSize(*fb, 1, ctx)) {
- return {};
- }
- if (!EnsureAtom(fb->Head(), ctx)) {
- return {};
- }
- auto type = fb->Head().Content();
- if (type == "currentRow") {
- if (fb->ChildrenSize() == 1) {
- if (!node.IsCallable("WinOnRange")) {
- ctx.AddError(TIssue(ctx.GetPosition(fb->Pos()), TStringBuilder() << "currentRow should only be used for RANGE"));
- return {};
- }
- frameBound = fb;
- continue;
- }
- ctx.AddError(TIssue(ctx.GetPosition(fb->Pos()), TStringBuilder() << "Expecting no value for '" << type << "'"));
- return {};
- }
- if (!(type == "preceding" || type == "following")) {
- ctx.AddError(TIssue(ctx.GetPosition(fb->Pos()), TStringBuilder() << "Expecting preceding or following, but got '" << type << "'"));
- return {};
- }
- if (!EnsureTupleSize(*fb, 2, ctx)) {
- return {};
- }
- auto boundValue = fb->ChildPtr(1);
- if (boundValue->IsAtom()) {
- if (boundValue->Content() == "unbounded") {
- frameBound = fb;
- continue;
- }
- ctx.AddError(TIssue(ctx.GetPosition(fb->Pos()), TStringBuilder() << "Expecting unbounded, but got '" << boundValue->Content() << "'"));
- return {};
- }
- if (node.IsCallable({"WinOnRows", "WinOnGroups"})) {
- if (!EnsureDataType(*boundValue, ctx)) {
- return {};
- }
- auto slot = boundValue->GetTypeAnn()->Cast<TDataExprType>()->GetSlot();
- bool groups = node.IsCallable("WinOnGroups");
- if (!IsDataTypeIntegral(slot)) {
- ctx.AddError(TIssue(ctx.GetPosition(boundValue->Pos()),
- TStringBuilder() << "Expecting integral values for " << (groups ? "GROUPS" : "ROWS") << " but got " << *boundValue->GetTypeAnn()));
- return {};
- }
- if (!groups) {
- auto maybeIntLiteral = TMaybeNode<TCoIntegralCtor>(boundValue);
- if (!maybeIntLiteral) {
- // TODO: this is not strictly necessary, and only needed for current implementation via Queue
- ctx.AddError(TIssue(ctx.GetPosition(boundValue->Pos()),
- TStringBuilder() << "Expecting literal values for ROWS"));
- return {};
- }
- auto strLiteralValue = maybeIntLiteral.Cast().Literal().Value();
- if (strLiteralValue.StartsWith("-")) {
- ctx.AddError(TIssue(ctx.GetPosition(boundValue->Pos()),
- TStringBuilder() << "Expecting positive literal values for ROWS, but got " << strLiteralValue));
- return {};
- }
- ui64 literalValue = FromString<ui64>(strLiteralValue);
- if (literalValue > std::numeric_limits<i32>::max()) {
- ctx.AddError(TIssue(ctx.GetPosition(boundValue->Pos()),
- TStringBuilder() << "ROWS offset too big: " << strLiteralValue << ", maximum is " << std::numeric_limits<i32>::max()));
- return {};
- }
- i32 castedValue = (i32)literalValue;
- if (type == "preceding") {
- castedValue = -castedValue;
- }
- boundOffset = castedValue;
- }
- } else if (!EnsureComparableType(boundValue->Pos(), *boundValue->GetTypeAnn(), ctx)) {
- return {};
- }
- frameBound = fb;
- } else if (setting->Tail().IsCallable("Int32")) {
- auto& valNode = setting->Tail().Head();
- YQL_ENSURE(valNode.IsAtom());
- i32 value;
- YQL_ENSURE(TryFromString(valNode.Content(), value));
- boundOffset = value;
- } else if (!setting->Tail().IsCallable("Void")) {
- const TTypeAnnotationNode* type = setting->Tail().GetTypeAnn();
- TStringBuilder errMsg;
- if (!type) {
- errMsg << "lambda";
- } else if (setting->Tail().IsCallable()) {
- errMsg << setting->Tail().Content() << " with type " << *type;
- } else {
- errMsg << *type;
- }
- ctx.AddError(TIssue(ctx.GetPosition(setting->Tail().Pos()),
- TStringBuilder() << "Invalid " << settingName << " frame bound - expecting Void or Int32 callable, but got: " << errMsg));
- return {};
- }
- }
- if (!hasBegin || !hasEnd) {
- ctx.AddError(TIssue(ctx.GetPosition(frameSpec->Pos()),
- TStringBuilder() << "Missing " << (!hasBegin ? "begin" : "end") << " bound in frame definition"));
- return {};
- }
- } else if (frameSpec->IsCallable("Void")) {
- settings.FirstOffset = {};
- settings.LastOffset = 0;
- } else {
- const TTypeAnnotationNode* type = frameSpec->GetTypeAnn();
- ctx.AddError(TIssue(ctx.GetPosition(frameSpec->Pos()),
- TStringBuilder() << "Invalid window frame - expecting Tuple or Void, but got: " << (type ? FormatType(type) : "lambda")));
- return {};
- }
- // frame will always contain rows if it includes current row
- if (!settings.FirstOffset) {
- settings.NeverEmpty = !settings.LastOffset.Defined() || *settings.LastOffset >= 0;
- } else if (!settings.LastOffset.Defined()) {
- settings.NeverEmpty = !settings.FirstOffset.Defined() || *settings.FirstOffset <= 0;
- } else {
- settings.NeverEmpty = *settings.FirstOffset <= *settings.LastOffset && *settings.FirstOffset <= 0 && *settings.LastOffset >= 0;
- }
- return settings;
- }
- TMaybe<i32> TWindowFrameSettings::GetFirstOffset() const {
- YQL_ENSURE(Type == FrameByRows);
- return FirstOffset;
- }
- TMaybe<i32> TWindowFrameSettings::GetLastOffset() const {
- YQL_ENSURE(Type == FrameByRows);
- return LastOffset;
- }
- TCoFrameBound TWindowFrameSettings::GetFirst() const {
- YQL_ENSURE(First);
- return TCoFrameBound(First);
- }
- TCoFrameBound TWindowFrameSettings::GetLast() const {
- YQL_ENSURE(Last);
- return TCoFrameBound(Last);
- }
- TExprNode::TPtr ZipWithSessionParamsLambda(TPositionHandle pos, const TExprNode::TPtr& partitionKeySelector,
- const TExprNode::TPtr& sessionKeySelector, const TExprNode::TPtr& sessionInit,
- const TExprNode::TPtr& sessionUpdate, TExprContext& ctx)
- {
- auto extractTupleItem = [&](ui32 idx) {
- return ctx.Builder(pos)
- .Lambda()
- .Param("tuple")
- .Callable("Nth")
- .Arg(0, "tuple")
- .Atom(1, ToString(idx), TNodeFlags::Default)
- .Seal()
- .Seal()
- .Build();
- };
- auto initLambda = ctx.Builder(pos)
- .Lambda()
- .Param("row")
- .List() // row, sessionKey, sessionState, partitionKey
- .Arg(0, "row")
- .Apply(1, sessionKeySelector)
- .With(0, "row")
- .With(1)
- .Apply(sessionInit)
- .With(0, "row")
- .Seal()
- .Done()
- .Seal()
- .Apply(2, sessionInit)
- .With(0, "row")
- .Seal()
- .Apply(3, partitionKeySelector)
- .With(0, "row")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- auto newPartitionLambda = ctx.Builder(pos)
- .Lambda()
- .Param("row")
- .Param("prevBigState")
- .Callable("AggrNotEquals")
- .Apply(0, partitionKeySelector)
- .With(0, "row")
- .Seal()
- .Apply(1, partitionKeySelector)
- .With(0)
- .Apply(extractTupleItem(0))
- .With(0, "prevBigState")
- .Seal()
- .Done()
- .Seal()
- .Seal()
- .Seal()
- .Build();
- auto newSessionOrUpdatedStateLambda = [&](bool newSession) {
- return ctx.Builder(pos)
- .Lambda()
- .Param("row")
- .Param("prevBigState")
- .Apply(extractTupleItem(newSession ? 0 : 1))
- .With(0)
- .Apply(sessionUpdate)
- .With(0, "row")
- .With(1)
- .Apply(extractTupleItem(2))
- .With(0, "prevBigState")
- .Seal()
- .Done()
- .Seal()
- .Done()
- .Seal()
- .Seal()
- .Build();
- };
- return ctx.Builder(pos)
- .Lambda()
- .Param("input")
- .Callable("Chain1Map")
- .Arg(0, "input")
- .Add(1, initLambda)
- .Lambda(2)
- .Param("row")
- .Param("prevBigState")
- .Callable("If")
- .Apply(0, newPartitionLambda)
- .With(0, "row")
- .With(1, "prevBigState")
- .Seal()
- .Apply(1, initLambda)
- .With(0, "row")
- .Seal()
- .List(2)
- .Arg(0, "row")
- .Callable(1, "If")
- .Apply(0, newSessionOrUpdatedStateLambda(/* newSession = */ true))
- .With(0, "row")
- .With(1, "prevBigState")
- .Seal()
- .Apply(1, sessionKeySelector)
- .With(0, "row")
- .With(1)
- .Apply(newSessionOrUpdatedStateLambda(/* newSession = */ false))
- .With(0, "row")
- .With(1, "prevBigState")
- .Seal()
- .Done()
- .Seal()
- .Apply(2, extractTupleItem(1))
- .With(0, "prevBigState")
- .Seal()
- .Seal()
- .Apply(2, newSessionOrUpdatedStateLambda(/* newSession = */ false))
- .With(0, "row")
- .With(1, "prevBigState")
- .Seal()
- .Apply(3, partitionKeySelector)
- .With(0, "row")
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
- TExprNode::TPtr AddSessionParamsMemberLambda(TPositionHandle pos,
- TStringBuf sessionStartMemberName, const TExprNode::TPtr& partitionKeySelector,
- const TSessionWindowParams& sessionWindowParams, TExprContext& ctx)
- {
- return AddSessionParamsMemberLambda(pos, sessionStartMemberName, "", partitionKeySelector,
- sessionWindowParams.Key, sessionWindowParams.Init, sessionWindowParams.Update, ctx);
- }
- TExprNode::TPtr AddSessionParamsMemberLambda(TPositionHandle pos,
- TStringBuf sessionStartMemberName, TStringBuf sessionParamsMemberName,
- const TExprNode::TPtr& partitionKeySelector,
- const TExprNode::TPtr& sessionKeySelector, const TExprNode::TPtr& sessionInit,
- const TExprNode::TPtr& sessionUpdate, TExprContext& ctx)
- {
- YQL_ENSURE(sessionStartMemberName);
- TExprNode::TPtr addLambda = ctx.Builder(pos)
- .Lambda()
- .Param("tupleOfItemAndSessionParams")
- .Callable("AddMember")
- .Callable(0, "Nth")
- .Arg(0, "tupleOfItemAndSessionParams")
- .Atom(1, "0", TNodeFlags::Default)
- .Seal()
- .Atom(1, sessionStartMemberName)
- .Callable(2, "Nth")
- .Arg(0, "tupleOfItemAndSessionParams")
- .Atom(1, "1", TNodeFlags::Default)
- .Seal()
- .Seal()
- .Seal()
- .Build();
- if (sessionParamsMemberName) {
- addLambda = ctx.Builder(pos)
- .Lambda()
- .Param("tupleOfItemAndSessionParams")
- .Callable("AddMember")
- .Apply(0, addLambda)
- .With(0, "tupleOfItemAndSessionParams")
- .Seal()
- .Atom(1, sessionParamsMemberName)
- .Callable(2, "AsStruct")
- .List(0)
- .Atom(0, "start", TNodeFlags::Default)
- .Callable(1, "Nth")
- .Arg(0, "tupleOfItemAndSessionParams")
- .Atom(1, "1", TNodeFlags::Default)
- .Seal()
- .Seal()
- .List(1)
- .Atom(0, "state", TNodeFlags::Default)
- .Callable(1, "Nth")
- .Arg(0, "tupleOfItemAndSessionParams")
- .Atom(1, "2", TNodeFlags::Default)
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
- return ctx.Builder(pos)
- .Lambda()
- .Param("input")
- .Callable("OrderedMap")
- .Apply(0, ZipWithSessionParamsLambda(pos, partitionKeySelector, sessionKeySelector, sessionInit, sessionUpdate, ctx))
- .With(0, "input")
- .Seal()
- .Add(1, addLambda)
- .Seal()
- .Seal()
- .Build();
- }
- void TSessionWindowParams::Reset()
- {
- Traits = {};
- Key = {};
- KeyType = nullptr;
- ParamsType = nullptr;
- Init = {};
- Update = {};
- SortTraits = {};
- }
- }
|