yql_opt_window.cpp 144 KB


  1. #include "yql_opt_window.h"
  2. #include "yql_opt_utils.h"
  3. #include "yql_expr_type_annotation.h"
  4. #include <yql/essentials/core/yql_expr_optimize.h>
  5. #include <yql/essentials/utils/log/log.h>
  6. namespace NYql {
  7. using namespace NNodes;
  8. namespace {
  9. const TStringBuf SessionStartMemberName = "_yql_window_session_start";
  10. const TStringBuf SessionParamsMemberName = "_yql_window_session_params";
  11. enum class EFrameBoundsType : ui8 {
  12. EMPTY,
  13. LAGGING,
  14. CURRENT,
  15. LEADING,
  16. FULL,
  17. GENERIC,
  18. };
  19. EFrameBoundsType FrameBoundsType(const TWindowFrameSettings& settings) {
  20. auto first = settings.GetFirstOffset();
  21. auto last = settings.GetLastOffset();
  22. if (first.Defined() && last.Defined() && first > last) {
  23. return EFrameBoundsType::EMPTY;
  24. }
  25. if (!first.Defined()) {
  26. if (!last.Defined()) {
  27. return EFrameBoundsType::FULL;
  28. }
  29. if (*last < 0) {
  30. return EFrameBoundsType::LAGGING;
  31. }
  32. return *last > 0 ? EFrameBoundsType::LEADING : EFrameBoundsType::CURRENT;
  33. }
  34. return EFrameBoundsType::GENERIC;
  35. }
  36. TExprNode::TPtr ReplaceLastLambdaArgWithUnsignedLiteral(const TExprNode& lambda, ui32 literal, TExprContext& ctx) {
  37. YQL_ENSURE(lambda.IsLambda());
  38. TExprNodeList args = lambda.ChildPtr(0)->ChildrenList();
  39. YQL_ENSURE(!args.empty());
  40. auto literalNode = ctx.Builder(lambda.Pos())
  41. .Callable("Uint32")
  42. .Atom(0, literal)
  43. .Seal()
  44. .Build();
  45. auto newBody = ctx.ReplaceNodes(lambda.ChildPtr(1), {{args.back().Get(), literalNode}});
  46. args.pop_back();
  47. return ctx.NewLambda(lambda.Pos(), ctx.NewArguments(lambda.Pos(), std::move(args)), std::move(newBody));
  48. }
  49. TExprNode::TPtr ReplaceFirstLambdaArgWithCastStruct(const TExprNode& lambda, const TTypeAnnotationNode& targetType, TExprContext& ctx) {
  50. YQL_ENSURE(lambda.IsLambda());
  51. YQL_ENSURE(targetType.GetKind() == ETypeAnnotationKind::Struct);
  52. TExprNodeList args = lambda.ChildPtr(0)->ChildrenList();
  53. YQL_ENSURE(!args.empty());
  54. auto newArg = ctx.NewArgument(lambda.Pos(), "row");
  55. auto cast = ctx.Builder(lambda.Pos())
  56. .Callable("MatchType")
  57. .Add(0, newArg)
  58. .Atom(1, "Optional", TNodeFlags::Default)
  59. .Lambda(2)
  60. .Param("row")
  61. .Callable("Map")
  62. .Arg(0, "row")
  63. .Lambda(1)
  64. .Param("unwrapped")
  65. .Callable("CastStruct")
  66. .Arg(0, "unwrapped")
  67. .Add(1, ExpandType(lambda.Pos(), targetType, ctx))
  68. .Seal()
  69. .Seal()
  70. .Seal()
  71. .Seal()
  72. .Lambda(3)
  73. .Param("row")
  74. .Callable("CastStruct")
  75. .Arg(0, "row")
  76. .Add(1, ExpandType(lambda.Pos(), targetType, ctx))
  77. .Seal()
  78. .Seal()
  79. .Seal()
  80. .Build();
  81. auto newBody = ctx.ReplaceNodes(lambda.ChildPtr(1), {{args.front().Get(), cast}});
  82. args[0] = newArg;
  83. return ctx.NewLambda(lambda.Pos(), ctx.NewArguments(lambda.Pos(), std::move(args)), std::move(newBody));
  84. }
  85. TExprNode::TPtr AddOptionalIfNotAlreadyOptionalOrNull(const TExprNode::TPtr& lambda, TExprContext& ctx) {
  86. YQL_ENSURE(lambda->IsLambda());
  87. YQL_ENSURE(lambda->ChildPtr(0)->ChildrenSize() == 1);
  88. auto identity = MakeIdentityLambda(lambda->Pos(), ctx);
  89. return ctx.Builder(lambda->Pos())
  90. .Lambda()
  91. .Param("arg")
  92. .Callable("MatchType")
  93. .Apply(0, lambda)
  94. .With(0, "arg")
  95. .Seal()
  96. .Atom(1, "Optional", TNodeFlags::Default)
  97. .Add(2, identity)
  98. .Atom(3, "Null", TNodeFlags::Default)
  99. .Add(4, identity)
  100. .Atom(5, "Pg", TNodeFlags::Default)
  101. .Add(6, identity)
  102. .Lambda(7)
  103. .Param("result")
  104. .Callable("Just")
  105. .Arg(0, "result")
  106. .Seal()
  107. .Seal()
  108. .Seal()
  109. .Seal()
  110. .Build();
  111. }
  112. struct TRawTrait {
  113. TPositionHandle Pos;
  114. // Init/Update/Default are set only for aggregations
  115. TExprNode::TPtr InitLambda;
  116. TExprNode::TPtr UpdateLambda;
  117. TExprNode::TPtr DefaultValue;
  118. TExprNode::TPtr CalculateLambda;
  119. TMaybe<i64> CalculateLambdaLead; // lead/lag for input to CalculateLambda;
  120. TVector<TExprNode::TPtr> Params; // NTile
  121. const TTypeAnnotationNode* OutputType = nullptr;
  122. TWindowFrameSettings FrameSettings;
  123. };
  124. struct TCalcOverWindowTraits {
  125. TMap<TStringBuf, TRawTrait> RawTraits;
  126. ui64 MaxDataOutpace = 0;
  127. ui64 MaxDataLag = 0;
  128. ui64 MaxUnboundedPrecedingLag = 0;
  129. const TTypeAnnotationNode* LagQueueItemType = nullptr;
  130. };
  131. TExprNode::TPtr ApplyDistinctForInitLambda(TExprNode::TPtr initLambda, const TStringBuf& distinctKey, const TTypeAnnotationNode& distinctKeyType, const TTypeAnnotationNode& distinctKeyOrigType, TExprContext& ctx) {
  132. bool hasParent = initLambda->Child(0)->ChildrenSize() == 2;
  133. bool distinctKeyIsStruct = distinctKeyOrigType.GetKind() == ETypeAnnotationKind::Struct;
  134. auto expandedDistinctKeyType = ExpandType(initLambda->Pos(), distinctKeyType, ctx);
  135. auto expandedDistinctKeyOrigType = ExpandType(initLambda->Pos(), distinctKeyOrigType, ctx);
  136. auto setCreateUdf = ctx.Builder(initLambda->Pos())
  137. .Callable("Udf")
  138. .Atom(0, "Set.Create")
  139. .Callable(1, "Void").Seal()
  140. .Callable(2, "TupleType")
  141. .Callable(0, "VoidType").Seal()
  142. .Callable(1, "VoidType").Seal()
  143. .Add(2, expandedDistinctKeyOrigType)
  144. .Seal()
  145. .Seal()
  146. .Build();
  147. auto setCreateLambda = ctx.Builder(initLambda->Pos())
  148. .Lambda()
  149. .Param("value")
  150. .Param("parent")
  151. .Callable("NamedApply")
  152. .Add(0, setCreateUdf)
  153. .List(1)
  154. .Arg(0, "value")
  155. .Callable(1, "Uint32")
  156. .Atom(0, 0)
  157. .Seal()
  158. .Seal()
  159. .Callable(2, "AsStruct").Seal()
  160. .Callable(3, "DependsOn")
  161. .Arg(0, "parent")
  162. .Seal()
  163. .Seal()
  164. .Seal()
  165. .Build();
  166. initLambda = ctx.Builder(initLambda->Pos())
  167. .Lambda()
  168. .Param("value")
  169. .Param("parent")
  170. .List()
  171. // aggregation state
  172. .Apply(0, initLambda)
  173. .Do([&](TExprNodeReplaceBuilder& builder) -> TExprNodeReplaceBuilder& {
  174. if (distinctKeyIsStruct) {
  175. return builder
  176. .With(0)
  177. .Callable("CastStruct")
  178. .Arg(0, "value")
  179. .Add(1, expandedDistinctKeyType)
  180. .Seal()
  181. .Done();
  182. } else {
  183. return builder.With(0, "value");
  184. }
  185. })
  186. .Do([&](TExprNodeReplaceBuilder& builder) -> TExprNodeReplaceBuilder& {
  187. return hasParent ? builder.With(1, "parent") : builder;
  188. })
  189. .Seal()
  190. // distinct set state
  191. .Apply(1, setCreateLambda)
  192. .With(0, "value")
  193. .With(1, "parent")
  194. .Seal()
  195. .Seal()
  196. .Seal()
  197. .Build();
  198. return ctx.Builder(initLambda->Pos())
  199. .Lambda()
  200. .Param("row")
  201. .Param("parent")
  202. .Apply(initLambda)
  203. .With(0)
  204. .Callable("Member")
  205. .Arg(0, "row")
  206. .Atom(1, distinctKey)
  207. .Seal()
  208. .Done()
  209. .With(1, "parent")
  210. .Seal()
  211. .Seal()
  212. .Build();
  213. }
  214. TExprNode::TPtr ApplyDistinctForUpdateLambda(TExprNode::TPtr updateLambda, const TStringBuf& distinctKey, const TTypeAnnotationNode& distinctKeyType, const TTypeAnnotationNode& distinctKeyOrigType, TExprContext& ctx) {
  215. bool hasParent = updateLambda->Child(0)->ChildrenSize() == 3;
  216. bool distinctKeyIsStruct = distinctKeyOrigType.GetKind() == ETypeAnnotationKind::Struct;
  217. auto expandedDistinctKeyType = ExpandType(updateLambda->Pos(), distinctKeyType, ctx);
  218. auto expandedDistinctKeyOrigType = ExpandType(updateLambda->Pos(), distinctKeyOrigType, ctx);
  219. auto setAddValueUdf = ctx.Builder(updateLambda->Pos())
  220. .Callable("Udf")
  221. .Atom(0, "Set.AddValue")
  222. .Callable(1, "Void").Seal()
  223. .Callable(2, "TupleType")
  224. .Callable(0, "VoidType").Seal()
  225. .Callable(1, "VoidType").Seal()
  226. .Add(2, expandedDistinctKeyOrigType)
  227. .Seal()
  228. .Seal()
  229. .Build();
  230. auto setWasChangedUdf = ctx.Builder(updateLambda->Pos())
  231. .Callable("Udf")
  232. .Atom(0, "Set.WasChanged")
  233. .Callable(1, "Void").Seal()
  234. .Callable(2, "TupleType")
  235. .Callable(0, "VoidType").Seal()
  236. .Callable(1, "VoidType").Seal()
  237. .Add(2, expandedDistinctKeyOrigType)
  238. .Seal()
  239. .Seal()
  240. .Build();
  241. auto setInsertLambda = ctx.Builder(updateLambda->Pos())
  242. .Lambda()
  243. .Param("set")
  244. .Param("value")
  245. .Param("parent")
  246. .Callable("NamedApply")
  247. .Add(0, setAddValueUdf)
  248. .List(1)
  249. .Arg(0, "set")
  250. .Arg(1, "value")
  251. .Seal()
  252. .Callable(2, "AsStruct").Seal()
  253. .Callable(3, "DependsOn")
  254. .Arg(0, "parent")
  255. .Seal()
  256. .Seal()
  257. .Seal()
  258. .Build();
  259. auto setWasChangedLambda = ctx.Builder(updateLambda->Pos())
  260. .Lambda()
  261. .Param("set")
  262. .Param("parent")
  263. .Callable("NamedApply")
  264. .Add(0, setWasChangedUdf)
  265. .List(1)
  266. .Arg(0, "set")
  267. .Seal()
  268. .Callable(2, "AsStruct").Seal()
  269. .Callable(3, "DependsOn")
  270. .Arg(0, "parent")
  271. .Seal()
  272. .Seal()
  273. .Seal()
  274. .Build();
  275. updateLambda = ctx.Builder(updateLambda->Pos())
  276. .Lambda()
  277. .Param("value")
  278. .Param("state")
  279. .Param("parent")
  280. .Callable("If")
  281. // condition
  282. .Apply(0, setWasChangedLambda)
  283. .With(0)
  284. .Apply(setInsertLambda)
  285. .With(0)
  286. .Callable("Nth")
  287. .Arg(0, "state")
  288. .Atom(1, 1)
  289. .Seal()
  290. .Done()
  291. .With(1, "value")
  292. .With(2, "parent")
  293. .Seal()
  294. .Done()
  295. .With(1, "parent")
  296. .Seal()
  297. // new state
  298. .List(1)
  299. // aggregation state
  300. .Apply(0, updateLambda)
  301. .Do([&](TExprNodeReplaceBuilder& builder) -> TExprNodeReplaceBuilder& {
  302. if (distinctKeyIsStruct) {
  303. return builder
  304. .With(0)
  305. .Callable("CastStruct")
  306. .Arg(0, "value")
  307. .Add(1, expandedDistinctKeyType)
  308. .Seal()
  309. .Done();
  310. } else {
  311. return builder.With(0, "value");
  312. }
  313. })
  314. .With(1)
  315. .Callable("Nth")
  316. .Arg(0, "state")
  317. .Atom(1, 0)
  318. .Seal()
  319. .Done()
  320. .Do([&](TExprNodeReplaceBuilder& builder) -> TExprNodeReplaceBuilder& {
  321. return hasParent ? builder.With(2, "parent") : builder;
  322. })
  323. .Seal()
  324. // distinct set state
  325. .Apply(1, setInsertLambda)
  326. .With(0)
  327. .Callable("Nth")
  328. .Arg(0, "state")
  329. .Atom(1, 1)
  330. .Seal()
  331. .Done()
  332. .With(1, "value")
  333. .With(2, "parent")
  334. .Seal()
  335. .Seal()
  336. // old state
  337. .Arg(2, "state")
  338. .Seal()
  339. .Seal()
  340. .Build();
  341. return ctx.Builder(updateLambda->Pos())
  342. .Lambda()
  343. .Param("row")
  344. .Param("state")
  345. .Param("parent")
  346. .Apply(updateLambda)
  347. .With(0)
  348. .Callable("Member")
  349. .Arg(0, "row")
  350. .Atom(1, distinctKey)
  351. .Seal()
  352. .Done()
  353. .With(1, "state")
  354. .With(2, "parent")
  355. .Seal()
  356. .Seal()
  357. .Build();
  358. }
  359. TExprNode::TPtr ApplyDistinctForCalculateLambda(TExprNode::TPtr calculateLambda, TExprContext& ctx) {
  360. return ctx.Builder(calculateLambda->Pos())
  361. .Lambda()
  362. .Param("state")
  363. .Apply(calculateLambda)
  364. .With(0)
  365. .Callable("Nth")
  366. .Arg(0, "state")
  367. .Atom(1, 0)
  368. .Seal()
  369. .Done()
  370. .Seal()
  371. .Seal()
  372. .Build();
  373. }
  374. TCalcOverWindowTraits ExtractCalcOverWindowTraits(const TExprNode::TPtr& frames, const TStructExprType& rowType, TExprContext& ctx) {
  375. TCalcOverWindowTraits result;
  376. auto& maxDataOutpace = result.MaxDataOutpace;
  377. auto& maxDataLag = result.MaxDataLag;
  378. auto& maxUnboundedPrecedingLag = result.MaxUnboundedPrecedingLag;
  379. TVector<const TItemExprType*> lagQueueStructItems;
  380. for (auto& winOn : frames->ChildrenList()) {
  381. TWindowFrameSettings frameSettings = TWindowFrameSettings::Parse(*winOn, ctx);
  382. ui64 frameOutpace = 0;
  383. ui64 frameLag = 0;
  384. const EFrameType ft = frameSettings.GetFrameType();
  385. if (ft == EFrameType::FrameByRows) {
  386. const EFrameBoundsType frameType = FrameBoundsType(frameSettings);
  387. const auto frameFirst = frameSettings.GetFirstOffset();
  388. const auto frameLast = frameSettings.GetLastOffset();
  389. if (frameType != EFrameBoundsType::EMPTY) {
  390. if (!frameLast.Defined() || *frameLast > 0) {
  391. frameOutpace = frameLast.Defined() ? ui64(*frameLast) : Max<ui64>();
  392. }
  393. if (frameFirst.Defined() && *frameFirst < 0) {
  394. frameLag = ui64(0 - *frameFirst);
  395. }
  396. }
  397. } else {
  398. // The only frame we currently support
  399. YQL_ENSURE(ft == EFrameType::FrameByRange);
  400. YQL_ENSURE(IsUnbounded(frameSettings.GetFirst()));
  401. YQL_ENSURE(IsCurrentRow(frameSettings.GetLast()));
  402. }
  403. const auto& winOnChildren = winOn->ChildrenList();
  404. YQL_ENSURE(winOnChildren.size() > 1);
  405. for (size_t i = 1; i < winOnChildren.size(); ++i) {
  406. auto item = winOnChildren[i];
  407. YQL_ENSURE(item->IsList());
  408. auto nameNode = item->Child(0);
  409. YQL_ENSURE(nameNode->IsAtom());
  410. TStringBuf name = nameNode->Content();
  411. YQL_ENSURE(!result.RawTraits.contains(name));
  412. auto traits = item->Child(1);
  413. auto& rawTraits = result.RawTraits[name];
  414. rawTraits.FrameSettings = frameSettings;
  415. rawTraits.Pos = traits->Pos();
  416. YQL_ENSURE(traits->IsCallable({"WindowTraits","CumeDist"}) || ft == EFrameType::FrameByRows, "Non-canonical frame for window functions");
  417. if (traits->IsCallable("WindowTraits")) {
  418. maxDataOutpace = Max(maxDataOutpace, frameOutpace);
  419. maxDataLag = Max(maxDataLag, frameLag);
  420. auto initLambda = traits->ChildPtr(1);
  421. auto updateLambda = traits->ChildPtr(2);
  422. auto calculateLambda = traits->ChildPtr(4);
  423. rawTraits.OutputType = calculateLambda->GetTypeAnn();
  424. YQL_ENSURE(rawTraits.OutputType);
  425. auto lambdaInputType = traits->Child(0)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
  426. if (item->ChildrenSize() == 3) {
  427. auto distinctKey = item->Child(2)->Content();
  428. auto distinctKeyOrigType = rowType.FindItemType(distinctKey);
  429. YQL_ENSURE(distinctKeyOrigType);
  430. initLambda = ApplyDistinctForInitLambda(initLambda, distinctKey, *lambdaInputType, *distinctKeyOrigType, ctx);
  431. updateLambda = ApplyDistinctForUpdateLambda(updateLambda, distinctKey, *lambdaInputType, *distinctKeyOrigType, ctx);
  432. calculateLambda = ApplyDistinctForCalculateLambda(calculateLambda, ctx);
  433. } else {
  434. initLambda = ReplaceFirstLambdaArgWithCastStruct(*initLambda, *lambdaInputType, ctx);
  435. updateLambda = ReplaceFirstLambdaArgWithCastStruct(*updateLambda, *lambdaInputType, ctx);
  436. }
  437. if (initLambda->Child(0)->ChildrenSize() == 2) {
  438. initLambda = ReplaceLastLambdaArgWithUnsignedLiteral(*initLambda, i, ctx);
  439. }
  440. if (updateLambda->Child(0)->ChildrenSize() == 3) {
  441. updateLambda = ReplaceLastLambdaArgWithUnsignedLiteral(*updateLambda, i, ctx);
  442. }
  443. rawTraits.InitLambda = initLambda;
  444. rawTraits.UpdateLambda = updateLambda;
  445. rawTraits.CalculateLambda = calculateLambda;
  446. rawTraits.DefaultValue = traits->ChildPtr(5);
  447. if (ft == EFrameType::FrameByRows) {
  448. const EFrameBoundsType frameType = FrameBoundsType(frameSettings);
  449. const auto frameLast = frameSettings.GetLastOffset();
  450. if (frameType == EFrameBoundsType::LAGGING) {
  451. maxUnboundedPrecedingLag = Max(maxUnboundedPrecedingLag, ui64(abs(*frameLast)));
  452. lagQueueStructItems.push_back(ctx.MakeType<TItemExprType>(name, rawTraits.OutputType));
  453. }
  454. }
  455. } else if (traits->IsCallable({"Lead", "Lag"})) {
  456. i64 lead = 1;
  457. if (traits->ChildrenSize() == 3) {
  458. YQL_ENSURE(traits->Child(2)->IsCallable("Int64"));
  459. lead = FromString<i64>(traits->Child(2)->Child(0)->Content());
  460. }
  461. if (traits->IsCallable("Lag")) {
  462. lead = -lead;
  463. }
  464. if (lead < 0) {
  465. maxDataLag = Max(maxDataLag, ui64(abs(lead)));
  466. } else {
  467. maxDataOutpace = Max<ui64>(maxDataOutpace, lead);
  468. }
  469. auto lambdaInputType =
  470. traits->Child(0)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TListExprType>()->GetItemType();
  471. rawTraits.CalculateLambda = ReplaceFirstLambdaArgWithCastStruct(*traits->Child(1), *lambdaInputType, ctx);
  472. rawTraits.CalculateLambdaLead = lead;
  473. rawTraits.OutputType = traits->Child(1)->GetTypeAnn();
  474. YQL_ENSURE(rawTraits.OutputType);
  475. } else if (traits->IsCallable({"Rank", "DenseRank", "PercentRank"})) {
  476. rawTraits.OutputType = traits->Child(1)->GetTypeAnn();
  477. auto lambdaInputType =
  478. traits->Child(0)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TListExprType>()->GetItemType();
  479. auto lambda = ReplaceFirstLambdaArgWithCastStruct(*traits->Child(1), *lambdaInputType, ctx);
  480. rawTraits.CalculateLambda = ctx.ChangeChild(*traits, 1, std::move(lambda));
  481. } else {
  482. YQL_ENSURE(traits->IsCallable({"RowNumber","CumeDist","NTile"}));
  483. rawTraits.CalculateLambda = traits;
  484. rawTraits.OutputType = traits->GetTypeAnn();
  485. for (ui32 i = 1; i < traits->ChildrenSize(); ++i) {
  486. rawTraits.Params.push_back(traits->ChildPtr(i));
  487. }
  488. }
  489. }
  490. }
  491. result.LagQueueItemType = ctx.MakeType<TStructExprType>(lagQueueStructItems);
  492. return result;
  493. }
  494. TExprNode::TPtr BuildUint64(TPositionHandle pos, ui64 value, TExprContext& ctx) {
  495. return ctx.Builder(pos)
  496. .Callable("Uint64")
  497. .Atom(0, ToString(value))
  498. .Seal()
  499. .Build();
  500. }
  501. TExprNode::TPtr BuildDouble(TPositionHandle pos, double value, TExprContext& ctx) {
  502. return ctx.Builder(pos)
  503. .Callable("Double")
  504. .Atom(0, ToString(value))
  505. .Seal()
  506. .Build();
  507. }
  508. TExprNode::TPtr BuildQueuePeek(TPositionHandle pos, const TExprNode::TPtr& queue, ui64 index, const TExprNode::TPtr& dependsOn,
  509. TExprContext& ctx)
  510. {
  511. return ctx.Builder(pos)
  512. .Callable("QueuePeek")
  513. .Add(0, queue)
  514. .Add(1, BuildUint64(pos, index, ctx))
  515. .Callable(2, "DependsOn")
  516. .Add(0, dependsOn)
  517. .Seal()
  518. .Seal()
  519. .Build();
  520. }
  521. TExprNode::TPtr BuildQueueRange(TPositionHandle pos, const TExprNode::TPtr& queue, ui64 begin, ui64 end,
  522. const TExprNode::TPtr& dependsOn, TExprContext& ctx)
  523. {
  524. return ctx.Builder(pos)
  525. .Callable("FlatMap")
  526. .Callable(0, "QueueRange")
  527. .Add(0, queue)
  528. .Add(1, BuildUint64(pos, begin, ctx))
  529. .Add(2, BuildUint64(pos, end, ctx))
  530. .Callable(3, "DependsOn")
  531. .Add(0, dependsOn)
  532. .Seal()
  533. .Seal()
  534. .Lambda(1)
  535. .Param("item")
  536. .Arg("item")
  537. .Seal()
  538. .Seal()
  539. .Build();
  540. }
  541. TExprNode::TPtr BuildQueue(TPositionHandle pos, const TTypeAnnotationNode& itemType, ui64 queueSize, ui64 initSize,
  542. const TExprNode::TPtr& dependsOn, TExprContext& ctx)
  543. {
  544. TExprNode::TPtr size;
  545. if (queueSize == Max<ui64>()) {
  546. size = ctx.NewCallable(pos, "Void", {});
  547. } else {
  548. size = BuildUint64(pos, queueSize, ctx);
  549. }
  550. return ctx.Builder(pos)
  551. .Callable("QueueCreate")
  552. .Add(0, ExpandType(pos, itemType, ctx))
  553. .Add(1, size)
  554. .Add(2, BuildUint64(pos, initSize, ctx))
  555. .Callable(3, "DependsOn")
  556. .Add(0, dependsOn)
  557. .Seal()
  558. .Seal()
  559. .Build();
  560. }
  561. TExprNode::TPtr CoalesceQueueOutput(TPositionHandle pos, const TExprNode::TPtr& output, bool rawOutputIsOptional,
  562. const TExprNode::TPtr& defaultValue, TExprContext& ctx)
  563. {
  564. // output is has type Optional<RawOutputType>
  565. if (!rawOutputIsOptional) {
  566. return ctx.Builder(pos)
  567. .Callable("Coalesce")
  568. .Add(0, output)
  569. .Add(1, defaultValue)
  570. .Seal()
  571. .Build();
  572. }
  573. return ctx.Builder(pos)
  574. .Callable("IfPresent")
  575. .Add(0, output)
  576. .Lambda(1)
  577. .Param("item")
  578. .Callable("Coalesce")
  579. .Arg(0, "item")
  580. .Add(1, defaultValue)
  581. .Seal()
  582. .Seal()
  583. .Add(2, defaultValue)
  584. .Seal()
  585. .Build();
  586. }
  587. TExprNode::TPtr WrapWithWinContext(const TExprNode::TPtr& input, TExprContext& ctx) {
  588. if (HasContextFuncs(*input)) {
  589. return ctx.Builder(input->Pos())
  590. .Callable("WithContext")
  591. .Add(0, input)
  592. .Atom(1, "WinAgg", TNodeFlags::Default)
  593. .Seal()
  594. .Build();
  595. }
  596. return input;
  597. }
  598. TExprNode::TPtr BuildInitLambdaForChain1Map(TPositionHandle pos, const TExprNode::TPtr& initStateLambda,
  599. const TExprNode::TPtr& calculateLambda, TExprContext& ctx)
  600. {
  601. return ctx.Builder(pos)
  602. .Lambda()
  603. .Param("row")
  604. .List()
  605. .Do([&](TExprNodeBuilder& parent)->TExprNodeBuilder& {
  606. if (calculateLambda->Head().ChildrenSize() == 1) {
  607. parent.Apply(0, calculateLambda)
  608. .With(0)
  609. .Apply(initStateLambda)
  610. .With(0, "row")
  611. .Seal()
  612. .Done()
  613. .Seal();
  614. } else {
  615. parent.Apply(0, calculateLambda)
  616. .With(0)
  617. .Apply(initStateLambda)
  618. .With(0, "row")
  619. .Seal()
  620. .Done()
  621. .With(1, "row")
  622. .Seal();
  623. }
  624. return parent;
  625. })
  626. .Apply(1, initStateLambda)
  627. .With(0, "row")
  628. .Seal()
  629. .Seal()
  630. .Seal()
  631. .Build();
  632. }
  633. TExprNode::TPtr BuildUpdateLambdaForChain1Map(TPositionHandle pos, const TExprNode::TPtr& updateStateLambda,
  634. const TExprNode::TPtr& calculateLambda, TExprContext& ctx)
  635. {
  636. return ctx.Builder(pos)
  637. .Lambda()
  638. .Param("row")
  639. .Param("state")
  640. .List()
  641. .Do([&](TExprNodeBuilder& parent)->TExprNodeBuilder& {
  642. if (calculateLambda->Head().ChildrenSize() == 1) {
  643. parent.Apply(0, calculateLambda)
  644. .With(0)
  645. .Apply(updateStateLambda)
  646. .With(0, "row")
  647. .With(1, "state")
  648. .Seal()
  649. .Done()
  650. .Seal();
  651. } else {
  652. parent.Apply(0, calculateLambda)
  653. .With(0)
  654. .Apply(updateStateLambda)
  655. .With(0, "row")
  656. .With(1, "state")
  657. .Seal()
  658. .Done()
  659. .With(1, "row")
  660. .Seal();
  661. }
  662. return parent;
  663. })
  664. .Apply(1, updateStateLambda)
  665. .With(0, "row")
  666. .With(1, "state")
  667. .Seal()
  668. .Seal()
  669. .Seal()
  670. .Build();
  671. }
  672. class TChain1MapTraits : public TThrRefBase, public TNonCopyable {
  673. public:
  674. using TPtr = TIntrusivePtr<TChain1MapTraits>;
  675. TChain1MapTraits(TStringBuf name, TPositionHandle pos)
  676. : Name(name)
  677. , Pos(pos)
  678. {
  679. }
  680. TStringBuf GetName() const {
  681. return Name;
  682. }
  683. TPositionHandle GetPos() const {
  684. return Pos;
  685. }
  686. // Lambda(row) -> AsTuple(output, state)
  687. virtual TExprNode::TPtr BuildInitLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const = 0;
  688. // Lambda(row, state) -> AsTuple(output, state)
  689. virtual TExprNode::TPtr BuildUpdateLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const = 0;
  690. virtual TExprNode::TPtr ExtractLaggingOutput(const TExprNode::TPtr& lagQueue,
  691. const TExprNode::TPtr& dependsOn, TExprContext& ctx) const
  692. {
  693. Y_UNUSED(lagQueue);
  694. Y_UNUSED(dependsOn);
  695. Y_UNUSED(ctx);
  696. return {};
  697. }
  698. virtual ~TChain1MapTraits() = default;
  699. private:
  700. const TStringBuf Name;
  701. const TPositionHandle Pos;
  702. };
  703. class TChain1MapTraitsLagLead : public TChain1MapTraits {
  704. public:
  705. TChain1MapTraitsLagLead(TStringBuf name, const TRawTrait& raw, TMaybe<ui64> queueOffset)
  706. : TChain1MapTraits(name, raw.Pos)
  707. , QueueOffset(queueOffset)
  708. , LeadLagLambda(raw.CalculateLambda)
  709. {
  710. }
  711. // Lambda(row) -> AsTuple(output, state)
  712. TExprNode::TPtr BuildInitLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
  713. return ctx.Builder(GetPos())
  714. .Lambda()
  715. .Param("row")
  716. .List()
  717. .Apply(0, CalculateOutputLambda(dataQueue, ctx))
  718. .With(0, "row")
  719. .Seal()
  720. .Callable(1, "Void")
  721. .Seal()
  722. .Seal()
  723. .Seal()
  724. .Build();
  725. }
  726. // Lambda(row, state) -> AsTuple(output, state)
  727. TExprNode::TPtr BuildUpdateLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
  728. return ctx.Builder(GetPos())
  729. .Lambda()
  730. .Param("row")
  731. .Param("state")
  732. .List()
  733. .Apply(0, CalculateOutputLambda(dataQueue, ctx))
  734. .With(0, "row")
  735. .Seal()
  736. .Arg(1, "state")
  737. .Seal()
  738. .Seal()
  739. .Build();
  740. }
  741. private:
  742. TExprNode::TPtr CalculateOutputLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const {
  743. if (!QueueOffset.Defined()) {
  744. return AddOptionalIfNotAlreadyOptionalOrNull(LeadLagLambda, ctx);
  745. }
  746. YQL_ENSURE(dataQueue);
  747. auto rowArg = ctx.NewArgument(GetPos(), "row");
  748. auto body = ctx.Builder(GetPos())
  749. .Callable("IfPresent")
  750. .Add(0, BuildQueuePeek(GetPos(), dataQueue, *QueueOffset, rowArg, ctx))
  751. .Add(1, AddOptionalIfNotAlreadyOptionalOrNull(LeadLagLambda, ctx))
  752. .Callable(2, "Null")
  753. .Seal()
  754. .Seal()
  755. .Build();
  756. return ctx.NewLambda(GetPos(), ctx.NewArguments(GetPos(), {rowArg}), std::move(body));
  757. }
  758. const TMaybe<ui64> QueueOffset;
  759. const TExprNode::TPtr LeadLagLambda;
  760. };
  761. class TChain1MapTraitsRowNumber : public TChain1MapTraits {
  762. public:
  763. TChain1MapTraitsRowNumber(TStringBuf name, const TRawTrait& raw)
  764. : TChain1MapTraits(name, raw.Pos)
  765. {
  766. }
  767. // Lambda(row) -> AsTuple(output, state)
  768. TExprNode::TPtr BuildInitLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
  769. Y_UNUSED(dataQueue);
  770. return ctx.Builder(GetPos())
  771. .Lambda()
  772. .Param("row")
  773. .List()
  774. .Add(0, BuildUint64(GetPos(), 1, ctx))
  775. .Add(1, BuildUint64(GetPos(), 1, ctx))
  776. .Seal()
  777. .Seal()
  778. .Build();
  779. }
  780. // Lambda(row, state) -> AsTuple(output, state)
  781. TExprNode::TPtr BuildUpdateLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
  782. Y_UNUSED(dataQueue);
  783. return ctx.Builder(GetPos())
  784. .Lambda()
  785. .Param("row")
  786. .Param("state")
  787. .List()
  788. .Callable(0, "Inc")
  789. .Arg(0, "state")
  790. .Seal()
  791. .Callable(1, "Inc")
  792. .Arg(0, "state")
  793. .Seal()
  794. .Seal()
  795. .Seal()
  796. .Build();
  797. }
  798. };
  799. class TChain1MapTraitsCumeDist : public TChain1MapTraits {
  800. public:
  801. TChain1MapTraitsCumeDist(TStringBuf name, const TRawTrait& raw, const TString& partitionRowsColumn)
  802. : TChain1MapTraits(name, raw.Pos)
  803. , PartitionRowsColumn(partitionRowsColumn)
  804. {
  805. }
  806. // Lambda(row) -> AsTuple(output, state)
  807. TExprNode::TPtr BuildInitLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
  808. Y_UNUSED(dataQueue);
  809. return ctx.Builder(GetPos())
  810. .Lambda()
  811. .Param("row")
  812. .List()
  813. .Callable(0, "/")
  814. .Add(0, BuildDouble(GetPos(), 1.0, ctx))
  815. .Callable(1, "Member")
  816. .Arg(0, "row")
  817. .Atom(1, PartitionRowsColumn)
  818. .Seal()
  819. .Seal()
  820. .Add(1, BuildUint64(GetPos(), 1, ctx))
  821. .Seal()
  822. .Seal()
  823. .Build();
  824. }
  825. // Lambda(row, state) -> AsTuple(output, state)
  826. TExprNode::TPtr BuildUpdateLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
  827. Y_UNUSED(dataQueue);
  828. return ctx.Builder(GetPos())
  829. .Lambda()
  830. .Param("row")
  831. .Param("state")
  832. .List()
  833. .Callable(0, "/")
  834. .Callable(0, "SafeCast")
  835. .Callable(0, "Inc")
  836. .Arg(0, "state")
  837. .Seal()
  838. .Atom(1, "Double")
  839. .Seal()
  840. .Callable(1, "Member")
  841. .Arg(0, "row")
  842. .Atom(1, PartitionRowsColumn)
  843. .Seal()
  844. .Seal()
  845. .Callable(1, "Inc")
  846. .Arg(0, "state")
  847. .Seal()
  848. .Seal()
  849. .Seal()
  850. .Build();
  851. }
  852. private:
  853. const TString PartitionRowsColumn;
  854. };
  855. class TChain1MapTraitsNTile : public TChain1MapTraits {
  856. public:
  857. TChain1MapTraitsNTile(TStringBuf name, const TRawTrait& raw, const TString& partitionRowsColumn)
  858. : TChain1MapTraits(name, raw.Pos)
  859. , PartitionRowsColumn(partitionRowsColumn)
  860. {
  861. YQL_ENSURE(raw.Params.size() == 1);
  862. Param = raw.Params[0];
  863. }
  864. // Lambda(row) -> AsTuple(output, state)
  865. TExprNode::TPtr BuildInitLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
  866. Y_UNUSED(dataQueue);
  867. return ctx.Builder(GetPos())
  868. .Lambda()
  869. .Param("row")
  870. .List()
  871. .Add(0, BuildUint64(GetPos(), 1, ctx))
  872. .Add(1, BuildUint64(GetPos(), 1, ctx))
  873. .Seal()
  874. .Seal()
  875. .Build();
  876. }
  877. // Lambda(row, state) -> AsTuple(output, state)
  878. TExprNode::TPtr BuildUpdateLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
  879. Y_UNUSED(dataQueue);
  880. return ctx.Builder(GetPos())
  881. .Lambda()
  882. .Param("row")
  883. .Param("state")
  884. .List()
  885. .Callable(0, "Inc")
  886. .Callable(0, "Unwrap")
  887. .Callable(0, "/")
  888. .Callable(0, "*")
  889. .Callable(0, "SafeCast")
  890. .Add(0, Param)
  891. .Atom(1, "Uint64")
  892. .Seal()
  893. .Arg(1, "state")
  894. .Seal()
  895. .Callable(1, "Member")
  896. .Arg(0, "row")
  897. .Atom(1, PartitionRowsColumn)
  898. .Seal()
  899. .Seal()
  900. .Seal()
  901. .Seal()
  902. .Callable(1, "Inc")
  903. .Arg(0, "state")
  904. .Seal()
  905. .Seal()
  906. .Seal()
  907. .Build();
  908. }
  909. private:
  910. const TString PartitionRowsColumn;
  911. TExprNode::TPtr Param;
  912. };
  913. class TChain1MapTraitsRankBase : public TChain1MapTraits {
  914. public:
  915. TChain1MapTraitsRankBase(TStringBuf name, const TRawTrait& raw)
  916. : TChain1MapTraits(name, raw.Pos)
  917. , ExtractForCompareLambda(raw.CalculateLambda->ChildPtr(1))
  918. , Ansi(HasSetting(*raw.CalculateLambda->Child(2), "ansi"))
  919. , KeyType(raw.OutputType)
  920. {
  921. }
  922. virtual TExprNode::TPtr BuildCalculateLambda(TExprContext& ctx) const {
  923. return ctx.Builder(GetPos())
  924. .Lambda()
  925. .Param("state")
  926. .Callable("Nth")
  927. .Arg(0, "state")
  928. .Atom(1, "0")
  929. .Seal()
  930. .Seal()
  931. .Build();
  932. }
  933. // Lambda(row) -> AsTuple(output, state)
  934. TExprNode::TPtr BuildInitLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const final {
  935. Y_UNUSED(dataQueue);
  936. auto initKeyLambda = BuildRawInitLambda(ctx);
  937. if (!Ansi && KeyType->GetKind() == ETypeAnnotationKind::Optional) {
  938. auto stateType = GetStateType(KeyType->Cast<TOptionalExprType>()->GetItemType(), ctx);
  939. initKeyLambda = BuildOptKeyInitLambda(initKeyLambda, stateType, ctx);
  940. }
  941. auto initRowLambda = ctx.Builder(GetPos())
  942. .Lambda()
  943. .Param("row")
  944. .Apply(initKeyLambda)
  945. .With(0)
  946. .Apply(ExtractForCompareLambda)
  947. .With(0, "row")
  948. .Seal()
  949. .Done()
  950. .Seal()
  951. .Seal()
  952. .Build();
  953. return BuildInitLambdaForChain1Map(GetPos(), initRowLambda, BuildCalculateLambda(ctx), ctx);
  954. }
  955. // Lambda(row, state) -> AsTuple(output, state)
  956. TExprNode::TPtr BuildUpdateLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const final {
  957. Y_UNUSED(dataQueue);
  958. bool useAggrEquals = Ansi;
  959. auto updateKeyLambda = BuildRawUpdateLambda(useAggrEquals, ctx);
  960. if (!Ansi && KeyType->GetKind() == ETypeAnnotationKind::Optional) {
  961. auto stateType = GetStateType(KeyType->Cast<TOptionalExprType>()->GetItemType(), ctx);
  962. updateKeyLambda = ctx.Builder(GetPos())
  963. .Lambda()
  964. .Param("key")
  965. .Param("state")
  966. .Callable("IfPresent")
  967. .Arg(0, "state")
  968. .Lambda(1)
  969. .Param("unwrappedState")
  970. .Callable("IfPresent")
  971. .Arg(0, "key")
  972. .Lambda(1)
  973. .Param("unwrappedKey")
  974. .Callable("Just")
  975. .Apply(0, updateKeyLambda)
  976. .With(0, "unwrappedKey")
  977. .With(1, "unwrappedState")
  978. .Seal()
  979. .Seal()
  980. .Seal()
  981. .Callable(2, "Just")
  982. .Arg(0, "unwrappedState")
  983. .Seal()
  984. .Seal()
  985. .Seal()
  986. .Apply(2, BuildOptKeyInitLambda(BuildRawInitLambda(ctx), stateType, ctx))
  987. .With(0, "key")
  988. .Seal()
  989. .Seal()
  990. .Seal()
  991. .Build();
  992. }
  993. auto updateRowLambda = ctx.Builder(GetPos())
  994. .Lambda()
  995. .Param("row")
  996. .Param("state")
  997. .Apply(updateKeyLambda)
  998. .With(0)
  999. .Apply(ExtractForCompareLambda)
  1000. .With(0, "row")
  1001. .Seal()
  1002. .Done()
  1003. .With(1, "state")
  1004. .Seal()
  1005. .Seal()
  1006. .Build();
  1007. return BuildUpdateLambdaForChain1Map(GetPos(), updateRowLambda, BuildCalculateLambda(ctx), ctx);
  1008. }
  1009. virtual TExprNode::TPtr BuildRawInitLambda(TExprContext& ctx) const = 0;
  1010. virtual TExprNode::TPtr BuildRawUpdateLambda(bool useAggrEquals, TExprContext& ctx) const = 0;
  1011. virtual const TTypeAnnotationNode* GetStateType(const TTypeAnnotationNode* keyType, TExprContext& ctx) const = 0;
  1012. private:
  1013. TExprNode::TPtr BuildOptKeyInitLambda(const TExprNode::TPtr& rawInitKeyLambda,
  1014. const TTypeAnnotationNode* stateType, TExprContext& ctx) const
  1015. {
  1016. auto optStateType = ctx.MakeType<TOptionalExprType>(stateType);
  1017. return ctx.Builder(GetPos())
  1018. .Lambda()
  1019. .Param("key")
  1020. .Callable("IfPresent")
  1021. .Arg(0, "key")
  1022. .Lambda(1)
  1023. .Param("unwrapped")
  1024. .Callable("Just")
  1025. .Apply(0, rawInitKeyLambda)
  1026. .With(0, "unwrapped")
  1027. .Seal()
  1028. .Seal()
  1029. .Seal()
  1030. .Callable(2, "Nothing")
  1031. .Add(0, ExpandType(GetPos(), *optStateType, ctx))
  1032. .Seal()
  1033. .Seal()
  1034. .Seal()
  1035. .Build();
  1036. }
  1037. const TExprNode::TPtr ExtractForCompareLambda;
  1038. const bool Ansi;
  1039. const TTypeAnnotationNode* const KeyType;
  1040. };
  1041. class TChain1MapTraitsRank : public TChain1MapTraitsRankBase {
  1042. public:
  1043. TChain1MapTraitsRank(TStringBuf name, const TRawTrait& raw)
  1044. : TChain1MapTraitsRankBase(name, raw)
  1045. {
  1046. }
  1047. TExprNode::TPtr BuildRawInitLambda(TExprContext& ctx) const final {
  1048. auto one = BuildUint64(GetPos(), 1, ctx);
  1049. return ctx.Builder(GetPos())
  1050. .Lambda()
  1051. .Param("key")
  1052. .List()
  1053. .Add(0, one)
  1054. .Add(1, one)
  1055. .Arg(2, "key")
  1056. .Seal()
  1057. .Seal()
  1058. .Build();
  1059. }
  1060. TExprNode::TPtr BuildRawUpdateLambda(bool useAggrEquals, TExprContext& ctx) const final {
  1061. return ctx.Builder(GetPos())
  1062. .Lambda()
  1063. .Param("key")
  1064. .Param("state")
  1065. .List()
  1066. .Callable(0, "If")
  1067. .Callable(0, useAggrEquals ? "AggrEquals" : "==")
  1068. .Arg(0, "key")
  1069. .Callable(1, "Nth")
  1070. .Arg(0, "state")
  1071. .Atom(1, "2")
  1072. .Seal()
  1073. .Seal()
  1074. .Callable(1, "Nth")
  1075. .Arg(0, "state")
  1076. .Atom(1, "0")
  1077. .Seal()
  1078. .Callable(2, "Inc")
  1079. .Callable(0, "Nth")
  1080. .Arg(0, "state")
  1081. .Atom(1, "1")
  1082. .Seal()
  1083. .Seal()
  1084. .Seal()
  1085. .Callable(1, "Inc")
  1086. .Callable(0, "Nth")
  1087. .Arg(0, "state")
  1088. .Atom(1, "1")
  1089. .Seal()
  1090. .Seal()
  1091. .Arg(2, "key")
  1092. .Seal()
  1093. .Seal()
  1094. .Build();
  1095. }
  1096. const TTypeAnnotationNode* GetStateType(const TTypeAnnotationNode* keyType, TExprContext& ctx) const final {
  1097. return ctx.MakeType<TTupleExprType>(TTypeAnnotationNode::TListType{
  1098. ctx.MakeType<TDataExprType>(EDataSlot::Uint64),
  1099. ctx.MakeType<TDataExprType>(EDataSlot::Uint64),
  1100. keyType
  1101. });
  1102. }
  1103. };
  1104. class TChain1MapTraitsPercentRank : public TChain1MapTraitsRank {
  1105. public:
  1106. TChain1MapTraitsPercentRank(TStringBuf name, const TRawTrait& raw, const TString& partitionRowsColumn)
  1107. : TChain1MapTraitsRank(name, raw)
  1108. , PartitionRowsColumn(partitionRowsColumn)
  1109. {
  1110. }
  1111. virtual TExprNode::TPtr BuildCalculateLambda(TExprContext& ctx) const {
  1112. return ctx.Builder(GetPos())
  1113. .Lambda()
  1114. .Param("state")
  1115. .Param("row")
  1116. .Callable("/")
  1117. .Callable(0, "SafeCast")
  1118. .Callable(0, "Dec")
  1119. .Callable(0, "Nth")
  1120. .Arg(0, "state")
  1121. .Atom(1, "0")
  1122. .Seal()
  1123. .Seal()
  1124. .Atom(1, "Double")
  1125. .Seal()
  1126. .Callable(1, "Dec")
  1127. .Callable(0, "Member")
  1128. .Arg(0, "row")
  1129. .Atom(1, PartitionRowsColumn)
  1130. .Seal()
  1131. .Seal()
  1132. .Seal()
  1133. .Seal()
  1134. .Build();
  1135. }
  1136. private:
  1137. const TString PartitionRowsColumn;
  1138. };
  1139. class TChain1MapTraitsDenseRank : public TChain1MapTraitsRankBase {
  1140. public:
  1141. TChain1MapTraitsDenseRank(TStringBuf name, const TRawTrait& raw)
  1142. : TChain1MapTraitsRankBase(name, raw)
  1143. {
  1144. }
  1145. TExprNode::TPtr BuildRawInitLambda(TExprContext& ctx) const final {
  1146. return ctx.Builder(GetPos())
  1147. .Lambda()
  1148. .Param("key")
  1149. .List()
  1150. .Add(0, BuildUint64(GetPos(), 1, ctx))
  1151. .Arg(1, "key")
  1152. .Seal()
  1153. .Seal()
  1154. .Build();
  1155. }
  1156. TExprNode::TPtr BuildRawUpdateLambda(bool useAggrEquals, TExprContext& ctx) const final {
  1157. return ctx.Builder(GetPos())
  1158. .Lambda()
  1159. .Param("key")
  1160. .Param("state")
  1161. .List()
  1162. .Callable(0, "If")
  1163. .Callable(0, useAggrEquals ? "AggrEquals" : "==")
  1164. .Arg(0, "key")
  1165. .Callable(1, "Nth")
  1166. .Arg(0, "state")
  1167. .Atom(1, "1")
  1168. .Seal()
  1169. .Seal()
  1170. .Callable(1, "Nth")
  1171. .Arg(0, "state")
  1172. .Atom(1, "0")
  1173. .Seal()
  1174. .Callable(2, "Inc")
  1175. .Callable(0, "Nth")
  1176. .Arg(0, "state")
  1177. .Atom(1, "0")
  1178. .Seal()
  1179. .Seal()
  1180. .Seal()
  1181. .Arg(1, "key")
  1182. .Seal()
  1183. .Seal()
  1184. .Build();
  1185. }
  1186. const TTypeAnnotationNode* GetStateType(const TTypeAnnotationNode* keyType, TExprContext& ctx) const final {
  1187. return ctx.MakeType<TTupleExprType>(TTypeAnnotationNode::TListType{
  1188. ctx.MakeType<TDataExprType>(EDataSlot::Uint64),
  1189. keyType
  1190. });
  1191. }
  1192. };
  1193. class TChain1MapTraitsStateBase : public TChain1MapTraits {
  1194. public:
  1195. TChain1MapTraitsStateBase(TStringBuf name, const TRawTrait& raw)
  1196. : TChain1MapTraits(name, raw.Pos)
  1197. , FrameNeverEmpty(raw.FrameSettings.IsNonEmpty())
  1198. , InitLambda(raw.InitLambda)
  1199. , UpdateLambda(raw.UpdateLambda)
  1200. , CalculateLambda(raw.CalculateLambda)
  1201. , DefaultValue(raw.DefaultValue)
  1202. {
  1203. }
  1204. protected:
  1205. TExprNode::TPtr GetInitLambda() const {
  1206. return InitLambda;
  1207. }
  1208. TExprNode::TPtr GetUpdateLambda() const {
  1209. return UpdateLambda;
  1210. }
  1211. TExprNode::TPtr GetCalculateLambda() const {
  1212. return CalculateLambda;
  1213. }
  1214. TExprNode::TPtr GetDefaultValue() const {
  1215. return DefaultValue;
  1216. }
  1217. const bool FrameNeverEmpty;
  1218. private:
  1219. const TExprNode::TPtr InitLambda;
  1220. const TExprNode::TPtr UpdateLambda;
  1221. const TExprNode::TPtr CalculateLambda;
  1222. const TExprNode::TPtr DefaultValue;
  1223. };
  1224. class TChain1MapTraitsCurrentOrLagging : public TChain1MapTraitsStateBase {
  1225. public:
  1226. TChain1MapTraitsCurrentOrLagging(TStringBuf name, const TRawTrait& raw, TMaybe<ui64> lagQueueIndex)
  1227. : TChain1MapTraitsStateBase(name, raw)
  1228. , LaggingQueueIndex(lagQueueIndex)
  1229. , OutputIsOptional(raw.OutputType->IsOptionalOrNull())
  1230. {
  1231. }
  1232. // Lambda(row) -> AsTuple(output, state)
  1233. TExprNode::TPtr BuildInitLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
  1234. Y_UNUSED(dataQueue);
  1235. return BuildInitLambdaForChain1Map(GetPos(), GetInitLambda(), GetCalculateLambda(), ctx);
  1236. }
  1237. // Lambda(row, state) -> AsTuple(output, state)
  1238. TExprNode::TPtr BuildUpdateLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
  1239. Y_UNUSED(dataQueue);
  1240. return BuildUpdateLambdaForChain1Map(GetPos(), GetUpdateLambda(), GetCalculateLambda(), ctx);
  1241. }
  1242. TExprNode::TPtr ExtractLaggingOutput(const TExprNode::TPtr& lagQueue,
  1243. const TExprNode::TPtr& dependsOn, TExprContext& ctx) const override
  1244. {
  1245. if (!LaggingQueueIndex.Defined()) {
  1246. return {};
  1247. }
  1248. YQL_ENSURE(!FrameNeverEmpty);
  1249. auto output = ctx.Builder(GetPos())
  1250. .Callable("Map")
  1251. .Add(0, BuildQueuePeek(GetPos(), lagQueue, *LaggingQueueIndex, dependsOn, ctx))
  1252. .Lambda(1)
  1253. .Param("struct")
  1254. .Callable("Member")
  1255. .Arg(0, "struct")
  1256. .Atom(1, GetName())
  1257. .Seal()
  1258. .Seal()
  1259. .Seal()
  1260. .Build();
  1261. return CoalesceQueueOutput(GetPos(), output, OutputIsOptional, GetDefaultValue(), ctx);
  1262. }
  1263. private:
  1264. const TMaybe<ui64> LaggingQueueIndex;
  1265. const bool OutputIsOptional;
  1266. };
  1267. class TChain1MapTraitsLeading : public TChain1MapTraitsStateBase {
  1268. public:
  1269. TChain1MapTraitsLeading(TStringBuf name, const TRawTrait& raw, ui64 currentRowIndex, ui64 lastRowIndex)
  1270. : TChain1MapTraitsStateBase(name, raw)
  1271. , QueueBegin(currentRowIndex + 1)
  1272. , QueueEnd(lastRowIndex + 1)
  1273. {
  1274. }
  1275. // Lambda(row) -> AsTuple(output, state)
  1276. TExprNode::TPtr BuildInitLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
  1277. YQL_ENSURE(dataQueue);
  1278. auto originalInit = GetInitLambda();
  1279. auto originalUpdate = GetUpdateLambda();
  1280. auto calculate = GetCalculateLambda();
  1281. auto rowArg = ctx.NewArgument(GetPos(), "row");
  1282. auto state = ctx.Builder(GetPos())
  1283. .Callable("Fold")
  1284. .Add(0, BuildQueueRange(GetPos(), dataQueue, QueueBegin, QueueEnd, rowArg, ctx))
  1285. .Apply(1, originalInit)
  1286. .With(0, rowArg)
  1287. .Seal()
  1288. .Add(2, ctx.DeepCopyLambda(*originalUpdate))
  1289. .Seal()
  1290. .Build();
  1291. state = WrapWithWinContext(state, ctx);
  1292. auto initBody = ctx.Builder(GetPos())
  1293. .List()
  1294. .Apply(0, calculate)
  1295. .With(0, state)
  1296. .Seal()
  1297. .Apply(1, originalInit)
  1298. .With(0, rowArg)
  1299. .Seal()
  1300. .Seal()
  1301. .Build();
  1302. return ctx.NewLambda(GetPos(), ctx.NewArguments(GetPos(), {rowArg}), std::move(initBody));
  1303. }
  1304. // Lambda(row, state) -> AsTuple(output, state)
  1305. TExprNode::TPtr BuildUpdateLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
  1306. YQL_ENSURE(dataQueue);
  1307. auto originalInit = GetInitLambda();
  1308. auto originalUpdate = GetUpdateLambda();
  1309. auto calculate = GetCalculateLambda();
  1310. auto rowArg = ctx.NewArgument(GetPos(), "row");
  1311. auto stateArg = ctx.NewArgument(GetPos(), "state");
  1312. auto state = ctx.Builder(GetPos())
  1313. .Callable("Fold")
  1314. .Add(0, BuildQueueRange(GetPos(), dataQueue, QueueBegin, QueueEnd, rowArg, ctx))
  1315. .Apply(1, originalUpdate)
  1316. .With(0, rowArg)
  1317. .With(1, stateArg)
  1318. .Seal()
  1319. .Add(2, ctx.DeepCopyLambda(*originalUpdate))
  1320. .Seal()
  1321. .Build();
  1322. state = WrapWithWinContext(state, ctx);
  1323. auto updateBody = ctx.Builder(GetPos())
  1324. .List()
  1325. .Apply(0, calculate)
  1326. .With(0, state)
  1327. .Seal()
  1328. .Apply(1, originalUpdate)
  1329. .With(0, rowArg)
  1330. .With(1, stateArg)
  1331. .Seal()
  1332. .Seal()
  1333. .Build();
  1334. return ctx.NewLambda(GetPos(), ctx.NewArguments(GetPos(), {rowArg, stateArg}), std::move(updateBody));
  1335. }
  1336. private:
  1337. const ui64 QueueBegin;
  1338. const ui64 QueueEnd;
  1339. };
  1340. class TChain1MapTraitsFull : public TChain1MapTraitsStateBase {
  1341. public:
  1342. TChain1MapTraitsFull(TStringBuf name, const TRawTrait& raw, ui64 currentRowIndex)
  1343. : TChain1MapTraitsStateBase(name, raw)
  1344. , QueueBegin(currentRowIndex + 1)
  1345. {
  1346. }
  1347. // Lambda(row) -> AsTuple(output, state)
  1348. // state == output
  1349. TExprNode::TPtr BuildInitLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
  1350. auto originalInit = GetInitLambda();
  1351. auto originalUpdate = GetUpdateLambda();
  1352. auto calculate = GetCalculateLambda();
  1353. auto rowArg = ctx.NewArgument(GetPos(), "row");
  1354. auto state = ctx.Builder(GetPos())
  1355. .Callable("Fold")
  1356. .Add(0, BuildQueueRange(GetPos(), dataQueue, QueueBegin, Max<ui64>(), rowArg, ctx))
  1357. .Apply(1, originalInit)
  1358. .With(0, rowArg)
  1359. .Seal()
  1360. .Add(2, ctx.DeepCopyLambda(*originalUpdate))
  1361. .Seal()
  1362. .Build();
  1363. state = WrapWithWinContext(state, ctx);
  1364. auto initBody = ctx.Builder(GetPos())
  1365. .List()
  1366. .Apply(0, calculate)
  1367. .With(0, state)
  1368. .Seal()
  1369. .Apply(1, calculate)
  1370. .With(0, state)
  1371. .Seal()
  1372. .Seal()
  1373. .Build();
  1374. return ctx.NewLambda(GetPos(), ctx.NewArguments(GetPos(), {rowArg}), std::move(initBody));
  1375. }
  1376. // Lambda(row, state) -> AsTuple(output, state)
  1377. TExprNode::TPtr BuildUpdateLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
  1378. Y_UNUSED(dataQueue);
  1379. return ctx.Builder(GetPos())
  1380. .Lambda()
  1381. .Param("row")
  1382. .Param("state")
  1383. .List()
  1384. .Arg(0, "state")
  1385. .Arg(1, "state")
  1386. .Seal()
  1387. .Seal()
  1388. .Build();
  1389. }
  1390. private:
  1391. const ui64 QueueBegin;
  1392. };
  1393. class TChain1MapTraitsGeneric : public TChain1MapTraitsStateBase {
  1394. public:
  1395. TChain1MapTraitsGeneric(TStringBuf name, const TRawTrait& raw, ui64 queueBegin, ui64 queueEnd)
  1396. : TChain1MapTraitsStateBase(name, raw)
  1397. , QueueBegin(queueBegin)
  1398. , QueueEnd(queueEnd)
  1399. , OutputIsOptional(raw.OutputType->IsOptionalOrNull())
  1400. {
  1401. }
  1402. // Lambda(row) -> AsTuple(output, state)
  1403. TExprNode::TPtr BuildInitLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
  1404. auto rowArg = ctx.NewArgument(GetPos(), "row");
  1405. auto body = ctx.Builder(GetPos())
  1406. .List()
  1407. .Add(0, BuildFinalOutput(rowArg, dataQueue, ctx))
  1408. .Callable(1, "Void")
  1409. .Seal()
  1410. .Seal()
  1411. .Build();
  1412. return ctx.NewLambda(GetPos(), ctx.NewArguments(GetPos(), {rowArg}), std::move(body));
  1413. }
  1414. // Lambda(row, state) -> AsTuple(output, state)
  1415. TExprNode::TPtr BuildUpdateLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
  1416. auto rowArg = ctx.NewArgument(GetPos(), "row");
  1417. auto stateArg = ctx.NewArgument(GetPos(), "state");
  1418. auto body = ctx.Builder(GetPos())
  1419. .List()
  1420. .Add(0, BuildFinalOutput(rowArg, dataQueue, ctx))
  1421. .Add(1, stateArg)
  1422. .Seal()
  1423. .Build();
  1424. return ctx.NewLambda(GetPos(), ctx.NewArguments(GetPos(), {rowArg, stateArg}), std::move(body));
  1425. }
  1426. private:
  1427. TExprNode::TPtr BuildFinalOutput(const TExprNode::TPtr& rowArg, const TExprNode::TPtr& dataQueue, TExprContext& ctx) const {
  1428. YQL_ENSURE(dataQueue);
  1429. auto originalInit = GetInitLambda();
  1430. auto originalUpdate = GetUpdateLambda();
  1431. auto calculate = GetCalculateLambda();
  1432. auto fold1 = ctx.Builder(GetPos())
  1433. .Callable("Fold1")
  1434. .Add(0, BuildQueueRange(GetPos(), dataQueue, QueueBegin, QueueEnd, rowArg, ctx))
  1435. .Add(1, ctx.DeepCopyLambda(*originalInit))
  1436. .Add(2, ctx.DeepCopyLambda(*originalUpdate))
  1437. .Seal()
  1438. .Build();
  1439. fold1 = WrapWithWinContext(fold1, ctx);
  1440. auto output = ctx.Builder(GetPos())
  1441. .Callable("Map")
  1442. .Add(0, fold1)
  1443. .Add(1, ctx.DeepCopyLambda(*calculate))
  1444. .Seal()
  1445. .Build();
  1446. if (FrameNeverEmpty) {
  1447. // output is always non-empty optional in this case
  1448. // we do IfPresent with some fake output value to remove optional
  1449. // this will have exactly the same result as Unwrap(output)
  1450. return ctx.Builder(GetPos())
  1451. .Callable("IfPresent")
  1452. .Add(0, output)
  1453. .Lambda(1)
  1454. .Param("unwrapped")
  1455. .Arg("unwrapped")
  1456. .Seal()
  1457. .Apply(2, calculate)
  1458. .With(0)
  1459. .Apply(originalInit)
  1460. .With(0, rowArg)
  1461. .Seal()
  1462. .Done()
  1463. .Seal()
  1464. .Seal()
  1465. .Build();
  1466. }
  1467. return CoalesceQueueOutput(GetPos(), output, OutputIsOptional, GetDefaultValue(), ctx);
  1468. }
  1469. const ui64 QueueBegin;
  1470. const ui64 QueueEnd;
  1471. const bool OutputIsOptional;
  1472. };
  1473. class TChain1MapTraitsEmpty : public TChain1MapTraitsStateBase {
  1474. public:
  1475. TChain1MapTraitsEmpty(TStringBuf name, const TRawTrait& raw)
  1476. : TChain1MapTraitsStateBase(name, raw)
  1477. , RawOutputType(raw.OutputType)
  1478. {
  1479. }
  1480. // Lambda(row) -> AsTuple(output, state)
  1481. TExprNode::TPtr BuildInitLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
  1482. Y_UNUSED(dataQueue);
  1483. return ctx.Builder(GetPos())
  1484. .Lambda()
  1485. .Param("row")
  1486. .List()
  1487. .Add(0, BuildFinalOutput(ctx))
  1488. .Callable(1, "Void")
  1489. .Seal()
  1490. .Seal()
  1491. .Seal()
  1492. .Build();
  1493. }
  1494. // Lambda(row, state) -> AsTuple(output, state)
  1495. TExprNode::TPtr BuildUpdateLambda(const TExprNode::TPtr& dataQueue, TExprContext& ctx) const override {
  1496. Y_UNUSED(dataQueue);
  1497. return ctx.Builder(GetPos())
  1498. .Lambda()
  1499. .Param("row")
  1500. .Param("state")
  1501. .List()
  1502. .Add(0, BuildFinalOutput(ctx))
  1503. .Arg(1, "state")
  1504. .Seal()
  1505. .Seal()
  1506. .Build();
  1507. }
  1508. private:
  1509. TExprNode::TPtr BuildFinalOutput(TExprContext& ctx) const {
  1510. const auto defaultValue = GetDefaultValue();
  1511. YQL_ENSURE(!FrameNeverEmpty);
  1512. if (defaultValue->IsCallable("Null")) {
  1513. auto resultingType = RawOutputType;
  1514. if (!resultingType->IsOptionalOrNull()) {
  1515. resultingType = ctx.MakeType<TOptionalExprType>(resultingType);
  1516. }
  1517. return ctx.Builder(GetPos())
  1518. .Callable("Nothing")
  1519. .Add(0, ExpandType(GetPos(), *resultingType, ctx))
  1520. .Seal()
  1521. .Build();
  1522. }
  1523. return defaultValue;
  1524. }
  1525. const TTypeAnnotationNode* const RawOutputType;
  1526. };
  1527. struct TQueueParams {
  1528. ui64 DataOutpace = 0;
  1529. ui64 DataLag = 0;
  1530. bool DataQueueNeeded = false;
  1531. ui64 LagQueueSize = 0;
  1532. const TTypeAnnotationNode* LagQueueItemType = nullptr;
  1533. };
  1534. TVector<TChain1MapTraits::TPtr> BuildFoldMapTraits(TQueueParams& queueParams, const TExprNode::TPtr& frames,
  1535. const TMaybe<TString>& partitionRowsColumn, const TStructExprType& rowType, TExprContext& ctx) {
  1536. queueParams = {};
  1537. TVector<TChain1MapTraits::TPtr> result;
  1538. TCalcOverWindowTraits traits = ExtractCalcOverWindowTraits(frames, rowType, ctx);
  1539. if (traits.LagQueueItemType->Cast<TStructExprType>()->GetSize()) {
  1540. YQL_ENSURE(traits.MaxUnboundedPrecedingLag > 0);
  1541. queueParams.LagQueueSize = traits.MaxUnboundedPrecedingLag;
  1542. queueParams.LagQueueItemType = traits.LagQueueItemType;
  1543. }
  1544. ui64 currentRowIndex = 0;
  1545. if (traits.MaxDataOutpace || traits.MaxDataLag) {
  1546. queueParams.DataOutpace = traits.MaxDataOutpace;
  1547. queueParams.DataLag = traits.MaxDataLag;
  1548. currentRowIndex = queueParams.DataLag;
  1549. queueParams.DataQueueNeeded = true;
  1550. }
  1551. for (const auto& item : traits.RawTraits) {
  1552. TStringBuf name = item.first;
  1553. const TRawTrait& trait = item.second;
  1554. if (!trait.InitLambda) {
  1555. YQL_ENSURE(!trait.UpdateLambda);
  1556. YQL_ENSURE(!trait.DefaultValue);
  1557. if (trait.CalculateLambdaLead.Defined()) {
  1558. TMaybe<ui64> queueOffset;
  1559. if (*trait.CalculateLambdaLead) {
  1560. queueOffset = currentRowIndex + *trait.CalculateLambdaLead;
  1561. }
  1562. result.push_back(new TChain1MapTraitsLagLead(name, trait, queueOffset));
  1563. } else if (trait.CalculateLambda->IsCallable("RowNumber")) {
  1564. result.push_back(new TChain1MapTraitsRowNumber(name, trait));
  1565. } else if (trait.CalculateLambda->IsCallable("Rank")) {
  1566. result.push_back(new TChain1MapTraitsRank(name, trait));
  1567. } else if (trait.CalculateLambda->IsCallable("CumeDist")) {
  1568. result.push_back(new TChain1MapTraitsCumeDist(name, trait, *partitionRowsColumn));
  1569. } else if (trait.CalculateLambda->IsCallable("NTile")) {
  1570. result.push_back(new TChain1MapTraitsNTile(name, trait, *partitionRowsColumn));
  1571. } else if (trait.CalculateLambda->IsCallable("PercentRank")) {
  1572. result.push_back(new TChain1MapTraitsPercentRank(name, trait, *partitionRowsColumn));
  1573. } else {
  1574. YQL_ENSURE(trait.CalculateLambda->IsCallable("DenseRank"));
  1575. result.push_back(new TChain1MapTraitsDenseRank(name, trait));
  1576. }
  1577. continue;
  1578. }
  1579. if (trait.FrameSettings.GetFrameType() == EFrameType::FrameByRange) {
  1580. result.push_back(new TChain1MapTraitsCurrentOrLagging(name, trait, {}));
  1581. continue;
  1582. }
  1583. YQL_ENSURE(trait.FrameSettings.GetFrameType() == EFrameType::FrameByRows);
  1584. switch(FrameBoundsType(trait.FrameSettings)) {
  1585. case EFrameBoundsType::CURRENT:
  1586. case EFrameBoundsType::LAGGING: {
  1587. TMaybe<ui64> lagQueueIndex;
  1588. auto end = *trait.FrameSettings.GetLastOffset();
  1589. YQL_ENSURE(end <= 0);
  1590. if (end < 0) {
  1591. YQL_ENSURE(queueParams.LagQueueSize >= ui64(0 - end));
  1592. lagQueueIndex = queueParams.LagQueueSize + end;
  1593. }
  1594. result.push_back(new TChain1MapTraitsCurrentOrLagging(name, trait, lagQueueIndex));
  1595. break;
  1596. }
  1597. case EFrameBoundsType::LEADING: {
  1598. auto end = *trait.FrameSettings.GetLastOffset();
  1599. YQL_ENSURE(end > 0);
  1600. ui64 lastRowIndex = currentRowIndex + ui64(end);
  1601. result.push_back(new TChain1MapTraitsLeading(name, trait, currentRowIndex, lastRowIndex));
  1602. break;
  1603. }
  1604. case EFrameBoundsType::FULL: {
  1605. result.push_back(new TChain1MapTraitsFull(name, trait, currentRowIndex));
  1606. break;
  1607. }
  1608. case EFrameBoundsType::GENERIC: {
  1609. queueParams.DataQueueNeeded = true;
  1610. auto first = trait.FrameSettings.GetFirstOffset();
  1611. auto last = trait.FrameSettings.GetLastOffset();
  1612. YQL_ENSURE(first.Defined());
  1613. ui64 beginIndex = currentRowIndex + *first;
  1614. ui64 endIndex = last.Defined() ? (currentRowIndex + *last + 1) : Max<ui64>();
  1615. result.push_back(new TChain1MapTraitsGeneric(name, trait, beginIndex, endIndex));
  1616. break;
  1617. }
  1618. case EFrameBoundsType::EMPTY: {
  1619. result.push_back(new TChain1MapTraitsEmpty(name, trait));
  1620. break;
  1621. }
  1622. }
  1623. }
  1624. return result;
  1625. }
  1626. TExprNode::TPtr ConvertStructOfTuplesToTupleOfStructs(TPositionHandle pos, const TExprNode::TPtr& input, TExprContext& ctx) {
  1627. return ctx.Builder(pos)
  1628. .List()
  1629. .Callable(0, "StaticMap")
  1630. .Add(0, input)
  1631. .Lambda(1)
  1632. .Param("tuple")
  1633. .Callable("Nth")
  1634. .Arg(0, "tuple")
  1635. .Atom(1, "0")
  1636. .Seal()
  1637. .Seal()
  1638. .Seal()
  1639. .Callable(1, "StaticMap")
  1640. .Add(0, input)
  1641. .Lambda(1)
  1642. .Param("tuple")
  1643. .Callable("Nth")
  1644. .Arg(0, "tuple")
  1645. .Atom(1, "1")
  1646. .Seal()
  1647. .Seal()
  1648. .Seal()
  1649. .Seal()
  1650. .Build();
  1651. }
  1652. TExprNode::TPtr AddInputMembersToOutput(TPositionHandle pos, const TExprNode::TPtr& tupleOfOutputStructAndStateStruct,
  1653. const TExprNode::TPtr& rowArg, TExprContext& ctx)
  1654. {
  1655. return ctx.Builder(pos)
  1656. .List()
  1657. .Callable(0, "FlattenMembers")
  1658. .List(0)
  1659. .Atom(0, "")
  1660. .Callable(1, "Nth")
  1661. .Add(0, tupleOfOutputStructAndStateStruct)
  1662. .Atom(1, "0")
  1663. .Seal()
  1664. .Seal()
  1665. .List(1)
  1666. .Atom(0, "")
  1667. .Add(1, rowArg)
  1668. .Seal()
  1669. .Seal()
  1670. .Callable(1, "Nth")
  1671. .Add(0, tupleOfOutputStructAndStateStruct)
  1672. .Atom(1, "1")
  1673. .Seal()
  1674. .Seal()
  1675. .Build();
  1676. }
  1677. template<typename T>
  1678. TExprNode::TPtr SelectMembers(TPositionHandle pos, const T& members, const TExprNode::TPtr& structNode, TExprContext& ctx) {
  1679. TExprNodeList structItems;
  1680. for (auto& name : members) {
  1681. structItems.push_back(
  1682. ctx.Builder(pos)
  1683. .List()
  1684. .Atom(0, name)
  1685. .Callable(1, "Member")
  1686. .Add(0, structNode)
  1687. .Atom(1, name)
  1688. .Seal()
  1689. .Seal()
  1690. .Build()
  1691. );
  1692. }
  1693. return ctx.NewCallable(pos, "AsStruct", std::move(structItems));
  1694. }
  1695. TExprNode::TPtr HandleLaggingItems(TPositionHandle pos, const TExprNode::TPtr& rowArg,
  1696. const TExprNode::TPtr& tupleOfOutputAndState, const TVector<TChain1MapTraits::TPtr>& traits,
  1697. const TExprNode::TPtr& lagQueue, TExprContext& ctx)
  1698. {
  1699. TExprNodeList laggingStructItems;
  1700. TSet<TStringBuf> laggingNames;
  1701. TSet<TStringBuf> otherNames;
  1702. for (auto& trait : traits) {
  1703. auto name = trait->GetName();
  1704. auto laggingOutput = trait->ExtractLaggingOutput(lagQueue, rowArg, ctx);
  1705. if (laggingOutput) {
  1706. laggingNames.insert(name);
  1707. laggingStructItems.push_back(
  1708. ctx.Builder(pos)
  1709. .List()
  1710. .Atom(0, name)
  1711. .Add(1, laggingOutput)
  1712. .Seal()
  1713. .Build()
  1714. );
  1715. } else {
  1716. otherNames.insert(trait->GetName());
  1717. }
  1718. }
  1719. if (laggingStructItems.empty()) {
  1720. return tupleOfOutputAndState;
  1721. }
  1722. YQL_ENSURE(lagQueue);
  1723. auto output = ctx.NewCallable(pos, "Nth", { tupleOfOutputAndState, ctx.NewAtom(pos, "0")});
  1724. auto state = ctx.NewCallable(pos, "Nth", { tupleOfOutputAndState, ctx.NewAtom(pos, "1")});
  1725. auto leadingOutput = SelectMembers(pos, laggingNames, output, ctx);
  1726. auto otherOutput = SelectMembers(pos, otherNames, output, ctx);
  1727. auto laggingOutput = ctx.NewCallable(pos, "AsStruct", std::move(laggingStructItems));
  1728. output = ctx.Builder(pos)
  1729. .Callable("FlattenMembers")
  1730. .List(0)
  1731. .Atom(0, "")
  1732. .Add(1, laggingOutput)
  1733. .Seal()
  1734. .List(1)
  1735. .Atom(0, "")
  1736. .Add(1, otherOutput)
  1737. .Seal()
  1738. .Seal()
  1739. .Build();
  1740. return ctx.Builder(pos)
  1741. .List()
  1742. .Add(0, output)
  1743. .Callable(1, "Seq")
  1744. .Add(0, output)
  1745. .Add(1, state)
  1746. .Add(2, leadingOutput)
  1747. .List(3)
  1748. .Add(0, state)
  1749. .Callable(1, "QueuePush")
  1750. .Callable(0, "QueuePop")
  1751. .Add(0, lagQueue)
  1752. .Seal()
  1753. .Add(1, leadingOutput)
  1754. .Seal()
  1755. .Seal()
  1756. .Seal()
  1757. .Seal()
  1758. .Build();
  1759. }
  1760. TExprNode::TPtr BuildChain1MapInitLambda(TPositionHandle pos, const TVector<TChain1MapTraits::TPtr>& traits,
  1761. const TExprNode::TPtr& dataQueue, ui64 lagQueueSize, const TTypeAnnotationNode* lagQueueItemType, TExprContext& ctx)
  1762. {
  1763. auto rowArg = ctx.NewArgument(pos, "row");
  1764. TExprNode::TPtr lagQueue;
  1765. if (lagQueueSize) {
  1766. YQL_ENSURE(lagQueueItemType);
  1767. lagQueue = BuildQueue(pos, *lagQueueItemType, lagQueueSize, lagQueueSize, rowArg, ctx);
  1768. }
  1769. TExprNodeList structItems;
  1770. for (auto& trait : traits) {
  1771. structItems.push_back(
  1772. ctx.Builder(pos)
  1773. .List()
  1774. .Atom(0, trait->GetName())
  1775. .Apply(1, trait->BuildInitLambda(dataQueue, ctx))
  1776. .With(0, rowArg)
  1777. .Seal()
  1778. .Seal()
  1779. .Build()
  1780. );
  1781. }
  1782. auto asStruct = ctx.NewCallable(pos, "AsStruct", std::move(structItems));
  1783. auto tupleOfOutputAndState = ConvertStructOfTuplesToTupleOfStructs(pos, asStruct, ctx);
  1784. tupleOfOutputAndState = HandleLaggingItems(pos, rowArg, tupleOfOutputAndState, traits, lagQueue, ctx);
  1785. auto finalBody = AddInputMembersToOutput(pos, tupleOfOutputAndState, rowArg, ctx);
  1786. return ctx.NewLambda(pos, ctx.NewArguments(pos, {rowArg}), std::move(finalBody));
  1787. }
  1788. TExprNode::TPtr BuildChain1MapUpdateLambda(TPositionHandle pos, const TVector<TChain1MapTraits::TPtr>& traits,
  1789. const TExprNode::TPtr& dataQueue, bool haveLagQueue, TExprContext& ctx)
  1790. {
  1791. const auto rowArg = ctx.NewArgument(pos, "row");
  1792. const auto stateArg = ctx.NewArgument(pos, "state");
  1793. auto state = ctx.Builder(pos)
  1794. .Callable("Nth")
  1795. .Add(0, stateArg)
  1796. .Atom(1, "1", TNodeFlags::Default)
  1797. .Seal()
  1798. .Build();
  1799. TExprNode::TPtr lagQueue;
  1800. if (haveLagQueue) {
  1801. lagQueue = ctx.Builder(pos)
  1802. .Callable("Nth")
  1803. .Add(0, state)
  1804. .Atom(1, "1", TNodeFlags::Default)
  1805. .Seal()
  1806. .Build();
  1807. state = ctx.Builder(pos)
  1808. .Callable("Nth")
  1809. .Add(0, std::move(state))
  1810. .Atom(1, "0", TNodeFlags::Default)
  1811. .Seal()
  1812. .Build();
  1813. }
  1814. TExprNodeList structItems;
  1815. for (auto& trait : traits) {
  1816. structItems.push_back(
  1817. ctx.Builder(pos)
  1818. .List()
  1819. .Atom(0, trait->GetName())
  1820. .Apply(1, trait->BuildUpdateLambda(dataQueue, ctx))
  1821. .With(0, rowArg)
  1822. .With(1)
  1823. .Callable("Member")
  1824. .Add(0, state)
  1825. .Atom(1, trait->GetName())
  1826. .Seal()
  1827. .Done()
  1828. .Seal()
  1829. .Seal()
  1830. .Build()
  1831. );
  1832. }
  1833. auto asStruct = ctx.NewCallable(pos, "AsStruct", std::move(structItems));
  1834. auto tupleOfOutputAndState = ConvertStructOfTuplesToTupleOfStructs(pos, asStruct, ctx);
  1835. tupleOfOutputAndState = HandleLaggingItems(pos, rowArg, tupleOfOutputAndState, traits, lagQueue, ctx);
  1836. auto finalBody = AddInputMembersToOutput(pos, tupleOfOutputAndState, rowArg, ctx);
  1837. return ctx.NewLambda(pos, ctx.NewArguments(pos, {rowArg, stateArg}), std::move(finalBody));
  1838. }
  1839. bool IsNonCompactFullFrame(const TExprNode& winOnRows, TExprContext& ctx) {
  1840. TWindowFrameSettings frameSettings = TWindowFrameSettings::Parse(winOnRows, ctx);
  1841. return frameSettings.GetFrameType() == FrameByRows &&
  1842. !frameSettings.IsCompact() && !frameSettings.GetFirstOffset().Defined() && !frameSettings.GetLastOffset().Defined();
  1843. }
  1844. TExprNode::TPtr DeduceCompatibleSort(const TExprNode::TPtr& traitsOne, const TExprNode::TPtr& traitsTwo) {
  1845. YQL_ENSURE(traitsOne->IsCallable({"Void", "SortTraits"}));
  1846. YQL_ENSURE(traitsTwo->IsCallable({"Void", "SortTraits"}));
  1847. if (traitsOne->IsCallable("Void")) {
  1848. return traitsTwo;
  1849. }
  1850. if (traitsTwo->IsCallable("Void")) {
  1851. return traitsOne;
  1852. }
  1853. // TODO: need more advanced logic here
  1854. if (traitsOne == traitsTwo) {
  1855. return traitsOne;
  1856. }
  1857. return {};
  1858. }
  1859. TExprNode::TPtr BuildPartitionsByKeys(TPositionHandle pos, const TExprNode::TPtr& input, const TExprNode::TPtr& keySelector,
  1860. const TExprNode::TPtr& sortOrder, const TExprNode::TPtr& sortKey, const TExprNode::TPtr& streamProcessingLambda,
  1861. const TExprNode::TPtr& sessionKey, const TExprNode::TPtr& sessionInit, const TExprNode::TPtr& sessionUpdate,
  1862. const TExprNode::TPtr& sessionColumns, TExprContext& ctx)
  1863. {
  1864. TExprNode::TPtr preprocessLambda;
  1865. TExprNode::TPtr chopperKeySelector;
  1866. const TExprNode::TPtr addSessionColumnsArg = ctx.NewArgument(pos, "row");
  1867. TExprNode::TPtr addSessionColumnsBody = addSessionColumnsArg;
  1868. if (sessionUpdate) {
  1869. YQL_ENSURE(sessionKey);
  1870. YQL_ENSURE(sessionInit);
  1871. preprocessLambda =
  1872. AddSessionParamsMemberLambda(pos, SessionStartMemberName, SessionParamsMemberName, keySelector, sessionKey, sessionInit, sessionUpdate, ctx);
  1873. chopperKeySelector = ctx.Builder(pos)
  1874. .Lambda()
  1875. .Param("item")
  1876. .List()
  1877. .Apply(0, keySelector)
  1878. .With(0, "item")
  1879. .Seal()
  1880. .Callable(1, "Member")
  1881. .Arg(0, "item")
  1882. .Atom(1, SessionStartMemberName)
  1883. .Seal()
  1884. .Seal()
  1885. .Seal()
  1886. .Build();
  1887. } else {
  1888. YQL_ENSURE(!sessionKey);
  1889. preprocessLambda = MakeIdentityLambda(pos, ctx);
  1890. chopperKeySelector = keySelector;
  1891. }
  1892. for (auto& column : sessionColumns->ChildrenList()) {
  1893. addSessionColumnsBody = ctx.Builder(pos)
  1894. .Callable("AddMember")
  1895. .Add(0, addSessionColumnsBody)
  1896. .Add(1, column)
  1897. .Callable(2, "Member")
  1898. .Add(0, addSessionColumnsArg)
  1899. .Atom(1, SessionParamsMemberName)
  1900. .Seal()
  1901. .Seal()
  1902. .Build();
  1903. }
  1904. addSessionColumnsBody = ctx.Builder(pos)
  1905. .Callable("ForceRemoveMember")
  1906. .Callable(0, "ForceRemoveMember")
  1907. .Add(0, addSessionColumnsBody)
  1908. .Atom(1, SessionStartMemberName)
  1909. .Seal()
  1910. .Atom(1, SessionParamsMemberName)
  1911. .Seal()
  1912. .Build();
  1913. auto addSessionColumnsLambda = ctx.NewLambda(pos, ctx.NewArguments(pos, { addSessionColumnsArg }), std::move(addSessionColumnsBody));
  1914. auto groupSwitchLambda = ctx.Builder(pos)
  1915. .Lambda()
  1916. .Param("prevKey")
  1917. .Param("item")
  1918. .Callable("AggrNotEquals")
  1919. .Arg(0, "prevKey")
  1920. .Apply(1, chopperKeySelector)
  1921. .With(0, "item")
  1922. .Seal()
  1923. .Seal()
  1924. .Seal()
  1925. .Build();
  1926. return ctx.Builder(pos)
  1927. .Callable("PartitionsByKeys")
  1928. .Add(0, input)
  1929. .Add(1, keySelector)
  1930. .Add(2, sortOrder)
  1931. .Add(3, sortKey)
  1932. .Lambda(4)
  1933. .Param("partitionedStream")
  1934. .Callable("ForwardList")
  1935. .Callable(0, "Chopper")
  1936. .Callable(0, "ToStream")
  1937. .Apply(0, preprocessLambda)
  1938. .With(0, "partitionedStream")
  1939. .Seal()
  1940. .Seal()
  1941. .Add(1, chopperKeySelector)
  1942. .Add(2, groupSwitchLambda)
  1943. .Lambda(3)
  1944. .Param("key")
  1945. .Param("singlePartition")
  1946. .Callable("Map")
  1947. .Apply(0, streamProcessingLambda)
  1948. .With(0, "singlePartition")
  1949. .Seal()
  1950. .Add(1, addSessionColumnsLambda)
  1951. .Seal()
  1952. .Seal()
  1953. .Seal()
  1954. .Seal()
  1955. .Seal()
  1956. .Seal()
  1957. .Build();
  1958. }
  1959. enum EFold1LambdaKind {
  1960. INIT,
  1961. UPDATE,
  1962. CALCULATE,
  1963. };
  1964. TExprNode::TPtr BuildFold1Lambda(TPositionHandle pos, const TExprNode::TPtr& frames, EFold1LambdaKind kind,
  1965. const TExprNodeList& keyColumns, const TStructExprType& rowType, TExprContext& ctx)
  1966. {
  1967. TExprNode::TPtr arg1 = ctx.NewArgument(pos, "arg1");
  1968. TExprNodeList args = { arg1 };
  1969. TExprNode::TPtr arg2;
  1970. if (kind == EFold1LambdaKind::UPDATE) {
  1971. arg2 = ctx.NewArgument(pos, "arg2");
  1972. args.push_back(arg2);
  1973. }
  1974. TExprNodeList structItems;
  1975. for (auto& winOn : frames->ChildrenList()) {
  1976. YQL_ENSURE(IsNonCompactFullFrame(*winOn, ctx));
  1977. for (ui32 i = 1; i < winOn->ChildrenSize(); ++i) {
  1978. YQL_ENSURE(winOn->Child(i)->IsList());
  1979. YQL_ENSURE(winOn->Child(i)->Child(0)->IsAtom());
  1980. YQL_ENSURE(winOn->Child(i)->Child(1)->IsCallable("WindowTraits"));
  1981. YQL_ENSURE(2 <= winOn->Child(i)->ChildrenSize() && winOn->Child(i)->ChildrenSize() <= 3);
  1982. auto column = winOn->Child(i)->ChildPtr(0);
  1983. auto traits = winOn->Child(i)->ChildPtr(1);
  1984. auto traitsInputType = traits->Child(0)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
  1985. TStringBuf distinctKey;
  1986. const TTypeAnnotationNode* distinctKeyOrigType = nullptr;
  1987. if (winOn->Child(i)->ChildrenSize() == 3) {
  1988. auto distinctKeyNode = winOn->Child(i)->Child(2);
  1989. YQL_ENSURE(distinctKeyNode->IsAtom());
  1990. distinctKey = distinctKeyNode->Content();
  1991. distinctKeyOrigType = rowType.FindItemType(distinctKey);
  1992. YQL_ENSURE(distinctKeyOrigType);
  1993. }
  1994. TExprNode::TPtr applied;
  1995. switch (kind) {
  1996. case EFold1LambdaKind::INIT: {
  1997. auto lambda = traits->ChildPtr(1);
  1998. if (distinctKeyOrigType) {
  1999. lambda = ApplyDistinctForInitLambda(lambda, distinctKey, *traitsInputType, *distinctKeyOrigType, ctx);
  2000. } else {
  2001. lambda = ReplaceFirstLambdaArgWithCastStruct(*lambda, *traitsInputType, ctx);
  2002. }
  2003. if (lambda->Child(0)->ChildrenSize() == 2) {
  2004. lambda = ReplaceLastLambdaArgWithUnsignedLiteral(*lambda, i, ctx);
  2005. }
  2006. YQL_ENSURE(lambda->Child(0)->ChildrenSize() == 1);
  2007. applied = ctx.Builder(pos)
  2008. .Apply(lambda)
  2009. .With(0, arg1)
  2010. .Seal()
  2011. .Build();
  2012. break;
  2013. }
  2014. case EFold1LambdaKind::CALCULATE: {
  2015. auto lambda = traits->ChildPtr(4);
  2016. YQL_ENSURE(lambda->Child(0)->ChildrenSize() == 1);
  2017. if (distinctKeyOrigType) {
  2018. lambda = ApplyDistinctForCalculateLambda(lambda, ctx);
  2019. }
  2020. applied = ctx.Builder(pos)
  2021. .Apply(lambda)
  2022. .With(0)
  2023. .Callable("Member")
  2024. .Add(0, arg1)
  2025. .Add(1, column)
  2026. .Seal()
  2027. .Done()
  2028. .Seal()
  2029. .Build();
  2030. break;
  2031. }
  2032. case EFold1LambdaKind::UPDATE: {
  2033. auto lambda = traits->ChildPtr(2);
  2034. if (distinctKeyOrigType) {
  2035. lambda = ApplyDistinctForUpdateLambda(lambda, distinctKey, *traitsInputType, *distinctKeyOrigType, ctx);
  2036. } else {
  2037. lambda = ReplaceFirstLambdaArgWithCastStruct(*lambda, *traitsInputType, ctx);
  2038. }
  2039. if (lambda->Child(0)->ChildrenSize() == 3) {
  2040. lambda = ReplaceLastLambdaArgWithUnsignedLiteral(*lambda, i, ctx);
  2041. }
  2042. YQL_ENSURE(lambda->Child(0)->ChildrenSize() == 2);
  2043. applied = ctx.Builder(pos)
  2044. .Apply(lambda)
  2045. .With(0, arg1)
  2046. .With(1)
  2047. .Callable("Member")
  2048. .Add(0, arg2)
  2049. .Add(1, column)
  2050. .Seal()
  2051. .Done()
  2052. .Seal()
  2053. .Build();
  2054. break;
  2055. }
  2056. }
  2057. structItems.push_back(ctx.NewList(pos, {column, applied}));
  2058. }
  2059. }
  2060. // pass key columns as-is
  2061. for (auto& keyColumn : keyColumns) {
  2062. YQL_ENSURE(keyColumn->IsAtom());
  2063. structItems.push_back(
  2064. ctx.Builder(pos)
  2065. .List()
  2066. .Add(0, keyColumn)
  2067. .Callable(1, "Member")
  2068. .Add(0, arg1)
  2069. .Add(1, keyColumn)
  2070. .Seal()
  2071. .Seal()
  2072. .Build()
  2073. );
  2074. }
  2075. return ctx.NewLambda(pos, ctx.NewArguments(pos, std::move(args)), ctx.NewCallable(pos, "AsStruct", std::move(structItems)));
  2076. }
  2077. TExprNode::TPtr ExpandNonCompactFullFrames(TPositionHandle pos, const TExprNode::TPtr& inputList,
  2078. const TExprNode::TPtr& originalKeyColumns, const TExprNode::TPtr& sortTraits, const TExprNode::TPtr& frames,
  2079. const TExprNode::TPtr& sessionTraits, const TExprNode::TPtr& sessionColumns, TExprContext& ctx)
  2080. {
  2081. TExprNode::TPtr sessionKey;
  2082. TExprNode::TPtr sessionInit;
  2083. TExprNode::TPtr sessionUpdate;
  2084. TExprNode::TPtr sessionSortTraits;
  2085. const TTypeAnnotationNode* sessionKeyType = nullptr;
  2086. const TTypeAnnotationNode* sessionParamsType = nullptr;
  2087. ExtractSessionWindowParams(pos, sessionTraits, sessionKey, sessionKeyType, sessionParamsType, sessionSortTraits, sessionInit, sessionUpdate, ctx);
  2088. TExprNode::TPtr sortKey;
  2089. TExprNode::TPtr sortOrder;
  2090. TExprNode::TPtr input = inputList;
  2091. if (input->IsCallable("ForwardList")) {
  2092. // full frame strategy uses input 2 times (for grouping and join)
  2093. // TODO: better way to detect "single use input"
  2094. input = ctx.NewCallable(pos, "Collect", { input });
  2095. }
  2096. const auto rowType = inputList->GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
  2097. TVector<const TItemExprType*> rowItems = rowType->GetItems();
  2098. TExprNodeList originalKeysWithSession = originalKeyColumns->ChildrenList();
  2099. TExprNodeList addedColumns;
  2100. const auto commonSortTraits = DeduceCompatibleSort(sortTraits, sessionSortTraits);
  2101. ExtractSortKeyAndOrder(pos, commonSortTraits ? commonSortTraits : sortTraits, sortKey, sortOrder, ctx);
  2102. if (!commonSortTraits) {
  2103. YQL_ENSURE(sessionKey);
  2104. YQL_ENSURE(sessionInit);
  2105. YQL_ENSURE(sessionUpdate);
  2106. TExprNode::TPtr sessionSortKey;
  2107. TExprNode::TPtr sessionSortOrder;
  2108. ExtractSortKeyAndOrder(pos, sessionSortTraits, sessionSortKey, sessionSortOrder, ctx);
  2109. const auto keySelector = BuildKeySelector(pos, *rowType, originalKeyColumns, ctx);
  2110. input = ctx.Builder(pos)
  2111. .Callable("PartitionsByKeys")
  2112. .Add(0, input)
  2113. .Add(1, keySelector)
  2114. .Add(2, sessionSortOrder)
  2115. .Add(3, sessionSortKey)
  2116. .Lambda(4)
  2117. .Param("partitionedStream")
  2118. .Apply(AddSessionParamsMemberLambda(pos, SessionStartMemberName, SessionParamsMemberName, keySelector, sessionKey, sessionInit, sessionUpdate, ctx))
  2119. .With(0, "partitionedStream")
  2120. .Seal()
  2121. .Seal()
  2122. .Seal()
  2123. .Build();
  2124. rowItems.push_back(ctx.MakeType<TItemExprType>(SessionParamsMemberName, sessionParamsType));
  2125. addedColumns.push_back(ctx.NewAtom(pos, SessionParamsMemberName));
  2126. originalKeysWithSession.push_back(ctx.NewAtom(pos, SessionStartMemberName));
  2127. addedColumns.push_back(originalKeysWithSession.back());
  2128. rowItems.push_back(ctx.MakeType<TItemExprType>(SessionStartMemberName, sessionKeyType));
  2129. sessionKey = sessionInit = sessionUpdate = {};
  2130. }
  2131. TExprNodeList keyColumns;
  2132. auto rowArg = ctx.NewArgument(pos, "row");
  2133. auto addMembersBody = rowArg;
  2134. static const TStringBuf keyColumnNamePrefix = "_yql_CalcOverWindowJoinKey";
  2135. const TStructExprType* rowTypeWithSession = ctx.MakeType<TStructExprType>(rowItems);
  2136. for (auto& keyColumn : originalKeysWithSession) {
  2137. YQL_ENSURE(keyColumn->IsAtom());
  2138. auto columnName = keyColumn->Content();
  2139. const TTypeAnnotationNode* columnType =
  2140. rowTypeWithSession->GetItems()[*rowTypeWithSession->FindItem(columnName)]->GetItemType();
  2141. if (columnType->HasOptionalOrNull()) {
  2142. addedColumns.push_back(ctx.NewAtom(pos, TStringBuilder() << keyColumnNamePrefix << addedColumns.size()));
  2143. keyColumns.push_back(addedColumns.back());
  2144. TStringBuf newName = addedColumns.back()->Content();
  2145. const TTypeAnnotationNode* newType = ctx.MakeType<TDataExprType>(EDataSlot::String);
  2146. rowItems.push_back(ctx.MakeType<TItemExprType>(newName, newType));
  2147. addMembersBody = ctx.Builder(pos)
  2148. .Callable("AddMember")
  2149. .Add(0, addMembersBody)
  2150. .Atom(1, newName)
  2151. .Callable(2, "StablePickle")
  2152. .Callable(0, "Member")
  2153. .Add(0, rowArg)
  2154. .Add(1, keyColumn)
  2155. .Seal()
  2156. .Seal()
  2157. .Seal()
  2158. .Build();
  2159. } else {
  2160. keyColumns.push_back(keyColumn);
  2161. }
  2162. }
  2163. input = ctx.Builder(pos)
  2164. .Callable("Map")
  2165. .Add(0, input)
  2166. .Add(1, ctx.NewLambda(pos, ctx.NewArguments(pos, { rowArg }), std::move(addMembersBody)))
  2167. .Seal()
  2168. .Build();
  2169. auto keySelector = BuildKeySelector(pos, *ctx.MakeType<TStructExprType>(rowItems),
  2170. ctx.NewList(pos, TExprNodeList{keyColumns}), ctx);
  2171. TExprNode::TPtr preprocessLambda;
  2172. TExprNode::TPtr groupKeySelector;
  2173. TExprNode::TPtr condenseSwitch;
  2174. if (sessionUpdate) {
  2175. YQL_ENSURE(sessionKey);
  2176. YQL_ENSURE(sessionInit);
  2177. YQL_ENSURE(sessionKeyType);
  2178. YQL_ENSURE(commonSortTraits);
  2179. preprocessLambda =
  2180. AddSessionParamsMemberLambda(pos, SessionStartMemberName, SessionParamsMemberName, keySelector, sessionKey, sessionInit, sessionUpdate, ctx);
  2181. rowItems.push_back(ctx.MakeType<TItemExprType>(SessionStartMemberName, sessionKeyType));
  2182. rowItems.push_back(ctx.MakeType<TItemExprType>(SessionParamsMemberName, sessionParamsType));
  2183. addedColumns.push_back(ctx.NewAtom(pos, SessionStartMemberName));
  2184. addedColumns.push_back(ctx.NewAtom(pos, SessionParamsMemberName));
  2185. if (sessionKeyType->HasOptionalOrNull()) {
  2186. addedColumns.push_back(ctx.NewAtom(pos, TStringBuilder() << keyColumnNamePrefix << addedColumns.size()));
  2187. preprocessLambda = ctx.Builder(pos)
  2188. .Lambda()
  2189. .Param("stream")
  2190. .Callable("OrderedMap")
  2191. .Apply(0, preprocessLambda)
  2192. .With(0, "stream")
  2193. .Seal()
  2194. .Lambda(1)
  2195. .Param("item")
  2196. .Callable("AddMember")
  2197. .Arg(0, "item")
  2198. .Add(1, addedColumns.back())
  2199. .Callable(2, "StablePickle")
  2200. .Callable(0, "Member")
  2201. .Arg(0, "item")
  2202. .Atom(1, SessionStartMemberName)
  2203. .Seal()
  2204. .Seal()
  2205. .Seal()
  2206. .Seal()
  2207. .Seal()
  2208. .Seal()
  2209. .Build();
  2210. TStringBuf newName = addedColumns.back()->Content();
  2211. const TTypeAnnotationNode* newType = ctx.MakeType<TDataExprType>(EDataSlot::String);
  2212. rowItems.push_back(ctx.MakeType<TItemExprType>(newName, newType));
  2213. }
  2214. keyColumns.push_back(addedColumns.back());
  2215. auto groupKeySelector = BuildKeySelector(pos, *ctx.MakeType<TStructExprType>(rowItems),
  2216. ctx.NewList(pos, TExprNodeList{keyColumns}), ctx);
  2217. condenseSwitch = ctx.Builder(pos)
  2218. .Lambda()
  2219. .Param("row")
  2220. .Param("state")
  2221. .Callable("AggrNotEquals")
  2222. .Apply(0, groupKeySelector)
  2223. .With(0, "row")
  2224. .Seal()
  2225. .Apply(1, groupKeySelector)
  2226. .With(0, "state")
  2227. .Seal()
  2228. .Seal()
  2229. .Seal()
  2230. .Build();
  2231. } else {
  2232. YQL_ENSURE(!sessionKey);
  2233. preprocessLambda = MakeIdentityLambda(pos, ctx);
  2234. auto groupKeySelector = keySelector;
  2235. condenseSwitch = ctx.Builder(pos)
  2236. .Lambda()
  2237. .Param("row")
  2238. .Param("state")
  2239. .Callable("IsKeySwitch")
  2240. .Arg(0, "row")
  2241. .Arg(1, "state")
  2242. .Add(2, groupKeySelector)
  2243. .Add(3, groupKeySelector)
  2244. .Seal()
  2245. .Seal()
  2246. .Build();
  2247. }
  2248. auto partitionByKeysLambda = ctx.Builder(pos)
  2249. .Lambda()
  2250. .Param("stream")
  2251. .Callable("Map")
  2252. .Callable(0, "Condense1")
  2253. .Apply(0, preprocessLambda)
  2254. .With(0, "stream")
  2255. .Seal()
  2256. .Add(1, BuildFold1Lambda(pos, frames, EFold1LambdaKind::INIT, keyColumns, *rowType, ctx))
  2257. .Add(2, condenseSwitch)
  2258. .Add(3, BuildFold1Lambda(pos, frames, EFold1LambdaKind::UPDATE, keyColumns, *rowType, ctx))
  2259. .Seal()
  2260. .Add(1, BuildFold1Lambda(pos, frames, EFold1LambdaKind::CALCULATE, keyColumns, *rowType, ctx))
  2261. .Seal()
  2262. .Seal()
  2263. .Build();
  2264. if (HasContextFuncs(*partitionByKeysLambda)) {
  2265. partitionByKeysLambda = ctx.Builder(pos)
  2266. .Lambda()
  2267. .Param("stream")
  2268. .Callable("WithContext")
  2269. .Apply(0, partitionByKeysLambda)
  2270. .With(0, "stream")
  2271. .Seal()
  2272. .Atom(1, "WinAgg", TNodeFlags::Default)
  2273. .Seal()
  2274. .Seal()
  2275. .Build();
  2276. }
  2277. auto aggregated = ctx.Builder(pos)
  2278. .Callable("PartitionsByKeys")
  2279. .Add(0, input)
  2280. .Add(1, keySelector)
  2281. .Add(2, sortOrder)
  2282. .Add(3, sortKey)
  2283. .Add(4, partitionByKeysLambda)
  2284. .Seal().Build();
  2285. if (sessionUpdate) {
  2286. // preprocess input without aggregation
  2287. input = ctx.Builder(pos)
  2288. .Callable("PartitionsByKeys")
  2289. .Add(0, input)
  2290. .Add(1, ctx.DeepCopyLambda(*keySelector))
  2291. .Add(2, sortOrder)
  2292. .Add(3, ctx.DeepCopyLambda(*sortKey))
  2293. .Lambda(4)
  2294. .Param("stream")
  2295. .Apply(preprocessLambda)
  2296. .With(0, "stream")
  2297. .Seal()
  2298. .Seal()
  2299. .Seal()
  2300. .Build();
  2301. }
  2302. TExprNode::TPtr joined;
  2303. if (!keyColumns.empty()) {
  2304. // SELECT * FROM input AS a JOIN aggregated AS b USING(keyColumns)
  2305. auto buildJoinKeysTuple = [&](TStringBuf side) {
  2306. TExprNodeList items;
  2307. for (const auto& keyColumn : keyColumns) {
  2308. items.push_back(ctx.NewAtom(pos, side));
  2309. items.push_back(keyColumn);
  2310. }
  2311. return ctx.NewList(pos, std::move(items));
  2312. };
  2313. joined = ctx.Builder(pos)
  2314. .Callable("EquiJoin")
  2315. .List(0)
  2316. .Add(0, input)
  2317. .Atom(1, "a", TNodeFlags::Default)
  2318. .Seal()
  2319. .List(1)
  2320. .Add(0, aggregated)
  2321. .Atom(1, "b", TNodeFlags::Default)
  2322. .Seal()
  2323. .List(2)
  2324. .Atom(0, "Inner", TNodeFlags::Default)
  2325. .Atom(1, "a", TNodeFlags::Default)
  2326. .Atom(2, "b", TNodeFlags::Default)
  2327. .Add(3, buildJoinKeysTuple("a"))
  2328. .Add(4, buildJoinKeysTuple("b"))
  2329. .List(5)
  2330. .List(0)
  2331. .Atom(0, "right", TNodeFlags::Default)
  2332. .Atom(1, "any", TNodeFlags::Default)
  2333. .Seal()
  2334. .Seal()
  2335. .Seal()
  2336. .List(3).Seal()
  2337. .Seal()
  2338. .Build();
  2339. // remove b.keys*
  2340. auto rowArg = ctx.NewArgument(pos, "row");
  2341. TExprNode::TPtr removed = rowArg;
  2342. auto removeSide = [&](const TString& side, const TExprNodeList& keys) {
  2343. for (const auto& keyColumn : keys) {
  2344. YQL_ENSURE(keyColumn->IsAtom());
  2345. TString toRemove = side + keyColumn->Content();
  2346. removed = ctx.Builder(pos)
  2347. .Callable("RemoveMember")
  2348. .Add(0, removed)
  2349. .Atom(1, toRemove)
  2350. .Seal()
  2351. .Build();
  2352. }
  2353. };
  2354. removeSide("b.", keyColumns);
  2355. // add session columns
  2356. for (auto column : sessionColumns->ChildrenList()) {
  2357. removed = ctx.Builder(pos)
  2358. .Callable("AddMember")
  2359. .Add(0, removed)
  2360. .Add(1, column)
  2361. .Callable(2, "Member")
  2362. .Add(0, rowArg)
  2363. .Atom(1, TString("a.") + SessionParamsMemberName)
  2364. .Seal()
  2365. .Seal()
  2366. .Build();
  2367. }
  2368. removeSide("a.", addedColumns);
  2369. joined = ctx.Builder(pos)
  2370. .Callable("Map")
  2371. .Add(0, joined)
  2372. .Add(1, ctx.NewLambda(pos, ctx.NewArguments(pos, {rowArg}), std::move(removed)))
  2373. .Seal()
  2374. .Build();
  2375. } else {
  2376. // SELECT * FROM input AS a CROSS JOIN aggregated AS b
  2377. joined = ctx.Builder(pos)
  2378. .Callable("EquiJoin")
  2379. .List(0)
  2380. .Add(0, input)
  2381. .Atom(1, "a", TNodeFlags::Default)
  2382. .Seal()
  2383. .List(1)
  2384. .Add(0, aggregated)
  2385. .Atom(1, "b", TNodeFlags::Default)
  2386. .Seal()
  2387. .List(2)
  2388. .Atom(0, "Cross", TNodeFlags::Default)
  2389. .Atom(1, "a", TNodeFlags::Default)
  2390. .Atom(2, "b", TNodeFlags::Default)
  2391. .List(3).Seal()
  2392. .List(4).Seal()
  2393. .List(5).Seal()
  2394. .Seal()
  2395. .List(3).Seal()
  2396. .Seal()
  2397. .Build();
  2398. }
  2399. return ctx.Builder(pos)
  2400. .Callable("Map")
  2401. .Add(0, joined)
  2402. .Lambda(1)
  2403. .Param("row")
  2404. .Callable("DivePrefixMembers")
  2405. .Arg(0, "row")
  2406. .List(1)
  2407. .Atom(0, "a.")
  2408. .Atom(1, "b.")
  2409. .Seal()
  2410. .Seal()
  2411. .Seal()
  2412. .Seal()
  2413. .Build();
  2414. }
  2415. TExprNode::TPtr TryExpandNonCompactFullFrames(TPositionHandle pos, const TExprNode::TPtr& inputList, const TExprNode::TPtr& keyColumns,
  2416. const TExprNode::TPtr& sortTraits, const TExprNode::TPtr& frames, const TExprNode::TPtr& sessionTraits,
  2417. const TExprNode::TPtr& sessionColumns, TExprContext& ctx)
  2418. {
  2419. TExprNodeList nonCompactAggregatingFullFrames;
  2420. TExprNodeList otherFrames;
  2421. for (auto& winOn : frames->ChildrenList()) {
  2422. if (!IsNonCompactFullFrame(*winOn, ctx)) {
  2423. otherFrames.push_back(winOn);
  2424. continue;
  2425. }
  2426. YQL_ENSURE(TCoWinOnBase::Match(winOn.Get()));
  2427. TExprNodeList nonAggregates = { winOn->ChildPtr(0) };
  2428. TExprNodeList aggregates = { winOn->ChildPtr(0) };
  2429. for (ui32 i = 1; i < winOn->ChildrenSize(); ++i) {
  2430. auto item = winOn->Child(i)->Child(1);
  2431. if (item->IsCallable("WindowTraits")) {
  2432. aggregates.push_back(winOn->ChildPtr(i));
  2433. } else {
  2434. nonAggregates.push_back(winOn->ChildPtr(i));
  2435. }
  2436. }
  2437. if (aggregates.size() == 1) {
  2438. otherFrames.push_back(winOn);
  2439. continue;
  2440. }
  2441. nonCompactAggregatingFullFrames.push_back(ctx.ChangeChildren(*winOn, std::move(aggregates)));
  2442. if (nonAggregates.size() > 1) {
  2443. otherFrames.push_back(ctx.ChangeChildren(*winOn, std::move(nonAggregates)));
  2444. }
  2445. }
  2446. if (nonCompactAggregatingFullFrames.empty()) {
  2447. return {};
  2448. }
  2449. auto fullFrames = ctx.NewList(pos, std::move(nonCompactAggregatingFullFrames));
  2450. auto nonFullFrames = ctx.NewList(pos, std::move(otherFrames));
  2451. auto expanded = ExpandNonCompactFullFrames(pos, inputList, keyColumns, sortTraits, fullFrames, sessionTraits, sessionColumns, ctx);
  2452. if (sessionTraits && !sessionTraits->IsCallable("Void")) {
  2453. return Build<TCoCalcOverSessionWindow>(ctx, pos)
  2454. .Input(expanded)
  2455. .Keys(keyColumns)
  2456. .SortSpec(sortTraits)
  2457. .Frames(nonFullFrames)
  2458. .SessionSpec(sessionTraits)
  2459. .SessionColumns(sessionColumns)
  2460. .Done().Ptr();
  2461. }
  2462. YQL_ENSURE(sessionColumns->ChildrenSize() == 0);
  2463. return Build<TCoCalcOverWindow>(ctx, pos)
  2464. .Input(expanded)
  2465. .Keys(keyColumns)
  2466. .SortSpec(sortTraits)
  2467. .Frames(nonFullFrames)
  2468. .Done().Ptr();
  2469. }
  2470. void SplitFramesByType(const TExprNode::TPtr& frames, TExprNode::TPtr& rowFrames, TExprNode::TPtr& rangeFrames, TExprNode::TPtr& groupFrames, TExprContext& ctx) {
  2471. TExprNodeList rows;
  2472. TExprNodeList range;
  2473. TExprNodeList groups;
  2474. for (auto& winOn : frames->ChildrenList()) {
  2475. if (TCoWinOnRows::Match(winOn.Get())) {
  2476. rows.push_back(std::move(winOn));
  2477. } else if (TCoWinOnRange::Match(winOn.Get())) {
  2478. range.push_back(std::move(winOn));
  2479. } else {
  2480. YQL_ENSURE(TCoWinOnGroups::Match(winOn.Get()));
  2481. groups.push_back(std::move(winOn));
  2482. }
  2483. }
  2484. rowFrames = ctx.NewList(frames->Pos(), std::move(rows));
  2485. rangeFrames = ctx.NewList(frames->Pos(), std::move(range));
  2486. groupFrames = ctx.NewList(frames->Pos(), std::move(groups));
  2487. }
  2488. const TStructExprType* ApplyFramesToType(const TStructExprType& inputType, const TStructExprType& finalOutputType, const TExprNode& frames, TExprContext& ctx) {
  2489. TVector<const TItemExprType*> resultItems = inputType.GetItems();
  2490. for (auto& frame : frames.ChildrenList()) {
  2491. YQL_ENSURE(TCoWinOnBase::Match(frame.Get()));
  2492. for (size_t i = 1; i < frame->ChildrenSize(); ++i) {
  2493. YQL_ENSURE(frame->Child(i)->IsList());
  2494. YQL_ENSURE(frame->Child(i)->Head().IsAtom());
  2495. TStringBuf column = frame->Child(i)->Head().Content();
  2496. const TTypeAnnotationNode* type = finalOutputType.FindItemType(column);
  2497. YQL_ENSURE(type);
  2498. resultItems.push_back(ctx.MakeType<TItemExprType>(column, type));
  2499. }
  2500. }
  2501. return ctx.MakeType<TStructExprType>(resultItems);
  2502. }
  2503. bool NeedPartitionRows(const TExprNode::TPtr& frames, const TStructExprType& rowType, TExprContext& ctx) {
  2504. if (frames->ChildrenSize() == 0) {
  2505. return false;
  2506. }
  2507. TCalcOverWindowTraits traits = ExtractCalcOverWindowTraits(frames, rowType, ctx);
  2508. for (const auto& item : traits.RawTraits) {
  2509. const TRawTrait& trait = item.second;
  2510. if (trait.CalculateLambda->IsCallable({"CumeDist","NTile","PercentRank"})) {
  2511. return true;
  2512. }
  2513. }
  2514. return false;
  2515. }
  2516. TString AllocatePartitionRowsColumn(const TStructExprType& rowType) {
  2517. ui64 index = 0;
  2518. for (;;) {
  2519. auto name = "_yql_partition_rows_" + ToString(index);
  2520. if (!rowType.FindItemType(name)) {
  2521. return name;
  2522. }
  2523. ++index;
  2524. }
  2525. }
  2526. TExprNode::TPtr AddPartitionRowsColumn(TPositionHandle pos, const TExprNode::TPtr& input, const TExprNode::TPtr& keyColumns,
  2527. const TString& columnName, TExprContext& ctx, TTypeAnnotationContext& types) {
  2528. auto exportsPtr = types.Modules->GetModule("/lib/yql/window.yql");
  2529. YQL_ENSURE(exportsPtr);
  2530. const auto& exports = exportsPtr->Symbols();
  2531. const auto ex = exports.find("count_traits_factory");
  2532. YQL_ENSURE(exports.cend() != ex);
  2533. TNodeOnNodeOwnedMap deepClones;
  2534. auto lambda = ctx.DeepCopy(*ex->second, exportsPtr->ExprCtx(), deepClones, true, false);
  2535. auto listTypeNode = ctx.NewCallable(pos, "TypeOf", {input});
  2536. auto extractor = ctx.Builder(pos)
  2537. .Lambda()
  2538. .Param("row")
  2539. .Callable("Void")
  2540. .Seal()
  2541. .Seal()
  2542. .Build();
  2543. auto traits = ctx.ReplaceNodes(lambda->TailPtr(), {
  2544. {lambda->Head().Child(0), listTypeNode},
  2545. {lambda->Head().Child(1), extractor}
  2546. });
  2547. ctx.Step.Repeat(TExprStep::ExpandApplyForLambdas);
  2548. auto status = ExpandApply(traits, traits, ctx);
  2549. YQL_ENSURE(status != IGraphTransformer::TStatus::Error);
  2550. return ctx.Builder(pos)
  2551. .Callable("CalcOverWindow")
  2552. .Add(0, input)
  2553. .Add(1, keyColumns)
  2554. .Callable(2, "Void")
  2555. .Seal()
  2556. .List(3)
  2557. .Callable(0, "WinOnRows")
  2558. .List(0)
  2559. .List(0)
  2560. .Atom(0, "begin")
  2561. .Callable(1, "Void")
  2562. .Seal()
  2563. .Seal()
  2564. .List(1)
  2565. .Atom(0, "end")
  2566. .Callable(1, "Void")
  2567. .Seal()
  2568. .Seal()
  2569. .Seal()
  2570. .List(1)
  2571. .Atom(0, columnName)
  2572. .Add(1, traits)
  2573. .Seal()
  2574. .Seal()
  2575. .Seal()
  2576. .Seal()
  2577. .Build();
  2578. }
  2579. TExprNode::TPtr RemovePartitionRowsColumn(TPositionHandle pos, const TExprNode::TPtr& input, const TString& columnName, TExprContext& ctx) {
  2580. return ctx.Builder(pos)
  2581. .Callable("Map")
  2582. .Add(0, input)
  2583. .Lambda(1)
  2584. .Param("row")
  2585. .Callable("RemoveMember")
  2586. .Arg(0, "row")
  2587. .Atom(1, columnName)
  2588. .Seal()
  2589. .Seal()
  2590. .Seal()
  2591. .Build();
  2592. }
  2593. TExprNode::TPtr ProcessRowsFrames(TPositionHandle pos, const TExprNode::TPtr& input, const TStructExprType& rowType, const TExprNode::TPtr& dependsOn,
  2594. const TExprNode::TPtr& frames, const TMaybe<TString>& partitionRowsColumn, TExprContext& ctx)
  2595. {
  2596. if (frames->ChildrenSize() == 0) {
  2597. return input;
  2598. }
  2599. TExprNode::TPtr processed = input;
  2600. TExprNode::TPtr dataQueue;
  2601. TQueueParams queueParams;
  2602. TVector<TChain1MapTraits::TPtr> traits = BuildFoldMapTraits(queueParams, frames, partitionRowsColumn, rowType, ctx);
  2603. if (queueParams.DataQueueNeeded) {
  2604. ui64 queueSize = (queueParams.DataOutpace == Max<ui64>()) ? Max<ui64>() : (queueParams.DataOutpace + queueParams.DataLag + 2);
  2605. dataQueue = BuildQueue(pos, rowType, queueSize, queueParams.DataLag, dependsOn, ctx);
  2606. processed = ctx.Builder(pos)
  2607. .Callable("PreserveStream")
  2608. .Add(0, processed)
  2609. .Add(1, dataQueue)
  2610. .Add(2, BuildUint64(pos, queueParams.DataOutpace, ctx))
  2611. .Seal()
  2612. .Build();
  2613. }
  2614. processed = ctx.Builder(pos)
  2615. .Callable("OrderedMap")
  2616. .Callable(0, "Chain1Map")
  2617. .Add(0, std::move(processed))
  2618. .Add(1, BuildChain1MapInitLambda(pos, traits, dataQueue, queueParams.LagQueueSize, queueParams.LagQueueItemType, ctx))
  2619. .Add(2, BuildChain1MapUpdateLambda(pos, traits, dataQueue, queueParams.LagQueueSize != 0, ctx))
  2620. .Seal()
  2621. .Lambda(1)
  2622. .Param("pair")
  2623. .Callable("Nth")
  2624. .Arg(0, "pair")
  2625. .Atom(1, "0", TNodeFlags::Default)
  2626. .Seal()
  2627. .Seal()
  2628. .Seal()
  2629. .Build();
  2630. return WrapWithWinContext(processed, ctx);
  2631. }
  2632. TExprNode::TPtr ProcessRangeFrames(TPositionHandle pos, const TExprNode::TPtr& input, const TStructExprType& rowType, const TExprNode::TPtr& sortKey, const TExprNode::TPtr& frames,
  2633. const TMaybe<TString>& partitionRowsColumn, TExprContext& ctx) {
  2634. if (frames->ChildrenSize() == 0) {
  2635. return input;
  2636. }
  2637. TExprNode::TPtr processed = input;
  2638. TQueueParams queueParams;
  2639. TVector<TChain1MapTraits::TPtr> traits = BuildFoldMapTraits(queueParams, frames, partitionRowsColumn, rowType, ctx);
  2640. YQL_ENSURE(!queueParams.DataQueueNeeded);
  2641. YQL_ENSURE(queueParams.LagQueueSize == 0);
  2642. YQL_ENSURE(queueParams.LagQueueItemType == nullptr);
  2643. // same processing as in WinOnRows
  2644. processed = ctx.Builder(pos)
  2645. .Callable("OrderedMap")
  2646. .Callable(0, "Chain1Map")
  2647. .Add(0, std::move(processed))
  2648. .Add(1, BuildChain1MapInitLambda(pos, traits, nullptr, 0, nullptr, ctx))
  2649. .Add(2, BuildChain1MapUpdateLambda(pos, traits, nullptr, false, ctx))
  2650. .Seal()
  2651. .Lambda(1)
  2652. .Param("pair")
  2653. .Callable("Nth")
  2654. .Arg(0, "pair")
  2655. .Atom(1, "0", TNodeFlags::Default)
  2656. .Seal()
  2657. .Seal()
  2658. .Seal()
  2659. .Build();
  2660. processed = WrapWithWinContext(processed, ctx);
  2661. TExprNode::TPtr sortKeyLambda = sortKey;
  2662. if (sortKey->IsCallable("Void")) {
  2663. sortKeyLambda = ctx.Builder(sortKey->Pos())
  2664. .Lambda()
  2665. .Param("row")
  2666. .Callable("Void")
  2667. .Seal()
  2668. .Seal()
  2669. .Build();
  2670. }
  2671. auto processedItemType = ctx.Builder(pos)
  2672. .Callable("StreamItemType")
  2673. .Callable(0, "TypeOf")
  2674. .Add(0, processed)
  2675. .Seal()
  2676. .Seal()
  2677. .Build();
  2678. auto variantType = ctx.Builder(pos)
  2679. .Callable("VariantType")
  2680. .Callable(0, "StructType")
  2681. .List(0)
  2682. .Atom(0, "singleRow", TNodeFlags::Default)
  2683. .Add(1, processedItemType)
  2684. .Seal()
  2685. .List(1)
  2686. .Atom(0, "group", TNodeFlags::Default)
  2687. .Callable(1, "ListType")
  2688. .Add(0, processedItemType)
  2689. .Seal()
  2690. .Seal()
  2691. .Seal()
  2692. .Seal()
  2693. .Build();
  2694. // split rows by groups with equal sortKey
  2695. processed = ctx.Builder(pos)
  2696. .Callable("Condense1")
  2697. .Add(0, processed)
  2698. .Lambda(1)
  2699. .Param("row")
  2700. .List()
  2701. .Apply(0, sortKeyLambda)
  2702. .With(0, "row")
  2703. .Seal()
  2704. .Callable(1, "Variant")
  2705. .Arg(0, "row")
  2706. .Atom(1, "singleRow", TNodeFlags::Default)
  2707. .Add(2, variantType)
  2708. .Seal()
  2709. .Seal()
  2710. .Seal()
  2711. .Lambda(2)
  2712. .Param("row")
  2713. .Param("state")
  2714. .Callable(0, "AggrNotEquals")
  2715. .Apply(0, sortKeyLambda)
  2716. .With(0, "row")
  2717. .Seal()
  2718. .Callable(1, "Nth")
  2719. .Arg(0, "state")
  2720. .Atom(1, "0", TNodeFlags::Default)
  2721. .Seal()
  2722. .Seal()
  2723. .Seal()
  2724. .Lambda(3)
  2725. .Param("row")
  2726. .Param("state")
  2727. .List()
  2728. .Callable(0, "Nth")
  2729. .Arg(0, "state")
  2730. .Atom(1, "0", TNodeFlags::Default)
  2731. .Seal()
  2732. .Callable(1, "Visit")
  2733. .Callable(0, "Nth")
  2734. .Arg(0, "state")
  2735. .Atom(1, "1", TNodeFlags::Default)
  2736. .Seal()
  2737. .Atom(1, "singleRow", TNodeFlags::Default)
  2738. .Lambda(2)
  2739. .Param("singleRow")
  2740. .Callable(0, "Variant")
  2741. .Callable(0, "AsList")
  2742. .Arg(0, "singleRow")
  2743. .Arg(1, "row")
  2744. .Seal()
  2745. .Atom(1, "group", TNodeFlags::Default)
  2746. .Add(2, variantType)
  2747. .Seal()
  2748. .Seal()
  2749. .Atom(3, "group", TNodeFlags::Default)
  2750. .Lambda(4)
  2751. .Param("group")
  2752. .Callable(0, "Variant")
  2753. .Callable(0, "Insert")
  2754. .Arg(0, "group")
  2755. .Arg(1, "row")
  2756. .Seal()
  2757. .Atom(1, "group", TNodeFlags::Default)
  2758. .Add(2, variantType)
  2759. .Seal()
  2760. .Seal()
  2761. .Seal()
  2762. .Seal()
  2763. .Seal()
  2764. .Seal()
  2765. .Build();
  2766. processed = ctx.Builder(pos)
  2767. .Callable("OrderedMap")
  2768. .Add(0, processed)
  2769. .Lambda(1)
  2770. .Param("item")
  2771. .Callable(0, "Nth")
  2772. .Arg(0, "item")
  2773. .Atom(1, "1", TNodeFlags::Default)
  2774. .Seal()
  2775. .Seal()
  2776. .Seal()
  2777. .Build();
  2778. auto lastRowArg = ctx.NewArgument(pos, "lastRow");
  2779. auto currentRowArg = ctx.NewArgument(pos, "currentRow");
  2780. auto currentRow = currentRowArg;
  2781. for (auto& trait : traits) {
  2782. TStringBuf name = trait->GetName();
  2783. currentRow = ctx.Builder(pos)
  2784. .Callable("AddMember")
  2785. .Callable(0, "RemoveMember")
  2786. .Add(0, currentRow)
  2787. .Atom(1, name)
  2788. .Seal()
  2789. .Atom(1, name)
  2790. .Callable(2, "Member")
  2791. .Add(0, lastRowArg)
  2792. .Atom(1, name)
  2793. .Seal()
  2794. .Seal()
  2795. .Build();
  2796. }
  2797. auto overwriteWithLastRowLambda = ctx.NewLambda(pos, ctx.NewArguments(pos, { currentRowArg, lastRowArg }), std::move(currentRow));
  2798. // processed is currently stream of groups (=Variant<row, List<row>>>) with equal sort keys
  2799. processed = ctx.Builder(pos)
  2800. .Callable("OrderedFlatMap")
  2801. .Add(0, processed)
  2802. .Lambda(1)
  2803. .Param("item")
  2804. .Callable("Visit")
  2805. .Arg(0, "item")
  2806. .Atom(1, "singleRow", TNodeFlags::Default)
  2807. .Lambda(2)
  2808. .Param("singleRow")
  2809. .Callable(0, "AsList")
  2810. .Arg(0, "singleRow")
  2811. .Seal()
  2812. .Seal()
  2813. .Atom(3, "group", TNodeFlags::Default)
  2814. .Lambda(4)
  2815. .Param("group")
  2816. .Callable("Coalesce")
  2817. .Callable(0, "Map")
  2818. .Callable(0, "Last")
  2819. .Arg(0, "group")
  2820. .Seal()
  2821. .Lambda(1)
  2822. .Param("lastRow")
  2823. .Callable("OrderedMap")
  2824. .Arg(0, "group")
  2825. .Lambda(1)
  2826. .Param("currentRow")
  2827. .Apply(overwriteWithLastRowLambda)
  2828. .With(0, "currentRow")
  2829. .With(1, "lastRow")
  2830. .Seal()
  2831. .Seal()
  2832. .Seal()
  2833. .Seal()
  2834. .Seal()
  2835. .Callable(1, "EmptyList")
  2836. .Seal()
  2837. .Seal()
  2838. .Seal()
  2839. .Seal()
  2840. .Seal()
  2841. .Seal()
  2842. .Build();
  2843. return processed;
  2844. }
  2845. TExprNode::TPtr ExpandSingleCalcOverWindow(TPositionHandle pos, const TExprNode::TPtr& inputList, const TExprNode::TPtr& keyColumns,
  2846. const TExprNode::TPtr& sortTraits, const TExprNode::TPtr& frames, const TExprNode::TPtr& sessionTraits,
  2847. const TExprNode::TPtr& sessionColumns, const TStructExprType& outputRowType, TExprContext& ctx, TTypeAnnotationContext& types)
  2848. {
  2849. if (auto expanded = TryExpandNonCompactFullFrames(pos, inputList, keyColumns, sortTraits, frames, sessionTraits, sessionColumns, ctx)) {
  2850. YQL_CLOG(INFO, Core) << "Expanded non-compact CalcOverWindow";
  2851. return expanded;
  2852. }
  2853. TExprNode::TPtr sessionKey;
  2854. TExprNode::TPtr sessionSortTraits;
  2855. const TTypeAnnotationNode* sessionKeyType = nullptr;
  2856. const TTypeAnnotationNode* sessionParamsType = nullptr;
  2857. TExprNode::TPtr sessionInit;
  2858. TExprNode::TPtr sessionUpdate;
  2859. ExtractSessionWindowParams(pos, sessionTraits, sessionKey, sessionKeyType, sessionParamsType, sessionSortTraits, sessionInit, sessionUpdate, ctx);
  2860. const auto originalRowType = inputList->GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
  2861. TVector<const TItemExprType*> rowItems = originalRowType->GetItems();
  2862. if (sessionKeyType) {
  2863. YQL_ENSURE(sessionParamsType);
  2864. rowItems.push_back(ctx.MakeType<TItemExprType>(SessionStartMemberName, sessionKeyType));
  2865. rowItems.push_back(ctx.MakeType<TItemExprType>(SessionParamsMemberName, sessionParamsType));
  2866. }
  2867. auto rowType = ctx.MakeType<TStructExprType>(rowItems);
  2868. auto keySelector = BuildKeySelector(pos, *rowType->Cast<TStructExprType>(), keyColumns, ctx);
  2869. TExprNode::TPtr sortKey;
  2870. TExprNode::TPtr sortOrder;
  2871. ExtractSortKeyAndOrder(pos, sortTraits, sortKey, sortOrder, ctx);
  2872. const TExprNode::TPtr originalSortKey = sortKey;
  2873. TExprNode::TPtr input = inputList;
  2874. const auto commonSortTraits = DeduceCompatibleSort(sortTraits, sessionSortTraits);
  2875. ExtractSortKeyAndOrder(pos, commonSortTraits ? commonSortTraits : sortTraits, sortKey, sortOrder, ctx);
  2876. auto fullKeyColumns = keyColumns;
  2877. if (!commonSortTraits) {
  2878. YQL_ENSURE(sessionKey);
  2879. YQL_ENSURE(sessionInit);
  2880. YQL_ENSURE(sessionUpdate);
  2881. TExprNode::TPtr sessionSortKey;
  2882. TExprNode::TPtr sessionSortOrder;
  2883. ExtractSortKeyAndOrder(pos, sessionSortTraits, sessionSortKey, sessionSortOrder, ctx);
  2884. input = ctx.Builder(pos)
  2885. .Callable("PartitionsByKeys")
  2886. .Add(0, input)
  2887. .Add(1, keySelector)
  2888. .Add(2, sessionSortOrder)
  2889. .Add(3, sessionSortKey)
  2890. .Lambda(4)
  2891. .Param("partitionedStream")
  2892. .Apply(AddSessionParamsMemberLambda(pos, SessionStartMemberName, SessionParamsMemberName, keySelector, sessionKey, sessionInit, sessionUpdate, ctx))
  2893. .With(0, "partitionedStream")
  2894. .Seal()
  2895. .Seal()
  2896. .Seal()
  2897. .Build();
  2898. TExprNodeList keyColumnsList = keyColumns->ChildrenList();
  2899. keyColumnsList.push_back(ctx.NewAtom(pos, SessionStartMemberName));
  2900. auto keyColumnsWithSessionStart = ctx.NewList(pos, std::move(keyColumnsList));
  2901. fullKeyColumns = keyColumnsWithSessionStart;
  2902. keySelector = BuildKeySelector(pos, *rowType, keyColumnsWithSessionStart, ctx);
  2903. sessionKey = sessionInit = sessionUpdate = {};
  2904. }
  2905. TExprNode::TPtr rowsFrames;
  2906. TExprNode::TPtr rangeFrames;
  2907. TExprNode::TPtr groupsFrames;
  2908. SplitFramesByType(frames, rowsFrames, rangeFrames, groupsFrames, ctx);
  2909. YQL_ENSURE(groupsFrames->ChildrenSize() == 0);
  2910. auto topLevelStreamArg = ctx.NewArgument(pos, "stream");
  2911. TExprNode::TPtr processed = topLevelStreamArg;
  2912. TMaybe<TString> partitionRowsColumn;
  2913. if (NeedPartitionRows(frames, *rowType, ctx)) {
  2914. partitionRowsColumn = AllocatePartitionRowsColumn(outputRowType);
  2915. input = AddPartitionRowsColumn(pos, input, fullKeyColumns, *partitionRowsColumn, ctx, types);
  2916. }
  2917. // All RANGE frames (even simplest RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
  2918. // will require additional memory to store TableRow()'s - so we want to start with minimum size of row
  2919. // (i.e. process range frames first)
  2920. processed = ProcessRangeFrames(pos, processed, *rowType, originalSortKey, rangeFrames, partitionRowsColumn, ctx);
  2921. rowType = ApplyFramesToType(*rowType, outputRowType, *rangeFrames, ctx);
  2922. processed = ProcessRowsFrames(pos, processed, *rowType, topLevelStreamArg, rowsFrames, partitionRowsColumn, ctx);
  2923. auto topLevelStreamProcessingLambda = ctx.NewLambda(pos, ctx.NewArguments(pos, {topLevelStreamArg}), std::move(processed));
  2924. YQL_CLOG(INFO, Core) << "Expanded compact CalcOverWindow";
  2925. auto res = BuildPartitionsByKeys(pos, input, keySelector, sortOrder, sortKey, topLevelStreamProcessingLambda, sessionKey,
  2926. sessionInit, sessionUpdate, sessionColumns, ctx);
  2927. if (partitionRowsColumn) {
  2928. res = RemovePartitionRowsColumn(pos, res, *partitionRowsColumn, ctx);
  2929. }
  2930. return res;
  2931. }
  2932. } // namespace
  2933. TExprNode::TPtr ExpandCalcOverWindow(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
  2934. YQL_ENSURE(node->IsCallable({"CalcOverWindow", "CalcOverSessionWindow", "CalcOverWindowGroup"}));
  2935. auto input = node->ChildPtr(0);
  2936. auto calcs = ExtractCalcsOverWindow(node, ctx);
  2937. if (calcs.empty()) {
  2938. return input;
  2939. }
  2940. TCoCalcOverWindowTuple calc(calcs.front());
  2941. if (calc.Frames().Size() != 0 || calc.SessionColumns().Size() != 0) {
  2942. const TStructExprType& outputRowType = *node->GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
  2943. input = ExpandSingleCalcOverWindow(node->Pos(), input, calc.Keys().Ptr(), calc.SortSpec().Ptr(), calc.Frames().Ptr(),
  2944. calc.SessionSpec().Ptr(), calc.SessionColumns().Ptr(), outputRowType, ctx, types);
  2945. }
  2946. calcs.erase(calcs.begin());
  2947. return RebuildCalcOverWindowGroup(node->Pos(), input, calcs, ctx);
  2948. }
  2949. TExprNodeList ExtractCalcsOverWindow(const TExprNodePtr& node, TExprContext& ctx) {
  2950. TExprNodeList result;
  2951. if (auto maybeBase = TMaybeNode<TCoCalcOverWindowBase>(node)) {
  2952. TCoCalcOverWindowBase self(maybeBase.Cast());
  2953. TExprNode::TPtr sessionSpec;
  2954. TExprNode::TPtr sessionColumns;
  2955. if (auto session = TMaybeNode<TCoCalcOverSessionWindow>(node)) {
  2956. sessionSpec = session.Cast().SessionSpec().Ptr();
  2957. sessionColumns = session.Cast().SessionColumns().Ptr();
  2958. } else {
  2959. sessionSpec = ctx.NewCallable(node->Pos(), "Void", {});
  2960. sessionColumns = ctx.NewList(node->Pos(), {});
  2961. }
  2962. result.emplace_back(
  2963. Build<TCoCalcOverWindowTuple>(ctx, node->Pos())
  2964. .Keys(self.Keys())
  2965. .SortSpec(self.SortSpec())
  2966. .Frames(self.Frames())
  2967. .SessionSpec(sessionSpec)
  2968. .SessionColumns(sessionColumns)
  2969. .Done().Ptr()
  2970. );
  2971. } else {
  2972. result = TMaybeNode<TCoCalcOverWindowGroup>(node).Cast().Calcs().Ref().ChildrenList();
  2973. }
  2974. return result;
  2975. }
  2976. TExprNode::TPtr RebuildCalcOverWindowGroup(TPositionHandle pos, const TExprNode::TPtr& input, const TExprNodeList& calcs, TExprContext& ctx) {
  2977. auto inputType = ctx.Builder(input->Pos())
  2978. .Callable("TypeOf")
  2979. .Add(0, input)
  2980. .Seal()
  2981. .Build();
  2982. auto inputItemType = ctx.Builder(input->Pos())
  2983. .Callable("ListItemType")
  2984. .Add(0, inputType)
  2985. .Seal()
  2986. .Build();
  2987. TExprNodeList fixedCalcs;
  2988. for (auto calcNode : calcs) {
  2989. TCoCalcOverWindowTuple calc(calcNode);
  2990. auto sortSpec = calc.SortSpec().Ptr();
  2991. if (sortSpec->IsCallable("SortTraits")) {
  2992. sortSpec = ctx.Builder(sortSpec->Pos())
  2993. .Callable("SortTraits")
  2994. .Add(0, inputType)
  2995. .Add(1, sortSpec->ChildPtr(1))
  2996. .Add(2, ctx.DeepCopyLambda(*sortSpec->Child(2)))
  2997. .Seal()
  2998. .Build();
  2999. } else {
  3000. YQL_ENSURE(sortSpec->IsCallable("Void"));
  3001. }
  3002. auto sessionSpec = calc.SessionSpec().Ptr();
  3003. if (sessionSpec->IsCallable("SessionWindowTraits")) {
  3004. TCoSessionWindowTraits traits(sessionSpec);
  3005. auto sessionSortSpec = traits.SortSpec().Ptr();
  3006. if (auto maybeSort = TMaybeNode<TCoSortTraits>(sessionSortSpec)) {
  3007. sessionSortSpec = Build<TCoSortTraits>(ctx, sessionSortSpec->Pos())
  3008. .ListType(inputType)
  3009. .SortDirections(maybeSort.Cast().SortDirections())
  3010. .SortKeySelectorLambda(ctx.DeepCopyLambda(maybeSort.Cast().SortKeySelectorLambda().Ref()))
  3011. .Done().Ptr();
  3012. } else {
  3013. YQL_ENSURE(sessionSortSpec->IsCallable("Void"));
  3014. }
  3015. sessionSpec = Build<TCoSessionWindowTraits>(ctx, traits.Pos())
  3016. .ListType(inputType)
  3017. .SortSpec(sessionSortSpec)
  3018. .InitState(ctx.DeepCopyLambda(traits.InitState().Ref()))
  3019. .UpdateState(ctx.DeepCopyLambda(traits.UpdateState().Ref()))
  3020. .Calculate(ctx.DeepCopyLambda(traits.Calculate().Ref()))
  3021. .Done().Ptr();
  3022. } else {
  3023. YQL_ENSURE(sessionSpec->IsCallable("Void"));
  3024. }
  3025. auto sessionColumns = calc.SessionColumns().Ptr();
  3026. TExprNodeList newFrames;
  3027. for (auto frameNode : calc.Frames().Ref().Children()) {
  3028. YQL_ENSURE(TCoWinOnBase::Match(frameNode.Get()));
  3029. TExprNodeList winOnArgs = { frameNode->ChildPtr(0) };
  3030. for (ui32 i = 1; i < frameNode->ChildrenSize(); ++i) {
  3031. auto kvTuple = frameNode->ChildPtr(i);
  3032. YQL_ENSURE(kvTuple->IsList());
  3033. YQL_ENSURE(2 <= kvTuple->ChildrenSize() && kvTuple->ChildrenSize() <= 3);
  3034. auto columnName = kvTuple->ChildPtr(0);
  3035. auto traits = kvTuple->ChildPtr(1);
  3036. YQL_ENSURE(traits->IsCallable({"Lag", "Lead", "RowNumber", "Rank", "DenseRank", "WindowTraits", "PercentRank", "CumeDist", "NTile"}));
  3037. if (traits->IsCallable("WindowTraits")) {
  3038. bool isDistinct = kvTuple->ChildrenSize() == 3;
  3039. if (!isDistinct) {
  3040. YQL_ENSURE(traits->Head().GetTypeAnn());
  3041. const TTypeAnnotationNode& oldItemType = *traits->Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
  3042. traits = ctx.Builder(traits->Pos())
  3043. .Callable(traits->Content())
  3044. .Add(0, inputItemType)
  3045. .Add(1, ctx.DeepCopyLambda(*ReplaceFirstLambdaArgWithCastStruct(*traits->Child(1), oldItemType, ctx)))
  3046. .Add(2, ctx.DeepCopyLambda(*ReplaceFirstLambdaArgWithCastStruct(*traits->Child(2), oldItemType, ctx)))
  3047. .Add(3, ctx.DeepCopyLambda(*ReplaceFirstLambdaArgWithCastStruct(*traits->Child(3), oldItemType, ctx)))
  3048. .Add(4, ctx.DeepCopyLambda(*traits->Child(4)))
  3049. .Add(5, traits->Child(5)->IsLambda() ? ctx.DeepCopyLambda(*traits->Child(5)) : traits->ChildPtr(5))
  3050. .Seal()
  3051. .Build();
  3052. }
  3053. } else if (traits->IsCallable({"Lag", "Lead", "Rank", "DenseRank", "PercentRank"})) {
  3054. YQL_ENSURE(traits->Head().GetTypeAnn());
  3055. const TTypeAnnotationNode& oldItemType = *traits->Head().GetTypeAnn()->Cast<TTypeExprType>()->GetType()
  3056. ->Cast<TListExprType>()->GetItemType();
  3057. traits = ctx.ChangeChild(*traits, 1, ctx.DeepCopyLambda(*ReplaceFirstLambdaArgWithCastStruct(*traits->Child(1), oldItemType, ctx)));
  3058. }
  3059. winOnArgs.push_back(ctx.ChangeChild(*kvTuple, 1, std::move(traits)));
  3060. }
  3061. newFrames.push_back(ctx.ChangeChildren(*frameNode, std::move(winOnArgs)));
  3062. }
  3063. fixedCalcs.push_back(
  3064. Build<TCoCalcOverWindowTuple>(ctx, calc.Pos())
  3065. .Keys(calc.Keys())
  3066. .SortSpec(sortSpec)
  3067. .Frames(ctx.NewList(calc.Frames().Pos(), std::move(newFrames)))
  3068. .SessionSpec(sessionSpec)
  3069. .SessionColumns(sessionColumns)
  3070. .Done().Ptr()
  3071. );
  3072. }
  3073. return Build<TCoCalcOverWindowGroup>(ctx, pos)
  3074. .Input(input)
  3075. .Calcs(ctx.NewList(pos, std::move(fixedCalcs)))
  3076. .Done().Ptr();
  3077. }
  3078. bool IsUnbounded(const NNodes::TCoFrameBound& bound) {
  3079. if (bound.Ref().ChildrenSize() < 2) {
  3080. return false;
  3081. }
  3082. if (auto maybeAtom = bound.Bound().Maybe<TCoAtom>()) {
  3083. return maybeAtom.Cast().Value() == "unbounded";
  3084. }
  3085. return false;
  3086. }
  3087. bool IsCurrentRow(const NNodes::TCoFrameBound& bound) {
  3088. return bound.Setting().Value() == "currentRow";
  3089. }
  3090. TWindowFrameSettings TWindowFrameSettings::Parse(const TExprNode& node, TExprContext& ctx) {
  3091. auto maybeSettings = TryParse(node, ctx);
  3092. YQL_ENSURE(maybeSettings);
  3093. return *maybeSettings;
  3094. }
  3095. TMaybe<TWindowFrameSettings> TWindowFrameSettings::TryParse(const TExprNode& node, TExprContext& ctx) {
  3096. TWindowFrameSettings settings;
  3097. if (node.IsCallable("WinOnRows")) {
  3098. settings.Type = EFrameType::FrameByRows;
  3099. } else if (node.IsCallable("WinOnRange")) {
  3100. settings.Type = EFrameType::FrameByRange;
  3101. } else {
  3102. YQL_ENSURE(node.IsCallable("WinOnGroups"));
  3103. settings.Type = EFrameType::FrameByGroups;
  3104. }
  3105. auto frameSpec = node.Child(0);
  3106. if (frameSpec->Type() == TExprNode::List) {
  3107. bool hasBegin = false;
  3108. bool hasEnd = false;
  3109. for (const auto& setting : frameSpec->Children()) {
  3110. if (!EnsureTupleMinSize(*setting, 1, ctx)) {
  3111. return {};
  3112. }
  3113. if (!EnsureAtom(setting->Head(), ctx)) {
  3114. return {};
  3115. }
  3116. const auto settingName = setting->Head().Content();
  3117. if (settingName != "begin" && settingName != "end" && settingName != "compact") {
  3118. ctx.AddError(
  3119. TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Invalid frame bound '" << settingName << "'"));
  3120. return {};
  3121. }
  3122. if (settingName == "compact") {
  3123. settings.Compact = true;
  3124. continue;
  3125. }
  3126. if (!EnsureTupleSize(*setting, 2, ctx)) {
  3127. return {};
  3128. }
  3129. bool& hasBound = (settingName == "begin") ? hasBegin : hasEnd;
  3130. if (hasBound) {
  3131. ctx.AddError(
  3132. TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Duplicate " << settingName << " frame bound detected"));
  3133. return {};
  3134. }
  3135. hasBound = true;
  3136. TMaybe<i32>& boundOffset = (settingName == "begin") ? settings.FirstOffset : settings.LastOffset;
  3137. TExprNode::TPtr& frameBound = (settingName == "begin") ? settings.First : settings.Last;
  3138. if (setting->Tail().IsList()) {
  3139. TExprNode::TPtr fb = setting->TailPtr();
  3140. if (!EnsureTupleMinSize(*fb, 1, ctx)) {
  3141. return {};
  3142. }
  3143. if (!EnsureAtom(fb->Head(), ctx)) {
  3144. return {};
  3145. }
  3146. auto type = fb->Head().Content();
  3147. if (type == "currentRow") {
  3148. if (fb->ChildrenSize() == 1) {
  3149. if (!node.IsCallable("WinOnRange")) {
  3150. ctx.AddError(TIssue(ctx.GetPosition(fb->Pos()), TStringBuilder() << "currentRow should only be used for RANGE"));
  3151. return {};
  3152. }
  3153. frameBound = fb;
  3154. continue;
  3155. }
  3156. ctx.AddError(TIssue(ctx.GetPosition(fb->Pos()), TStringBuilder() << "Expecting no value for '" << type << "'"));
  3157. return {};
  3158. }
  3159. if (!(type == "preceding" || type == "following")) {
  3160. ctx.AddError(TIssue(ctx.GetPosition(fb->Pos()), TStringBuilder() << "Expecting preceding or following, but got '" << type << "'"));
  3161. return {};
  3162. }
  3163. if (!EnsureTupleSize(*fb, 2, ctx)) {
  3164. return {};
  3165. }
  3166. auto boundValue = fb->ChildPtr(1);
  3167. if (boundValue->IsAtom()) {
  3168. if (boundValue->Content() == "unbounded") {
  3169. frameBound = fb;
  3170. continue;
  3171. }
  3172. ctx.AddError(TIssue(ctx.GetPosition(fb->Pos()), TStringBuilder() << "Expecting unbounded, but got '" << boundValue->Content() << "'"));
  3173. return {};
  3174. }
  3175. if (node.IsCallable({"WinOnRows", "WinOnGroups"})) {
  3176. if (!EnsureDataType(*boundValue, ctx)) {
  3177. return {};
  3178. }
  3179. auto slot = boundValue->GetTypeAnn()->Cast<TDataExprType>()->GetSlot();
  3180. bool groups = node.IsCallable("WinOnGroups");
  3181. if (!IsDataTypeIntegral(slot)) {
  3182. ctx.AddError(TIssue(ctx.GetPosition(boundValue->Pos()),
  3183. TStringBuilder() << "Expecting integral values for " << (groups ? "GROUPS" : "ROWS") << " but got " << *boundValue->GetTypeAnn()));
  3184. return {};
  3185. }
  3186. if (!groups) {
  3187. auto maybeIntLiteral = TMaybeNode<TCoIntegralCtor>(boundValue);
  3188. if (!maybeIntLiteral) {
  3189. // TODO: this is not strictly necessary, and only needed for current implementation via Queue
  3190. ctx.AddError(TIssue(ctx.GetPosition(boundValue->Pos()),
  3191. TStringBuilder() << "Expecting literal values for ROWS"));
  3192. return {};
  3193. }
  3194. auto strLiteralValue = maybeIntLiteral.Cast().Literal().Value();
  3195. if (strLiteralValue.StartsWith("-")) {
  3196. ctx.AddError(TIssue(ctx.GetPosition(boundValue->Pos()),
  3197. TStringBuilder() << "Expecting positive literal values for ROWS, but got " << strLiteralValue));
  3198. return {};
  3199. }
  3200. ui64 literalValue = FromString<ui64>(strLiteralValue);
  3201. if (literalValue > std::numeric_limits<i32>::max()) {
  3202. ctx.AddError(TIssue(ctx.GetPosition(boundValue->Pos()),
  3203. TStringBuilder() << "ROWS offset too big: " << strLiteralValue << ", maximum is " << std::numeric_limits<i32>::max()));
  3204. return {};
  3205. }
  3206. i32 castedValue = (i32)literalValue;
  3207. if (type == "preceding") {
  3208. castedValue = -castedValue;
  3209. }
  3210. boundOffset = castedValue;
  3211. }
  3212. } else if (!EnsureComparableType(boundValue->Pos(), *boundValue->GetTypeAnn(), ctx)) {
  3213. return {};
  3214. }
  3215. frameBound = fb;
  3216. } else if (setting->Tail().IsCallable("Int32")) {
  3217. auto& valNode = setting->Tail().Head();
  3218. YQL_ENSURE(valNode.IsAtom());
  3219. i32 value;
  3220. YQL_ENSURE(TryFromString(valNode.Content(), value));
  3221. boundOffset = value;
  3222. } else if (!setting->Tail().IsCallable("Void")) {
  3223. const TTypeAnnotationNode* type = setting->Tail().GetTypeAnn();
  3224. TStringBuilder errMsg;
  3225. if (!type) {
  3226. errMsg << "lambda";
  3227. } else if (setting->Tail().IsCallable()) {
  3228. errMsg << setting->Tail().Content() << " with type " << *type;
  3229. } else {
  3230. errMsg << *type;
  3231. }
  3232. ctx.AddError(TIssue(ctx.GetPosition(setting->Tail().Pos()),
  3233. TStringBuilder() << "Invalid " << settingName << " frame bound - expecting Void or Int32 callable, but got: " << errMsg));
  3234. return {};
  3235. }
  3236. }
  3237. if (!hasBegin || !hasEnd) {
  3238. ctx.AddError(TIssue(ctx.GetPosition(frameSpec->Pos()),
  3239. TStringBuilder() << "Missing " << (!hasBegin ? "begin" : "end") << " bound in frame definition"));
  3240. return {};
  3241. }
  3242. } else if (frameSpec->IsCallable("Void")) {
  3243. settings.FirstOffset = {};
  3244. settings.LastOffset = 0;
  3245. } else {
  3246. const TTypeAnnotationNode* type = frameSpec->GetTypeAnn();
  3247. ctx.AddError(TIssue(ctx.GetPosition(frameSpec->Pos()),
  3248. TStringBuilder() << "Invalid window frame - expecting Tuple or Void, but got: " << (type ? FormatType(type) : "lambda")));
  3249. return {};
  3250. }
  3251. // frame will always contain rows if it includes current row
  3252. if (!settings.FirstOffset) {
  3253. settings.NeverEmpty = !settings.LastOffset.Defined() || *settings.LastOffset >= 0;
  3254. } else if (!settings.LastOffset.Defined()) {
  3255. settings.NeverEmpty = !settings.FirstOffset.Defined() || *settings.FirstOffset <= 0;
  3256. } else {
  3257. settings.NeverEmpty = *settings.FirstOffset <= *settings.LastOffset && *settings.FirstOffset <= 0 && *settings.LastOffset >= 0;
  3258. }
  3259. return settings;
  3260. }
  3261. TMaybe<i32> TWindowFrameSettings::GetFirstOffset() const {
  3262. YQL_ENSURE(Type == FrameByRows);
  3263. return FirstOffset;
  3264. }
  3265. TMaybe<i32> TWindowFrameSettings::GetLastOffset() const {
  3266. YQL_ENSURE(Type == FrameByRows);
  3267. return LastOffset;
  3268. }
  3269. TCoFrameBound TWindowFrameSettings::GetFirst() const {
  3270. YQL_ENSURE(First);
  3271. return TCoFrameBound(First);
  3272. }
  3273. TCoFrameBound TWindowFrameSettings::GetLast() const {
  3274. YQL_ENSURE(Last);
  3275. return TCoFrameBound(Last);
  3276. }
  3277. TExprNode::TPtr ZipWithSessionParamsLambda(TPositionHandle pos, const TExprNode::TPtr& partitionKeySelector,
  3278. const TExprNode::TPtr& sessionKeySelector, const TExprNode::TPtr& sessionInit,
  3279. const TExprNode::TPtr& sessionUpdate, TExprContext& ctx)
  3280. {
  3281. auto extractTupleItem = [&](ui32 idx) {
  3282. return ctx.Builder(pos)
  3283. .Lambda()
  3284. .Param("tuple")
  3285. .Callable("Nth")
  3286. .Arg(0, "tuple")
  3287. .Atom(1, ToString(idx), TNodeFlags::Default)
  3288. .Seal()
  3289. .Seal()
  3290. .Build();
  3291. };
  3292. auto initLambda = ctx.Builder(pos)
  3293. .Lambda()
  3294. .Param("row")
  3295. .List() // row, sessionKey, sessionState, partitionKey
  3296. .Arg(0, "row")
  3297. .Apply(1, sessionKeySelector)
  3298. .With(0, "row")
  3299. .With(1)
  3300. .Apply(sessionInit)
  3301. .With(0, "row")
  3302. .Seal()
  3303. .Done()
  3304. .Seal()
  3305. .Apply(2, sessionInit)
  3306. .With(0, "row")
  3307. .Seal()
  3308. .Apply(3, partitionKeySelector)
  3309. .With(0, "row")
  3310. .Seal()
  3311. .Seal()
  3312. .Seal()
  3313. .Build();
  3314. auto newPartitionLambda = ctx.Builder(pos)
  3315. .Lambda()
  3316. .Param("row")
  3317. .Param("prevBigState")
  3318. .Callable("AggrNotEquals")
  3319. .Apply(0, partitionKeySelector)
  3320. .With(0, "row")
  3321. .Seal()
  3322. .Apply(1, partitionKeySelector)
  3323. .With(0)
  3324. .Apply(extractTupleItem(0))
  3325. .With(0, "prevBigState")
  3326. .Seal()
  3327. .Done()
  3328. .Seal()
  3329. .Seal()
  3330. .Seal()
  3331. .Build();
  3332. auto newSessionOrUpdatedStateLambda = [&](bool newSession) {
  3333. return ctx.Builder(pos)
  3334. .Lambda()
  3335. .Param("row")
  3336. .Param("prevBigState")
  3337. .Apply(extractTupleItem(newSession ? 0 : 1))
  3338. .With(0)
  3339. .Apply(sessionUpdate)
  3340. .With(0, "row")
  3341. .With(1)
  3342. .Apply(extractTupleItem(2))
  3343. .With(0, "prevBigState")
  3344. .Seal()
  3345. .Done()
  3346. .Seal()
  3347. .Done()
  3348. .Seal()
  3349. .Seal()
  3350. .Build();
  3351. };
  3352. return ctx.Builder(pos)
  3353. .Lambda()
  3354. .Param("input")
  3355. .Callable("Chain1Map")
  3356. .Arg(0, "input")
  3357. .Add(1, initLambda)
  3358. .Lambda(2)
  3359. .Param("row")
  3360. .Param("prevBigState")
  3361. .Callable("If")
  3362. .Apply(0, newPartitionLambda)
  3363. .With(0, "row")
  3364. .With(1, "prevBigState")
  3365. .Seal()
  3366. .Apply(1, initLambda)
  3367. .With(0, "row")
  3368. .Seal()
  3369. .List(2)
  3370. .Arg(0, "row")
  3371. .Callable(1, "If")
  3372. .Apply(0, newSessionOrUpdatedStateLambda(/* newSession = */ true))
  3373. .With(0, "row")
  3374. .With(1, "prevBigState")
  3375. .Seal()
  3376. .Apply(1, sessionKeySelector)
  3377. .With(0, "row")
  3378. .With(1)
  3379. .Apply(newSessionOrUpdatedStateLambda(/* newSession = */ false))
  3380. .With(0, "row")
  3381. .With(1, "prevBigState")
  3382. .Seal()
  3383. .Done()
  3384. .Seal()
  3385. .Apply(2, extractTupleItem(1))
  3386. .With(0, "prevBigState")
  3387. .Seal()
  3388. .Seal()
  3389. .Apply(2, newSessionOrUpdatedStateLambda(/* newSession = */ false))
  3390. .With(0, "row")
  3391. .With(1, "prevBigState")
  3392. .Seal()
  3393. .Apply(3, partitionKeySelector)
  3394. .With(0, "row")
  3395. .Seal()
  3396. .Seal()
  3397. .Seal()
  3398. .Seal()
  3399. .Seal()
  3400. .Seal()
  3401. .Build();
  3402. }
  3403. TExprNode::TPtr AddSessionParamsMemberLambda(TPositionHandle pos,
  3404. TStringBuf sessionStartMemberName, const TExprNode::TPtr& partitionKeySelector,
  3405. const TSessionWindowParams& sessionWindowParams, TExprContext& ctx)
  3406. {
  3407. return AddSessionParamsMemberLambda(pos, sessionStartMemberName, "", partitionKeySelector,
  3408. sessionWindowParams.Key, sessionWindowParams.Init, sessionWindowParams.Update, ctx);
  3409. }
  3410. TExprNode::TPtr AddSessionParamsMemberLambda(TPositionHandle pos,
  3411. TStringBuf sessionStartMemberName, TStringBuf sessionParamsMemberName,
  3412. const TExprNode::TPtr& partitionKeySelector,
  3413. const TExprNode::TPtr& sessionKeySelector, const TExprNode::TPtr& sessionInit,
  3414. const TExprNode::TPtr& sessionUpdate, TExprContext& ctx)
  3415. {
  3416. YQL_ENSURE(sessionStartMemberName);
  3417. TExprNode::TPtr addLambda = ctx.Builder(pos)
  3418. .Lambda()
  3419. .Param("tupleOfItemAndSessionParams")
  3420. .Callable("AddMember")
  3421. .Callable(0, "Nth")
  3422. .Arg(0, "tupleOfItemAndSessionParams")
  3423. .Atom(1, "0", TNodeFlags::Default)
  3424. .Seal()
  3425. .Atom(1, sessionStartMemberName)
  3426. .Callable(2, "Nth")
  3427. .Arg(0, "tupleOfItemAndSessionParams")
  3428. .Atom(1, "1", TNodeFlags::Default)
  3429. .Seal()
  3430. .Seal()
  3431. .Seal()
  3432. .Build();
  3433. if (sessionParamsMemberName) {
  3434. addLambda = ctx.Builder(pos)
  3435. .Lambda()
  3436. .Param("tupleOfItemAndSessionParams")
  3437. .Callable("AddMember")
  3438. .Apply(0, addLambda)
  3439. .With(0, "tupleOfItemAndSessionParams")
  3440. .Seal()
  3441. .Atom(1, sessionParamsMemberName)
  3442. .Callable(2, "AsStruct")
  3443. .List(0)
  3444. .Atom(0, "start", TNodeFlags::Default)
  3445. .Callable(1, "Nth")
  3446. .Arg(0, "tupleOfItemAndSessionParams")
  3447. .Atom(1, "1", TNodeFlags::Default)
  3448. .Seal()
  3449. .Seal()
  3450. .List(1)
  3451. .Atom(0, "state", TNodeFlags::Default)
  3452. .Callable(1, "Nth")
  3453. .Arg(0, "tupleOfItemAndSessionParams")
  3454. .Atom(1, "2", TNodeFlags::Default)
  3455. .Seal()
  3456. .Seal()
  3457. .Seal()
  3458. .Seal()
  3459. .Seal()
  3460. .Build();
  3461. }
  3462. return ctx.Builder(pos)
  3463. .Lambda()
  3464. .Param("input")
  3465. .Callable("OrderedMap")
  3466. .Apply(0, ZipWithSessionParamsLambda(pos, partitionKeySelector, sessionKeySelector, sessionInit, sessionUpdate, ctx))
  3467. .With(0, "input")
  3468. .Seal()
  3469. .Add(1, addLambda)
  3470. .Seal()
  3471. .Seal()
  3472. .Build();
  3473. }
  3474. void TSessionWindowParams::Reset()
  3475. {
  3476. Traits = {};
  3477. Key = {};
  3478. KeyType = nullptr;
  3479. ParamsType = nullptr;
  3480. Init = {};
  3481. Update = {};
  3482. SortTraits = {};
  3483. }
  3484. }