yql_lineage.cpp 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964
  1. #include "yql_lineage.h"
  2. #include <yql/essentials/core/yql_type_annotation.h>
  3. #include <yql/essentials/core/yql_expr_optimize.h>
  4. #include <yql/essentials/core/yql_opt_utils.h>
  5. #include <yql/essentials/core/yql_join.h>
  6. #include <util/system/env.h>
  7. namespace NYql {
  8. namespace {
  9. class TLineageScanner {
  10. public:
  11. TLineageScanner(const TExprNode& root, const TTypeAnnotationContext& ctx, TExprContext& exprCtx)
  12. : Root_(root)
  13. , Ctx_(ctx)
  14. , ExprCtx_(exprCtx)
  15. {}
  16. TString Process() {
  17. VisitExpr(Root_, [&](const TExprNode& node) {
  18. for (auto& p : Ctx_.DataSources) {
  19. if (p->IsRead(node)) {
  20. Reads_[&node] = p.Get();
  21. HasReads_.emplace(&node);
  22. }
  23. }
  24. for (auto& p : Ctx_.DataSinks) {
  25. if (p->IsWrite(node)) {
  26. Writes_[&node] = p.Get();
  27. }
  28. }
  29. return true;
  30. }, [&](const TExprNode& node) {
  31. for (const auto& child : node.Children()) {
  32. if (HasReads_.contains(child.Get())) {
  33. HasReads_.emplace(&node);
  34. break;
  35. }
  36. }
  37. return true;
  38. });
  39. TStringStream s;
  40. NYson::TYsonWriter writer(&s, NYson::EYsonFormat::Binary);
  41. writer.OnBeginMap();
  42. writer.OnKeyedItem("Reads");
  43. writer.OnBeginList();
  44. for (const auto& r : Reads_) {
  45. TVector<TPinInfo> inputs;
  46. auto& formatter = r.second->GetPlanFormatter();
  47. formatter.GetInputs(*r.first, inputs, /* withLimits */ false);
  48. for (const auto& i : inputs) {
  49. auto id = ++NextReadId_;
  50. ReadIds_[r.first].push_back(id);
  51. writer.OnListItem();
  52. writer.OnBeginMap();
  53. writer.OnKeyedItem("Id");
  54. writer.OnInt64Scalar(id);
  55. writer.OnKeyedItem("Name");
  56. writer.OnStringScalar(i.DisplayName);
  57. writer.OnKeyedItem("Schema");
  58. const auto& itemType = *r.first->GetTypeAnn()->Cast<TTupleExprType>()->GetItems()[1]->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
  59. WriteSchema(writer, itemType, nullptr);
  60. if (formatter.WriteSchemaHeader(writer)) {
  61. WriteSchema(writer, itemType, &formatter);
  62. }
  63. writer.OnEndMap();
  64. }
  65. }
  66. writer.OnEndList();
  67. writer.OnKeyedItem("Writes");
  68. writer.OnBeginList();
  69. for (const auto& w : Writes_) {
  70. auto data = w.first->Child(3);
  71. TVector<TPinInfo> outputs;
  72. auto& formatter = w.second->GetPlanFormatter();
  73. formatter.GetOutputs(*w.first, outputs, /* withLimits */ false);
  74. YQL_ENSURE(outputs.size() == 1);
  75. auto id = ++NextWriteId_;
  76. WriteIds_[w.first] = id;
  77. writer.OnListItem();
  78. writer.OnBeginMap();
  79. writer.OnKeyedItem("Id");
  80. writer.OnInt64Scalar(id);
  81. writer.OnKeyedItem("Name");
  82. writer.OnStringScalar(outputs.front().DisplayName);
  83. writer.OnKeyedItem("Schema");
  84. const auto& itemType = *data->GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
  85. WriteSchema(writer, itemType, nullptr);
  86. if (formatter.WriteSchemaHeader(writer)) {
  87. WriteSchema(writer, itemType, &formatter);
  88. }
  89. writer.OnKeyedItem("Lineage");
  90. auto lineage = CollectLineage(*data);
  91. WriteLineage(writer, *lineage);
  92. writer.OnEndMap();
  93. }
  94. writer.OnEndList();
  95. writer.OnEndMap();
  96. return s.Str();
  97. }
  98. private:
  99. void WriteSchema(NYson::TYsonWriter& writer, const TStructExprType& structType, IPlanFormatter* formatter) {
  100. writer.OnBeginMap();
  101. for (const auto& i : structType.GetItems()) {
  102. if (i->GetName().StartsWith("_yql_sys_")) {
  103. continue;
  104. }
  105. writer.OnKeyedItem(i->GetName());
  106. if (formatter) {
  107. formatter->WriteTypeDetails(writer, *i->GetItemType());
  108. } else {
  109. writer.OnStringScalar(FormatType(i->GetItemType()));
  110. }
  111. }
  112. writer.OnEndMap();
  113. }
  114. struct TFieldLineage {
  115. ui32 InputIndex;
  116. TString Field;
  117. TString Transforms;
  118. struct THash {
  119. std::size_t operator()(const TFieldLineage& x) const noexcept {
  120. return CombineHashes(
  121. CombineHashes(std::hash<ui32>()(x.InputIndex), std::hash<TString>()(x.Field)),
  122. std::hash<TString>()(x.Transforms));
  123. }
  124. };
  125. bool operator==(const TFieldLineage& rhs) const {
  126. return std::tie(InputIndex, Field, Transforms) == std::tie(rhs.InputIndex, rhs.Field, rhs.Transforms);
  127. }
  128. bool operator<(const TFieldLineage& rhs) const {
  129. return std::tie(InputIndex, Field, Transforms) < std::tie(rhs.InputIndex, rhs.Field, rhs.Transforms);
  130. }
  131. };
  132. static TFieldLineage ReplaceTransforms(const TFieldLineage& src, const TString& newTransforms) {
  133. return { src.InputIndex, src.Field, (src.Transforms == "Copy" && newTransforms == "Copy") ? newTransforms : "" };
  134. }
  135. using TFieldLineageSet = THashSet<TFieldLineage, TFieldLineage::THash>;
  136. struct TFieldsLineage {
  137. TFieldLineageSet Items;
  138. TMaybe<THashMap<TString, TFieldLineageSet>> StructItems;
  139. void MergeFrom(const TFieldsLineage& from) {
  140. Items.insert(from.Items.begin(), from.Items.end());
  141. if (StructItems && from.StructItems) {
  142. for (const auto& i : *from.StructItems) {
  143. (*StructItems)[i.first].insert(i.second.begin(), i.second.end());
  144. }
  145. }
  146. }
  147. };
  148. static TFieldLineageSet ReplaceTransforms(const TFieldLineageSet& src, const TString& newTransforms) {
  149. TFieldLineageSet ret;
  150. for (const auto& i : src) {
  151. ret.insert(ReplaceTransforms(i, newTransforms));
  152. }
  153. return ret;
  154. }
  155. static TFieldsLineage ReplaceTransforms(const TFieldsLineage& src, const TString& newTransforms) {
  156. TFieldsLineage ret;
  157. ret.Items = ReplaceTransforms(src.Items, newTransforms);
  158. if (src.StructItems) {
  159. ret.StructItems.ConstructInPlace();
  160. for (const auto& i : *src.StructItems) {
  161. (*ret.StructItems)[i.first] = ReplaceTransforms(i.second, newTransforms);
  162. }
  163. }
  164. return ret;
  165. }
  166. struct TLineage {
  167. // null - can't calculcate
  168. TMaybe<THashMap<TString, TFieldsLineage>> Fields;
  169. };
  170. const TLineage* CollectLineage(const TExprNode& node) {
  171. if (auto it = Lineages_.find(&node); it != Lineages_.end()) {
  172. return &it->second;
  173. }
  174. auto& lineage = Lineages_[&node];
  175. if (auto readIt = ReadIds_.find(&node); readIt != ReadIds_.end()) {
  176. lineage.Fields.ConstructInPlace();
  177. auto type = node.GetTypeAnn()->Cast<TTupleExprType>()->GetItems()[1]->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
  178. for (const auto& i : type->GetItems()) {
  179. if (i->GetName().StartsWith("_yql_sys_")) {
  180. continue;
  181. }
  182. TString fieldName(i->GetName());
  183. auto& v = (*lineage.Fields)[fieldName];
  184. for (const auto& r : readIt->second) {
  185. v.Items.insert({ r, fieldName, "Copy" });
  186. }
  187. }
  188. return &lineage;
  189. }
  190. if (!HasReads_.contains(&node)) {
  191. auto type = node.GetTypeAnn();
  192. if (type->GetKind() == ETypeAnnotationKind::List) {
  193. auto itemType = type->Cast<TListExprType>()->GetItemType();
  194. if (itemType->GetKind() == ETypeAnnotationKind::Struct) {
  195. auto structType = itemType->Cast<TStructExprType>();
  196. lineage.Fields.ConstructInPlace();
  197. for (const auto& i : structType->GetItems()) {
  198. if (i->GetName().StartsWith("_yql_sys_")) {
  199. continue;
  200. }
  201. TString fieldName(i->GetName());
  202. (*lineage.Fields).emplace(fieldName, TFieldsLineage());
  203. }
  204. return &lineage;
  205. }
  206. }
  207. }
  208. if (node.IsCallable({
  209. "Unordered",
  210. "UnorderedSubquery",
  211. "Right!",
  212. "YtTableContent",
  213. "Skip",
  214. "Take",
  215. "Sort",
  216. "TopSort",
  217. "AssumeSorted",
  218. "SkipNullMembers"})) {
  219. lineage = *CollectLineage(node.Head());
  220. return &lineage;
  221. } else if (node.IsCallable("ExtractMembers")) {
  222. HandleExtractMembers(lineage, node);
  223. } else if (node.IsCallable({"FlatMap", "OrderedFlatMap"})) {
  224. HandleFlatMap(lineage, node);
  225. } else if (node.IsCallable("Aggregate")) {
  226. HandleAggregate(lineage, node);
  227. } else if (node.IsCallable({"Extend","OrderedExtend","Merge"})) {
  228. HandleExtend(lineage, node);
  229. } else if (node.IsCallable({"CalcOverWindow","CalcOverSessionWindow","CalcOverWindowGroup"})) {
  230. HandleWindow(lineage, node);
  231. } else if (node.IsCallable("EquiJoin")) {
  232. HandleEquiJoin(lineage, node);
  233. } else if (node.IsCallable("LMap")) {
  234. HandleLMap(lineage, node);
  235. } else if (node.IsCallable({"PartitionsByKeys", "PartitionByKey"})) {
  236. HandlePartitionByKeys(lineage, node);
  237. } else if (node.IsCallable({"AsList","List","ListIf"})) {
  238. HandleListLiteral(lineage, node);
  239. } else {
  240. Warning(node);
  241. }
  242. return &lineage;
  243. }
  244. void Warning(const TExprNode& node) {
  245. auto message = TStringBuilder() << node.Type() << " : " << node.Content() << " is not supported";
  246. auto issue = TIssue(ExprCtx_.GetPosition(node.Pos()), message);
  247. SetIssueCode(EYqlIssueCode::TIssuesIds_EIssueCode_CORE_LINEAGE_INTERNAL_ERROR, issue);
  248. ExprCtx_.AddWarning(issue);
  249. }
  250. void HandleExtractMembers(TLineage& lineage, const TExprNode& node) {
  251. auto innerLineage = *CollectLineage(node.Head());
  252. if (innerLineage.Fields.Defined()) {
  253. lineage.Fields.ConstructInPlace();
  254. for (const auto& atom : node.Child(1)->Children()) {
  255. TString fieldName(atom->Content());
  256. (*lineage.Fields)[fieldName] = (*innerLineage.Fields)[fieldName];
  257. }
  258. }
  259. }
  260. TMaybe<TFieldsLineage> ScanExprLineage(const TExprNode& node, const TExprNode* arg, const TLineage* src,
  261. TNodeMap<TMaybe<TFieldsLineage>>& visited,
  262. const THashMap<const TExprNode*, TMaybe<TFieldsLineage>>& flattenColumns) {
  263. if (&node == arg) {
  264. return Nothing();
  265. }
  266. auto [it, inserted] = visited.emplace(&node, Nothing());
  267. if (!inserted) {
  268. return it->second;
  269. }
  270. if (auto itFlatten = flattenColumns.find(&node); itFlatten != flattenColumns.end()) {
  271. return it->second = itFlatten->second;
  272. }
  273. if (node.IsCallable("Member")) {
  274. if (&node.Head() == arg && src) {
  275. return it->second = (*src->Fields).at(node.Tail().Content());
  276. }
  277. if (node.Head().IsCallable("Head")) {
  278. auto lineage = CollectLineage(node.Head().Head());
  279. if (lineage && lineage->Fields) {
  280. TFieldsLineage result;
  281. for (const auto& f : *lineage->Fields) {
  282. result.MergeFrom(f.second);
  283. }
  284. return it->second = result;
  285. }
  286. }
  287. auto inner = ScanExprLineage(node.Head(), arg, src, visited, {});
  288. if (!inner) {
  289. return Nothing();
  290. }
  291. if (inner->StructItems) {
  292. TFieldsLineage result;
  293. result.Items = (*inner->StructItems).at(node.Tail().Content());
  294. return it->second = result;
  295. }
  296. }
  297. if (node.IsCallable("SqlIn")) {
  298. auto lineage = CollectLineage(*node.Child(0));
  299. if (lineage && lineage->Fields) {
  300. TFieldsLineage result;
  301. for (const auto& f : *lineage->Fields) {
  302. result.MergeFrom(f.second);
  303. }
  304. return it->second = result;
  305. }
  306. }
  307. std::vector<TFieldsLineage> results;
  308. TMaybe<bool> hasStructItems;
  309. for (ui32 index = 0; index < node.ChildrenSize(); ++index) {
  310. if (index != 0 && node.IsCallable("SqlIn")) {
  311. continue;
  312. }
  313. auto child = node.Child(index);
  314. if (node.IsCallable("AsStruct")) {
  315. child = &child->Tail();
  316. }
  317. if (!child->GetTypeAnn()->IsComputable()) {
  318. continue;
  319. }
  320. auto inner = ScanExprLineage(*child, arg, src, visited, {});
  321. if (!inner) {
  322. return Nothing();
  323. }
  324. if (!hasStructItems) {
  325. hasStructItems = inner->StructItems.Defined();
  326. } else {
  327. hasStructItems = *hasStructItems && inner->StructItems.Defined();
  328. }
  329. results.emplace_back(std::move(*inner));
  330. }
  331. TFieldsLineage result;
  332. if (hasStructItems && *hasStructItems) {
  333. result.StructItems.ConstructInPlace();
  334. }
  335. for (const auto& r : results) {
  336. result.MergeFrom(r);
  337. }
  338. return it->second = result;
  339. }
  340. void MergeLineageFromUsedFields(const TExprNode& expr, const TExprNode& arg, const TLineage& src,
  341. TFieldLineageSet& dst, const THashMap<const TExprNode*, TMaybe<TFieldsLineage>>& flattenColumns,
  342. const TString& newTransforms = "") {
  343. TNodeMap<TMaybe<TFieldsLineage>> visited;
  344. auto res = ScanExprLineage(expr, &arg, &src, visited, flattenColumns);
  345. if (!res) {
  346. for (const auto& f : *src.Fields) {
  347. for (const auto& i: f.second.Items) {
  348. dst.insert(ReplaceTransforms(i, newTransforms));
  349. }
  350. }
  351. } else {
  352. for (const auto& i: res->Items) {
  353. dst.insert(ReplaceTransforms(i, newTransforms));
  354. }
  355. }
  356. }
  357. void MergeLineageFromUsedFields(const TExprNode& expr, const TExprNode& arg, const TLineage& src,
  358. TFieldsLineage& dst, bool produceStruct, const THashMap<const TExprNode*, TMaybe<TFieldsLineage>>& flattenColumns,
  359. const TString& newTransforms = "") {
  360. if (produceStruct) {
  361. auto root = &expr;
  362. while (root->IsCallable("Just")) {
  363. root = &root->Head();
  364. }
  365. if (root == &arg) {
  366. dst.StructItems.ConstructInPlace();
  367. for (const auto& f : *src.Fields) {
  368. (*dst.StructItems)[f.first] = f.second.Items;
  369. }
  370. } else if (root->IsCallable("AsStruct")) {
  371. dst.StructItems.ConstructInPlace();
  372. for (const auto& x : root->Children()) {
  373. auto fieldName = x->Head().Content();
  374. auto& s = (*dst.StructItems)[fieldName];
  375. MergeLineageFromUsedFields(x->Tail(), arg, src, s, flattenColumns, newTransforms);
  376. }
  377. } else if (root->IsCallable("Member") && &root->Head() == &arg) {
  378. auto fieldName = root->Tail().Content();
  379. const auto& in = (*src.Fields).at(fieldName);
  380. dst.StructItems = in.StructItems;
  381. }
  382. }
  383. MergeLineageFromUsedFields(expr, arg, src, dst.Items, flattenColumns, newTransforms);
  384. }
  385. void FillStructLineage(TLineage& lineage, const TExprNode* value, const TExprNode& arg, const TLineage& innerLineage,
  386. const TTypeAnnotationNode* extType, const THashMap<const TExprNode*, TMaybe<TFieldsLineage>>& flattenColumns) {
  387. TMaybe<TString> oneField;
  388. if (value && value->IsCallable("Member") && &value->Head() == &arg) {
  389. TString field(value->Tail().Content());
  390. auto& f = innerLineage.Fields->at(field);
  391. if (f.StructItems) {
  392. for (const auto& x : *f.StructItems) {
  393. auto& res = (*lineage.Fields)[x.first];
  394. res.Items = x.second;
  395. }
  396. return;
  397. }
  398. // fallback
  399. oneField = field;
  400. }
  401. if (value && value->IsCallable("If")) {
  402. TLineage left, right;
  403. left.Fields.ConstructInPlace();
  404. right.Fields.ConstructInPlace();
  405. FillStructLineage(left, value->Child(1), arg, innerLineage, extType, {});
  406. FillStructLineage(right, value->Child(2), arg, innerLineage, extType, {});
  407. for (const auto& f : *left.Fields) {
  408. auto& res = (*lineage.Fields)[f.first];
  409. res.Items.insert(f.second.Items.begin(), f.second.Items.end());
  410. }
  411. for (const auto& f : *right.Fields) {
  412. auto& res = (*lineage.Fields)[f.first];
  413. res.Items.insert(f.second.Items.begin(), f.second.Items.end());
  414. }
  415. return;
  416. }
  417. if (value && value->IsCallable("AsStruct")) {
  418. for (const auto& child : value->Children()) {
  419. TString field(child->Head().Content());
  420. auto& res = (*lineage.Fields)[field];
  421. const auto& expr = child->Tail();
  422. TString newTransforms;
  423. const TExprNode* root = &expr;
  424. while (root->IsCallable("Just")) {
  425. root = &root->Head();
  426. }
  427. if (root->IsCallable("Member") && &root->Head() == &arg) {
  428. newTransforms = "Copy";
  429. }
  430. MergeLineageFromUsedFields(expr, arg, innerLineage, res, true, flattenColumns, newTransforms);
  431. }
  432. return;
  433. }
  434. if (extType && extType->GetKind() == ETypeAnnotationKind::Struct) {
  435. auto structType = extType->Cast<TStructExprType>();
  436. TFieldLineageSet allLineage;
  437. for (const auto& f : *innerLineage.Fields) {
  438. if (oneField && oneField != f.first) {
  439. continue;
  440. }
  441. allLineage.insert(f.second.Items.begin(), f.second.Items.end());
  442. }
  443. for (const auto& i : structType->GetItems()) {
  444. TString field(i->GetName());
  445. auto& res = (*lineage.Fields)[field];
  446. res.Items = allLineage;
  447. }
  448. }
  449. }
  450. void HandleFlatMap(TLineage& lineage, const TExprNode& node) {
  451. auto innerLineage = *CollectLineage(node.Head());
  452. if (!innerLineage.Fields.Defined()) {
  453. return;
  454. }
  455. const auto& lambda = node.Tail();
  456. const auto& arg = lambda.Head().Head();
  457. const auto& body = lambda.Tail();
  458. THashMap<const TExprNode*, TMaybe<TFieldsLineage>> flattenColumns;
  459. const TExprNode* value = &body.Tail();
  460. if (body.IsCallable({"OptionalIf", "FlatListIf"})) {
  461. value = &body.Tail();
  462. } else if (body.IsCallable("Just")) {
  463. value = &body.Head();
  464. } else if (body.IsCallable({"FlatMap", "OrderedFlatMap"})) {
  465. if (lambda.GetTypeAnn()->GetKind() == ETypeAnnotationKind::List) {
  466. value = &body;
  467. while(value->IsCallable({"FlatMap", "OrderedFlatMap"})) {
  468. TNodeMap<TMaybe<TFieldsLineage>> visited;
  469. if (auto res = ScanExprLineage(value->Head(), &arg, &innerLineage, visited, {})) {
  470. flattenColumns.emplace(value->Tail().Head().HeadPtr().Get(), res);
  471. }
  472. value = &value->Tail().Tail();
  473. }
  474. if (value->IsCallable("Just")) {
  475. value = &value->Head();
  476. } else if (value->IsCallable({"OptionalIf", "FlatListIf"})) {
  477. value = &value->Tail();
  478. }
  479. } else {
  480. value = &body.Head();
  481. }
  482. } else {
  483. Warning(body);
  484. return;
  485. }
  486. if (value == &arg) {
  487. lineage.Fields = *innerLineage.Fields;
  488. return;
  489. }
  490. lineage.Fields.ConstructInPlace();
  491. FillStructLineage(lineage, value, arg, innerLineage, GetSeqItemType(body.GetTypeAnn()), flattenColumns);
  492. }
  493. void HandleAggregate(TLineage& lineage, const TExprNode& node) {
  494. auto innerLineage = *CollectLineage(node.Head());
  495. if (!innerLineage.Fields.Defined()) {
  496. return;
  497. }
  498. lineage.Fields.ConstructInPlace();
  499. for (const auto& key : node.Child(1)->Children()) {
  500. TString field(key->Content());
  501. (*lineage.Fields)[field] = (*innerLineage.Fields)[field];
  502. }
  503. for (const auto& payload: node.Child(2)->Children()) {
  504. TVector<TString> fields;
  505. if (payload->Child(0)->IsList()) {
  506. for (const auto& child : payload->Child(0)->Children()) {
  507. fields.push_back(TString(child->Content()));
  508. }
  509. } else {
  510. fields.push_back(TString(payload->Child(0)->Content()));
  511. }
  512. TFieldsLineage source;
  513. if (payload->ChildrenSize() == 3) {
  514. // distinct
  515. source = ReplaceTransforms((*innerLineage.Fields)[payload->Child(2)->Content()], "");
  516. } else {
  517. if (payload->Child(1)->IsCallable("AggregationTraits")) {
  518. // merge all used fields from init/update handlers
  519. auto initHandler = payload->Child(1)->Child(1);
  520. auto updateHandler = payload->Child(1)->Child(2);
  521. MergeLineageFromUsedFields(initHandler->Tail(), initHandler->Head().Head(), innerLineage, source, false, {});
  522. MergeLineageFromUsedFields(updateHandler->Tail(), updateHandler->Head().Head(), innerLineage, source, false, {});
  523. } else if (payload->Child(1)->IsCallable("AggApply")) {
  524. auto extractHandler = payload->Child(1)->Child(2);
  525. bool produceStruct = payload->Child(1)->Head().Content() == "some";
  526. MergeLineageFromUsedFields(extractHandler->Tail(), extractHandler->Head().Head(), innerLineage, source, produceStruct, {});
  527. } else {
  528. Warning(*payload->Child(1));
  529. lineage.Fields.Clear();
  530. return;
  531. }
  532. }
  533. for (const auto& field : fields) {
  534. (*lineage.Fields)[field] = source;
  535. }
  536. }
  537. }
  538. void HandleLMap(TLineage& lineage, const TExprNode& node) {
  539. auto innerLineage = *CollectLineage(node.Head());
  540. if (!innerLineage.Fields.Defined()) {
  541. return;
  542. }
  543. const auto& lambda = node.Tail();
  544. const auto& arg = lambda.Head().Head();
  545. const auto& body = lambda.Tail();
  546. if (&body == &arg) {
  547. lineage.Fields = *innerLineage.Fields;
  548. return;
  549. }
  550. lineage.Fields.ConstructInPlace();
  551. FillStructLineage(lineage, nullptr, arg, innerLineage, GetSeqItemType(body.GetTypeAnn()), {});
  552. }
  553. void HandlePartitionByKeys(TLineage& lineage, const TExprNode& node) {
  554. auto innerLineage = *CollectLineage(node.Head());
  555. if (!innerLineage.Fields.Defined()) {
  556. return;
  557. }
  558. const auto& lambda = node.Tail();
  559. const auto& arg = lambda.Head().Head();
  560. const auto& body = lambda.Tail();
  561. if (&body == &arg) {
  562. lineage.Fields = *innerLineage.Fields;
  563. return;
  564. }
  565. lineage.Fields.ConstructInPlace();
  566. FillStructLineage(lineage, nullptr, arg, innerLineage, GetSeqItemType(body.GetTypeAnn()), {});
  567. }
  568. void HandleExtend(TLineage& lineage, const TExprNode& node) {
  569. TVector<TLineage> inners;
  570. for (const auto& child : node.Children()) {
  571. inners.push_back(*CollectLineage(*child));
  572. if (!inners.back().Fields.Defined()) {
  573. return;
  574. }
  575. }
  576. if (inners.empty()) {
  577. return;
  578. }
  579. lineage.Fields.ConstructInPlace();
  580. for (const auto& x : *inners.front().Fields) {
  581. auto& res = (*lineage.Fields)[x.first];
  582. TMaybe<bool> hasStructItems;
  583. for (const auto& i : inners) {
  584. if (auto f = (*i.Fields).FindPtr(x.first)) {
  585. for (const auto& x : f->Items) {
  586. res.Items.insert(x);
  587. }
  588. if (f->StructItems || f->Items.empty()) {
  589. if (!hasStructItems) {
  590. hasStructItems = true;
  591. }
  592. } else {
  593. hasStructItems = false;
  594. }
  595. }
  596. }
  597. if (hasStructItems && *hasStructItems) {
  598. res.StructItems.ConstructInPlace();
  599. for (const auto& i : inners) {
  600. if (auto f = (*i.Fields).FindPtr(x.first)) {
  601. if (f->StructItems) {
  602. for (const auto& si : *f->StructItems) {
  603. for (const auto& x : si.second) {
  604. (*res.StructItems)[si.first].insert(x);
  605. }
  606. }
  607. }
  608. }
  609. }
  610. }
  611. }
  612. }
  613. void HandleWindow(TLineage& lineage, const TExprNode& node) {
  614. auto innerLineage = *CollectLineage(node.Head());
  615. if (!innerLineage.Fields.Defined()) {
  616. return;
  617. }
  618. TExprNode::TListType frameGroups;
  619. if (node.IsCallable("CalcOverWindowGroup")) {
  620. for (const auto& g : node.Child(1)->Children()) {
  621. frameGroups.push_back(g->Child(2));
  622. }
  623. } else {
  624. frameGroups.push_back(node.Child(3));
  625. }
  626. lineage.Fields = *innerLineage.Fields;
  627. if (node.IsCallable("CalcOverSessionWindow")) {
  628. if (node.Child(5)->ChildrenSize() && !node.Child(4)->IsCallable("SessionWindowTraits")) {
  629. lineage.Fields.Clear();
  630. return;
  631. }
  632. for (const auto& sessionColumn : node.Child(5)->Children()) {
  633. auto& res = (*lineage.Fields)[sessionColumn->Content()];
  634. const auto& initHandler = node.Child(4)->Child(2);
  635. const auto& updateHandler = node.Child(4)->Child(2);
  636. MergeLineageFromUsedFields(initHandler->Tail(), initHandler->Head().Head(), innerLineage, res, false, {});
  637. MergeLineageFromUsedFields(updateHandler->Tail(), updateHandler->Head().Head(), innerLineage, res, false, {});
  638. }
  639. }
  640. for (const auto& g : frameGroups) {
  641. for (const auto& f : g->Children()) {
  642. if (!f->IsCallable("WinOnRows")) {
  643. lineage.Fields.Clear();
  644. return;
  645. }
  646. for (ui32 i = 1; i < f->ChildrenSize(); ++i) {
  647. const auto& list = f->Child(i);
  648. auto field = list->Head().Content();
  649. auto& res = (*lineage.Fields)[field];
  650. if (list->Tail().IsCallable({"RowNumber","CumeDist","NTile"})) {
  651. continue;
  652. } else if (list->Tail().IsCallable({"Lag","Lead","Rank","DenseRank","PercentRank"})) {
  653. const auto& lambda = list->Tail().Child(1);
  654. bool produceStruct = list->Tail().IsCallable({"Lag","Lead"});
  655. MergeLineageFromUsedFields(lambda->Tail(), lambda->Head().Head(), innerLineage, res, produceStruct, {});
  656. } else if (list->Tail().IsCallable("WindowTraits")) {
  657. const auto& initHandler = list->Tail().Child(1);
  658. const auto& updateHandler = list->Tail().Child(2);
  659. MergeLineageFromUsedFields(initHandler->Tail(), initHandler->Head().Head(), innerLineage, res, false, {});
  660. MergeLineageFromUsedFields(updateHandler->Tail(), updateHandler->Head().Head(), innerLineage, res, false, {});
  661. } else {
  662. lineage.Fields.Clear();
  663. return;
  664. }
  665. }
  666. }
  667. }
  668. }
  669. void HandleEquiJoin(TLineage& lineage, const TExprNode& node) {
  670. TVector<TLineage> inners;
  671. THashMap<TStringBuf, ui32> inputLabels;
  672. for (ui32 i = 0; i < node.ChildrenSize() - 2; ++i) {
  673. inners.push_back(*CollectLineage(node.Child(i)->Head()));
  674. if (!inners.back().Fields.Defined()) {
  675. return;
  676. }
  677. if (node.Child(i)->Tail().IsAtom()) {
  678. inputLabels[node.Child(i)->Tail().Content()] = i;
  679. } else {
  680. for (const auto& label : node.Child(i)->Tail().Children()) {
  681. inputLabels[label->Content()] = i;
  682. }
  683. }
  684. }
  685. THashMap<TStringBuf, TStringBuf> backRename;
  686. for (auto setting : node.Tail().Children()) {
  687. if (setting->Head().Content() != "rename") {
  688. continue;
  689. }
  690. if (setting->Child(2)->Content().empty()) {
  691. continue;
  692. }
  693. backRename[setting->Child(2)->Content()] = setting->Child(1)->Content();
  694. }
  695. lineage.Fields.ConstructInPlace();
  696. auto structType = node.GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
  697. THashMap<TString, TMaybe<bool>> hasStructItems;
  698. for (const auto& field : structType->GetItems()) {
  699. TStringBuf originalName = field->GetName();
  700. if (auto it = backRename.find(originalName); it != backRename.end()) {
  701. originalName = it->second;
  702. }
  703. TStringBuf table, column;
  704. SplitTableName(originalName, table, column);
  705. ui32 index = inputLabels.at(table);
  706. auto& res = (*lineage.Fields)[field->GetName()];
  707. auto& f = (*inners[index].Fields).at(column);
  708. for (const auto& i: f.Items) {
  709. res.Items.insert(i);
  710. }
  711. auto& h = hasStructItems[field->GetName()];
  712. if (f.StructItems || f.Items.empty()) {
  713. if (!h) {
  714. h = true;
  715. }
  716. } else {
  717. h = false;
  718. }
  719. }
  720. for (const auto& field : structType->GetItems()) {
  721. TStringBuf originalName = field->GetName();
  722. if (auto it = backRename.find(originalName); it != backRename.end()) {
  723. originalName = it->second;
  724. }
  725. TStringBuf table, column;
  726. SplitTableName(originalName, table, column);
  727. ui32 index = inputLabels.at(table);
  728. auto& res = (*lineage.Fields)[field->GetName()];
  729. auto& f = (*inners[index].Fields).at(column);
  730. auto& h = hasStructItems[field->GetName()];
  731. if (h && *h) {
  732. if (!res.StructItems) {
  733. res.StructItems.ConstructInPlace();
  734. }
  735. if (f.StructItems) {
  736. for (const auto& i: *f.StructItems) {
  737. for (const auto& x : i.second) {
  738. (*res.StructItems)[i.first].insert(x);
  739. }
  740. }
  741. }
  742. }
  743. }
  744. }
  745. void HandleListLiteral(TLineage& lineage, const TExprNode& node) {
  746. auto itemType = node.GetTypeAnn()->Cast<TListExprType>()->GetItemType();
  747. if (itemType->GetKind() != ETypeAnnotationKind::Struct) {
  748. return;
  749. }
  750. auto structType = itemType->Cast<TStructExprType>();
  751. lineage.Fields.ConstructInPlace();
  752. ui32 startIndex = 0;
  753. if (node.IsCallable({"List", "ListIf"})) {
  754. startIndex = 1;
  755. }
  756. for (ui32 i = startIndex; i < node.ChildrenSize(); ++i) {
  757. auto child = node.Child(i);
  758. if (child->IsCallable("AsStruct")) {
  759. for (const auto& f : child->Children()) {
  760. TNodeMap<TMaybe<TFieldsLineage>> visited;
  761. auto res = ScanExprLineage(f->Tail(), nullptr, nullptr, visited, {});
  762. if (res) {
  763. auto name = f->Head().Content();
  764. (*lineage.Fields)[name].MergeFrom(*res);
  765. }
  766. }
  767. } else {
  768. TNodeMap<TMaybe<TFieldsLineage>> visited;
  769. auto res = ScanExprLineage(*child, nullptr, nullptr, visited, {});
  770. if (res) {
  771. for (const auto& i : structType->GetItems()) {
  772. if (i->GetName().StartsWith("_yql_sys_")) {
  773. continue;
  774. }
  775. (*lineage.Fields)[i->GetName()].MergeFrom(*res);
  776. }
  777. }
  778. }
  779. }
  780. }
  781. void WriteLineage(NYson::TYsonWriter& writer, const TLineage& lineage) {
  782. if (!lineage.Fields.Defined()) {
  783. YQL_ENSURE(!GetEnv("YQL_DETERMINISTIC_MODE"), "Can't calculate lineage");
  784. writer.OnEntity();
  785. return;
  786. }
  787. writer.OnBeginMap();
  788. TVector<TString> fields;
  789. for (const auto& f : *lineage.Fields) {
  790. fields.push_back(f.first);
  791. }
  792. Sort(fields);
  793. for (const auto& f : fields) {
  794. writer.OnKeyedItem(f);
  795. writer.OnBeginList();
  796. TVector<TFieldLineage> items;
  797. for (const auto& i : lineage.Fields->at(f).Items) {
  798. items.push_back(i);
  799. }
  800. Sort(items);
  801. for (const auto& i: items) {
  802. writer.OnListItem();
  803. writer.OnBeginMap();
  804. writer.OnKeyedItem("Input");
  805. writer.OnInt64Scalar(i.InputIndex);
  806. writer.OnKeyedItem("Field");
  807. writer.OnStringScalar(i.Field);
  808. writer.OnKeyedItem("Transforms");
  809. const auto& transforms = i.Transforms;
  810. if (transforms.empty()) {
  811. writer.OnEntity();
  812. } else {
  813. writer.OnStringScalar(transforms);
  814. }
  815. writer.OnEndMap();
  816. }
  817. writer.OnEndList();
  818. }
  819. writer.OnEndMap();
  820. }
  821. private:
  822. const TExprNode& Root_;
  823. const TTypeAnnotationContext& Ctx_;
  824. TExprContext& ExprCtx_;
  825. TNodeMap<IDataProvider*> Reads_, Writes_;
  826. ui32 NextReadId_ = 0;
  827. ui32 NextWriteId_ = 0;
  828. TNodeMap<TVector<ui32>> ReadIds_;
  829. TNodeMap<ui32> WriteIds_;
  830. TNodeMap<TLineage> Lineages_;
  831. TNodeSet HasReads_;
  832. };
  833. }
  834. TString CalculateLineage(const TExprNode& root, const TTypeAnnotationContext& ctx, TExprContext& exprCtx) {
  835. TLineageScanner scanner(root, ctx, exprCtx);
  836. return scanner.Process();
  837. }
  838. }