yql_provider.cpp 70 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953
  1. #include "yql_provider.h"
  2. #include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
  3. #include <yql/essentials/core/type_ann/type_ann_core.h>
  4. #include <yql/essentials/core/type_ann/type_ann_expr.h>
  5. #include <yql/essentials/core/yql_expr_type_annotation.h>
  6. #include <yql/essentials/core/yql_expr_optimize.h>
  7. #include <yql/essentials/core/yql_execution.h>
  8. #include <yql/essentials/core/yql_opt_utils.h>
  9. #include <yql/essentials/parser/pg_catalog/catalog.h>
  10. #include <yql/essentials/minikql/mkql_function_registry.h>
  11. #include <yql/essentials/minikql/mkql_program_builder.h>
  12. #include <util/folder/path.h>
  13. #include <util/generic/is_in.h>
  14. #include <util/generic/utility.h>
  15. #include <util/string/join.h>
  16. namespace NYql {
  17. namespace NCommon {
  18. using namespace NNodes;
  19. namespace {
  20. constexpr std::array<std::string_view, 8> FormatsForInput = {
  21. "csv_with_names"sv,
  22. "tsv_with_names"sv,
  23. "json_list"sv,
  24. "json"sv,
  25. "raw"sv,
  26. "json_as_string"sv,
  27. "json_each_row"sv,
  28. "parquet"sv
  29. };
  30. constexpr std::array<std::string_view, 7> FormatsForOutput = {
  31. "csv_with_names"sv,
  32. "tsv_with_names"sv,
  33. "json_list"sv,
  34. "json"sv,
  35. "raw"sv,
  36. "json_each_row"sv,
  37. "parquet"sv
  38. };
  39. constexpr std::array<std::string_view, 6> Compressions = {
  40. "gzip"sv,
  41. "zstd"sv,
  42. "lz4"sv,
  43. "brotli"sv,
  44. "bzip2"sv,
  45. "xz"sv
  46. };
  47. constexpr std::array<std::string_view, 7> IntervalUnits = {
  48. "MICROSECONDS"sv,
  49. "MILLISECONDS"sv,
  50. "SECONDS"sv,
  51. "MINUTES"sv,
  52. "HOURS"sv,
  53. "DAYS"sv,
  54. "WEEKS"sv
  55. };
  56. constexpr std::array<std::string_view, 2> DateTimeFormatNames = {
  57. "POSIX"sv,
  58. "ISO"sv
  59. };
  60. constexpr std::array<std::string_view, 5> TimestampFormatNames = {
  61. "POSIX"sv,
  62. "ISO"sv,
  63. "UNIX_TIME_MILLISECONDS"sv,
  64. "UNIX_TIME_SECONDS"sv,
  65. "UNIX_TIME_MICROSECONDS"sv
  66. };
  67. TCoAtom InferIndexName(TCoAtomList key, TExprContext& ctx) {
  68. static const TString end = "_idx";
  69. static const TString delimiter = "_";
  70. size_t sz = end.size();
  71. for (const auto& n: key)
  72. sz += n.Value().size() + delimiter.size();
  73. TString name(Reserve(sz));
  74. for (const auto& n: key) {
  75. name += n.Value() + delimiter;
  76. }
  77. name += end;
  78. return Build<TCoAtom>(ctx, key.Pos())
  79. .Value(name)
  80. .Done();
  81. }
  82. } // namespace
  83. bool TCommitSettings::EnsureModeEmpty(TExprContext& ctx) {
  84. if (Mode) {
  85. ctx.AddError(TIssue(ctx.GetPosition(Pos), TStringBuilder()
  86. << "Unsupported mode:" << Mode.Cast().Value()));
  87. return false;
  88. }
  89. return true;
  90. }
  91. bool TCommitSettings::EnsureEpochEmpty(TExprContext& ctx) {
  92. if (Epoch) {
  93. ctx.AddError(TIssue(ctx.GetPosition(Pos), TStringBuilder()
  94. << "Epochs are unsupported."));
  95. return false;
  96. }
  97. return true;
  98. }
  99. bool TCommitSettings::EnsureOtherEmpty(TExprContext& ctx) {
  100. if (!Other.Empty()) {
  101. ctx.AddError(TIssue(ctx.GetPosition(Pos), TStringBuilder()
  102. << "Unsupported setting:" << Other.Item(0).Name().Value()));
  103. return false;
  104. }
  105. return true;
  106. }
  107. TCoNameValueTupleList TCommitSettings::BuildNode(TExprContext& ctx) const {
  108. TVector<TExprBase> settings;
  109. auto addSettings = [this, &settings, &ctx] (const TString& name, TMaybeNode<TExprBase> value) {
  110. if (value) {
  111. auto node = Build<TCoNameValueTuple>(ctx, Pos)
  112. .Name().Build(name)
  113. .Value(value.Cast())
  114. .Done();
  115. settings.push_back(node);
  116. }
  117. };
  118. addSettings("mode", Mode);
  119. addSettings("epoch", Epoch);
  120. for (auto setting : Other) {
  121. settings.push_back(setting);
  122. }
  123. auto ret = Build<TCoNameValueTupleList>(ctx, Pos)
  124. .Add(settings)
  125. .Done();
  126. return ret;
  127. }
  128. const TStructExprType* BuildCommonTableListType(TExprContext& ctx) {
  129. TVector<const TItemExprType*> items;
  130. auto stringType = ctx.MakeType<TDataExprType>(EDataSlot::String);
  131. auto listOfString = ctx.MakeType<TListExprType>(stringType);
  132. items.push_back(ctx.MakeType<TItemExprType>("Prefix", stringType));
  133. items.push_back(ctx.MakeType<TItemExprType>("Folders", listOfString));
  134. items.push_back(ctx.MakeType<TItemExprType>("Tables", listOfString));
  135. return ctx.MakeType<TStructExprType>(items);
  136. }
  137. TExprNode::TPtr BuildTypeExpr(TPositionHandle pos, const TTypeAnnotationNode& ann, TExprContext& ctx) {
  138. return ExpandType(pos, ann, ctx);
  139. }
  140. bool HasResOrPullOption(const TExprNode& node, const TStringBuf& option) {
  141. if (node.Content() == "Result" || node.Content() == "Pull") {
  142. auto options = node.Child(4);
  143. for (auto setting : options->Children()) {
  144. if (setting->Head().Content() == option) {
  145. return true;
  146. }
  147. }
  148. }
  149. return false;
  150. }
  151. TVector<TString> GetResOrPullColumnHints(const TExprNode& node) {
  152. TVector<TString> columns;
  153. auto setting = GetSetting(*node.Child(4), "columns");
  154. if (setting) {
  155. auto type = node.Head().GetTypeAnn();
  156. if (type->GetKind() != ETypeAnnotationKind::EmptyList) {
  157. auto structType = type->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
  158. for (ui32 i = 0; i < structType->GetSize(); ++i) {
  159. auto field = setting->Child(1)->Child(i);
  160. columns.push_back(TString(field->Content()));
  161. }
  162. }
  163. }
  164. return columns;
  165. }
  166. TString FullTableName(const TStringBuf& cluster, const TStringBuf& table) {
  167. return TStringBuilder() << cluster << ".[" << table << "]";
  168. }
  169. IDataProvider::TFillSettings GetFillSettings(const TExprNode& node) {
  170. IDataProvider::TFillSettings fillSettings;
  171. fillSettings.AllResultsBytesLimit = node.Child(1)->Content().empty()
  172. ? Nothing()
  173. : TMaybe<ui64>(FromString<ui64>(node.Child(1)->Content()));
  174. fillSettings.RowsLimitPerWrite = node.Child(2)->Content().empty()
  175. ? Nothing()
  176. : TMaybe<ui64>(FromString<ui64>(node.Child(2)->Content()));
  177. fillSettings.Format = (IDataProvider::EResultFormat)FromString<ui32>(node.Child(5)->Content());
  178. fillSettings.FormatDetails = node.Child(3)->Content();
  179. fillSettings.Discard = FromString<bool>(node.Child(7)->Content());
  180. return fillSettings;
  181. }
  182. NYson::EYsonFormat GetYsonFormat(const IDataProvider::TFillSettings& fillSettings) {
  183. YQL_ENSURE(fillSettings.Format == IDataProvider::EResultFormat::Yson);
  184. return (NYson::EYsonFormat)FromString<ui32>(fillSettings.FormatDetails);
  185. }
  186. TWriteTableSettings ParseWriteTableSettings(TExprList node, TExprContext& ctx) {
  187. TMaybeNode<TCoAtom> mode;
  188. TMaybeNode<TCoAtom> temporary;
  189. TMaybeNode<TCoAtom> isBatch;
  190. TMaybeNode<TExprList> columns;
  191. TMaybeNode<TExprList> returningList;
  192. TMaybeNode<TCoAtomList> primaryKey;
  193. TMaybeNode<TCoAtomList> partitionBy;
  194. TMaybeNode<TCoNameValueTupleList> orderBy;
  195. TMaybeNode<TCoLambda> filter;
  196. TMaybeNode<TCoLambda> update;
  197. TVector<TCoNameValueTuple> other;
  198. TVector<TCoIndex> indexes;
  199. TVector<TCoChangefeed> changefeeds;
  200. TMaybeNode<TExprList> columnFamilies;
  201. TVector<TCoNameValueTuple> tableSettings;
  202. TVector<TCoNameValueTuple> alterActions;
  203. TMaybeNode<TCoAtom> tableType;
  204. TMaybeNode<TCallable> pgFilter;
  205. for (auto child : node) {
  206. if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
  207. auto tuple = maybeTuple.Cast();
  208. auto name = tuple.Name().Value();
  209. if (name == "mode") {
  210. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  211. mode = tuple.Value().Cast<TCoAtom>();
  212. } else if (name == "columns") {
  213. YQL_ENSURE(tuple.Value().Maybe<TExprList>());
  214. columns = tuple.Value().Cast<TExprList>();
  215. } else if (name == "primarykey") {
  216. YQL_ENSURE(tuple.Value().Maybe<TCoAtomList>());
  217. primaryKey = tuple.Value().Cast<TCoAtomList>();
  218. } else if (name == "partitionby") {
  219. YQL_ENSURE(tuple.Value().Maybe<TCoAtomList>());
  220. partitionBy = tuple.Value().Cast<TCoAtomList>();
  221. } else if (name == "orderby") {
  222. YQL_ENSURE(tuple.Value().Maybe<TCoNameValueTupleList>());
  223. orderBy = tuple.Value().Cast<TCoNameValueTupleList>();
  224. } else if (name == "filter") {
  225. YQL_ENSURE(tuple.Value().Maybe<TCoLambda>());
  226. filter = tuple.Value().Cast<TCoLambda>();
  227. } else if (name == "update") {
  228. YQL_ENSURE(tuple.Value().Maybe<TCoLambda>());
  229. update = tuple.Value().Cast<TCoLambda>();
  230. } else if (name == "index") {
  231. YQL_ENSURE(tuple.Value().Maybe<TCoNameValueTupleList>());
  232. auto index = Build<TCoIndex>(ctx, node.Pos());
  233. bool inferName = false;
  234. TCoNameValueTupleList tableSettings = Build<TCoNameValueTupleList>(ctx, node.Pos()).Done();
  235. TCoNameValueTupleList indexSettings = Build<TCoNameValueTupleList>(ctx, node.Pos()).Done();
  236. TMaybe<TCoAtomList> columnList;
  237. for (const auto& item : tuple.Value().Cast<TCoNameValueTupleList>()) {
  238. const auto& indexItemName = item.Name().Value();
  239. if (indexItemName == "indexName") {
  240. if (auto atom = item.Value().Maybe<TCoAtom>()) {
  241. index.Name(atom.Cast());
  242. } else {
  243. // No index name given - infer name from column set
  244. inferName = true;
  245. }
  246. } else if (indexItemName == "indexType") {
  247. index.Type(item.Value().Cast<TCoAtom>());
  248. } else if (indexItemName == "indexColumns") {
  249. columnList = item.Value().Cast<TCoAtomList>();
  250. index.Columns(item.Value().Cast<TCoAtomList>());
  251. } else if (indexItemName == "dataColumns") {
  252. index.DataColumns(item.Value().Cast<TCoAtomList>());
  253. } else if (indexItemName == "tableSettings") {
  254. tableSettings = item.Value().Cast<TCoNameValueTupleList>();
  255. } else if (indexItemName == "indexSettings") {
  256. indexSettings = item.Value().Cast<TCoNameValueTupleList>();
  257. } else {
  258. YQL_ENSURE(false, "unknown index item");
  259. }
  260. }
  261. index.TableSettings(tableSettings);
  262. index.IndexSettings(indexSettings);
  263. if (inferName) {
  264. YQL_ENSURE(columnList);
  265. index.Name(InferIndexName(*columnList, ctx));
  266. }
  267. indexes.push_back(index.Done());
  268. } else if (name == "changefeed") {
  269. YQL_ENSURE(tuple.Value().Maybe<TCoNameValueTupleList>());
  270. auto cf = Build<TCoChangefeed>(ctx, node.Pos());
  271. for (const auto& item : tuple.Value().Cast<TCoNameValueTupleList>()) {
  272. const auto& itemName = item.Name().Value();
  273. if (itemName == "name") {
  274. cf.Name(item.Value().Cast<TCoAtom>());
  275. } else if (itemName == "settings") {
  276. YQL_ENSURE(item.Value().Maybe<TCoNameValueTupleList>());
  277. cf.Settings(item.Value().Cast<TCoNameValueTupleList>());
  278. } else if (itemName == "state") {
  279. cf.State(item.Value().Cast<TCoAtom>());
  280. } else {
  281. YQL_ENSURE(false, "unknown changefeed item");
  282. }
  283. }
  284. changefeeds.push_back(cf.Done());
  285. } else if (name == "columnFamilies") {
  286. YQL_ENSURE(tuple.Value().Maybe<TExprList>());
  287. columnFamilies = tuple.Value().Cast<TExprList>();
  288. } else if (name == "tableSettings") {
  289. YQL_ENSURE(tuple.Value().Maybe<TCoNameValueTupleList>());
  290. for (const auto& item : tuple.Value().Cast<TCoNameValueTupleList>()) {
  291. tableSettings.push_back(item);
  292. }
  293. } else if (name == "actions") {
  294. YQL_ENSURE(tuple.Value().Maybe<TCoNameValueTupleList>());
  295. for (const auto& item : tuple.Value().Cast<TCoNameValueTupleList>()) {
  296. alterActions.push_back(item);
  297. }
  298. } else if (name == "tableType") {
  299. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  300. tableType = tuple.Value().Cast<TCoAtom>();
  301. } else if (name == "pg_delete" || name == "pg_update") {
  302. YQL_ENSURE(tuple.Value().Maybe<TCallable>());
  303. pgFilter = tuple.Value().Cast<TCallable>();
  304. } else if (name == "temporary") {
  305. temporary = Build<TCoAtom>(ctx, node.Pos()).Value("true").Done();
  306. } else if (name == "returning") {
  307. YQL_ENSURE(tuple.Value().Maybe<TExprList>());
  308. returningList = tuple.Value().Cast<TExprList>();
  309. } else if (name == "is_batch") {
  310. isBatch = Build<TCoAtom>(ctx, node.Pos()).Value("true").Done();
  311. } else {
  312. other.push_back(tuple);
  313. }
  314. }
  315. }
  316. const auto& otherSettings = Build<TCoNameValueTupleList>(ctx, node.Pos())
  317. .Add(other)
  318. .Done();
  319. const auto& idx = Build<TCoIndexList>(ctx, node.Pos())
  320. .Add(indexes)
  321. .Done();
  322. const auto& cfs = Build<TCoChangefeedList>(ctx, node.Pos())
  323. .Add(changefeeds)
  324. .Done();
  325. const auto& tableProfileSettings = Build<TCoNameValueTupleList>(ctx, node.Pos())
  326. .Add(tableSettings)
  327. .Done();
  328. const auto& alterTableActions = Build<TCoNameValueTupleList>(ctx, node.Pos())
  329. .Add(alterActions)
  330. .Done();
  331. if (!columnFamilies.IsValid()) {
  332. columnFamilies = Build<TExprList>(ctx, node.Pos()).Done();
  333. }
  334. TWriteTableSettings ret(otherSettings);
  335. ret.Mode = mode;
  336. ret.Temporary = temporary;
  337. ret.IsBatch = isBatch;
  338. ret.Columns = columns;
  339. ret.ReturningList = returningList;
  340. ret.PrimaryKey = primaryKey;
  341. ret.PartitionBy = partitionBy;
  342. ret.OrderBy = orderBy;
  343. ret.Filter = filter;
  344. ret.Update = update;
  345. ret.Indexes = idx;
  346. ret.Changefeeds = cfs;
  347. ret.ColumnFamilies = columnFamilies;
  348. ret.TableSettings = tableProfileSettings;
  349. ret.AlterActions = alterTableActions;
  350. ret.TableType = tableType;
  351. ret.PgFilter = pgFilter;
  352. return ret;
  353. }
  354. TWriteSequenceSettings ParseSequenceSettings(NNodes::TExprList node, TExprContext& ctx) {
  355. TMaybeNode<TCoAtom> mode;
  356. TMaybeNode<TCoAtom> valueType;
  357. TMaybeNode<TCoAtom> temporary;
  358. TMaybeNode<TCoAtom> ownedBy;
  359. TVector<TCoNameValueTuple> sequenceSettings;
  360. TVector<TCoNameValueTuple> other;
  361. const static std::unordered_set<TString> sequenceSettingNames =
  362. {"start", "increment", "cache", "minvalue", "maxvalue", "cycle", "restart"};
  363. for (auto child : node) {
  364. if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
  365. auto tuple = maybeTuple.Cast();
  366. auto name = tuple.Name().Value();
  367. if (name == "mode") {
  368. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  369. mode = tuple.Value().Cast<TCoAtom>();
  370. } else if (name == "as") {
  371. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  372. valueType = tuple.Value().Cast<TCoAtom>();
  373. } else if (name == "temporary") {
  374. temporary = Build<TCoAtom>(ctx, node.Pos()).Value("true").Done();
  375. } else if (sequenceSettingNames.contains(TString(name))) {
  376. sequenceSettings.push_back(tuple);
  377. } else {
  378. other.push_back(tuple);
  379. }
  380. }
  381. }
  382. const auto& sequenceSettingsList = Build<TCoNameValueTupleList>(ctx, node.Pos())
  383. .Add(sequenceSettings)
  384. .Done();
  385. const auto& otherSettings = Build<TCoNameValueTupleList>(ctx, node.Pos())
  386. .Add(other)
  387. .Done();
  388. TWriteSequenceSettings ret(otherSettings);
  389. ret.Mode = mode;
  390. ret.ValueType = valueType;
  391. ret.Temporary = temporary;
  392. ret.SequenceSettings = sequenceSettingsList;
  393. return ret;
  394. }
  395. TWriteTopicSettings ParseWriteTopicSettings(TExprList node, TExprContext& ctx) {
  396. Y_UNUSED(ctx);
  397. TMaybeNode<TCoAtom> mode;
  398. TVector<TCoTopicConsumer> consumers;
  399. TVector<TCoTopicConsumer> addConsumers;
  400. TVector<TCoTopicConsumer> alterConsumers;
  401. TVector<TCoAtom> dropConsumers;
  402. TVector<TCoNameValueTuple> topicSettings;
  403. auto parseNewConsumer = [&](const auto& node, const auto& tuple, auto& consumersList) {
  404. YQL_ENSURE(tuple.Value().template Maybe<TCoNameValueTupleList>());
  405. auto consumer = Build<TCoTopicConsumer>(ctx, node.Pos());
  406. for (const auto& item : tuple.Value().template Cast<TCoNameValueTupleList>()) {
  407. const auto& itemName = item.Name().Value();
  408. if (itemName == "name") {
  409. consumer.Name(item.Value().template Cast<TCoAtom>());
  410. } else if (itemName == "settings") {
  411. YQL_ENSURE(item.Value().template Maybe<TCoNameValueTupleList>());
  412. consumer.Settings(item.Value().template Cast<TCoNameValueTupleList>());
  413. } else {
  414. YQL_ENSURE(false, "unknown consumer item");
  415. }
  416. }
  417. consumersList.push_back(consumer.Done());
  418. };
  419. for (auto child : node) {
  420. if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
  421. auto tuple = maybeTuple.Cast();
  422. auto name = tuple.Name().Value();
  423. if (name == "mode") {
  424. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  425. mode = tuple.Value().Cast<TCoAtom>();
  426. } else if (name == "consumer") {
  427. parseNewConsumer(node, tuple, consumers);
  428. } else if (name == "addConsumer") {
  429. parseNewConsumer(node, tuple, addConsumers);
  430. } else if (name == "alterConsumer") {
  431. parseNewConsumer(node, tuple, alterConsumers);
  432. } else if (name == "dropConsumer") {
  433. auto name = tuple.Value().Cast<TCoAtom>();
  434. dropConsumers.push_back(name);
  435. } else if (name == "topicSettings") {
  436. YQL_ENSURE(tuple.Value().Maybe<TCoNameValueTupleList>());
  437. for (const auto& item : tuple.Value().Cast<TCoNameValueTupleList>()) {
  438. topicSettings.push_back(item);
  439. }
  440. }
  441. }
  442. }
  443. const auto& builtCons = Build<TCoTopicConsumerList>(ctx, node.Pos())
  444. .Add(consumers)
  445. .Done();
  446. const auto& builtAddCons = Build<TCoTopicConsumerList>(ctx, node.Pos())
  447. .Add(addConsumers)
  448. .Done();
  449. const auto& builtAlterCons = Build<TCoTopicConsumerList>(ctx, node.Pos())
  450. .Add(alterConsumers)
  451. .Done();
  452. const auto& builtDropCons = Build<TCoAtomList>(ctx, node.Pos())
  453. .Add(dropConsumers)
  454. .Done();
  455. const auto& builtSettings = Build<TCoNameValueTupleList>(ctx, node.Pos())
  456. .Add(topicSettings)
  457. .Done();
  458. TVector<TCoNameValueTuple> other;
  459. const auto& otherSettings = Build<TCoNameValueTupleList>(ctx, node.Pos())
  460. .Add(other)
  461. .Done();
  462. TWriteTopicSettings ret(otherSettings);
  463. ret.Mode = mode;
  464. ret.Consumers = builtCons;
  465. ret.AddConsumers = builtAddCons;
  466. ret.TopicSettings = builtSettings;
  467. ret.AlterConsumers = builtAlterCons;
  468. ret.DropConsumers = builtDropCons;
  469. return ret;
  470. }
  471. TWriteReplicationSettings ParseWriteReplicationSettings(TExprList node, TExprContext& ctx) {
  472. TMaybeNode<TCoAtom> mode;
  473. TVector<TCoReplicationTarget> targets;
  474. TVector<TCoNameValueTuple> settings;
  475. TVector<TCoNameValueTuple> other;
  476. for (auto child : node) {
  477. if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
  478. auto tuple = maybeTuple.Cast();
  479. auto name = tuple.Name().Value();
  480. if (name == "mode") {
  481. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  482. mode = tuple.Value().Cast<TCoAtom>();
  483. } else if (name == "targets") {
  484. YQL_ENSURE(tuple.Value().Maybe<TExprList>());
  485. for (const auto& target : tuple.Value().Cast<TExprList>()) {
  486. auto builtTarget = Build<TCoReplicationTarget>(ctx, node.Pos());
  487. YQL_ENSURE(target.Maybe<TCoNameValueTupleList>());
  488. for (const auto& item : target.Cast<TCoNameValueTupleList>()) {
  489. auto itemName = item.Name().Value();
  490. if (itemName == "remote") {
  491. builtTarget.RemotePath(item.Value().Cast<TCoAtom>());
  492. } else if (itemName == "local") {
  493. builtTarget.LocalPath(item.Value().Cast<TCoAtom>());
  494. } else {
  495. YQL_ENSURE(false, "unknown target item");
  496. }
  497. }
  498. targets.push_back(builtTarget.Done());
  499. }
  500. } else if (name == "settings") {
  501. YQL_ENSURE(tuple.Value().Maybe<TCoNameValueTupleList>());
  502. for (const auto& item : tuple.Value().Cast<TCoNameValueTupleList>()) {
  503. settings.push_back(item);
  504. }
  505. } else {
  506. other.push_back(tuple);
  507. }
  508. }
  509. }
  510. const auto& builtTargets = Build<TCoReplicationTargetList>(ctx, node.Pos())
  511. .Add(targets)
  512. .Done();
  513. const auto& builtSettings = Build<TCoNameValueTupleList>(ctx, node.Pos())
  514. .Add(settings)
  515. .Done();
  516. const auto& builtOther = Build<TCoNameValueTupleList>(ctx, node.Pos())
  517. .Add(other)
  518. .Done();
  519. TWriteReplicationSettings ret(builtOther);
  520. ret.Mode = mode;
  521. ret.Targets = builtTargets;
  522. ret.ReplicationSettings = builtSettings;
  523. return ret;
  524. }
  525. TDatabaseSettings ParseDatabaseSettings(TExprList node, TExprContext& ctx) {
  526. TMaybeNode<TCoAtom> mode;
  527. TVector<TCoNameValueTuple> other;
  528. for (auto child : node) {
  529. if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
  530. auto tuple = maybeTuple.Cast();
  531. auto name = tuple.Name().Value();
  532. if (name == "mode") {
  533. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  534. mode = tuple.Value().Cast<TCoAtom>();
  535. } else {
  536. other.push_back(tuple);
  537. }
  538. }
  539. }
  540. const auto& otherSettings = Build<TCoNameValueTupleList>(ctx, node.Pos())
  541. .Add(other)
  542. .Done();
  543. TDatabaseSettings ret(otherSettings);
  544. ret.Mode = mode;
  545. return ret;
  546. }
  547. TWriteTransferSettings ParseWriteTransferSettings(TExprList node, TExprContext& ctx) {
  548. TMaybeNode<TCoAtom> mode;
  549. TMaybeNode<TCoAtom> source;
  550. TMaybeNode<TCoAtom> target;
  551. TMaybeNode<TCoAtom> transformLambda;
  552. TVector<TCoNameValueTuple> settings;
  553. TVector<TCoNameValueTuple> other;
  554. for (auto child : node) {
  555. if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
  556. auto tuple = maybeTuple.Cast();
  557. auto name = tuple.Name().Value();
  558. if (name == "mode") {
  559. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  560. mode = tuple.Value().Cast<TCoAtom>();
  561. } else if (name == "source") {
  562. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  563. source = tuple.Value().Cast<TCoAtom>();
  564. } else if (name == "target") {
  565. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  566. target = tuple.Value().Cast<TCoAtom>();
  567. } else if (name == "transformLambda") {
  568. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  569. transformLambda = tuple.Value().Cast<TCoAtom>();
  570. } else if (name == "settings") {
  571. YQL_ENSURE(tuple.Value().Maybe<TCoNameValueTupleList>());
  572. for (const auto& item : tuple.Value().Cast<TCoNameValueTupleList>()) {
  573. settings.push_back(item);
  574. }
  575. } else {
  576. other.push_back(tuple);
  577. }
  578. }
  579. }
  580. const auto& builtSettings = Build<TCoNameValueTupleList>(ctx, node.Pos())
  581. .Add(settings)
  582. .Done();
  583. const auto& builtOther = Build<TCoNameValueTupleList>(ctx, node.Pos())
  584. .Add(other)
  585. .Done();
  586. TWriteTransferSettings ret(builtOther);
  587. ret.Mode = mode;
  588. ret.Source = source;
  589. ret.Target = target;
  590. ret.TransformLambda = transformLambda;
  591. ret.TransferSettings = builtSettings;
  592. return ret;
  593. }
  594. TWriteRoleSettings ParseWriteRoleSettings(TExprList node, TExprContext& ctx) {
  595. TMaybeNode<TCoAtom> mode;
  596. TVector<TCoAtom> roles;
  597. TMaybeNode<TCoAtom> newName;
  598. TVector<TCoNameValueTuple> other;
  599. for (auto child : node) {
  600. if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
  601. auto tuple = maybeTuple.Cast();
  602. auto name = tuple.Name().Value();
  603. if (name == "mode") {
  604. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  605. mode = tuple.Value().Cast<TCoAtom>();
  606. } else if (name == "roles") {
  607. YQL_ENSURE(tuple.Value().Maybe<TCoAtomList>());
  608. for (const auto& item : tuple.Value().Cast<TCoAtomList>()) {
  609. roles.push_back(item);
  610. }
  611. } else if (name == "newName") {
  612. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  613. newName = tuple.Value().Cast<TCoAtom>();
  614. } else {
  615. other.push_back(tuple);
  616. }
  617. }
  618. }
  619. const auto& builtRoles = Build<TCoAtomList>(ctx, node.Pos())
  620. .Add(roles)
  621. .Done();
  622. const auto& otherSettings = Build<TCoNameValueTupleList>(ctx, node.Pos())
  623. .Add(other)
  624. .Done();
  625. TWriteRoleSettings ret(otherSettings);
  626. ret.Roles = builtRoles;;
  627. ret.NewName = newName;
  628. ret.Mode = mode;
  629. return ret;
  630. }
  631. TWritePermissionSettings ParseWritePermissionsSettings(TExprList node, TExprContext&) {
  632. TMaybeNode<TCoAtomList> permissions;
  633. TMaybeNode<TCoAtomList> paths;
  634. TMaybeNode<TCoAtomList> roleNames;
  635. for (auto child : node) {
  636. if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
  637. auto tuple = maybeTuple.Cast();
  638. auto name = tuple.Name().Value();
  639. if (name == "permissions") {
  640. YQL_ENSURE(tuple.Value().Maybe<TCoAtomList>());
  641. permissions = tuple.Value().Cast<TCoAtomList>();;
  642. } else if (name == "roles") {
  643. YQL_ENSURE(tuple.Value().Maybe<TCoAtomList>());
  644. roleNames = tuple.Value().Cast<TCoAtomList>();
  645. } else if (name == "paths") {
  646. YQL_ENSURE(tuple.Value().Maybe<TCoAtomList>());
  647. paths = tuple.Value().Cast<TCoAtomList>();
  648. }
  649. }
  650. }
  651. TWritePermissionSettings ret(std::move(permissions), std::move(paths), std::move(roleNames));
  652. return ret;
  653. }
  654. TWriteObjectSettings ParseWriteObjectSettings(TExprList node, TExprContext& ctx) {
  655. TMaybeNode<TCoAtom> mode;
  656. TMaybe<TCoNameValueTupleList> kvFeatures;
  657. TMaybe<TCoAtomList> resetFeatures;
  658. for (auto child : node) {
  659. if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
  660. auto tuple = maybeTuple.Cast();
  661. auto name = tuple.Name().Value();
  662. if (name == "mode") {
  663. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  664. mode = tuple.Value().Cast<TCoAtom>();
  665. } else if (name == "features") {
  666. auto maybeFeatures = tuple.Value().Maybe<TCoNameValueTupleList>();
  667. Y_ABORT_UNLESS(maybeFeatures);
  668. kvFeatures = maybeFeatures.Cast();
  669. } else if (name == "resetFeatures") {
  670. auto maybeFeatures = tuple.Value().Maybe<TCoAtomList>();
  671. Y_ABORT_UNLESS(maybeFeatures);
  672. resetFeatures = maybeFeatures.Cast();
  673. }
  674. }
  675. }
  676. if (!kvFeatures) {
  677. kvFeatures = Build<TCoNameValueTupleList>(ctx, node.Pos()).Done();
  678. }
  679. if (!resetFeatures) {
  680. resetFeatures = Build<TCoAtomList>(ctx, node.Pos()).Done();
  681. }
  682. TWriteObjectSettings ret(std::move(mode), std::move(*kvFeatures), std::move(*resetFeatures));
  683. return ret;
  684. }
  685. TCommitSettings ParseCommitSettings(TCoCommit node, TExprContext& ctx) {
  686. if (!node.Settings()) {
  687. return TCommitSettings(Build<TCoNameValueTupleList>(ctx, node.Pos()).Done());
  688. }
  689. TMaybeNode<TCoAtom> mode;
  690. TMaybeNode<TCoAtom> epoch;
  691. TVector<TExprBase> other;
  692. if (node.Settings()) {
  693. auto settings = node.Settings().Cast();
  694. for (auto setting : settings) {
  695. if (setting.Name() == "mode") {
  696. YQL_ENSURE(setting.Value().Maybe<TCoAtom>());
  697. mode = setting.Value().Cast<TCoAtom>();
  698. } else if (setting.Name() == "epoch") {
  699. YQL_ENSURE(setting.Value().Maybe<TCoAtom>());
  700. epoch = setting.Value().Cast<TCoAtom>();
  701. } else {
  702. other.push_back(setting);
  703. }
  704. }
  705. }
  706. auto otherSettings = Build<TCoNameValueTupleList>(ctx, node.Pos())
  707. .Add(other)
  708. .Done();
  709. TCommitSettings ret(otherSettings);
  710. ret.Pos = node.Pos();
  711. ret.Mode = mode;
  712. ret.Epoch = epoch;
  713. return ret;
  714. }
  715. TPgObjectSettings ParsePgObjectSettings(TExprList node, TExprContext&) {
  716. TMaybeNode<TCoAtom> mode;
  717. TMaybeNode<TCoAtom> ifExists;
  718. for (auto child : node) {
  719. if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
  720. auto tuple = maybeTuple.Cast();
  721. auto name = tuple.Name().Value();
  722. if (name == "mode") {
  723. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  724. mode = tuple.Value().Cast<TCoAtom>();
  725. } else if (name == "ifExists") {
  726. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  727. ifExists = tuple.Value().Cast<TCoAtom>();
  728. }
  729. }
  730. }
  731. TPgObjectSettings ret(std::move(mode), std::move(ifExists));
  732. return ret;
  733. }
  734. TVector<TString> GetStructFields(const TTypeAnnotationNode* type) {
  735. TVector<TString> fields;
  736. if (type->GetKind() == ETypeAnnotationKind::List) {
  737. type = type->Cast<TListExprType>()->GetItemType();
  738. }
  739. if (type->GetKind() == ETypeAnnotationKind::Struct) {
  740. auto structType = type->Cast<TStructExprType>();
  741. for (auto& member : structType->GetItems()) {
  742. fields.push_back(TString(member->GetName()));
  743. }
  744. }
  745. return fields;
  746. }
  747. void TransformerStatsToYson(const TString& name, const IGraphTransformer::TStatistics& stats,
  748. NYson::TYsonWriter& writer)
  749. {
  750. writer.OnBeginMap();
  751. if (!name.empty()) {
  752. writer.OnKeyedItem("Name");
  753. writer.OnStringScalar(name);
  754. }
  755. if (stats.TransformDuration.MicroSeconds() > 0) {
  756. writer.OnKeyedItem("TransformDurationUs");
  757. writer.OnUint64Scalar(stats.TransformDuration.MicroSeconds());
  758. }
  759. if (stats.WaitDuration.MicroSeconds() > 0) {
  760. writer.OnKeyedItem("WaitDurationUs");
  761. writer.OnUint64Scalar(stats.WaitDuration.MicroSeconds());
  762. }
  763. if (stats.NewExprNodes > 0) {
  764. writer.OnKeyedItem("NewExprNodes");
  765. writer.OnInt64Scalar(stats.NewExprNodes);
  766. }
  767. if (stats.NewTypeNodes > 0) {
  768. writer.OnKeyedItem("NewTypeNodes");
  769. writer.OnInt64Scalar(stats.NewTypeNodes);
  770. }
  771. if (stats.NewConstraintNodes > 0) {
  772. writer.OnKeyedItem("NewConstraintNodes");
  773. writer.OnInt64Scalar(stats.NewConstraintNodes);
  774. }
  775. if (stats.Repeats > 0) {
  776. writer.OnKeyedItem("Repeats");
  777. writer.OnUint64Scalar(stats.Repeats);
  778. }
  779. if (stats.Restarts > 0) {
  780. writer.OnKeyedItem("Restarts");
  781. writer.OnUint64Scalar(stats.Restarts);
  782. }
  783. if (!stats.Stages.empty()) {
  784. writer.OnKeyedItem("Stages");
  785. writer.OnBeginList();
  786. for (auto& stage : stats.Stages) {
  787. writer.OnListItem();
  788. TransformerStatsToYson(stage.first, stage.second, writer);
  789. }
  790. writer.OnEndList();
  791. }
  792. writer.OnEndMap();
  793. }
  794. TString TransformerStatsToYson(const IGraphTransformer::TStatistics& stats, NYson::EYsonFormat format) {
  795. TStringStream out;
  796. NYson::TYsonWriter writer(&out, format);
  797. TransformerStatsToYson("", stats, writer);
  798. return out.Str();
  799. }
  800. bool FillUsedFilesImpl(
  801. const TExprNode& node,
  802. TUserDataTable& files,
  803. const TTypeAnnotationContext& types,
  804. TExprContext& ctx,
  805. const TUserDataTable& crutches,
  806. TNodeSet& visited,
  807. ui64& usedPgExtensions,
  808. bool needFullPgCatalog)
  809. {
  810. if (!visited.insert(&node).second) {
  811. return true;
  812. }
  813. if (node.GetTypeAnn()) {
  814. usedPgExtensions |= node.GetTypeAnn()->GetUsedPgExtensions();
  815. }
  816. if (node.IsCallable("PgResolvedCall")) {
  817. auto procId = FromString<ui32>(node.Child(1)->Content());
  818. const auto& proc = NPg::LookupProc(procId);
  819. usedPgExtensions |= MakePgExtensionMask(proc.ExtensionIndex);
  820. }
  821. if (node.IsCallable("PgResolvedOp")) {
  822. auto operId = FromString<ui32>(node.Child(1)->Content());
  823. const auto& oper = NPg::LookupOper(operId);
  824. const auto& proc = NPg::LookupProc(oper.ProcId);
  825. usedPgExtensions |= MakePgExtensionMask(proc.ExtensionIndex);
  826. }
  827. if (node.IsCallable({"PgAnyResolvedOp", "PgAllResolvedOp"})) {
  828. auto operId = FromString<ui32>(node.Child(1)->Content());
  829. const auto& oper = NPg::LookupOper(operId);
  830. const auto& proc = NPg::LookupProc(oper.ProcId);
  831. usedPgExtensions |= MakePgExtensionMask(proc.ExtensionIndex);
  832. }
  833. if (node.IsCallable("PgTableContent")) {
  834. needFullPgCatalog = true;
  835. }
  836. if (node.IsCallable("FilePath") || node.IsCallable("FileContent")) {
  837. const auto& name = node.Head().Content();
  838. const auto block = types.UserDataStorage->FindUserDataBlock(name);
  839. if (!block) {
  840. ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "File not found: " << name));
  841. return false;
  842. }
  843. else {
  844. auto iter = files.insert({ TUserDataStorage::ComposeUserDataKey(name), *block }).first;
  845. iter->second.Usage.Set(node.IsCallable("FilePath") ? EUserDataBlockUsage::Path : EUserDataBlockUsage::Content);
  846. }
  847. }
  848. if (node.IsCallable("FolderPath")) {
  849. const auto& name = node.Head().Content();
  850. auto blocks = types.UserDataStorage->FindUserDataFolder(name);
  851. if (!blocks) {
  852. ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "Folder not found: " << name));
  853. return false;
  854. } else {
  855. for (const auto& x : *blocks) {
  856. auto iter = files.insert({ x.first, *x.second }).first;
  857. iter->second.Usage.Set(EUserDataBlockUsage::Path);
  858. }
  859. }
  860. }
  861. if (node.IsCallable("Udf") || node.IsCallable("ScriptUdf")) {
  862. TStringBuf moduleName = node.Head().Content();
  863. if (node.IsCallable("Udf")) {
  864. TStringBuf funcName;
  865. YQL_ENSURE(SplitUdfName(node.Head().Content(), moduleName, funcName));
  866. }
  867. auto scriptType = NKikimr::NMiniKQL::ScriptTypeFromStr(moduleName);
  868. if (node.IsCallable("ScriptUdf") && !NKikimr::NMiniKQL::IsCustomPython(scriptType)) {
  869. moduleName = NKikimr::NMiniKQL::ScriptTypeAsStr(NKikimr::NMiniKQL::CanonizeScriptType(scriptType));
  870. }
  871. bool addSysModule = true;
  872. TString fileAlias;
  873. if (node.IsCallable("Udf")) {
  874. fileAlias = node.Child(6)->Content();
  875. } else {
  876. auto iterator = types.UdfModules.find(moduleName);
  877. // we have external UdfModule (not in preinstalled udfs)
  878. if (iterator != types.UdfModules.end()) {
  879. fileAlias = iterator->second.FileAlias;
  880. }
  881. }
  882. if (!fileAlias.empty()) {
  883. addSysModule = false;
  884. const auto block = types.UserDataStorage->FindUserDataBlock(fileAlias);
  885. if (!block) {
  886. ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "File not found: " << fileAlias));
  887. return false;
  888. } else {
  889. files.emplace(TUserDataStorage::ComposeUserDataKey(fileAlias), *block).first->second.Usage.Set(EUserDataBlockUsage::Udf);
  890. }
  891. }
  892. if (moduleName == TStringBuf("Geo")) {
  893. const auto geobase = TUserDataKey::File(TStringBuf("/home/geodata6.bin"));
  894. if (const auto block = types.UserDataStorage->FindUserDataBlock(geobase)) {
  895. files.emplace(geobase, *block).first->second.Usage.Set(EUserDataBlockUsage::Path);
  896. } else {
  897. const auto it = crutches.find(geobase);
  898. if (crutches.cend() != it) {
  899. auto pragma = it->second;
  900. types.UserDataStorage->AddUserDataBlock(geobase, pragma);
  901. files.emplace(geobase, pragma).first->second.Usage.Set(EUserDataBlockUsage::Path);
  902. }
  903. }
  904. }
  905. if (addSysModule) {
  906. auto pathWithMd5 = types.UdfResolver->GetSystemModulePath(moduleName);
  907. YQL_ENSURE(pathWithMd5);
  908. TUserDataBlock sysBlock;
  909. sysBlock.Type = EUserDataType::PATH;
  910. sysBlock.Data = pathWithMd5->Path;
  911. sysBlock.Usage.Set(EUserDataBlockUsage::Udf);
  912. auto alias = TFsPath(sysBlock.Data).GetName();
  913. auto key = TUserDataKey::Udf(alias);
  914. if (const auto block = types.UserDataStorage->FindUserDataBlock(key)) {
  915. files[key] = *block;
  916. if (!types.QContext.CanRead()) {
  917. YQL_ENSURE(block->FrozenFile);
  918. }
  919. } else {
  920. // Check alias clash with user files
  921. if (files.contains(TUserDataStorage::ComposeUserDataKey(alias))) {
  922. ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "System module name " << alias << " clashes with one of the user's files"));
  923. return false;
  924. }
  925. if (!alias.StartsWith(NKikimr::NMiniKQL::StaticModulePrefix) && !files.contains(key)) {
  926. // CreateFakeFileLink calculates md5 for file, let's do it once
  927. if (!types.QContext.CanRead()) {
  928. sysBlock.FrozenFile = CreateFakeFileLink(sysBlock.Data, pathWithMd5->Md5);
  929. }
  930. files[key] = sysBlock;
  931. types.UserDataStorage->AddUserDataBlock(key, sysBlock);
  932. }
  933. }
  934. }
  935. }
  936. bool childrenOk = true;
  937. for (auto& child : node.Children()) {
  938. childrenOk = FillUsedFilesImpl(*child, files, types, ctx, crutches, visited,
  939. usedPgExtensions, needFullPgCatalog) && childrenOk;
  940. }
  941. return childrenOk;
  942. }
  943. static void GetToken(const TString& string, TString& out, const TTypeAnnotationContext& type) {
  944. auto separator = string.find(":");
  945. const auto p0 = string.substr(0, separator);
  946. if (p0 == "api") {
  947. const auto p1 = string.substr(separator + 1);
  948. if (p1 == "oauth") {
  949. out = type.Credentials->GetUserCredentials().OauthToken;
  950. } else if (p1 == "cookie") {
  951. out = type.Credentials->GetUserCredentials().BlackboxSessionIdCookie;
  952. } else {
  953. YQL_ENSURE(false, "unexpected token id");
  954. }
  955. } else if (p0 == "token" || p0 == "cluster") {
  956. const auto p1 = string.substr(separator + 1);
  957. auto cred = type.Credentials->FindCredential(p1);
  958. if (cred == nullptr) {
  959. if (p0 == "cluster") {
  960. TStringBuf clusterName = p1;
  961. if (clusterName.SkipPrefix("default_")) {
  962. for (auto& x : type.DataSources) {
  963. auto tokens = x->GetClusterTokens();
  964. if (tokens) {
  965. auto token = tokens->FindPtr(clusterName);
  966. if (token) {
  967. out = *token;
  968. return;
  969. }
  970. }
  971. }
  972. for (auto& x : type.DataSinks) {
  973. auto tokens = x->GetClusterTokens();
  974. if (tokens) {
  975. auto token = tokens->FindPtr(clusterName);
  976. if (token) {
  977. out = *token;
  978. return;
  979. }
  980. }
  981. }
  982. }
  983. }
  984. YQL_ENSURE(false, "unexpected token id");
  985. }
  986. out = cred->Content;
  987. } else {
  988. YQL_ENSURE(false, "unexpected token prefix");
  989. }
  990. }
  991. void FillSecureParams(
  992. const TExprNode::TPtr& root,
  993. const TTypeAnnotationContext& types,
  994. THashMap<TString, TString>& secureParams) {
  995. NYql::VisitExpr(root, [&secureParams](const TExprNode::TPtr& node) {
  996. if (auto maybeSecureParam = TMaybeNode<TCoSecureParam>(node)) {
  997. const auto& secureParamName = TString(maybeSecureParam.Cast().Name().Value());
  998. secureParams.insert({secureParamName, TString()});
  999. }
  1000. return true;
  1001. });
  1002. for (auto& it : secureParams) {
  1003. GetToken(it.first, it.second, types);
  1004. }
  1005. }
  1006. bool AddPgFile(bool isPath, const TString& pathOrContent, const TString& md5, const TString& alias, TUserDataTable& files,
  1007. const TTypeAnnotationContext& types, TPositionHandle pos, TExprContext& ctx) {
  1008. TUserDataBlock block;
  1009. block.Data = pathOrContent;
  1010. if (isPath) {
  1011. block.Type = EUserDataType::PATH;
  1012. block.Usage.Set(EUserDataBlockUsage::Path);
  1013. block.Usage.Set(EUserDataBlockUsage::PgExt);
  1014. } else {
  1015. block.Type = EUserDataType::RAW_INLINE_DATA;
  1016. block.Usage.Set(EUserDataBlockUsage::Content);
  1017. }
  1018. auto key = TUserDataKey::File(alias);
  1019. if (const auto foundBlock = types.UserDataStorage->FindUserDataBlock(key)) {
  1020. files[key] = *foundBlock;
  1021. YQL_ENSURE(!isPath || foundBlock->FrozenFile);
  1022. } else {
  1023. // Check alias clash with user files
  1024. if (files.contains(TUserDataStorage::ComposeUserDataKey(alias))) {
  1025. ctx.AddError(TIssue(ctx.GetPosition(pos), TStringBuilder() << "File " << alias << " clashes with one of the user's files"));
  1026. return false;
  1027. }
  1028. // CreateFakeFileLink calculates md5 for file, let's do it once if needed
  1029. if (isPath) {
  1030. block.FrozenFile = CreateFakeFileLink(block.Data, md5);
  1031. }
  1032. files[key] = block;
  1033. types.UserDataStorage->AddUserDataBlock(key, block);
  1034. }
  1035. return true;
  1036. }
  1037. bool FillUsedFiles(
  1038. const TExprNode& node,
  1039. TUserDataTable& files,
  1040. const TTypeAnnotationContext& types,
  1041. TExprContext& ctx,
  1042. const TUserDataTable& crutches) {
  1043. TNodeSet visited;
  1044. ui64 usedPgExtensions = 0;
  1045. bool needFullPgCatalog = false;
  1046. auto ret = FillUsedFilesImpl(node, files, types, ctx, crutches, visited, usedPgExtensions, needFullPgCatalog);
  1047. if (!ret) {
  1048. return false;
  1049. }
  1050. auto remainingPgExtensions = usedPgExtensions;
  1051. TSet<ui32> filter;
  1052. for (ui32 extensionIndex = 1; remainingPgExtensions && (extensionIndex <= 64); ++extensionIndex) {
  1053. auto mask = MakePgExtensionMask(extensionIndex);
  1054. if (!(mask & usedPgExtensions)) {
  1055. continue;
  1056. }
  1057. filter.insert(extensionIndex);
  1058. remainingPgExtensions &= ~mask;
  1059. const auto& e = NPg::LookupExtension(extensionIndex);
  1060. needFullPgCatalog = true;
  1061. auto alias = TFsPath(e.LibraryPath).GetName();
  1062. if (!AddPgFile(true, e.LibraryPath, e.LibraryMD5, alias, files, types, node.Pos(), ctx)) {
  1063. return false;
  1064. }
  1065. }
  1066. Y_ENSURE(remainingPgExtensions == 0);
  1067. if (!needFullPgCatalog) {
  1068. return true;
  1069. }
  1070. TString content = NPg::ExportExtensions(filter);
  1071. if (!AddPgFile(false, content, "", TString(PgCatalogFileName), files, types, node.Pos(), ctx)) {
  1072. return false;
  1073. }
  1074. return true;
  1075. }
  1076. std::pair<IGraphTransformer::TStatus, TAsyncTransformCallbackFuture> FreezeUsedFiles(const TExprNode& node, TUserDataTable& files, const TTypeAnnotationContext& types, TExprContext& ctx, const std::function<bool(const TString&)>& urlDownloadFilter, const TUserDataTable& crutches) {
  1077. if (!FillUsedFiles(node, files, types, ctx, crutches)) {
  1078. return SyncError();
  1079. }
  1080. if (types.QContext.CanRead()) {
  1081. return SyncOk();
  1082. }
  1083. auto future = FreezeUserDataTableIfNeeded(types.UserDataStorage, files, urlDownloadFilter);
  1084. if (future.Wait(TDuration::Zero())) {
  1085. files = future.GetValue()();
  1086. return SyncOk();
  1087. } else {
  1088. return std::make_pair(IGraphTransformer::TStatus::Async, future.Apply(
  1089. [](const NThreading::TFuture<std::function<TUserDataTable()>>& completedFuture) {
  1090. return TAsyncTransformCallback([completedFuture](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
  1091. output = input;
  1092. try {
  1093. completedFuture.GetValue()();
  1094. }
  1095. catch (const std::exception& e) {
  1096. auto inputPos = ctx.GetPosition(input->Pos());
  1097. TIssueScopeGuard issueScope(ctx.IssueManager, [&]() {
  1098. return MakeIntrusive<TIssue>(YqlIssue(inputPos, TIssuesIds::UNEXPECTED));
  1099. });
  1100. ctx.AddError(ExceptionToIssue(e, inputPos));
  1101. input->SetState(TExprNode::EState::Error);
  1102. return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Error);
  1103. }
  1104. catch (...) {
  1105. auto inputPos = ctx.GetPosition(input->Pos());
  1106. TIssueScopeGuard issueScope(ctx.IssueManager, [&]() {
  1107. return MakeIntrusive<TIssue>(YqlIssue(inputPos, TIssuesIds::UNEXPECTED));
  1108. });
  1109. ctx.AddError(YqlIssue(inputPos, TIssuesIds::UNEXPECTED, CurrentExceptionMessage()));
  1110. input->SetState(TExprNode::EState::Error);
  1111. return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Error);
  1112. }
  1113. input->SetState(TExprNode::EState::ExecutionRequired);
  1114. return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat);
  1115. });
  1116. }));
  1117. }
  1118. }
  1119. bool FreezeUsedFilesSync(const TExprNode& node, TUserDataTable& files, const TTypeAnnotationContext& types, TExprContext& ctx, const std::function<bool(const TString&)>& urlDownloadFilter) {
  1120. if (!FillUsedFiles(node, files, types, ctx)) {
  1121. return false;
  1122. }
  1123. if (!types.QContext.CanRead()) {
  1124. auto future = FreezeUserDataTableIfNeeded(types.UserDataStorage, files, urlDownloadFilter);
  1125. files = future.GetValueSync()();
  1126. }
  1127. return true;
  1128. }
  1129. void WriteColumns(NYson::TYsonWriter& writer, const TExprBase& columns) {
  1130. if (auto maybeList = columns.Maybe<TExprList>()) {
  1131. writer.OnBeginList();
  1132. for (const auto& column : maybeList.Cast()) {
  1133. writer.OnListItem();
  1134. if (column.Maybe<TCoAtom>()) {
  1135. writer.OnStringScalar(column.Cast<TCoAtom>().Value());
  1136. } else {
  1137. writer.OnStringScalar(column.Cast<TCoAtomList>().Item(0).Value());
  1138. }
  1139. }
  1140. writer.OnEndList();
  1141. } else if (columns.Maybe<TCoVoid>()) {
  1142. writer.OnStringScalar("*");
  1143. } else {
  1144. writer.OnStringScalar("?");
  1145. }
  1146. }
  1147. TString SerializeExpr(TExprContext& ctx, const TExprNode& expr, bool withTypes) {
  1148. ui32 typeFlags = TExprAnnotationFlags::None;
  1149. if (withTypes) {
  1150. typeFlags |= TExprAnnotationFlags::Types;
  1151. }
  1152. auto ast = ConvertToAst(expr, ctx, typeFlags, true);
  1153. YQL_ENSURE(ast.Root);
  1154. return ast.Root->ToString();
  1155. }
  1156. TString ExprToPrettyString(TExprContext& ctx, const TExprNode& expr) {
  1157. auto ast = ConvertToAst(expr, ctx, TExprAnnotationFlags::None, true);
  1158. TStringStream exprStream;
  1159. YQL_ENSURE(ast.Root);
  1160. ast.Root->PrettyPrintTo(exprStream, NYql::TAstPrintFlags::PerLine | NYql::TAstPrintFlags::ShortQuote);
  1161. TString exprText = exprStream.Str();
  1162. return exprText;
  1163. }
  1164. bool IsFlowOrStream(const TExprNode* node) {
  1165. auto kind = node->GetTypeAnn()->GetKind();
  1166. return kind == ETypeAnnotationKind::Stream || kind == ETypeAnnotationKind::Flow;
  1167. }
  1168. void WriteStream(NYson::TYsonWriter& writer, const TExprNode* node, const TExprNode* source) {
  1169. if (node == source) {
  1170. return;
  1171. }
  1172. if (!node->IsCallable()) {
  1173. return;
  1174. }
  1175. if (!node->GetTypeAnn()) {
  1176. return;
  1177. }
  1178. TVector<const TExprNode*> applyStreamChildren;
  1179. if (TCoApply::Match(node)) {
  1180. switch (node->GetTypeAnn()->GetKind()) {
  1181. case ETypeAnnotationKind::Stream:
  1182. case ETypeAnnotationKind::Flow:
  1183. case ETypeAnnotationKind::List:
  1184. break;
  1185. default:
  1186. return;
  1187. }
  1188. for (size_t i = 1; i < node->ChildrenSize(); ++i) {
  1189. if (IsFlowOrStream(*node->Child(i))) {
  1190. applyStreamChildren.push_back(node->Child(i));
  1191. } else if (node->Child(i)->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List) {
  1192. if (node->Child(i)->IsCallable("ForwardList")) {
  1193. applyStreamChildren.push_back(node->Child(i)->Child(0));
  1194. } else if (node->Child(i)->IsCallable("Collect") && IsFlowOrStream(node->Child(i)->Head())) {
  1195. applyStreamChildren.push_back(node->Child(i)->Child(0));
  1196. }
  1197. }
  1198. }
  1199. if (applyStreamChildren.size() == 1) {
  1200. WriteStream(writer, applyStreamChildren.front(), source);
  1201. }
  1202. }
  1203. else if (!TCoExtendBase::Match(node) && node->ChildrenSize() > 0) {
  1204. WriteStream(writer, node->Child(0), source);
  1205. }
  1206. writer.OnListItem();
  1207. writer.OnBeginMap();
  1208. writer.OnKeyedItem("Name");
  1209. writer.OnStringScalar(node->Content());
  1210. if (TCoFlatMapBase::Match(node) && IsFlowOrStream(*node->Child(1))) {
  1211. writer.OnKeyedItem("Children");
  1212. writer.OnBeginList();
  1213. writer.OnListItem();
  1214. writer.OnBeginList();
  1215. WriteStream(writer, node->Child(1)->Child(1), node->Child(1)->Head().Child(0));
  1216. writer.OnEndList();
  1217. writer.OnEndList();
  1218. }
  1219. if (TCoChopper::Match(node) || node->IsCallable("WideChopper")) {
  1220. writer.OnKeyedItem("Children");
  1221. writer.OnBeginList();
  1222. writer.OnListItem();
  1223. writer.OnBeginList();
  1224. WriteStream(writer, &node->Tail().Tail(), &node->Tail().Head().Head());
  1225. writer.OnEndList();
  1226. writer.OnEndList();
  1227. }
  1228. if (TCoSwitch::Match(node)) {
  1229. writer.OnKeyedItem("Children");
  1230. writer.OnBeginList();
  1231. for (size_t i = 3; i < node->ChildrenSize(); i += 2) {
  1232. writer.OnListItem();
  1233. writer.OnBeginList();
  1234. WriteStream(writer, node->Child(i)->Child(1), node->Child(i)->Head().Child(0));
  1235. writer.OnEndList();
  1236. }
  1237. writer.OnEndList();
  1238. }
  1239. if (TCoExtendBase::Match(node) && node->ChildrenSize() > 0) {
  1240. writer.OnKeyedItem("Children");
  1241. writer.OnBeginList();
  1242. for (size_t i = 0; i < node->ChildrenSize(); ++i) {
  1243. writer.OnListItem();
  1244. writer.OnBeginList();
  1245. WriteStream(writer, node->Child(i), source);
  1246. writer.OnEndList();
  1247. }
  1248. writer.OnEndList();
  1249. }
  1250. if (TCoApply::Match(node) && applyStreamChildren.size() > 1) {
  1251. writer.OnKeyedItem("Children");
  1252. writer.OnBeginList();
  1253. for (auto child: applyStreamChildren) {
  1254. writer.OnListItem();
  1255. writer.OnBeginList();
  1256. WriteStream(writer, child, source);
  1257. writer.OnEndList();
  1258. }
  1259. writer.OnEndList();
  1260. }
  1261. writer.OnEndMap();
  1262. }
  1263. void WriteStreams(NYson::TYsonWriter& writer, TStringBuf name, const TCoLambda& lambda) {
  1264. writer.OnKeyedItem(name);
  1265. writer.OnBeginList();
  1266. WriteStream(writer, lambda.Body().Raw(), lambda.Args().Size() > 0 ? lambda.Args().Arg(0).Raw() : nullptr);
  1267. writer.OnEndList();
  1268. }
  1269. double GetDataReplicationFactor(double factor, const TExprNode* node, const TExprNode* stream, TExprContext& ctx) {
  1270. if (node == stream) {
  1271. return factor;
  1272. }
  1273. if (!node->IsCallable()) {
  1274. return factor;
  1275. }
  1276. if (TCoApply::Match(node)) {
  1277. switch (node->GetTypeAnn()->GetKind()) {
  1278. case ETypeAnnotationKind::Stream:
  1279. case ETypeAnnotationKind::Flow:
  1280. case ETypeAnnotationKind::List: {
  1281. double applyFactor = 0.0;
  1282. for (size_t i = 1; i < node->ChildrenSize(); ++i) {
  1283. if (IsFlowOrStream(*node->Child(i))) {
  1284. applyFactor += GetDataReplicationFactor(factor, node->Child(i), stream, ctx);
  1285. } else if (node->Child(i)->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List) {
  1286. if (node->Child(i)->IsCallable("ForwardList")) {
  1287. applyFactor += GetDataReplicationFactor(factor, node->Child(i)->Child(0), stream, ctx);
  1288. } else if (node->Child(i)->IsCallable("Collect") && IsFlowOrStream(node->Child(i)->Head())) {
  1289. applyFactor += GetDataReplicationFactor(factor, node->Child(i)->Child(0), stream, ctx);
  1290. }
  1291. }
  1292. }
  1293. factor = 2.0 * Max(1.0, applyFactor);
  1294. break;
  1295. }
  1296. default:
  1297. break;
  1298. }
  1299. return factor;
  1300. }
  1301. if (!TCoExtendBase::Match(node) && node->ChildrenSize() > 0) {
  1302. factor = GetDataReplicationFactor(factor, node->Child(0), stream, ctx);
  1303. }
  1304. if (TCoFlatMapBase::Match(node)) {
  1305. // TODO: check MapJoinCore input unique using constraints
  1306. if (const auto& lambda = node->Tail(); node->Head().IsCallable("SqueezeToDict") && lambda.Tail().IsCallable("MapJoinCore") && lambda.Tail().Child(1U) == &lambda.Head().Head()) {
  1307. TMaybe<bool> isMany;
  1308. TMaybe<EDictType> type;
  1309. bool isCompact = false;
  1310. TMaybe<ui64> itemsCount;
  1311. ParseToDictSettings(node->Head(), ctx, type, isMany, itemsCount, isCompact);
  1312. if (isMany.GetOrElse(true)) {
  1313. factor *= 5.0;
  1314. }
  1315. } else {
  1316. switch (lambda.GetTypeAnn()->GetKind()) {
  1317. case ETypeAnnotationKind::Stream:
  1318. case ETypeAnnotationKind::Flow:
  1319. factor = GetDataReplicationFactor(factor, &lambda.Tail(), &lambda.Head().Head(), ctx);
  1320. break;
  1321. case ETypeAnnotationKind::List:
  1322. factor *= 2.0;
  1323. break;
  1324. default:
  1325. break;
  1326. }
  1327. }
  1328. }
  1329. else if (node->IsCallable("CommonJoinCore")) {
  1330. factor *= 5.0;
  1331. }
  1332. else if (node->IsCallable("MapJoinCore")) {
  1333. // TODO: check MapJoinCore input unique using constraints
  1334. if (node->Child(1)->IsCallable("ToDict")) {
  1335. TMaybe<bool> isMany;
  1336. TMaybe<EDictType> type;
  1337. bool isCompact = false;
  1338. TMaybe<ui64> itemsCount;
  1339. ParseToDictSettings(*node->Child(1), ctx, type, isMany, itemsCount, isCompact);
  1340. if (isMany.GetOrElse(true)) {
  1341. factor *= 5.0;
  1342. }
  1343. }
  1344. }
  1345. else if (TCoSwitch::Match(node)) {
  1346. double switchFactor = 0.0;
  1347. for (size_t i = 3; i < node->ChildrenSize(); i += 2) {
  1348. switchFactor += GetDataReplicationFactor(factor, node->Child(i)->Child(1), node->Child(i)->Head().Child(0), ctx);
  1349. }
  1350. factor = Max(1.0, switchFactor);
  1351. }
  1352. else if (TCoExtendBase::Match(node) && node->ChildrenSize() > 0) {
  1353. double extendFactor = 0.0;
  1354. for (size_t i = 0; i < node->ChildrenSize(); ++i) {
  1355. extendFactor += GetDataReplicationFactor(factor, node->Child(i), stream, ctx);
  1356. }
  1357. factor = Max(1.0, extendFactor);
  1358. }
  1359. else if (TCoChopper::Match(node) || node->IsCallable("WideChopper")) {
  1360. factor = GetDataReplicationFactor(factor, &node->Child(TCoChopper::idx_Handler)->Tail(), &node->Child(TCoChopper::idx_Handler)->Head().Tail(), ctx);
  1361. }
  1362. return factor;
  1363. }
  1364. double GetDataReplicationFactor(const TExprNode& lambda, TExprContext& ctx) {
  1365. return GetDataReplicationFactor(1.0, lambda.Child(1), lambda.Head().ChildrenSize() > 0 ? lambda.Head().Child(0) : nullptr, ctx);
  1366. }
  1367. void WriteStatistics(NYson::TYsonWriter& writer, const TOperationStatistics& statistics)
  1368. {
  1369. writer.OnBeginMap();
  1370. for (auto& el : statistics.Entries) {
  1371. writer.OnKeyedItem(el.Name);
  1372. if (el.Value) {
  1373. writer.OnStringScalar(*el.Value);
  1374. continue;
  1375. }
  1376. writer.OnBeginMap();
  1377. if (auto val = el.Sum) {
  1378. writer.OnKeyedItem("sum");
  1379. writer.OnInt64Scalar(*val);
  1380. }
  1381. if (auto val = el.Count) {
  1382. writer.OnKeyedItem("count");
  1383. writer.OnInt64Scalar(*val);
  1384. }
  1385. if (auto val = el.Avg) {
  1386. writer.OnKeyedItem("avg");
  1387. writer.OnInt64Scalar(*val);
  1388. }
  1389. if (auto val = el.Max) {
  1390. writer.OnKeyedItem("max");
  1391. writer.OnInt64Scalar(*val);
  1392. }
  1393. if (auto val = el.Min) {
  1394. writer.OnKeyedItem("min");
  1395. writer.OnInt64Scalar(*val);
  1396. }
  1397. writer.OnEndMap();
  1398. }
  1399. writer.OnEndMap();
  1400. }
  1401. void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<ui32, TOperationStatistics>& statistics, bool addTotalKey, bool addExternalMap) {
  1402. if (statistics.empty()) {
  1403. return;
  1404. }
  1405. THashMap<TString, std::tuple<i64, i64, i64, TMaybe<i64>>> total; // sum, count, max, min
  1406. if (addExternalMap) {
  1407. writer.OnBeginMap();
  1408. }
  1409. for (const auto& opStatistics : statistics) {
  1410. for (auto& el : opStatistics.second.Entries) {
  1411. if (el.Value) {
  1412. continue;
  1413. }
  1414. auto& totalEntry = total[el.Name];
  1415. if (auto val = el.Sum) {
  1416. std::get<0>(totalEntry) += *val;
  1417. }
  1418. if (auto val = el.Count) {
  1419. std::get<1>(totalEntry) += *val;
  1420. }
  1421. if (auto val = el.Max) {
  1422. std::get<2>(totalEntry) = Max<i64>(*val, std::get<2>(totalEntry));
  1423. }
  1424. if (auto val = el.Min) {
  1425. std::get<3>(totalEntry) = Min<i64>(*val, std::get<3>(totalEntry).GetOrElse(Max<i64>()));
  1426. }
  1427. }
  1428. }
  1429. if (totalOnly == false) {
  1430. for (const auto& [key, value] : statistics) {
  1431. writer.OnKeyedItem(ToString(key));
  1432. WriteStatistics(writer, value);
  1433. }
  1434. }
  1435. TVector<TString> statKeys;
  1436. std::transform(total.cbegin(), total.cend(), std::back_inserter(statKeys), [](const decltype(total)::value_type& v) { return v.first; });
  1437. std::sort(statKeys.begin(), statKeys.end());
  1438. if (addTotalKey) {
  1439. writer.OnKeyedItem("total");
  1440. writer.OnBeginMap();
  1441. }
  1442. for (auto& key: statKeys) {
  1443. auto& totalEntry = total[key];
  1444. writer.OnKeyedItem(key);
  1445. writer.OnBeginMap();
  1446. writer.OnKeyedItem("sum");
  1447. writer.OnInt64Scalar(std::get<0>(totalEntry));
  1448. writer.OnKeyedItem("count");
  1449. writer.OnInt64Scalar(std::get<1>(totalEntry));
  1450. writer.OnKeyedItem("avg");
  1451. writer.OnInt64Scalar(std::get<1>(totalEntry) ? (std::get<0>(totalEntry) / std::get<1>(totalEntry)) : 0l);
  1452. writer.OnKeyedItem("max");
  1453. writer.OnInt64Scalar(std::get<2>(totalEntry));
  1454. writer.OnKeyedItem("min");
  1455. writer.OnInt64Scalar(std::get<3>(totalEntry).GetOrElse(0));
  1456. writer.OnEndMap();
  1457. }
  1458. if (addTotalKey) {
  1459. writer.OnEndMap(); // total
  1460. }
  1461. if (addExternalMap) {
  1462. writer.OnEndMap();
  1463. }
  1464. }
  1465. bool ValidateCompressionForInput(std::string_view format, std::string_view compression, TExprContext& ctx) {
  1466. if (compression.empty()) {
  1467. return true;
  1468. }
  1469. if (format == "parquet"sv) {
  1470. ctx.AddError(TIssue(TStringBuilder() << "External compression for parquet is not supported"));
  1471. return false;
  1472. }
  1473. if (IsIn(Compressions, compression)) {
  1474. return true;
  1475. }
  1476. ctx.AddError(TIssue(TStringBuilder() << "Unknown compression: " << compression
  1477. << ". Use one of: " << JoinSeq(", ", Compressions)));
  1478. return false;
  1479. }
  1480. bool ValidateCompressionForOutput(std::string_view format, std::string_view compression, TExprContext& ctx) {
  1481. if (compression.empty()) {
  1482. return true;
  1483. }
  1484. if (format == "parquet"sv) {
  1485. ctx.AddError(TIssue(TStringBuilder() << "External compression for parquet is not supported"));
  1486. return false;
  1487. }
  1488. if (IsIn(Compressions, compression)) {
  1489. return true;
  1490. }
  1491. ctx.AddError(TIssue(TStringBuilder() << "Unknown compression: " << compression
  1492. << ". Use one of: " << JoinSeq(", ", Compressions)));
  1493. return false;
  1494. }
  1495. bool ValidateFormatForInput(
  1496. std::string_view format,
  1497. const TStructExprType* schemaStructRowType,
  1498. const std::function<bool(TStringBuf)>& excludeFields,
  1499. TExprContext& ctx) {
  1500. if (format.empty()) {
  1501. return true;
  1502. }
  1503. if (!IsIn(FormatsForInput, format)) {
  1504. ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << format
  1505. << ". Use one of: " << JoinSeq(", ", FormatsForInput)));
  1506. return false;
  1507. }
  1508. if (schemaStructRowType && format == TStringBuf("raw")) {
  1509. ui64 realSchemaColumnsCount = 0;
  1510. for (const TItemExprType* item : schemaStructRowType->GetItems()) {
  1511. if (excludeFields && excludeFields(item->GetName())) {
  1512. continue;
  1513. }
  1514. const TTypeAnnotationNode* rowType = item->GetItemType();
  1515. if (rowType->GetKind() == ETypeAnnotationKind::Optional) {
  1516. rowType = rowType->Cast<TOptionalExprType>()->GetItemType();
  1517. }
  1518. if (rowType->GetKind() != ETypeAnnotationKind::Data
  1519. || !IsDataTypeString(rowType->Cast<TDataExprType>()->GetSlot())) {
  1520. ctx.AddError(TIssue(TStringBuilder() << "Only string type column in schema supported in raw format (you have '"
  1521. << item->GetName() << " " << FormatType(rowType) << "' field)"));
  1522. return false;
  1523. }
  1524. ++realSchemaColumnsCount;
  1525. }
  1526. if (realSchemaColumnsCount != 1) {
  1527. ctx.AddError(TIssue(TStringBuilder() << "Only one column in schema supported in raw format (you have "
  1528. << realSchemaColumnsCount << " fields)"));
  1529. return false;
  1530. }
  1531. }
  1532. else if (schemaStructRowType && format == TStringBuf("json_list")) {
  1533. bool failedSchemaColumns = false;
  1534. for (const TItemExprType* item : schemaStructRowType->GetItems()) {
  1535. if (excludeFields && excludeFields(item->GetName())) {
  1536. continue;
  1537. }
  1538. const TTypeAnnotationNode* rowType = item->GetItemType();
  1539. if (rowType->GetKind() == ETypeAnnotationKind::Optional) {
  1540. rowType = rowType->Cast<TOptionalExprType>()->GetItemType();
  1541. }
  1542. if (rowType->GetKind() == ETypeAnnotationKind::Data
  1543. && IsDataTypeDateOrTzDateOrInterval(rowType->Cast<TDataExprType>()->GetSlot())) {
  1544. ctx.AddError(TIssue(TStringBuilder() << "Date, Timestamp and Interval types are not allowed in json_list format (you have '"
  1545. << item->GetName() << " " << FormatType(rowType) << "' field)"));
  1546. failedSchemaColumns = true;
  1547. }
  1548. }
  1549. if (failedSchemaColumns) {
  1550. return false;
  1551. }
  1552. }
  1553. return true;
  1554. }
  1555. bool ValidateFormatForOutput(std::string_view format, TExprContext& ctx) {
  1556. if (format.empty() || IsIn(FormatsForOutput, format)) {
  1557. return true;
  1558. }
  1559. ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << format
  1560. << ". Use one of: " << JoinSeq(", ", FormatsForOutput)));
  1561. return false;
  1562. }
  1563. template<typename T>
  1564. bool ValidateValueInDictionary(std::string_view value, TExprContext& ctx, const T& dictionary) {
  1565. if (value.empty() || IsIn(dictionary, value)) {
  1566. return true;
  1567. }
  1568. ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << value
  1569. << ". Use one of: " << JoinSeq(", ", dictionary)));
  1570. return false;
  1571. }
  1572. bool ValidateIntervalUnit(std::string_view unit, TExprContext& ctx) {
  1573. return ValidateValueInDictionary(unit, ctx, IntervalUnits);
  1574. }
  1575. bool ValidateDateTimeFormatName(std::string_view formatName, TExprContext& ctx) {
  1576. return ValidateValueInDictionary(formatName, ctx, DateTimeFormatNames);
  1577. }
  1578. bool ValidateTimestampFormatName(std::string_view formatName, TExprContext& ctx) {
  1579. return ValidateValueInDictionary(formatName, ctx, TimestampFormatNames);
  1580. }
  1581. namespace {
  1582. bool MatchesSetItemOption(const TExprBase& setItemOption, TStringBuf name) {
  1583. if (setItemOption.Ref().IsList() && setItemOption.Ref().ChildrenSize() > 0) {
  1584. if (setItemOption.Ref().ChildPtr(0)->Content() == name) {
  1585. return true;
  1586. }
  1587. }
  1588. return false;
  1589. }
  1590. } //namespace
  1591. bool TransformPgSetItemOption(
  1592. const TCoPgSelect& pgSelect,
  1593. TStringBuf optionName,
  1594. std::function<void(const TExprBase&)> lambda
  1595. ) {
  1596. bool applied = false;
  1597. for (const auto& option : pgSelect.SelectOptions()) {
  1598. if (option.Name() == "set_items") {
  1599. auto pgSetItems = option.Value().Cast<TExprList>();
  1600. for (const auto& setItem : pgSetItems) {
  1601. auto setItemNode = setItem.Cast<TCoPgSetItem>();
  1602. for (const auto& setItemOption : setItemNode.SetItemOptions()) {
  1603. if (MatchesSetItemOption(setItemOption, optionName)) {
  1604. applied = true;
  1605. lambda(setItemOption);
  1606. }
  1607. }
  1608. }
  1609. }
  1610. }
  1611. return applied;
  1612. }
  1613. TExprNode::TPtr GetSetItemOption(const TCoPgSelect& pgSelect, TStringBuf optionName) {
  1614. TExprNode::TPtr nodePtr = nullptr;
  1615. TransformPgSetItemOption(pgSelect, optionName, [&nodePtr](const TExprBase& option) {
  1616. nodePtr = option.Ptr();
  1617. });
  1618. return nodePtr;
  1619. }
  1620. TExprNode::TPtr GetSetItemOptionValue(const TExprBase& setItemOption) {
  1621. if (setItemOption.Ref().IsList() && setItemOption.Ref().ChildrenSize() > 1) {
  1622. return setItemOption.Ref().ChildPtr(1);
  1623. }
  1624. return nullptr;
  1625. }
  1626. bool NeedToRenamePgSelectColumns(const TCoPgSelect& pgSelect) {
  1627. auto fill = NCommon::GetSetItemOption(pgSelect, "fill_target_columns");
  1628. return fill && !NCommon::GetSetItemOptionValue(TExprBase(fill));
  1629. }
  1630. bool RenamePgSelectColumns(
  1631. const TCoPgSelect& node,
  1632. TExprNode::TPtr& output,
  1633. const TMaybe<TColumnOrder>& tableColumnOrder,
  1634. TExprContext& ctx,
  1635. TTypeAnnotationContext& types) {
  1636. bool hasValues = (bool)GetSetItemOption(node, "values");
  1637. bool hasProjectionOrder = (bool)GetSetItemOption(node, "projection_order");
  1638. Y_ENSURE(hasValues ^ hasProjectionOrder, "Only one of values and projection_order should be present");
  1639. TString optionName = (hasValues) ? "values" : "projection_order";
  1640. auto selectorColumnOrder = types.LookupColumnOrder(node.Ref());
  1641. TColumnOrder insertColumnOrder;
  1642. if (auto targetColumnsOption = GetSetItemOption(node, "target_columns")) {
  1643. auto targetColumns = GetSetItemOptionValue(TExprBase(targetColumnsOption));
  1644. for (const auto& child : targetColumns->ChildrenList()) {
  1645. insertColumnOrder.AddColumn(TString(child->Content()));
  1646. }
  1647. } else {
  1648. YQL_ENSURE(tableColumnOrder);
  1649. insertColumnOrder = *tableColumnOrder;
  1650. }
  1651. YQL_ENSURE(selectorColumnOrder);
  1652. if (selectorColumnOrder->Size() > insertColumnOrder.Size()) {
  1653. ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << Sprintf(
  1654. "%s have %zu columns, INSERT INTO expects: %zu",
  1655. optionName.data(),
  1656. selectorColumnOrder->Size(),
  1657. insertColumnOrder.Size()
  1658. )));
  1659. return false;
  1660. }
  1661. if (*selectorColumnOrder == insertColumnOrder) {
  1662. output = node.Ptr();
  1663. return true;
  1664. }
  1665. TVector<const TItemExprType*> rowTypeItems;
  1666. rowTypeItems.reserve(selectorColumnOrder->Size());
  1667. const TTypeAnnotationNode* inputType;
  1668. switch (node.Ref().GetTypeAnn()->GetKind()) {
  1669. case ETypeAnnotationKind::List:
  1670. inputType = node.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType();
  1671. break;
  1672. default:
  1673. inputType = node.Ref().GetTypeAnn();
  1674. break;
  1675. }
  1676. YQL_ENSURE(inputType->GetKind() == ETypeAnnotationKind::Struct);
  1677. const auto rowArg = Build<TCoArgument>(ctx, node.Pos())
  1678. .Name("row")
  1679. .Done();
  1680. auto structBuilder = Build<TCoAsStruct>(ctx, node.Pos());
  1681. for (size_t i = 0; i < selectorColumnOrder->Size(); i++) {
  1682. const auto& columnName = selectorColumnOrder->at(i);
  1683. structBuilder.Add<TCoNameValueTuple>()
  1684. .Name().Build(insertColumnOrder.at(i).PhysicalName)
  1685. .Value<TCoMember>()
  1686. .Struct(rowArg)
  1687. .Name().Build(columnName.PhysicalName)
  1688. .Build()
  1689. .Build();
  1690. }
  1691. auto fill = GetSetItemOption(node, "fill_target_columns");
  1692. output = Build<TCoMap>(ctx, node.Pos())
  1693. .Input(node)
  1694. .Lambda<TCoLambda>()
  1695. .Args({rowArg})
  1696. .Body(structBuilder.Done().Ptr())
  1697. .Build()
  1698. .Done().Ptr();
  1699. fill->ChangeChildrenInplace({
  1700. fill->Child(0),
  1701. Build<TCoAtom>(ctx, node.Pos())
  1702. .Value("done")
  1703. .Done().Ptr()
  1704. });
  1705. fill->ChildPtr(1)->SetTypeAnn(ctx.MakeType<TUnitExprType>());
  1706. return true;
  1707. }
  1708. } // namespace NCommon
  1709. } // namespace NYql