yql_opt_hopping.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569
  1. #include "yql_opt_hopping.h"
  2. #include <yql/essentials/core/yql_expr_type_annotation.h>
  3. #include <yql/essentials/core/yql_opt_utils.h>
  4. #include <util/generic/bitmap.h>
  5. #include <util/generic/maybe.h>
  6. #include <util/generic/string.h>
  7. #include <util/generic/vector.h>
  8. using namespace NYql;
  9. using namespace NYql::NNodes;
  10. namespace NYql::NHopping {
  11. TKeysDescription::TKeysDescription(const TStructExprType& rowType, const TCoAtomList& keys, const TString& hoppingColumn) {
  12. for (const auto& key : keys) {
  13. if (key.StringValue() == hoppingColumn) {
  14. FakeKeys.emplace_back(key.StringValue());
  15. continue;
  16. }
  17. const auto index = rowType.FindItem(key.StringValue());
  18. Y_ENSURE(index);
  19. auto itemType = rowType.GetItems()[*index]->GetItemType();
  20. if (RemoveOptionalType(itemType)->GetKind() == ETypeAnnotationKind::Data) {
  21. MemberKeys.emplace_back(key.StringValue());
  22. continue;
  23. }
  24. PickleKeys.emplace_back(key.StringValue());
  25. }
  26. }
  27. TExprNode::TPtr TKeysDescription::BuildPickleLambda(TExprContext& ctx, TPositionHandle pos) const {
  28. TCoArgument arg = Build<TCoArgument>(ctx, pos)
  29. .Name("item")
  30. .Done();
  31. TExprBase body = arg;
  32. for (const auto& key : PickleKeys) {
  33. const auto member = Build<TCoMember>(ctx, pos)
  34. .Name().Build(key)
  35. .Struct(arg)
  36. .Done()
  37. .Ptr();
  38. body = Build<TCoReplaceMember>(ctx, pos)
  39. .Struct(body)
  40. .Name().Build(key)
  41. .Item(ctx.NewCallable(pos, "StablePickle", { member }))
  42. .Done();
  43. }
  44. return Build<TCoLambda>(ctx, pos)
  45. .Args({arg})
  46. .Body(body)
  47. .Done()
  48. .Ptr();
  49. }
  50. TExprNode::TPtr TKeysDescription::BuildUnpickleLambda(TExprContext& ctx, TPositionHandle pos, const TStructExprType& rowType) {
  51. TCoArgument arg = Build<TCoArgument>(ctx, pos)
  52. .Name("item")
  53. .Done();
  54. TExprBase body = arg;
  55. for (const auto& key : PickleKeys) {
  56. const auto index = rowType.FindItem(key);
  57. Y_ENSURE(index);
  58. auto itemType = rowType.GetItems().at(*index)->GetItemType();
  59. const auto member = Build<TCoMember>(ctx, pos)
  60. .Name().Build(key)
  61. .Struct(arg)
  62. .Done()
  63. .Ptr();
  64. body = Build<TCoReplaceMember>(ctx, pos)
  65. .Struct(body)
  66. .Name().Build(key)
  67. .Item(ctx.NewCallable(pos, "Unpickle", { ExpandType(pos, *itemType, ctx), member }))
  68. .Done();
  69. }
  70. return Build<TCoLambda>(ctx, pos)
  71. .Args({arg})
  72. .Body(body)
  73. .Done()
  74. .Ptr();
  75. }
  76. TVector<TCoAtom> TKeysDescription::GetKeysList(TExprContext& ctx, TPositionHandle pos) const {
  77. TVector<TCoAtom> res;
  78. res.reserve(PickleKeys.size() + MemberKeys.size());
  79. for (const auto& pickleKey : PickleKeys) {
  80. res.emplace_back(Build<TCoAtom>(ctx, pos).Value(pickleKey).Done());
  81. }
  82. for (const auto& memberKey : MemberKeys) {
  83. res.emplace_back(Build<TCoAtom>(ctx, pos).Value(memberKey).Done());
  84. }
  85. return res;
  86. }
  87. TVector<TString> TKeysDescription::GetActualGroupKeys() const {
  88. TVector<TString> result;
  89. result.reserve(PickleKeys.size() + MemberKeys.size());
  90. result.insert(result.end(), PickleKeys.begin(), PickleKeys.end());
  91. result.insert(result.end(), MemberKeys.begin(), MemberKeys.end());
  92. return result;
  93. }
  94. bool TKeysDescription::NeedPickle() const {
  95. return !PickleKeys.empty();
  96. }
  97. TExprNode::TPtr TKeysDescription::GetKeySelector(TExprContext& ctx, TPositionHandle pos, const TStructExprType* rowType) {
  98. auto builder = Build<TCoAtomList>(ctx, pos);
  99. for (auto key : GetKeysList(ctx, pos)) {
  100. builder.Add(std::move(key));
  101. }
  102. return BuildKeySelector(pos, *rowType, builder.Build().Value().Ptr(), ctx);
  103. }
  104. TString BuildColumnName(const TExprBase& column) {
  105. if (const auto columnName = column.Maybe<TCoAtom>()) {
  106. return columnName.Cast().StringValue();
  107. }
  108. if (const auto columnNames = column.Maybe<TCoAtomList>()) {
  109. TStringBuilder columnNameBuilder;
  110. for (const auto columnName : columnNames.Cast()) {
  111. columnNameBuilder.append(columnName.StringValue());
  112. columnNameBuilder.append("_");
  113. }
  114. return columnNameBuilder;
  115. }
  116. YQL_ENSURE(false, "Invalid node. Expected Atom or AtomList, but received: "
  117. << column.Ptr()->Dump());
  118. }
  119. bool IsLegacyHopping(const TExprNode::TPtr& hoppingSetting) {
  120. return !hoppingSetting->Child(1)->IsList();
  121. }
  122. void EnsureNotDistinct(const TCoAggregate& aggregate) {
  123. const auto& aggregateHandlers = aggregate.Handlers();
  124. YQL_ENSURE(
  125. AllOf(aggregateHandlers, [](const auto& t){ return !t.DistinctName(); }),
  126. "Distinct is not supported for aggregation with hop");
  127. }
  128. TMaybe<THoppingTraits> ExtractHopTraits(const TCoAggregate& aggregate, TExprContext& ctx, bool analyticsMode) {
  129. const auto pos = aggregate.Pos();
  130. const auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping");
  131. if (!hopSetting) {
  132. ctx.AddError(TIssue(ctx.GetPosition(pos), "Aggregate over stream must have 'hopping' setting"));
  133. return Nothing();
  134. }
  135. const auto hoppingColumn = IsLegacyHopping(hopSetting)
  136. ? "_yql_time"
  137. : TString(hopSetting->Child(1)->Child(0)->Content());
  138. const auto traitsNode = IsLegacyHopping(hopSetting)
  139. ? hopSetting->Child(1)
  140. : hopSetting->Child(1)->Child(1);
  141. const auto maybeTraits = TMaybeNode<TCoHoppingTraits>(traitsNode);
  142. if (!maybeTraits) {
  143. ctx.AddError(TIssue(ctx.GetPosition(pos), "Invalid 'hopping' setting in Aggregate"));
  144. return Nothing();
  145. }
  146. const auto traits = maybeTraits.Cast();
  147. const auto checkIntervalParam = [&] (TExprBase param) -> ui64 {
  148. if (param.Maybe<TCoJust>()) {
  149. param = param.Cast<TCoJust>().Input();
  150. }
  151. if (!param.Maybe<TCoInterval>()) {
  152. ctx.AddError(TIssue(ctx.GetPosition(pos), "Not an interval data ctor"));
  153. return 0;
  154. }
  155. auto value = FromString<i64>(param.Cast<TCoInterval>().Literal().Value());
  156. if (value <= 0) {
  157. ctx.AddError(TIssue(ctx.GetPosition(pos), "Interval value must be positive"));
  158. return 0;
  159. }
  160. return (ui64)value;
  161. };
  162. const auto hop = checkIntervalParam(traits.Hop());
  163. if (!hop) {
  164. return Nothing();
  165. }
  166. const auto interval = checkIntervalParam(traits.Interval());
  167. if (!interval) {
  168. return Nothing();
  169. }
  170. const auto delay = checkIntervalParam(traits.Delay());
  171. if (!delay) {
  172. return Nothing();
  173. }
  174. if (interval < hop) {
  175. ctx.AddError(TIssue(ctx.GetPosition(pos), "Interval must be greater or equal then hop"));
  176. return Nothing();
  177. }
  178. if (delay < hop) {
  179. ctx.AddError(TIssue(ctx.GetPosition(pos), "Delay must be greater or equal then hop"));
  180. return Nothing();
  181. }
  182. const auto newTraits = Build<TCoHoppingTraits>(ctx, aggregate.Pos())
  183. .InitFrom(traits)
  184. .DataWatermarks(analyticsMode
  185. ? ctx.NewAtom(aggregate.Pos(), "false")
  186. : traits.DataWatermarks().Ptr())
  187. .Done();
  188. return THoppingTraits {
  189. hoppingColumn,
  190. newTraits,
  191. hop,
  192. interval,
  193. delay
  194. };
  195. }
  196. TExprNode::TPtr BuildTimeExtractor(const TCoHoppingTraits& hoppingTraits, TExprContext& ctx) {
  197. const auto pos = hoppingTraits.Pos();
  198. if (hoppingTraits.ItemType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>()->GetSize() == 0) {
  199. // The case when no fields are used in lambda. F.e. when it has only DependsOn.
  200. return ctx.DeepCopyLambda(hoppingTraits.TimeExtractor().Ref());
  201. }
  202. return Build<TCoLambda>(ctx, pos)
  203. .Args({"item"})
  204. .Body<TExprApplier>()
  205. .Apply(hoppingTraits.TimeExtractor())
  206. .With<TCoSafeCast>(0)
  207. .Type(hoppingTraits.ItemType())
  208. .Value("item")
  209. .Build()
  210. .Build()
  211. .Done()
  212. .Ptr();
  213. }
  214. TExprNode::TPtr BuildInitHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) {
  215. const auto pos = aggregate.Pos();
  216. const auto& aggregateHandlers = aggregate.Handlers();
  217. const auto initItemArg = Build<TCoArgument>(ctx, pos).Name("item").Done();
  218. TVector<TExprBase> structItems;
  219. structItems.reserve(aggregateHandlers.Size());
  220. ui32 index = 0;
  221. for (const auto& handler : aggregateHandlers) {
  222. const auto tuple = handler.Cast<TCoAggregateTuple>();
  223. TMaybeNode<TExprBase> applier;
  224. if (tuple.Trait().Cast<TCoAggregationTraits>().InitHandler().Args().Size() == 1) {
  225. applier = Build<TExprApplier>(ctx, pos)
  226. .Apply(tuple.Trait().Cast<TCoAggregationTraits>().InitHandler())
  227. .With(0, initItemArg)
  228. .Done();
  229. } else {
  230. applier = Build<TExprApplier>(ctx, pos)
  231. .Apply(tuple.Trait().Cast<TCoAggregationTraits>().InitHandler())
  232. .With(0, initItemArg)
  233. .With<TCoUint32>(1)
  234. .Literal().Build(ToString(index))
  235. .Build()
  236. .Done();
  237. }
  238. structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
  239. .Name().Build(BuildColumnName(tuple.ColumnName()))
  240. .Value(applier)
  241. .Done());
  242. ++index;
  243. }
  244. return Build<TCoLambda>(ctx, pos)
  245. .Args({initItemArg})
  246. .Body<TCoAsStruct>()
  247. .Add(structItems)
  248. .Build()
  249. .Done()
  250. .Ptr();
  251. }
  252. TExprNode::TPtr BuildUpdateHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) {
  253. const auto pos = aggregate.Pos();
  254. const auto aggregateHandlers = aggregate.Handlers();
  255. const auto updateItemArg = Build<TCoArgument>(ctx, pos).Name("item").Done();
  256. const auto updateStateArg = Build<TCoArgument>(ctx, pos).Name("state").Done();
  257. TVector<TExprBase> structItems;
  258. structItems.reserve(aggregateHandlers.Size());
  259. i32 index = 0;
  260. for (const auto& handler : aggregateHandlers) {
  261. const auto tuple = handler.Cast<TCoAggregateTuple>();
  262. const TString columnName = BuildColumnName(tuple.ColumnName());
  263. const auto member = Build<TCoMember>(ctx, pos)
  264. .Struct(updateStateArg)
  265. .Name().Build(columnName)
  266. .Done();
  267. TMaybeNode<TExprBase> applier;
  268. if (tuple.Trait().Cast<TCoAggregationTraits>().UpdateHandler().Args().Size() == 2) {
  269. applier = Build<TExprApplier>(ctx, pos)
  270. .Apply(tuple.Trait().Cast<TCoAggregationTraits>().UpdateHandler())
  271. .With(0, updateItemArg)
  272. .With(1, member)
  273. .Done();
  274. } else {
  275. applier = Build<TExprApplier>(ctx, pos)
  276. .Apply(tuple.Trait().Cast<TCoAggregationTraits>().UpdateHandler())
  277. .With(0, updateItemArg)
  278. .With(1, member)
  279. .With<TCoUint32>(2)
  280. .Literal().Build(ToString(index))
  281. .Build()
  282. .Done();
  283. }
  284. structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
  285. .Name().Build(columnName)
  286. .Value(applier)
  287. .Done());
  288. ++index;
  289. }
  290. return Build<TCoLambda>(ctx, pos)
  291. .Args({updateItemArg, updateStateArg})
  292. .Body<TCoAsStruct>()
  293. .Add(structItems)
  294. .Build()
  295. .Done()
  296. .Ptr();
  297. }
  298. TExprNode::TPtr BuildMergeHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) {
  299. const auto pos = aggregate.Pos();
  300. const auto& aggregateHandlers = aggregate.Handlers();
  301. const auto mergeState1Arg = Build<TCoArgument>(ctx, pos).Name("state1").Done();
  302. const auto mergeState2Arg = Build<TCoArgument>(ctx, pos).Name("state2").Done();
  303. TVector<TExprBase> structItems;
  304. structItems.reserve(aggregateHandlers.Size());
  305. for (const auto& handler : aggregateHandlers) {
  306. const auto tuple = handler.Cast<TCoAggregateTuple>();
  307. const TString columnName = BuildColumnName(tuple.ColumnName());
  308. const auto member1 = Build<TCoMember>(ctx, pos)
  309. .Struct(mergeState1Arg)
  310. .Name().Build(columnName)
  311. .Done();
  312. const auto member2 = Build<TCoMember>(ctx, pos)
  313. .Struct(mergeState2Arg)
  314. .Name().Build(columnName)
  315. .Done();
  316. structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
  317. .Name().Build(columnName)
  318. .Value<TExprApplier>()
  319. .Apply(tuple.Trait().Cast<TCoAggregationTraits>().MergeHandler())
  320. .With(0, member1)
  321. .With(1, member2)
  322. .Build()
  323. .Done());
  324. }
  325. return Build<TCoLambda>(ctx, pos)
  326. .Args({mergeState1Arg, mergeState2Arg})
  327. .Body<TCoAsStruct>()
  328. .Add(structItems)
  329. .Build()
  330. .Done()
  331. .Ptr();
  332. }
  333. TExprNode::TPtr BuildFinishHopLambda(
  334. const TCoAggregate& aggregate,
  335. const TVector<TString>& actualGroupKeys,
  336. const TString& hoppingColumn,
  337. TExprContext& ctx)
  338. {
  339. const auto pos = aggregate.Pos();
  340. const auto aggregateHandlers = aggregate.Handlers();
  341. const auto finishKeyArg = Build<TCoArgument>(ctx, pos).Name("key").Done();
  342. const auto finishStateArg = Build<TCoArgument>(ctx, pos).Name("state").Done();
  343. const auto finishTimeArg = Build<TCoArgument>(ctx, pos).Name("time").Done();
  344. TVector<TExprBase> structItems;
  345. structItems.reserve(actualGroupKeys.size() + aggregateHandlers.Size() + 1);
  346. if (actualGroupKeys.size() == 1) {
  347. structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
  348. .Name().Build(actualGroupKeys[0])
  349. .Value(finishKeyArg)
  350. .Done());
  351. } else {
  352. for (size_t i = 0; i < actualGroupKeys.size(); ++i) {
  353. structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
  354. .Name().Build(actualGroupKeys[i])
  355. .Value<TCoNth>()
  356. .Tuple(finishKeyArg)
  357. .Index<TCoAtom>()
  358. .Value(ToString(i))
  359. .Build()
  360. .Build()
  361. .Done());
  362. }
  363. }
  364. for (const auto& handler : aggregateHandlers) {
  365. const auto tuple = handler.Cast<TCoAggregateTuple>();
  366. const TString compoundColumnName = BuildColumnName(tuple.ColumnName());
  367. const auto member = Build<TCoMember>(ctx, pos)
  368. .Struct(finishStateArg)
  369. .Name().Build(compoundColumnName)
  370. .Done();
  371. if (tuple.ColumnName().Maybe<TCoAtom>()) {
  372. structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
  373. .Name().Build(compoundColumnName)
  374. .Value<TExprApplier>()
  375. .Apply(tuple.Trait().Cast<TCoAggregationTraits>().FinishHandler())
  376. .With(0, member)
  377. .Build()
  378. .Done());
  379. continue;
  380. }
  381. if (const auto namesList = tuple.ColumnName().Maybe<TCoAtomList>()) {
  382. const auto expApplier = Build<TExprApplier>(ctx, pos)
  383. .Apply(tuple.Trait().Cast<TCoAggregationTraits>().FinishHandler())
  384. .With(0, member)
  385. .Done();
  386. int index = 0;
  387. for (const auto columnName : namesList.Cast()) {
  388. const auto extracter = Build<TCoNth>(ctx, pos)
  389. .Tuple(expApplier)
  390. .Index<TCoAtom>().Build(index++)
  391. .Done();
  392. structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
  393. .Name(columnName)
  394. .Value(extracter)
  395. .Done());
  396. }
  397. continue;
  398. }
  399. YQL_ENSURE(false, "Invalid node. Expected Atom or AtomList, but received: "
  400. << tuple.ColumnName().Ptr()->Dump());
  401. }
  402. structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
  403. .Name().Build(hoppingColumn)
  404. .Value(finishTimeArg)
  405. .Done());
  406. return Build<TCoLambda>(ctx, pos)
  407. .Args({finishKeyArg, finishStateArg, finishTimeArg})
  408. .Body<TCoAsStruct>()
  409. .Add(structItems)
  410. .Build()
  411. .Done()
  412. .Ptr();
  413. }
  414. TExprNode::TPtr BuildSaveHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) {
  415. const auto pos = aggregate.Pos();
  416. const auto aggregateHandlers = aggregate.Handlers();
  417. const auto saveStateArg = Build<TCoArgument>(ctx, pos).Name("state").Done();
  418. TVector<TExprBase> structItems;
  419. structItems.reserve(aggregateHandlers.Size());
  420. for (const auto& handler : aggregateHandlers) {
  421. const auto tuple = handler.Cast<TCoAggregateTuple>();
  422. const TString columnName = BuildColumnName(tuple.ColumnName());
  423. const auto member = Build<TCoMember>(ctx, pos)
  424. .Struct(saveStateArg)
  425. .Name().Build(columnName)
  426. .Done();
  427. structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
  428. .Name().Build(columnName)
  429. .Value<TExprApplier>()
  430. .Apply(tuple.Trait().Cast<TCoAggregationTraits>().SaveHandler())
  431. .With(0, member)
  432. .Build()
  433. .Done());
  434. }
  435. return Build<TCoLambda>(ctx, pos)
  436. .Args({saveStateArg})
  437. .Body<TCoAsStruct>()
  438. .Add(structItems)
  439. .Build()
  440. .Done()
  441. .Ptr();
  442. }
  443. TExprNode::TPtr BuildLoadHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) {
  444. const auto pos = aggregate.Pos();
  445. const auto aggregateHandlers = aggregate.Handlers();
  446. TCoArgument loadStateArg = Build<TCoArgument>(ctx, pos).Name("state").Done();
  447. TVector<TExprBase> structItems;
  448. structItems.reserve(aggregateHandlers.Size());
  449. for (const auto& handler : aggregateHandlers) {
  450. const auto tuple = handler.Cast<TCoAggregateTuple>();
  451. const TString columnName = BuildColumnName(tuple.ColumnName());
  452. const auto member = Build<TCoMember>(ctx, pos)
  453. .Struct(loadStateArg)
  454. .Name().Build(columnName)
  455. .Done();
  456. structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
  457. .Name().Build(columnName)
  458. .Value<TExprApplier>()
  459. .Apply(tuple.Trait().Cast<TCoAggregationTraits>().LoadHandler())
  460. .With(0, member)
  461. .Build()
  462. .Done());
  463. }
  464. return Build<TCoLambda>(ctx, pos)
  465. .Args({loadStateArg})
  466. .Body<TCoAsStruct>()
  467. .Add(structItems)
  468. .Build()
  469. .Done()
  470. .Ptr();
  471. }
  472. } // NYql::NHopping