yql_opt_hopping.cpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592
  1. #include "yql_opt_hopping.h"
  2. #include <yql/essentials/core/yql_expr_type_annotation.h>
  3. #include <yql/essentials/core/yql_opt_utils.h>
  4. #include <util/generic/bitmap.h>
  5. #include <util/generic/maybe.h>
  6. #include <util/generic/string.h>
  7. #include <util/generic/vector.h>
  8. using namespace NYql;
  9. using namespace NYql::NNodes;
  10. namespace NYql::NHopping {
  11. TKeysDescription::TKeysDescription(const TStructExprType& rowType, const TCoAtomList& keys, const TString& hoppingColumn) {
  12. for (const auto& key : keys) {
  13. if (key.StringValue() == hoppingColumn) {
  14. FakeKeys.emplace_back(key.StringValue());
  15. continue;
  16. }
  17. const auto index = rowType.FindItem(key.StringValue());
  18. Y_ENSURE(index);
  19. auto itemType = rowType.GetItems()[*index]->GetItemType();
  20. if (RemoveOptionalType(itemType)->GetKind() == ETypeAnnotationKind::Data) {
  21. MemberKeys.emplace_back(key.StringValue());
  22. continue;
  23. }
  24. PickleKeys.emplace_back(key.StringValue());
  25. }
  26. }
  27. TExprNode::TPtr TKeysDescription::BuildPickleLambda(TExprContext& ctx, TPositionHandle pos) const {
  28. TCoArgument arg = Build<TCoArgument>(ctx, pos)
  29. .Name("item")
  30. .Done();
  31. TExprBase body = arg;
  32. for (const auto& key : PickleKeys) {
  33. const auto member = Build<TCoMember>(ctx, pos)
  34. .Name().Build(key)
  35. .Struct(arg)
  36. .Done()
  37. .Ptr();
  38. body = Build<TCoReplaceMember>(ctx, pos)
  39. .Struct(body)
  40. .Name().Build(key)
  41. .Item(ctx.NewCallable(pos, "StablePickle", { member }))
  42. .Done();
  43. }
  44. return Build<TCoLambda>(ctx, pos)
  45. .Args({arg})
  46. .Body(body)
  47. .Done()
  48. .Ptr();
  49. }
  50. TExprNode::TPtr TKeysDescription::BuildUnpickleLambda(TExprContext& ctx, TPositionHandle pos, const TStructExprType& rowType) {
  51. TCoArgument arg = Build<TCoArgument>(ctx, pos)
  52. .Name("item")
  53. .Done();
  54. TExprBase body = arg;
  55. for (const auto& key : PickleKeys) {
  56. const auto index = rowType.FindItem(key);
  57. Y_ENSURE(index);
  58. auto itemType = rowType.GetItems().at(*index)->GetItemType();
  59. const auto member = Build<TCoMember>(ctx, pos)
  60. .Name().Build(key)
  61. .Struct(arg)
  62. .Done()
  63. .Ptr();
  64. body = Build<TCoReplaceMember>(ctx, pos)
  65. .Struct(body)
  66. .Name().Build(key)
  67. .Item(ctx.NewCallable(pos, "Unpickle", { ExpandType(pos, *itemType, ctx), member }))
  68. .Done();
  69. }
  70. return Build<TCoLambda>(ctx, pos)
  71. .Args({arg})
  72. .Body(body)
  73. .Done()
  74. .Ptr();
  75. }
  76. TVector<TCoAtom> TKeysDescription::GetKeysList(TExprContext& ctx, TPositionHandle pos) const {
  77. TVector<TCoAtom> res;
  78. res.reserve(PickleKeys.size() + MemberKeys.size());
  79. for (const auto& pickleKey : PickleKeys) {
  80. res.emplace_back(Build<TCoAtom>(ctx, pos).Value(pickleKey).Done());
  81. }
  82. for (const auto& memberKey : MemberKeys) {
  83. res.emplace_back(Build<TCoAtom>(ctx, pos).Value(memberKey).Done());
  84. }
  85. return res;
  86. }
  87. TVector<TString> TKeysDescription::GetActualGroupKeys() const {
  88. TVector<TString> result;
  89. result.reserve(PickleKeys.size() + MemberKeys.size());
  90. result.insert(result.end(), PickleKeys.begin(), PickleKeys.end());
  91. result.insert(result.end(), MemberKeys.begin(), MemberKeys.end());
  92. return result;
  93. }
  94. bool TKeysDescription::NeedPickle() const {
  95. return !PickleKeys.empty();
  96. }
  97. TExprNode::TPtr TKeysDescription::GetKeySelector(TExprContext& ctx, TPositionHandle pos, const TStructExprType* rowType) {
  98. auto builder = Build<TCoAtomList>(ctx, pos);
  99. for (auto key : GetKeysList(ctx, pos)) {
  100. builder.Add(std::move(key));
  101. }
  102. return BuildKeySelector(pos, *rowType, builder.Build().Value().Ptr(), ctx);
  103. }
  104. TString BuildColumnName(const TExprBase& column) {
  105. if (const auto columnName = column.Maybe<TCoAtom>()) {
  106. return columnName.Cast().StringValue();
  107. }
  108. if (const auto columnNames = column.Maybe<TCoAtomList>()) {
  109. TStringBuilder columnNameBuilder;
  110. for (const auto columnName : columnNames.Cast()) {
  111. columnNameBuilder.append(columnName.StringValue());
  112. columnNameBuilder.append("_");
  113. }
  114. return columnNameBuilder;
  115. }
  116. YQL_ENSURE(false, "Invalid node. Expected Atom or AtomList, but received: "
  117. << column.Ptr()->Dump());
  118. }
  119. bool IsLegacyHopping(const TExprNode::TPtr& hoppingSetting) {
  120. return !hoppingSetting->Child(1)->IsList();
  121. }
  122. void EnsureNotDistinct(const TCoAggregate& aggregate) {
  123. const auto& aggregateHandlers = aggregate.Handlers();
  124. YQL_ENSURE(
  125. AllOf(aggregateHandlers, [](const auto& t){ return !t.DistinctName(); }),
  126. "Distinct is not supported for aggregation with hop");
  127. }
  128. TMaybe<THoppingTraits> ExtractHopTraits(const TCoAggregate& aggregate, TExprContext& ctx, bool analyticsMode) {
  129. const auto pos = aggregate.Pos();
  130. const auto addError = [&](TStringBuf message) {
  131. ctx.AddError(TIssue(ctx.GetPosition(pos), message));
  132. };
  133. const auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping");
  134. if (!hopSetting) {
  135. addError("Aggregate over stream must have 'hopping' setting");
  136. return Nothing();
  137. }
  138. const auto hoppingColumn = IsLegacyHopping(hopSetting)
  139. ? "_yql_time"
  140. : TString(hopSetting->Child(1)->Child(0)->Content());
  141. const auto traitsNode = IsLegacyHopping(hopSetting)
  142. ? hopSetting->Child(1)
  143. : hopSetting->Child(1)->Child(1);
  144. const auto maybeTraits = TMaybeNode<TCoHoppingTraits>(traitsNode);
  145. if (!maybeTraits) {
  146. addError("Invalid 'hopping' setting in Aggregate");
  147. return Nothing();
  148. }
  149. const auto traits = maybeTraits.Cast();
  150. const auto checkIntervalParam = [&](TExprBase param) -> TMaybe<i64> {
  151. if (param.Maybe<TCoJust>()) {
  152. param = param.Cast<TCoJust>().Input();
  153. }
  154. if (!param.Maybe<TCoInterval>()) {
  155. addError("Not an interval data ctor");
  156. return Nothing();
  157. }
  158. return FromString<i64>(param.Cast<TCoInterval>().Literal().Value());
  159. };
  160. const auto hop = checkIntervalParam(traits.Hop());
  161. if (!hop) {
  162. return Nothing();
  163. }
  164. const auto hopTime = *hop;
  165. const auto interval = checkIntervalParam(traits.Interval());
  166. if (!interval) {
  167. return Nothing();
  168. }
  169. const auto intervalTime = *interval;
  170. const auto delay = checkIntervalParam(traits.Delay());
  171. if (!delay) {
  172. return Nothing();
  173. }
  174. const auto delayTime = *delay;
  175. if (hopTime <= 0) {
  176. addError("Hop time must be positive");
  177. return Nothing();
  178. }
  179. if (intervalTime <= 0) {
  180. addError("Interval time must be positive");
  181. return Nothing();
  182. }
  183. if (delayTime < 0) {
  184. addError("Delay time must be non-negative");
  185. return Nothing();
  186. }
  187. if (intervalTime % hopTime) {
  188. addError("Interval time must be divisible by hop time");
  189. return Nothing();
  190. }
  191. if (delayTime % hopTime) {
  192. addError("Delay time must be divisible by hop time");
  193. return Nothing();
  194. }
  195. if (intervalTime / hopTime > 100'000) {
  196. addError("Too many hops in interval");
  197. return Nothing();
  198. }
  199. if (delayTime / hopTime > 100'000) {
  200. addError("Too many hops in delay");
  201. return Nothing();
  202. }
  203. const auto newTraits = Build<TCoHoppingTraits>(ctx, aggregate.Pos())
  204. .InitFrom(traits)
  205. .DataWatermarks(analyticsMode
  206. ? ctx.NewAtom(aggregate.Pos(), "false")
  207. : traits.DataWatermarks().Ptr())
  208. .Done();
  209. return THoppingTraits {
  210. hoppingColumn,
  211. newTraits,
  212. static_cast<ui64>(hopTime),
  213. static_cast<ui64>(intervalTime),
  214. static_cast<ui64>(delayTime),
  215. };
  216. }
  217. TExprNode::TPtr BuildTimeExtractor(const TCoHoppingTraits& hoppingTraits, TExprContext& ctx) {
  218. const auto pos = hoppingTraits.Pos();
  219. if (hoppingTraits.ItemType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>()->GetSize() == 0) {
  220. // The case when no fields are used in lambda. F.e. when it has only DependsOn.
  221. return ctx.DeepCopyLambda(hoppingTraits.TimeExtractor().Ref());
  222. }
  223. return Build<TCoLambda>(ctx, pos)
  224. .Args({"item"})
  225. .Body<TExprApplier>()
  226. .Apply(hoppingTraits.TimeExtractor())
  227. .With<TCoSafeCast>(0)
  228. .Type(hoppingTraits.ItemType())
  229. .Value("item")
  230. .Build()
  231. .Build()
  232. .Done()
  233. .Ptr();
  234. }
  235. TExprNode::TPtr BuildInitHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) {
  236. const auto pos = aggregate.Pos();
  237. const auto& aggregateHandlers = aggregate.Handlers();
  238. const auto initItemArg = Build<TCoArgument>(ctx, pos).Name("item").Done();
  239. TVector<TExprBase> structItems;
  240. structItems.reserve(aggregateHandlers.Size());
  241. ui32 index = 0;
  242. for (const auto& handler : aggregateHandlers) {
  243. const auto tuple = handler.Cast<TCoAggregateTuple>();
  244. TMaybeNode<TExprBase> applier;
  245. if (tuple.Trait().Cast<TCoAggregationTraits>().InitHandler().Args().Size() == 1) {
  246. applier = Build<TExprApplier>(ctx, pos)
  247. .Apply(tuple.Trait().Cast<TCoAggregationTraits>().InitHandler())
  248. .With(0, initItemArg)
  249. .Done();
  250. } else {
  251. applier = Build<TExprApplier>(ctx, pos)
  252. .Apply(tuple.Trait().Cast<TCoAggregationTraits>().InitHandler())
  253. .With(0, initItemArg)
  254. .With<TCoUint32>(1)
  255. .Literal().Build(ToString(index))
  256. .Build()
  257. .Done();
  258. }
  259. structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
  260. .Name().Build(BuildColumnName(tuple.ColumnName()))
  261. .Value(applier)
  262. .Done());
  263. ++index;
  264. }
  265. return Build<TCoLambda>(ctx, pos)
  266. .Args({initItemArg})
  267. .Body<TCoAsStruct>()
  268. .Add(structItems)
  269. .Build()
  270. .Done()
  271. .Ptr();
  272. }
  273. TExprNode::TPtr BuildUpdateHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) {
  274. const auto pos = aggregate.Pos();
  275. const auto aggregateHandlers = aggregate.Handlers();
  276. const auto updateItemArg = Build<TCoArgument>(ctx, pos).Name("item").Done();
  277. const auto updateStateArg = Build<TCoArgument>(ctx, pos).Name("state").Done();
  278. TVector<TExprBase> structItems;
  279. structItems.reserve(aggregateHandlers.Size());
  280. i32 index = 0;
  281. for (const auto& handler : aggregateHandlers) {
  282. const auto tuple = handler.Cast<TCoAggregateTuple>();
  283. const TString columnName = BuildColumnName(tuple.ColumnName());
  284. const auto member = Build<TCoMember>(ctx, pos)
  285. .Struct(updateStateArg)
  286. .Name().Build(columnName)
  287. .Done();
  288. TMaybeNode<TExprBase> applier;
  289. if (tuple.Trait().Cast<TCoAggregationTraits>().UpdateHandler().Args().Size() == 2) {
  290. applier = Build<TExprApplier>(ctx, pos)
  291. .Apply(tuple.Trait().Cast<TCoAggregationTraits>().UpdateHandler())
  292. .With(0, updateItemArg)
  293. .With(1, member)
  294. .Done();
  295. } else {
  296. applier = Build<TExprApplier>(ctx, pos)
  297. .Apply(tuple.Trait().Cast<TCoAggregationTraits>().UpdateHandler())
  298. .With(0, updateItemArg)
  299. .With(1, member)
  300. .With<TCoUint32>(2)
  301. .Literal().Build(ToString(index))
  302. .Build()
  303. .Done();
  304. }
  305. structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
  306. .Name().Build(columnName)
  307. .Value(applier)
  308. .Done());
  309. ++index;
  310. }
  311. return Build<TCoLambda>(ctx, pos)
  312. .Args({updateItemArg, updateStateArg})
  313. .Body<TCoAsStruct>()
  314. .Add(structItems)
  315. .Build()
  316. .Done()
  317. .Ptr();
  318. }
  319. TExprNode::TPtr BuildMergeHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) {
  320. const auto pos = aggregate.Pos();
  321. const auto& aggregateHandlers = aggregate.Handlers();
  322. const auto mergeState1Arg = Build<TCoArgument>(ctx, pos).Name("state1").Done();
  323. const auto mergeState2Arg = Build<TCoArgument>(ctx, pos).Name("state2").Done();
  324. TVector<TExprBase> structItems;
  325. structItems.reserve(aggregateHandlers.Size());
  326. for (const auto& handler : aggregateHandlers) {
  327. const auto tuple = handler.Cast<TCoAggregateTuple>();
  328. const TString columnName = BuildColumnName(tuple.ColumnName());
  329. const auto member1 = Build<TCoMember>(ctx, pos)
  330. .Struct(mergeState1Arg)
  331. .Name().Build(columnName)
  332. .Done();
  333. const auto member2 = Build<TCoMember>(ctx, pos)
  334. .Struct(mergeState2Arg)
  335. .Name().Build(columnName)
  336. .Done();
  337. structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
  338. .Name().Build(columnName)
  339. .Value<TExprApplier>()
  340. .Apply(tuple.Trait().Cast<TCoAggregationTraits>().MergeHandler())
  341. .With(0, member1)
  342. .With(1, member2)
  343. .Build()
  344. .Done());
  345. }
  346. return Build<TCoLambda>(ctx, pos)
  347. .Args({mergeState1Arg, mergeState2Arg})
  348. .Body<TCoAsStruct>()
  349. .Add(structItems)
  350. .Build()
  351. .Done()
  352. .Ptr();
  353. }
  354. TExprNode::TPtr BuildFinishHopLambda(
  355. const TCoAggregate& aggregate,
  356. const TVector<TString>& actualGroupKeys,
  357. const TString& hoppingColumn,
  358. TExprContext& ctx)
  359. {
  360. const auto pos = aggregate.Pos();
  361. const auto aggregateHandlers = aggregate.Handlers();
  362. const auto finishKeyArg = Build<TCoArgument>(ctx, pos).Name("key").Done();
  363. const auto finishStateArg = Build<TCoArgument>(ctx, pos).Name("state").Done();
  364. const auto finishTimeArg = Build<TCoArgument>(ctx, pos).Name("time").Done();
  365. TVector<TExprBase> structItems;
  366. structItems.reserve(actualGroupKeys.size() + aggregateHandlers.Size() + 1);
  367. if (actualGroupKeys.size() == 1) {
  368. structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
  369. .Name().Build(actualGroupKeys[0])
  370. .Value(finishKeyArg)
  371. .Done());
  372. } else {
  373. for (size_t i = 0; i < actualGroupKeys.size(); ++i) {
  374. structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
  375. .Name().Build(actualGroupKeys[i])
  376. .Value<TCoNth>()
  377. .Tuple(finishKeyArg)
  378. .Index<TCoAtom>()
  379. .Value(ToString(i))
  380. .Build()
  381. .Build()
  382. .Done());
  383. }
  384. }
  385. for (const auto& handler : aggregateHandlers) {
  386. const auto tuple = handler.Cast<TCoAggregateTuple>();
  387. const TString compoundColumnName = BuildColumnName(tuple.ColumnName());
  388. const auto member = Build<TCoMember>(ctx, pos)
  389. .Struct(finishStateArg)
  390. .Name().Build(compoundColumnName)
  391. .Done();
  392. if (tuple.ColumnName().Maybe<TCoAtom>()) {
  393. structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
  394. .Name().Build(compoundColumnName)
  395. .Value<TExprApplier>()
  396. .Apply(tuple.Trait().Cast<TCoAggregationTraits>().FinishHandler())
  397. .With(0, member)
  398. .Build()
  399. .Done());
  400. continue;
  401. }
  402. if (const auto namesList = tuple.ColumnName().Maybe<TCoAtomList>()) {
  403. const auto expApplier = Build<TExprApplier>(ctx, pos)
  404. .Apply(tuple.Trait().Cast<TCoAggregationTraits>().FinishHandler())
  405. .With(0, member)
  406. .Done();
  407. int index = 0;
  408. for (const auto columnName : namesList.Cast()) {
  409. const auto extracter = Build<TCoNth>(ctx, pos)
  410. .Tuple(expApplier)
  411. .Index<TCoAtom>().Build(index++)
  412. .Done();
  413. structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
  414. .Name(columnName)
  415. .Value(extracter)
  416. .Done());
  417. }
  418. continue;
  419. }
  420. YQL_ENSURE(false, "Invalid node. Expected Atom or AtomList, but received: "
  421. << tuple.ColumnName().Ptr()->Dump());
  422. }
  423. structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
  424. .Name().Build(hoppingColumn)
  425. .Value(finishTimeArg)
  426. .Done());
  427. return Build<TCoLambda>(ctx, pos)
  428. .Args({finishKeyArg, finishStateArg, finishTimeArg})
  429. .Body<TCoAsStruct>()
  430. .Add(structItems)
  431. .Build()
  432. .Done()
  433. .Ptr();
  434. }
  435. TExprNode::TPtr BuildSaveHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) {
  436. const auto pos = aggregate.Pos();
  437. const auto aggregateHandlers = aggregate.Handlers();
  438. const auto saveStateArg = Build<TCoArgument>(ctx, pos).Name("state").Done();
  439. TVector<TExprBase> structItems;
  440. structItems.reserve(aggregateHandlers.Size());
  441. for (const auto& handler : aggregateHandlers) {
  442. const auto tuple = handler.Cast<TCoAggregateTuple>();
  443. const TString columnName = BuildColumnName(tuple.ColumnName());
  444. const auto member = Build<TCoMember>(ctx, pos)
  445. .Struct(saveStateArg)
  446. .Name().Build(columnName)
  447. .Done();
  448. structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
  449. .Name().Build(columnName)
  450. .Value<TExprApplier>()
  451. .Apply(tuple.Trait().Cast<TCoAggregationTraits>().SaveHandler())
  452. .With(0, member)
  453. .Build()
  454. .Done());
  455. }
  456. return Build<TCoLambda>(ctx, pos)
  457. .Args({saveStateArg})
  458. .Body<TCoAsStruct>()
  459. .Add(structItems)
  460. .Build()
  461. .Done()
  462. .Ptr();
  463. }
  464. TExprNode::TPtr BuildLoadHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) {
  465. const auto pos = aggregate.Pos();
  466. const auto aggregateHandlers = aggregate.Handlers();
  467. TCoArgument loadStateArg = Build<TCoArgument>(ctx, pos).Name("state").Done();
  468. TVector<TExprBase> structItems;
  469. structItems.reserve(aggregateHandlers.Size());
  470. for (const auto& handler : aggregateHandlers) {
  471. const auto tuple = handler.Cast<TCoAggregateTuple>();
  472. const TString columnName = BuildColumnName(tuple.ColumnName());
  473. const auto member = Build<TCoMember>(ctx, pos)
  474. .Struct(loadStateArg)
  475. .Name().Build(columnName)
  476. .Done();
  477. structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
  478. .Name().Build(columnName)
  479. .Value<TExprApplier>()
  480. .Apply(tuple.Trait().Cast<TCoAggregationTraits>().LoadHandler())
  481. .With(0, member)
  482. .Build()
  483. .Done());
  484. }
  485. return Build<TCoLambda>(ctx, pos)
  486. .Args({loadStateArg})
  487. .Body<TCoAsStruct>()
  488. .Add(structItems)
  489. .Build()
  490. .Done()
  491. .Ptr();
  492. }
  493. } // NYql::NHopping