yql_provider.cpp 69 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926
  1. #include "yql_provider.h"
  2. #include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
  3. #include <yql/essentials/core/type_ann/type_ann_core.h>
  4. #include <yql/essentials/core/type_ann/type_ann_expr.h>
  5. #include <yql/essentials/core/yql_expr_type_annotation.h>
  6. #include <yql/essentials/core/yql_expr_optimize.h>
  7. #include <yql/essentials/core/yql_execution.h>
  8. #include <yql/essentials/core/yql_opt_utils.h>
  9. #include <yql/essentials/parser/pg_catalog/catalog.h>
  10. #include <yql/essentials/minikql/mkql_function_registry.h>
  11. #include <yql/essentials/minikql/mkql_program_builder.h>
  12. #include <util/folder/path.h>
  13. #include <util/generic/is_in.h>
  14. #include <util/generic/utility.h>
  15. #include <util/string/join.h>
  16. namespace NYql {
  17. namespace NCommon {
  18. using namespace NNodes;
  19. namespace {
  20. constexpr std::array<std::string_view, 8> FormatsForInput = {
  21. "csv_with_names"sv,
  22. "tsv_with_names"sv,
  23. "json_list"sv,
  24. "json"sv,
  25. "raw"sv,
  26. "json_as_string"sv,
  27. "json_each_row"sv,
  28. "parquet"sv
  29. };
  30. constexpr std::array<std::string_view, 7> FormatsForOutput = {
  31. "csv_with_names"sv,
  32. "tsv_with_names"sv,
  33. "json_list"sv,
  34. "json"sv,
  35. "raw"sv,
  36. "json_each_row"sv,
  37. "parquet"sv
  38. };
  39. constexpr std::array<std::string_view, 6> Compressions = {
  40. "gzip"sv,
  41. "zstd"sv,
  42. "lz4"sv,
  43. "brotli"sv,
  44. "bzip2"sv,
  45. "xz"sv
  46. };
  47. constexpr std::array<std::string_view, 7> IntervalUnits = {
  48. "MICROSECONDS"sv,
  49. "MILLISECONDS"sv,
  50. "SECONDS"sv,
  51. "MINUTES"sv,
  52. "HOURS"sv,
  53. "DAYS"sv,
  54. "WEEKS"sv
  55. };
  56. constexpr std::array<std::string_view, 2> DateTimeFormatNames = {
  57. "POSIX"sv,
  58. "ISO"sv
  59. };
  60. constexpr std::array<std::string_view, 5> TimestampFormatNames = {
  61. "POSIX"sv,
  62. "ISO"sv,
  63. "UNIX_TIME_MILLISECONDS"sv,
  64. "UNIX_TIME_SECONDS"sv,
  65. "UNIX_TIME_MICROSECONDS"sv
  66. };
  67. TCoAtom InferIndexName(TCoAtomList key, TExprContext& ctx) {
  68. static const TString end = "_idx";
  69. static const TString delimiter = "_";
  70. size_t sz = end.size();
  71. for (const auto& n: key)
  72. sz += n.Value().size() + delimiter.size();
  73. TString name(Reserve(sz));
  74. for (const auto& n: key) {
  75. name += n.Value() + delimiter;
  76. }
  77. name += end;
  78. return Build<TCoAtom>(ctx, key.Pos())
  79. .Value(name)
  80. .Done();
  81. }
  82. } // namespace
  83. bool TCommitSettings::EnsureModeEmpty(TExprContext& ctx) {
  84. if (Mode) {
  85. ctx.AddError(TIssue(ctx.GetPosition(Pos), TStringBuilder()
  86. << "Unsupported mode:" << Mode.Cast().Value()));
  87. return false;
  88. }
  89. return true;
  90. }
  91. bool TCommitSettings::EnsureEpochEmpty(TExprContext& ctx) {
  92. if (Epoch) {
  93. ctx.AddError(TIssue(ctx.GetPosition(Pos), TStringBuilder()
  94. << "Epochs are unsupported."));
  95. return false;
  96. }
  97. return true;
  98. }
  99. bool TCommitSettings::EnsureOtherEmpty(TExprContext& ctx) {
  100. if (!Other.Empty()) {
  101. ctx.AddError(TIssue(ctx.GetPosition(Pos), TStringBuilder()
  102. << "Unsupported setting:" << Other.Item(0).Name().Value()));
  103. return false;
  104. }
  105. return true;
  106. }
  107. TCoNameValueTupleList TCommitSettings::BuildNode(TExprContext& ctx) const {
  108. TVector<TExprBase> settings;
  109. auto addSettings = [this, &settings, &ctx] (const TString& name, TMaybeNode<TExprBase> value) {
  110. if (value) {
  111. auto node = Build<TCoNameValueTuple>(ctx, Pos)
  112. .Name().Build(name)
  113. .Value(value.Cast())
  114. .Done();
  115. settings.push_back(node);
  116. }
  117. };
  118. addSettings("mode", Mode);
  119. addSettings("epoch", Epoch);
  120. for (auto setting : Other) {
  121. settings.push_back(setting);
  122. }
  123. auto ret = Build<TCoNameValueTupleList>(ctx, Pos)
  124. .Add(settings)
  125. .Done();
  126. return ret;
  127. }
  128. const TStructExprType* BuildCommonTableListType(TExprContext& ctx) {
  129. TVector<const TItemExprType*> items;
  130. auto stringType = ctx.MakeType<TDataExprType>(EDataSlot::String);
  131. auto listOfString = ctx.MakeType<TListExprType>(stringType);
  132. items.push_back(ctx.MakeType<TItemExprType>("Prefix", stringType));
  133. items.push_back(ctx.MakeType<TItemExprType>("Folders", listOfString));
  134. items.push_back(ctx.MakeType<TItemExprType>("Tables", listOfString));
  135. return ctx.MakeType<TStructExprType>(items);
  136. }
  137. TExprNode::TPtr BuildTypeExpr(TPositionHandle pos, const TTypeAnnotationNode& ann, TExprContext& ctx) {
  138. return ExpandType(pos, ann, ctx);
  139. }
  140. bool HasResOrPullOption(const TExprNode& node, const TStringBuf& option) {
  141. if (node.Content() == "Result" || node.Content() == "Pull") {
  142. auto options = node.Child(4);
  143. for (auto setting : options->Children()) {
  144. if (setting->Head().Content() == option) {
  145. return true;
  146. }
  147. }
  148. }
  149. return false;
  150. }
  151. TVector<TString> GetResOrPullColumnHints(const TExprNode& node) {
  152. TVector<TString> columns;
  153. auto setting = GetSetting(*node.Child(4), "columns");
  154. if (setting) {
  155. auto type = node.Head().GetTypeAnn();
  156. if (type->GetKind() != ETypeAnnotationKind::EmptyList) {
  157. auto structType = type->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
  158. for (ui32 i = 0; i < structType->GetSize(); ++i) {
  159. auto field = setting->Child(1)->Child(i);
  160. columns.push_back(TString(field->Content()));
  161. }
  162. }
  163. }
  164. return columns;
  165. }
  166. TString FullTableName(const TStringBuf& cluster, const TStringBuf& table) {
  167. return TStringBuilder() << cluster << ".[" << table << "]";
  168. }
  169. IDataProvider::TFillSettings GetFillSettings(const TExprNode& node) {
  170. IDataProvider::TFillSettings fillSettings;
  171. fillSettings.AllResultsBytesLimit = node.Child(1)->Content().empty()
  172. ? Nothing()
  173. : TMaybe<ui64>(FromString<ui64>(node.Child(1)->Content()));
  174. fillSettings.RowsLimitPerWrite = node.Child(2)->Content().empty()
  175. ? Nothing()
  176. : TMaybe<ui64>(FromString<ui64>(node.Child(2)->Content()));
  177. fillSettings.Format = (IDataProvider::EResultFormat)FromString<ui32>(node.Child(5)->Content());
  178. fillSettings.FormatDetails = node.Child(3)->Content();
  179. fillSettings.Discard = FromString<bool>(node.Child(7)->Content());
  180. return fillSettings;
  181. }
  182. NYson::EYsonFormat GetYsonFormat(const IDataProvider::TFillSettings& fillSettings) {
  183. YQL_ENSURE(fillSettings.Format == IDataProvider::EResultFormat::Yson);
  184. return (NYson::EYsonFormat)FromString<ui32>(fillSettings.FormatDetails);
  185. }
  186. TWriteTableSettings ParseWriteTableSettings(TExprList node, TExprContext& ctx) {
  187. TMaybeNode<TCoAtom> mode;
  188. TMaybeNode<TCoAtom> temporary;
  189. TMaybeNode<TCoAtom> isBatch;
  190. TMaybeNode<TExprList> columns;
  191. TMaybeNode<TExprList> returningList;
  192. TMaybeNode<TCoAtomList> primaryKey;
  193. TMaybeNode<TCoAtomList> partitionBy;
  194. TMaybeNode<TCoNameValueTupleList> orderBy;
  195. TMaybeNode<TCoLambda> filter;
  196. TMaybeNode<TCoLambda> update;
  197. TVector<TCoNameValueTuple> other;
  198. TVector<TCoIndex> indexes;
  199. TVector<TCoChangefeed> changefeeds;
  200. TMaybeNode<TExprList> columnFamilies;
  201. TVector<TCoNameValueTuple> tableSettings;
  202. TVector<TCoNameValueTuple> alterActions;
  203. TMaybeNode<TCoAtom> tableType;
  204. TMaybeNode<TCallable> pgFilter;
  205. for (auto child : node) {
  206. if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
  207. auto tuple = maybeTuple.Cast();
  208. auto name = tuple.Name().Value();
  209. if (name == "mode") {
  210. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  211. mode = tuple.Value().Cast<TCoAtom>();
  212. } else if (name == "columns") {
  213. YQL_ENSURE(tuple.Value().Maybe<TExprList>());
  214. columns = tuple.Value().Cast<TExprList>();
  215. } else if (name == "primarykey") {
  216. YQL_ENSURE(tuple.Value().Maybe<TCoAtomList>());
  217. primaryKey = tuple.Value().Cast<TCoAtomList>();
  218. } else if (name == "partitionby") {
  219. YQL_ENSURE(tuple.Value().Maybe<TCoAtomList>());
  220. partitionBy = tuple.Value().Cast<TCoAtomList>();
  221. } else if (name == "orderby") {
  222. YQL_ENSURE(tuple.Value().Maybe<TCoNameValueTupleList>());
  223. orderBy = tuple.Value().Cast<TCoNameValueTupleList>();
  224. } else if (name == "filter") {
  225. YQL_ENSURE(tuple.Value().Maybe<TCoLambda>());
  226. filter = tuple.Value().Cast<TCoLambda>();
  227. } else if (name == "update") {
  228. YQL_ENSURE(tuple.Value().Maybe<TCoLambda>());
  229. update = tuple.Value().Cast<TCoLambda>();
  230. } else if (name == "index") {
  231. YQL_ENSURE(tuple.Value().Maybe<TCoNameValueTupleList>());
  232. auto index = Build<TCoIndex>(ctx, node.Pos());
  233. bool inferName = false;
  234. TCoNameValueTupleList tableSettings = Build<TCoNameValueTupleList>(ctx, node.Pos()).Done();
  235. TCoNameValueTupleList indexSettings = Build<TCoNameValueTupleList>(ctx, node.Pos()).Done();
  236. TMaybe<TCoAtomList> columnList;
  237. for (const auto& item : tuple.Value().Cast<TCoNameValueTupleList>()) {
  238. const auto& indexItemName = item.Name().Value();
  239. if (indexItemName == "indexName") {
  240. if (auto atom = item.Value().Maybe<TCoAtom>()) {
  241. index.Name(atom.Cast());
  242. } else {
  243. // No index name given - infer name from column set
  244. inferName = true;
  245. }
  246. } else if (indexItemName == "indexType") {
  247. index.Type(item.Value().Cast<TCoAtom>());
  248. } else if (indexItemName == "indexColumns") {
  249. columnList = item.Value().Cast<TCoAtomList>();
  250. index.Columns(item.Value().Cast<TCoAtomList>());
  251. } else if (indexItemName == "dataColumns") {
  252. index.DataColumns(item.Value().Cast<TCoAtomList>());
  253. } else if (indexItemName == "tableSettings") {
  254. tableSettings = item.Value().Cast<TCoNameValueTupleList>();
  255. } else if (indexItemName == "indexSettings") {
  256. indexSettings = item.Value().Cast<TCoNameValueTupleList>();
  257. } else {
  258. YQL_ENSURE(false, "unknown index item");
  259. }
  260. }
  261. index.TableSettings(tableSettings);
  262. index.IndexSettings(indexSettings);
  263. if (inferName) {
  264. YQL_ENSURE(columnList);
  265. index.Name(InferIndexName(*columnList, ctx));
  266. }
  267. indexes.push_back(index.Done());
  268. } else if (name == "changefeed") {
  269. YQL_ENSURE(tuple.Value().Maybe<TCoNameValueTupleList>());
  270. auto cf = Build<TCoChangefeed>(ctx, node.Pos());
  271. for (const auto& item : tuple.Value().Cast<TCoNameValueTupleList>()) {
  272. const auto& itemName = item.Name().Value();
  273. if (itemName == "name") {
  274. cf.Name(item.Value().Cast<TCoAtom>());
  275. } else if (itemName == "settings") {
  276. YQL_ENSURE(item.Value().Maybe<TCoNameValueTupleList>());
  277. cf.Settings(item.Value().Cast<TCoNameValueTupleList>());
  278. } else if (itemName == "state") {
  279. cf.State(item.Value().Cast<TCoAtom>());
  280. } else {
  281. YQL_ENSURE(false, "unknown changefeed item");
  282. }
  283. }
  284. changefeeds.push_back(cf.Done());
  285. } else if (name == "columnFamilies") {
  286. YQL_ENSURE(tuple.Value().Maybe<TExprList>());
  287. columnFamilies = tuple.Value().Cast<TExprList>();
  288. } else if (name == "tableSettings") {
  289. YQL_ENSURE(tuple.Value().Maybe<TCoNameValueTupleList>());
  290. for (const auto& item : tuple.Value().Cast<TCoNameValueTupleList>()) {
  291. tableSettings.push_back(item);
  292. }
  293. } else if (name == "actions") {
  294. YQL_ENSURE(tuple.Value().Maybe<TCoNameValueTupleList>());
  295. for (const auto& item : tuple.Value().Cast<TCoNameValueTupleList>()) {
  296. alterActions.push_back(item);
  297. }
  298. } else if (name == "tableType") {
  299. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  300. tableType = tuple.Value().Cast<TCoAtom>();
  301. } else if (name == "pg_delete" || name == "pg_update") {
  302. YQL_ENSURE(tuple.Value().Maybe<TCallable>());
  303. pgFilter = tuple.Value().Cast<TCallable>();
  304. } else if (name == "temporary") {
  305. temporary = Build<TCoAtom>(ctx, node.Pos()).Value("true").Done();
  306. } else if (name == "returning") {
  307. YQL_ENSURE(tuple.Value().Maybe<TExprList>());
  308. returningList = tuple.Value().Cast<TExprList>();
  309. } else if (name == "is_batch") {
  310. isBatch = Build<TCoAtom>(ctx, node.Pos()).Value("true").Done();
  311. } else {
  312. other.push_back(tuple);
  313. }
  314. }
  315. }
  316. const auto& otherSettings = Build<TCoNameValueTupleList>(ctx, node.Pos())
  317. .Add(other)
  318. .Done();
  319. const auto& idx = Build<TCoIndexList>(ctx, node.Pos())
  320. .Add(indexes)
  321. .Done();
  322. const auto& cfs = Build<TCoChangefeedList>(ctx, node.Pos())
  323. .Add(changefeeds)
  324. .Done();
  325. const auto& tableProfileSettings = Build<TCoNameValueTupleList>(ctx, node.Pos())
  326. .Add(tableSettings)
  327. .Done();
  328. const auto& alterTableActions = Build<TCoNameValueTupleList>(ctx, node.Pos())
  329. .Add(alterActions)
  330. .Done();
  331. if (!columnFamilies.IsValid()) {
  332. columnFamilies = Build<TExprList>(ctx, node.Pos()).Done();
  333. }
  334. TWriteTableSettings ret(otherSettings);
  335. ret.Mode = mode;
  336. ret.Temporary = temporary;
  337. ret.IsBatch = isBatch;
  338. ret.Columns = columns;
  339. ret.ReturningList = returningList;
  340. ret.PrimaryKey = primaryKey;
  341. ret.PartitionBy = partitionBy;
  342. ret.OrderBy = orderBy;
  343. ret.Filter = filter;
  344. ret.Update = update;
  345. ret.Indexes = idx;
  346. ret.Changefeeds = cfs;
  347. ret.ColumnFamilies = columnFamilies;
  348. ret.TableSettings = tableProfileSettings;
  349. ret.AlterActions = alterTableActions;
  350. ret.TableType = tableType;
  351. ret.PgFilter = pgFilter;
  352. return ret;
  353. }
  354. TWriteSequenceSettings ParseSequenceSettings(NNodes::TExprList node, TExprContext& ctx) {
  355. TMaybeNode<TCoAtom> mode;
  356. TMaybeNode<TCoAtom> valueType;
  357. TMaybeNode<TCoAtom> temporary;
  358. TMaybeNode<TCoAtom> ownedBy;
  359. TVector<TCoNameValueTuple> sequenceSettings;
  360. TVector<TCoNameValueTuple> other;
  361. const static std::unordered_set<TString> sequenceSettingNames =
  362. {"start", "increment", "cache", "minvalue", "maxvalue", "cycle", "restart"};
  363. for (auto child : node) {
  364. if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
  365. auto tuple = maybeTuple.Cast();
  366. auto name = tuple.Name().Value();
  367. if (name == "mode") {
  368. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  369. mode = tuple.Value().Cast<TCoAtom>();
  370. } else if (name == "as") {
  371. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  372. valueType = tuple.Value().Cast<TCoAtom>();
  373. } else if (name == "temporary") {
  374. temporary = Build<TCoAtom>(ctx, node.Pos()).Value("true").Done();
  375. } else if (sequenceSettingNames.contains(TString(name))) {
  376. sequenceSettings.push_back(tuple);
  377. } else {
  378. other.push_back(tuple);
  379. }
  380. }
  381. }
  382. const auto& sequenceSettingsList = Build<TCoNameValueTupleList>(ctx, node.Pos())
  383. .Add(sequenceSettings)
  384. .Done();
  385. const auto& otherSettings = Build<TCoNameValueTupleList>(ctx, node.Pos())
  386. .Add(other)
  387. .Done();
  388. TWriteSequenceSettings ret(otherSettings);
  389. ret.Mode = mode;
  390. ret.ValueType = valueType;
  391. ret.Temporary = temporary;
  392. ret.SequenceSettings = sequenceSettingsList;
  393. return ret;
  394. }
  395. TWriteTopicSettings ParseWriteTopicSettings(TExprList node, TExprContext& ctx) {
  396. Y_UNUSED(ctx);
  397. TMaybeNode<TCoAtom> mode;
  398. TVector<TCoTopicConsumer> consumers;
  399. TVector<TCoTopicConsumer> addConsumers;
  400. TVector<TCoTopicConsumer> alterConsumers;
  401. TVector<TCoAtom> dropConsumers;
  402. TVector<TCoNameValueTuple> topicSettings;
  403. auto parseNewConsumer = [&](const auto& node, const auto& tuple, auto& consumersList) {
  404. YQL_ENSURE(tuple.Value().template Maybe<TCoNameValueTupleList>());
  405. auto consumer = Build<TCoTopicConsumer>(ctx, node.Pos());
  406. for (const auto& item : tuple.Value().template Cast<TCoNameValueTupleList>()) {
  407. const auto& itemName = item.Name().Value();
  408. if (itemName == "name") {
  409. consumer.Name(item.Value().template Cast<TCoAtom>());
  410. } else if (itemName == "settings") {
  411. YQL_ENSURE(item.Value().template Maybe<TCoNameValueTupleList>());
  412. consumer.Settings(item.Value().template Cast<TCoNameValueTupleList>());
  413. } else {
  414. YQL_ENSURE(false, "unknown consumer item");
  415. }
  416. }
  417. consumersList.push_back(consumer.Done());
  418. };
  419. for (auto child : node) {
  420. if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
  421. auto tuple = maybeTuple.Cast();
  422. auto name = tuple.Name().Value();
  423. if (name == "mode") {
  424. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  425. mode = tuple.Value().Cast<TCoAtom>();
  426. } else if (name == "consumer") {
  427. parseNewConsumer(node, tuple, consumers);
  428. } else if (name == "addConsumer") {
  429. parseNewConsumer(node, tuple, addConsumers);
  430. } else if (name == "alterConsumer") {
  431. parseNewConsumer(node, tuple, alterConsumers);
  432. } else if (name == "dropConsumer") {
  433. auto name = tuple.Value().Cast<TCoAtom>();
  434. dropConsumers.push_back(name);
  435. } else if (name == "topicSettings") {
  436. YQL_ENSURE(tuple.Value().Maybe<TCoNameValueTupleList>());
  437. for (const auto& item : tuple.Value().Cast<TCoNameValueTupleList>()) {
  438. topicSettings.push_back(item);
  439. }
  440. }
  441. }
  442. }
  443. const auto& builtCons = Build<TCoTopicConsumerList>(ctx, node.Pos())
  444. .Add(consumers)
  445. .Done();
  446. const auto& builtAddCons = Build<TCoTopicConsumerList>(ctx, node.Pos())
  447. .Add(addConsumers)
  448. .Done();
  449. const auto& builtAlterCons = Build<TCoTopicConsumerList>(ctx, node.Pos())
  450. .Add(alterConsumers)
  451. .Done();
  452. const auto& builtDropCons = Build<TCoAtomList>(ctx, node.Pos())
  453. .Add(dropConsumers)
  454. .Done();
  455. const auto& builtSettings = Build<TCoNameValueTupleList>(ctx, node.Pos())
  456. .Add(topicSettings)
  457. .Done();
  458. TVector<TCoNameValueTuple> other;
  459. const auto& otherSettings = Build<TCoNameValueTupleList>(ctx, node.Pos())
  460. .Add(other)
  461. .Done();
  462. TWriteTopicSettings ret(otherSettings);
  463. ret.Mode = mode;
  464. ret.Consumers = builtCons;
  465. ret.AddConsumers = builtAddCons;
  466. ret.TopicSettings = builtSettings;
  467. ret.AlterConsumers = builtAlterCons;
  468. ret.DropConsumers = builtDropCons;
  469. return ret;
  470. }
  471. TWriteReplicationSettings ParseWriteReplicationSettings(TExprList node, TExprContext& ctx) {
  472. TMaybeNode<TCoAtom> mode;
  473. TVector<TCoReplicationTarget> targets;
  474. TVector<TCoNameValueTuple> settings;
  475. TVector<TCoNameValueTuple> other;
  476. for (auto child : node) {
  477. if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
  478. auto tuple = maybeTuple.Cast();
  479. auto name = tuple.Name().Value();
  480. if (name == "mode") {
  481. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  482. mode = tuple.Value().Cast<TCoAtom>();
  483. } else if (name == "targets") {
  484. YQL_ENSURE(tuple.Value().Maybe<TExprList>());
  485. for (const auto& target : tuple.Value().Cast<TExprList>()) {
  486. auto builtTarget = Build<TCoReplicationTarget>(ctx, node.Pos());
  487. YQL_ENSURE(target.Maybe<TCoNameValueTupleList>());
  488. for (const auto& item : target.Cast<TCoNameValueTupleList>()) {
  489. auto itemName = item.Name().Value();
  490. if (itemName == "remote") {
  491. builtTarget.RemotePath(item.Value().Cast<TCoAtom>());
  492. } else if (itemName == "local") {
  493. builtTarget.LocalPath(item.Value().Cast<TCoAtom>());
  494. } else {
  495. YQL_ENSURE(false, "unknown target item");
  496. }
  497. }
  498. targets.push_back(builtTarget.Done());
  499. }
  500. } else if (name == "settings") {
  501. YQL_ENSURE(tuple.Value().Maybe<TCoNameValueTupleList>());
  502. for (const auto& item : tuple.Value().Cast<TCoNameValueTupleList>()) {
  503. settings.push_back(item);
  504. }
  505. } else {
  506. other.push_back(tuple);
  507. }
  508. }
  509. }
  510. const auto& builtTargets = Build<TCoReplicationTargetList>(ctx, node.Pos())
  511. .Add(targets)
  512. .Done();
  513. const auto& builtSettings = Build<TCoNameValueTupleList>(ctx, node.Pos())
  514. .Add(settings)
  515. .Done();
  516. const auto& builtOther = Build<TCoNameValueTupleList>(ctx, node.Pos())
  517. .Add(other)
  518. .Done();
  519. TWriteReplicationSettings ret(builtOther);
  520. ret.Mode = mode;
  521. ret.Targets = builtTargets;
  522. ret.ReplicationSettings = builtSettings;
  523. return ret;
  524. }
  525. TWriteTransferSettings ParseWriteTransferSettings(TExprList node, TExprContext& ctx) {
  526. TMaybeNode<TCoAtom> mode;
  527. TMaybeNode<TCoAtom> source;
  528. TMaybeNode<TCoAtom> target;
  529. TMaybeNode<TCoAtom> transformLambda;
  530. TVector<TCoNameValueTuple> settings;
  531. TVector<TCoNameValueTuple> other;
  532. for (auto child : node) {
  533. if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
  534. auto tuple = maybeTuple.Cast();
  535. auto name = tuple.Name().Value();
  536. if (name == "mode") {
  537. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  538. mode = tuple.Value().Cast<TCoAtom>();
  539. } else if (name == "source") {
  540. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  541. source = tuple.Value().Cast<TCoAtom>();
  542. } else if (name == "target") {
  543. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  544. target = tuple.Value().Cast<TCoAtom>();
  545. } else if (name == "transformLambda") {
  546. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  547. transformLambda = tuple.Value().Cast<TCoAtom>();
  548. } else if (name == "settings") {
  549. YQL_ENSURE(tuple.Value().Maybe<TCoNameValueTupleList>());
  550. for (const auto& item : tuple.Value().Cast<TCoNameValueTupleList>()) {
  551. settings.push_back(item);
  552. }
  553. } else {
  554. other.push_back(tuple);
  555. }
  556. }
  557. }
  558. const auto& builtSettings = Build<TCoNameValueTupleList>(ctx, node.Pos())
  559. .Add(settings)
  560. .Done();
  561. const auto& builtOther = Build<TCoNameValueTupleList>(ctx, node.Pos())
  562. .Add(other)
  563. .Done();
  564. TWriteTransferSettings ret(builtOther);
  565. ret.Mode = mode;
  566. ret.Source = source;
  567. ret.Target = target;
  568. ret.TransformLambda = transformLambda;
  569. ret.TransferSettings = builtSettings;
  570. return ret;
  571. }
  572. TWriteRoleSettings ParseWriteRoleSettings(TExprList node, TExprContext& ctx) {
  573. TMaybeNode<TCoAtom> mode;
  574. TVector<TCoAtom> roles;
  575. TMaybeNode<TCoAtom> newName;
  576. TVector<TCoNameValueTuple> other;
  577. for (auto child : node) {
  578. if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
  579. auto tuple = maybeTuple.Cast();
  580. auto name = tuple.Name().Value();
  581. if (name == "mode") {
  582. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  583. mode = tuple.Value().Cast<TCoAtom>();
  584. } else if (name == "roles") {
  585. YQL_ENSURE(tuple.Value().Maybe<TCoAtomList>());
  586. for (const auto& item : tuple.Value().Cast<TCoAtomList>()) {
  587. roles.push_back(item);
  588. }
  589. } else if (name == "newName") {
  590. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  591. newName = tuple.Value().Cast<TCoAtom>();
  592. } else {
  593. other.push_back(tuple);
  594. }
  595. }
  596. }
  597. const auto& builtRoles = Build<TCoAtomList>(ctx, node.Pos())
  598. .Add(roles)
  599. .Done();
  600. const auto& otherSettings = Build<TCoNameValueTupleList>(ctx, node.Pos())
  601. .Add(other)
  602. .Done();
  603. TWriteRoleSettings ret(otherSettings);
  604. ret.Roles = builtRoles;;
  605. ret.NewName = newName;
  606. ret.Mode = mode;
  607. return ret;
  608. }
  609. TWritePermissionSettings ParseWritePermissionsSettings(TExprList node, TExprContext&) {
  610. TMaybeNode<TCoAtomList> permissions;
  611. TMaybeNode<TCoAtomList> paths;
  612. TMaybeNode<TCoAtomList> roleNames;
  613. for (auto child : node) {
  614. if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
  615. auto tuple = maybeTuple.Cast();
  616. auto name = tuple.Name().Value();
  617. if (name == "permissions") {
  618. YQL_ENSURE(tuple.Value().Maybe<TCoAtomList>());
  619. permissions = tuple.Value().Cast<TCoAtomList>();;
  620. } else if (name == "roles") {
  621. YQL_ENSURE(tuple.Value().Maybe<TCoAtomList>());
  622. roleNames = tuple.Value().Cast<TCoAtomList>();
  623. } else if (name == "paths") {
  624. YQL_ENSURE(tuple.Value().Maybe<TCoAtomList>());
  625. paths = tuple.Value().Cast<TCoAtomList>();
  626. }
  627. }
  628. }
  629. TWritePermissionSettings ret(std::move(permissions), std::move(paths), std::move(roleNames));
  630. return ret;
  631. }
  632. TWriteObjectSettings ParseWriteObjectSettings(TExprList node, TExprContext& ctx) {
  633. TMaybeNode<TCoAtom> mode;
  634. TMaybe<TCoNameValueTupleList> kvFeatures;
  635. TMaybe<TCoAtomList> resetFeatures;
  636. for (auto child : node) {
  637. if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
  638. auto tuple = maybeTuple.Cast();
  639. auto name = tuple.Name().Value();
  640. if (name == "mode") {
  641. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  642. mode = tuple.Value().Cast<TCoAtom>();
  643. } else if (name == "features") {
  644. auto maybeFeatures = tuple.Value().Maybe<TCoNameValueTupleList>();
  645. Y_ABORT_UNLESS(maybeFeatures);
  646. kvFeatures = maybeFeatures.Cast();
  647. } else if (name == "resetFeatures") {
  648. auto maybeFeatures = tuple.Value().Maybe<TCoAtomList>();
  649. Y_ABORT_UNLESS(maybeFeatures);
  650. resetFeatures = maybeFeatures.Cast();
  651. }
  652. }
  653. }
  654. if (!kvFeatures) {
  655. kvFeatures = Build<TCoNameValueTupleList>(ctx, node.Pos()).Done();
  656. }
  657. if (!resetFeatures) {
  658. resetFeatures = Build<TCoAtomList>(ctx, node.Pos()).Done();
  659. }
  660. TWriteObjectSettings ret(std::move(mode), std::move(*kvFeatures), std::move(*resetFeatures));
  661. return ret;
  662. }
  663. TCommitSettings ParseCommitSettings(TCoCommit node, TExprContext& ctx) {
  664. if (!node.Settings()) {
  665. return TCommitSettings(Build<TCoNameValueTupleList>(ctx, node.Pos()).Done());
  666. }
  667. TMaybeNode<TCoAtom> mode;
  668. TMaybeNode<TCoAtom> epoch;
  669. TVector<TExprBase> other;
  670. if (node.Settings()) {
  671. auto settings = node.Settings().Cast();
  672. for (auto setting : settings) {
  673. if (setting.Name() == "mode") {
  674. YQL_ENSURE(setting.Value().Maybe<TCoAtom>());
  675. mode = setting.Value().Cast<TCoAtom>();
  676. } else if (setting.Name() == "epoch") {
  677. YQL_ENSURE(setting.Value().Maybe<TCoAtom>());
  678. epoch = setting.Value().Cast<TCoAtom>();
  679. } else {
  680. other.push_back(setting);
  681. }
  682. }
  683. }
  684. auto otherSettings = Build<TCoNameValueTupleList>(ctx, node.Pos())
  685. .Add(other)
  686. .Done();
  687. TCommitSettings ret(otherSettings);
  688. ret.Pos = node.Pos();
  689. ret.Mode = mode;
  690. ret.Epoch = epoch;
  691. return ret;
  692. }
  693. TPgObjectSettings ParsePgObjectSettings(TExprList node, TExprContext&) {
  694. TMaybeNode<TCoAtom> mode;
  695. TMaybeNode<TCoAtom> ifExists;
  696. for (auto child : node) {
  697. if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
  698. auto tuple = maybeTuple.Cast();
  699. auto name = tuple.Name().Value();
  700. if (name == "mode") {
  701. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  702. mode = tuple.Value().Cast<TCoAtom>();
  703. } else if (name == "ifExists") {
  704. YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
  705. ifExists = tuple.Value().Cast<TCoAtom>();
  706. }
  707. }
  708. }
  709. TPgObjectSettings ret(std::move(mode), std::move(ifExists));
  710. return ret;
  711. }
  712. TVector<TString> GetStructFields(const TTypeAnnotationNode* type) {
  713. TVector<TString> fields;
  714. if (type->GetKind() == ETypeAnnotationKind::List) {
  715. type = type->Cast<TListExprType>()->GetItemType();
  716. }
  717. if (type->GetKind() == ETypeAnnotationKind::Struct) {
  718. auto structType = type->Cast<TStructExprType>();
  719. for (auto& member : structType->GetItems()) {
  720. fields.push_back(TString(member->GetName()));
  721. }
  722. }
  723. return fields;
  724. }
  725. void TransformerStatsToYson(const TString& name, const IGraphTransformer::TStatistics& stats,
  726. NYson::TYsonWriter& writer)
  727. {
  728. writer.OnBeginMap();
  729. if (!name.empty()) {
  730. writer.OnKeyedItem("Name");
  731. writer.OnStringScalar(name);
  732. }
  733. if (stats.TransformDuration.MicroSeconds() > 0) {
  734. writer.OnKeyedItem("TransformDurationUs");
  735. writer.OnUint64Scalar(stats.TransformDuration.MicroSeconds());
  736. }
  737. if (stats.WaitDuration.MicroSeconds() > 0) {
  738. writer.OnKeyedItem("WaitDurationUs");
  739. writer.OnUint64Scalar(stats.WaitDuration.MicroSeconds());
  740. }
  741. if (stats.NewExprNodes > 0) {
  742. writer.OnKeyedItem("NewExprNodes");
  743. writer.OnInt64Scalar(stats.NewExprNodes);
  744. }
  745. if (stats.NewTypeNodes > 0) {
  746. writer.OnKeyedItem("NewTypeNodes");
  747. writer.OnInt64Scalar(stats.NewTypeNodes);
  748. }
  749. if (stats.NewConstraintNodes > 0) {
  750. writer.OnKeyedItem("NewConstraintNodes");
  751. writer.OnInt64Scalar(stats.NewConstraintNodes);
  752. }
  753. if (stats.Repeats > 0) {
  754. writer.OnKeyedItem("Repeats");
  755. writer.OnUint64Scalar(stats.Repeats);
  756. }
  757. if (stats.Restarts > 0) {
  758. writer.OnKeyedItem("Restarts");
  759. writer.OnUint64Scalar(stats.Restarts);
  760. }
  761. if (!stats.Stages.empty()) {
  762. writer.OnKeyedItem("Stages");
  763. writer.OnBeginList();
  764. for (auto& stage : stats.Stages) {
  765. writer.OnListItem();
  766. TransformerStatsToYson(stage.first, stage.second, writer);
  767. }
  768. writer.OnEndList();
  769. }
  770. writer.OnEndMap();
  771. }
  772. TString TransformerStatsToYson(const IGraphTransformer::TStatistics& stats, NYson::EYsonFormat format) {
  773. TStringStream out;
  774. NYson::TYsonWriter writer(&out, format);
  775. TransformerStatsToYson("", stats, writer);
  776. return out.Str();
  777. }
  778. bool FillUsedFilesImpl(
  779. const TExprNode& node,
  780. TUserDataTable& files,
  781. const TTypeAnnotationContext& types,
  782. TExprContext& ctx,
  783. const TUserDataTable& crutches,
  784. TNodeSet& visited,
  785. ui64& usedPgExtensions,
  786. bool needFullPgCatalog)
  787. {
  788. if (!visited.insert(&node).second) {
  789. return true;
  790. }
  791. if (node.GetTypeAnn()) {
  792. usedPgExtensions |= node.GetTypeAnn()->GetUsedPgExtensions();
  793. }
  794. if (node.IsCallable("PgResolvedCall")) {
  795. auto procId = FromString<ui32>(node.Child(1)->Content());
  796. const auto& proc = NPg::LookupProc(procId);
  797. usedPgExtensions |= MakePgExtensionMask(proc.ExtensionIndex);
  798. }
  799. if (node.IsCallable("PgResolvedOp")) {
  800. auto operId = FromString<ui32>(node.Child(1)->Content());
  801. const auto& oper = NPg::LookupOper(operId);
  802. const auto& proc = NPg::LookupProc(oper.ProcId);
  803. usedPgExtensions |= MakePgExtensionMask(proc.ExtensionIndex);
  804. }
  805. if (node.IsCallable({"PgAnyResolvedOp", "PgAllResolvedOp"})) {
  806. auto operId = FromString<ui32>(node.Child(1)->Content());
  807. const auto& oper = NPg::LookupOper(operId);
  808. const auto& proc = NPg::LookupProc(oper.ProcId);
  809. usedPgExtensions |= MakePgExtensionMask(proc.ExtensionIndex);
  810. }
  811. if (node.IsCallable("PgTableContent")) {
  812. needFullPgCatalog = true;
  813. }
  814. if (node.IsCallable("FilePath") || node.IsCallable("FileContent")) {
  815. const auto& name = node.Head().Content();
  816. const auto block = types.UserDataStorage->FindUserDataBlock(name);
  817. if (!block) {
  818. ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "File not found: " << name));
  819. return false;
  820. }
  821. else {
  822. auto iter = files.insert({ TUserDataStorage::ComposeUserDataKey(name), *block }).first;
  823. iter->second.Usage.Set(node.IsCallable("FilePath") ? EUserDataBlockUsage::Path : EUserDataBlockUsage::Content);
  824. }
  825. }
  826. if (node.IsCallable("FolderPath")) {
  827. const auto& name = node.Head().Content();
  828. auto blocks = types.UserDataStorage->FindUserDataFolder(name);
  829. if (!blocks) {
  830. ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "Folder not found: " << name));
  831. return false;
  832. } else {
  833. for (const auto& x : *blocks) {
  834. auto iter = files.insert({ x.first, *x.second }).first;
  835. iter->second.Usage.Set(EUserDataBlockUsage::Path);
  836. }
  837. }
  838. }
  839. if (node.IsCallable("Udf") || node.IsCallable("ScriptUdf")) {
  840. TStringBuf moduleName = node.Head().Content();
  841. if (node.IsCallable("Udf")) {
  842. TStringBuf funcName;
  843. YQL_ENSURE(SplitUdfName(node.Head().Content(), moduleName, funcName));
  844. }
  845. auto scriptType = NKikimr::NMiniKQL::ScriptTypeFromStr(moduleName);
  846. if (node.IsCallable("ScriptUdf") && !NKikimr::NMiniKQL::IsCustomPython(scriptType)) {
  847. moduleName = NKikimr::NMiniKQL::ScriptTypeAsStr(NKikimr::NMiniKQL::CanonizeScriptType(scriptType));
  848. }
  849. bool addSysModule = true;
  850. TString fileAlias;
  851. if (node.IsCallable("Udf")) {
  852. fileAlias = node.Child(6)->Content();
  853. } else {
  854. auto iterator = types.UdfModules.find(moduleName);
  855. // we have external UdfModule (not in preinstalled udfs)
  856. if (iterator != types.UdfModules.end()) {
  857. fileAlias = iterator->second.FileAlias;
  858. }
  859. }
  860. if (!fileAlias.empty()) {
  861. addSysModule = false;
  862. const auto block = types.UserDataStorage->FindUserDataBlock(fileAlias);
  863. if (!block) {
  864. ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "File not found: " << fileAlias));
  865. return false;
  866. } else {
  867. files.emplace(TUserDataStorage::ComposeUserDataKey(fileAlias), *block).first->second.Usage.Set(EUserDataBlockUsage::Udf);
  868. }
  869. }
  870. if (moduleName == TStringBuf("Geo")) {
  871. const auto geobase = TUserDataKey::File(TStringBuf("/home/geodata6.bin"));
  872. if (const auto block = types.UserDataStorage->FindUserDataBlock(geobase)) {
  873. files.emplace(geobase, *block).first->second.Usage.Set(EUserDataBlockUsage::Path);
  874. } else {
  875. const auto it = crutches.find(geobase);
  876. if (crutches.cend() != it) {
  877. auto pragma = it->second;
  878. types.UserDataStorage->AddUserDataBlock(geobase, pragma);
  879. files.emplace(geobase, pragma).first->second.Usage.Set(EUserDataBlockUsage::Path);
  880. }
  881. }
  882. }
  883. if (addSysModule) {
  884. auto pathWithMd5 = types.UdfResolver->GetSystemModulePath(moduleName);
  885. YQL_ENSURE(pathWithMd5);
  886. TUserDataBlock sysBlock;
  887. sysBlock.Type = EUserDataType::PATH;
  888. sysBlock.Data = pathWithMd5->Path;
  889. sysBlock.Usage.Set(EUserDataBlockUsage::Udf);
  890. auto alias = TFsPath(sysBlock.Data).GetName();
  891. auto key = TUserDataKey::Udf(alias);
  892. if (const auto block = types.UserDataStorage->FindUserDataBlock(key)) {
  893. files[key] = *block;
  894. if (!types.QContext.CanRead()) {
  895. YQL_ENSURE(block->FrozenFile);
  896. }
  897. } else {
  898. // Check alias clash with user files
  899. if (files.contains(TUserDataStorage::ComposeUserDataKey(alias))) {
  900. ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "System module name " << alias << " clashes with one of the user's files"));
  901. return false;
  902. }
  903. if (!alias.StartsWith(NKikimr::NMiniKQL::StaticModulePrefix) && !files.contains(key)) {
  904. // CreateFakeFileLink calculates md5 for file, let's do it once
  905. if (!types.QContext.CanRead()) {
  906. sysBlock.FrozenFile = CreateFakeFileLink(sysBlock.Data, pathWithMd5->Md5);
  907. }
  908. files[key] = sysBlock;
  909. types.UserDataStorage->AddUserDataBlock(key, sysBlock);
  910. }
  911. }
  912. }
  913. }
  914. bool childrenOk = true;
  915. for (auto& child : node.Children()) {
  916. childrenOk = FillUsedFilesImpl(*child, files, types, ctx, crutches, visited,
  917. usedPgExtensions, needFullPgCatalog) && childrenOk;
  918. }
  919. return childrenOk;
  920. }
  921. static void GetToken(const TString& string, TString& out, const TTypeAnnotationContext& type) {
  922. auto separator = string.find(":");
  923. const auto p0 = string.substr(0, separator);
  924. if (p0 == "api") {
  925. const auto p1 = string.substr(separator + 1);
  926. if (p1 == "oauth") {
  927. out = type.Credentials->GetUserCredentials().OauthToken;
  928. } else if (p1 == "cookie") {
  929. out = type.Credentials->GetUserCredentials().BlackboxSessionIdCookie;
  930. } else {
  931. YQL_ENSURE(false, "unexpected token id");
  932. }
  933. } else if (p0 == "token" || p0 == "cluster") {
  934. const auto p1 = string.substr(separator + 1);
  935. auto cred = type.Credentials->FindCredential(p1);
  936. if (cred == nullptr) {
  937. if (p0 == "cluster") {
  938. TStringBuf clusterName = p1;
  939. if (clusterName.SkipPrefix("default_")) {
  940. for (auto& x : type.DataSources) {
  941. auto tokens = x->GetClusterTokens();
  942. if (tokens) {
  943. auto token = tokens->FindPtr(clusterName);
  944. if (token) {
  945. out = *token;
  946. return;
  947. }
  948. }
  949. }
  950. for (auto& x : type.DataSinks) {
  951. auto tokens = x->GetClusterTokens();
  952. if (tokens) {
  953. auto token = tokens->FindPtr(clusterName);
  954. if (token) {
  955. out = *token;
  956. return;
  957. }
  958. }
  959. }
  960. }
  961. }
  962. YQL_ENSURE(false, "unexpected token id");
  963. }
  964. out = cred->Content;
  965. } else {
  966. YQL_ENSURE(false, "unexpected token prefix");
  967. }
  968. }
  969. void FillSecureParams(
  970. const TExprNode::TPtr& root,
  971. const TTypeAnnotationContext& types,
  972. THashMap<TString, TString>& secureParams) {
  973. NYql::VisitExpr(root, [&secureParams](const TExprNode::TPtr& node) {
  974. if (auto maybeSecureParam = TMaybeNode<TCoSecureParam>(node)) {
  975. const auto& secureParamName = TString(maybeSecureParam.Cast().Name().Value());
  976. secureParams.insert({secureParamName, TString()});
  977. }
  978. return true;
  979. });
  980. for (auto& it : secureParams) {
  981. GetToken(it.first, it.second, types);
  982. }
  983. }
  984. bool AddPgFile(bool isPath, const TString& pathOrContent, const TString& md5, const TString& alias, TUserDataTable& files,
  985. const TTypeAnnotationContext& types, TPositionHandle pos, TExprContext& ctx) {
  986. TUserDataBlock block;
  987. block.Data = pathOrContent;
  988. if (isPath) {
  989. block.Type = EUserDataType::PATH;
  990. block.Usage.Set(EUserDataBlockUsage::Path);
  991. block.Usage.Set(EUserDataBlockUsage::PgExt);
  992. } else {
  993. block.Type = EUserDataType::RAW_INLINE_DATA;
  994. block.Usage.Set(EUserDataBlockUsage::Content);
  995. }
  996. auto key = TUserDataKey::File(alias);
  997. if (const auto foundBlock = types.UserDataStorage->FindUserDataBlock(key)) {
  998. files[key] = *foundBlock;
  999. YQL_ENSURE(!isPath || foundBlock->FrozenFile);
  1000. } else {
  1001. // Check alias clash with user files
  1002. if (files.contains(TUserDataStorage::ComposeUserDataKey(alias))) {
  1003. ctx.AddError(TIssue(ctx.GetPosition(pos), TStringBuilder() << "File " << alias << " clashes with one of the user's files"));
  1004. return false;
  1005. }
  1006. // CreateFakeFileLink calculates md5 for file, let's do it once if needed
  1007. if (isPath) {
  1008. block.FrozenFile = CreateFakeFileLink(block.Data, md5);
  1009. }
  1010. files[key] = block;
  1011. types.UserDataStorage->AddUserDataBlock(key, block);
  1012. }
  1013. return true;
  1014. }
  1015. bool FillUsedFiles(
  1016. const TExprNode& node,
  1017. TUserDataTable& files,
  1018. const TTypeAnnotationContext& types,
  1019. TExprContext& ctx,
  1020. const TUserDataTable& crutches) {
  1021. TNodeSet visited;
  1022. ui64 usedPgExtensions = 0;
  1023. bool needFullPgCatalog = false;
  1024. auto ret = FillUsedFilesImpl(node, files, types, ctx, crutches, visited, usedPgExtensions, needFullPgCatalog);
  1025. if (!ret) {
  1026. return false;
  1027. }
  1028. auto remainingPgExtensions = usedPgExtensions;
  1029. TSet<ui32> filter;
  1030. for (ui32 extensionIndex = 1; remainingPgExtensions && (extensionIndex <= 64); ++extensionIndex) {
  1031. auto mask = MakePgExtensionMask(extensionIndex);
  1032. if (!(mask & usedPgExtensions)) {
  1033. continue;
  1034. }
  1035. filter.insert(extensionIndex);
  1036. remainingPgExtensions &= ~mask;
  1037. const auto& e = NPg::LookupExtension(extensionIndex);
  1038. needFullPgCatalog = true;
  1039. auto alias = TFsPath(e.LibraryPath).GetName();
  1040. if (!AddPgFile(true, e.LibraryPath, e.LibraryMD5, alias, files, types, node.Pos(), ctx)) {
  1041. return false;
  1042. }
  1043. }
  1044. Y_ENSURE(remainingPgExtensions == 0);
  1045. if (!needFullPgCatalog) {
  1046. return true;
  1047. }
  1048. TString content = NPg::ExportExtensions(filter);
  1049. if (!AddPgFile(false, content, "", TString(PgCatalogFileName), files, types, node.Pos(), ctx)) {
  1050. return false;
  1051. }
  1052. return true;
  1053. }
  1054. std::pair<IGraphTransformer::TStatus, TAsyncTransformCallbackFuture> FreezeUsedFiles(const TExprNode& node, TUserDataTable& files, const TTypeAnnotationContext& types, TExprContext& ctx, const std::function<bool(const TString&)>& urlDownloadFilter, const TUserDataTable& crutches) {
  1055. if (!FillUsedFiles(node, files, types, ctx, crutches)) {
  1056. return SyncError();
  1057. }
  1058. if (types.QContext.CanRead()) {
  1059. return SyncOk();
  1060. }
  1061. auto future = FreezeUserDataTableIfNeeded(types.UserDataStorage, files, urlDownloadFilter);
  1062. if (future.Wait(TDuration::Zero())) {
  1063. files = future.GetValue()();
  1064. return SyncOk();
  1065. } else {
  1066. return std::make_pair(IGraphTransformer::TStatus::Async, future.Apply(
  1067. [](const NThreading::TFuture<std::function<TUserDataTable()>>& completedFuture) {
  1068. return TAsyncTransformCallback([completedFuture](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
  1069. output = input;
  1070. try {
  1071. completedFuture.GetValue()();
  1072. }
  1073. catch (const std::exception& e) {
  1074. auto inputPos = ctx.GetPosition(input->Pos());
  1075. TIssueScopeGuard issueScope(ctx.IssueManager, [&]() {
  1076. return MakeIntrusive<TIssue>(YqlIssue(inputPos, TIssuesIds::UNEXPECTED));
  1077. });
  1078. ctx.AddError(ExceptionToIssue(e, inputPos));
  1079. input->SetState(TExprNode::EState::Error);
  1080. return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Error);
  1081. }
  1082. catch (...) {
  1083. auto inputPos = ctx.GetPosition(input->Pos());
  1084. TIssueScopeGuard issueScope(ctx.IssueManager, [&]() {
  1085. return MakeIntrusive<TIssue>(YqlIssue(inputPos, TIssuesIds::UNEXPECTED));
  1086. });
  1087. ctx.AddError(YqlIssue(inputPos, TIssuesIds::UNEXPECTED, CurrentExceptionMessage()));
  1088. input->SetState(TExprNode::EState::Error);
  1089. return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Error);
  1090. }
  1091. input->SetState(TExprNode::EState::ExecutionRequired);
  1092. return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat);
  1093. });
  1094. }));
  1095. }
  1096. }
  1097. bool FreezeUsedFilesSync(const TExprNode& node, TUserDataTable& files, const TTypeAnnotationContext& types, TExprContext& ctx, const std::function<bool(const TString&)>& urlDownloadFilter) {
  1098. if (!FillUsedFiles(node, files, types, ctx)) {
  1099. return false;
  1100. }
  1101. if (!types.QContext.CanRead()) {
  1102. auto future = FreezeUserDataTableIfNeeded(types.UserDataStorage, files, urlDownloadFilter);
  1103. files = future.GetValueSync()();
  1104. }
  1105. return true;
  1106. }
  1107. void WriteColumns(NYson::TYsonWriter& writer, const TExprBase& columns) {
  1108. if (auto maybeList = columns.Maybe<TExprList>()) {
  1109. writer.OnBeginList();
  1110. for (const auto& column : maybeList.Cast()) {
  1111. writer.OnListItem();
  1112. if (column.Maybe<TCoAtom>()) {
  1113. writer.OnStringScalar(column.Cast<TCoAtom>().Value());
  1114. } else {
  1115. writer.OnStringScalar(column.Cast<TCoAtomList>().Item(0).Value());
  1116. }
  1117. }
  1118. writer.OnEndList();
  1119. } else if (columns.Maybe<TCoVoid>()) {
  1120. writer.OnStringScalar("*");
  1121. } else {
  1122. writer.OnStringScalar("?");
  1123. }
  1124. }
  1125. TString SerializeExpr(TExprContext& ctx, const TExprNode& expr, bool withTypes) {
  1126. ui32 typeFlags = TExprAnnotationFlags::None;
  1127. if (withTypes) {
  1128. typeFlags |= TExprAnnotationFlags::Types;
  1129. }
  1130. auto ast = ConvertToAst(expr, ctx, typeFlags, true);
  1131. YQL_ENSURE(ast.Root);
  1132. return ast.Root->ToString();
  1133. }
  1134. TString ExprToPrettyString(TExprContext& ctx, const TExprNode& expr) {
  1135. auto ast = ConvertToAst(expr, ctx, TExprAnnotationFlags::None, true);
  1136. TStringStream exprStream;
  1137. YQL_ENSURE(ast.Root);
  1138. ast.Root->PrettyPrintTo(exprStream, NYql::TAstPrintFlags::PerLine | NYql::TAstPrintFlags::ShortQuote);
  1139. TString exprText = exprStream.Str();
  1140. return exprText;
  1141. }
  1142. bool IsFlowOrStream(const TExprNode* node) {
  1143. auto kind = node->GetTypeAnn()->GetKind();
  1144. return kind == ETypeAnnotationKind::Stream || kind == ETypeAnnotationKind::Flow;
  1145. }
  1146. void WriteStream(NYson::TYsonWriter& writer, const TExprNode* node, const TExprNode* source) {
  1147. if (node == source) {
  1148. return;
  1149. }
  1150. if (!node->IsCallable()) {
  1151. return;
  1152. }
  1153. if (!node->GetTypeAnn()) {
  1154. return;
  1155. }
  1156. TVector<const TExprNode*> applyStreamChildren;
  1157. if (TCoApply::Match(node)) {
  1158. switch (node->GetTypeAnn()->GetKind()) {
  1159. case ETypeAnnotationKind::Stream:
  1160. case ETypeAnnotationKind::Flow:
  1161. case ETypeAnnotationKind::List:
  1162. break;
  1163. default:
  1164. return;
  1165. }
  1166. for (size_t i = 1; i < node->ChildrenSize(); ++i) {
  1167. if (IsFlowOrStream(*node->Child(i))) {
  1168. applyStreamChildren.push_back(node->Child(i));
  1169. } else if (node->Child(i)->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List) {
  1170. if (node->Child(i)->IsCallable("ForwardList")) {
  1171. applyStreamChildren.push_back(node->Child(i)->Child(0));
  1172. } else if (node->Child(i)->IsCallable("Collect") && IsFlowOrStream(node->Child(i)->Head())) {
  1173. applyStreamChildren.push_back(node->Child(i)->Child(0));
  1174. }
  1175. }
  1176. }
  1177. if (applyStreamChildren.size() == 1) {
  1178. WriteStream(writer, applyStreamChildren.front(), source);
  1179. }
  1180. }
  1181. else if (!TCoExtendBase::Match(node) && node->ChildrenSize() > 0) {
  1182. WriteStream(writer, node->Child(0), source);
  1183. }
  1184. writer.OnListItem();
  1185. writer.OnBeginMap();
  1186. writer.OnKeyedItem("Name");
  1187. writer.OnStringScalar(node->Content());
  1188. if (TCoFlatMapBase::Match(node) && IsFlowOrStream(*node->Child(1))) {
  1189. writer.OnKeyedItem("Children");
  1190. writer.OnBeginList();
  1191. writer.OnListItem();
  1192. writer.OnBeginList();
  1193. WriteStream(writer, node->Child(1)->Child(1), node->Child(1)->Head().Child(0));
  1194. writer.OnEndList();
  1195. writer.OnEndList();
  1196. }
  1197. if (TCoChopper::Match(node) || node->IsCallable("WideChopper")) {
  1198. writer.OnKeyedItem("Children");
  1199. writer.OnBeginList();
  1200. writer.OnListItem();
  1201. writer.OnBeginList();
  1202. WriteStream(writer, &node->Tail().Tail(), &node->Tail().Head().Head());
  1203. writer.OnEndList();
  1204. writer.OnEndList();
  1205. }
  1206. if (TCoSwitch::Match(node)) {
  1207. writer.OnKeyedItem("Children");
  1208. writer.OnBeginList();
  1209. for (size_t i = 3; i < node->ChildrenSize(); i += 2) {
  1210. writer.OnListItem();
  1211. writer.OnBeginList();
  1212. WriteStream(writer, node->Child(i)->Child(1), node->Child(i)->Head().Child(0));
  1213. writer.OnEndList();
  1214. }
  1215. writer.OnEndList();
  1216. }
  1217. if (TCoExtendBase::Match(node) && node->ChildrenSize() > 0) {
  1218. writer.OnKeyedItem("Children");
  1219. writer.OnBeginList();
  1220. for (size_t i = 0; i < node->ChildrenSize(); ++i) {
  1221. writer.OnListItem();
  1222. writer.OnBeginList();
  1223. WriteStream(writer, node->Child(i), source);
  1224. writer.OnEndList();
  1225. }
  1226. writer.OnEndList();
  1227. }
  1228. if (TCoApply::Match(node) && applyStreamChildren.size() > 1) {
  1229. writer.OnKeyedItem("Children");
  1230. writer.OnBeginList();
  1231. for (auto child: applyStreamChildren) {
  1232. writer.OnListItem();
  1233. writer.OnBeginList();
  1234. WriteStream(writer, child, source);
  1235. writer.OnEndList();
  1236. }
  1237. writer.OnEndList();
  1238. }
  1239. writer.OnEndMap();
  1240. }
  1241. void WriteStreams(NYson::TYsonWriter& writer, TStringBuf name, const TCoLambda& lambda) {
  1242. writer.OnKeyedItem(name);
  1243. writer.OnBeginList();
  1244. WriteStream(writer, lambda.Body().Raw(), lambda.Args().Size() > 0 ? lambda.Args().Arg(0).Raw() : nullptr);
  1245. writer.OnEndList();
  1246. }
  1247. double GetDataReplicationFactor(double factor, const TExprNode* node, const TExprNode* stream, TExprContext& ctx) {
  1248. if (node == stream) {
  1249. return factor;
  1250. }
  1251. if (!node->IsCallable()) {
  1252. return factor;
  1253. }
  1254. if (TCoApply::Match(node)) {
  1255. switch (node->GetTypeAnn()->GetKind()) {
  1256. case ETypeAnnotationKind::Stream:
  1257. case ETypeAnnotationKind::Flow:
  1258. case ETypeAnnotationKind::List: {
  1259. double applyFactor = 0.0;
  1260. for (size_t i = 1; i < node->ChildrenSize(); ++i) {
  1261. if (IsFlowOrStream(*node->Child(i))) {
  1262. applyFactor += GetDataReplicationFactor(factor, node->Child(i), stream, ctx);
  1263. } else if (node->Child(i)->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List) {
  1264. if (node->Child(i)->IsCallable("ForwardList")) {
  1265. applyFactor += GetDataReplicationFactor(factor, node->Child(i)->Child(0), stream, ctx);
  1266. } else if (node->Child(i)->IsCallable("Collect") && IsFlowOrStream(node->Child(i)->Head())) {
  1267. applyFactor += GetDataReplicationFactor(factor, node->Child(i)->Child(0), stream, ctx);
  1268. }
  1269. }
  1270. }
  1271. factor = 2.0 * Max(1.0, applyFactor);
  1272. break;
  1273. }
  1274. default:
  1275. break;
  1276. }
  1277. return factor;
  1278. }
  1279. if (!TCoExtendBase::Match(node) && node->ChildrenSize() > 0) {
  1280. factor = GetDataReplicationFactor(factor, node->Child(0), stream, ctx);
  1281. }
  1282. if (TCoFlatMapBase::Match(node)) {
  1283. // TODO: check MapJoinCore input unique using constraints
  1284. if (const auto& lambda = node->Tail(); node->Head().IsCallable("SqueezeToDict") && lambda.Tail().IsCallable("MapJoinCore") && lambda.Tail().Child(1U) == &lambda.Head().Head()) {
  1285. TMaybe<bool> isMany;
  1286. TMaybe<EDictType> type;
  1287. bool isCompact = false;
  1288. TMaybe<ui64> itemsCount;
  1289. ParseToDictSettings(node->Head(), ctx, type, isMany, itemsCount, isCompact);
  1290. if (isMany.GetOrElse(true)) {
  1291. factor *= 5.0;
  1292. }
  1293. } else {
  1294. switch (lambda.GetTypeAnn()->GetKind()) {
  1295. case ETypeAnnotationKind::Stream:
  1296. case ETypeAnnotationKind::Flow:
  1297. factor = GetDataReplicationFactor(factor, &lambda.Tail(), &lambda.Head().Head(), ctx);
  1298. break;
  1299. case ETypeAnnotationKind::List:
  1300. factor *= 2.0;
  1301. break;
  1302. default:
  1303. break;
  1304. }
  1305. }
  1306. }
  1307. else if (node->IsCallable("CommonJoinCore")) {
  1308. factor *= 5.0;
  1309. }
  1310. else if (node->IsCallable("MapJoinCore")) {
  1311. // TODO: check MapJoinCore input unique using constraints
  1312. if (node->Child(1)->IsCallable("ToDict")) {
  1313. TMaybe<bool> isMany;
  1314. TMaybe<EDictType> type;
  1315. bool isCompact = false;
  1316. TMaybe<ui64> itemsCount;
  1317. ParseToDictSettings(*node->Child(1), ctx, type, isMany, itemsCount, isCompact);
  1318. if (isMany.GetOrElse(true)) {
  1319. factor *= 5.0;
  1320. }
  1321. }
  1322. }
  1323. else if (TCoSwitch::Match(node)) {
  1324. double switchFactor = 0.0;
  1325. for (size_t i = 3; i < node->ChildrenSize(); i += 2) {
  1326. switchFactor += GetDataReplicationFactor(factor, node->Child(i)->Child(1), node->Child(i)->Head().Child(0), ctx);
  1327. }
  1328. factor = Max(1.0, switchFactor);
  1329. }
  1330. else if (TCoExtendBase::Match(node) && node->ChildrenSize() > 0) {
  1331. double extendFactor = 0.0;
  1332. for (size_t i = 0; i < node->ChildrenSize(); ++i) {
  1333. extendFactor += GetDataReplicationFactor(factor, node->Child(i), stream, ctx);
  1334. }
  1335. factor = Max(1.0, extendFactor);
  1336. }
  1337. else if (TCoChopper::Match(node) || node->IsCallable("WideChopper")) {
  1338. factor = GetDataReplicationFactor(factor, &node->Child(TCoChopper::idx_Handler)->Tail(), &node->Child(TCoChopper::idx_Handler)->Head().Tail(), ctx);
  1339. }
  1340. return factor;
  1341. }
  1342. double GetDataReplicationFactor(const TExprNode& lambda, TExprContext& ctx) {
  1343. return GetDataReplicationFactor(1.0, lambda.Child(1), lambda.Head().ChildrenSize() > 0 ? lambda.Head().Child(0) : nullptr, ctx);
  1344. }
  1345. void WriteStatistics(NYson::TYsonWriter& writer, const TOperationStatistics& statistics)
  1346. {
  1347. writer.OnBeginMap();
  1348. for (auto& el : statistics.Entries) {
  1349. writer.OnKeyedItem(el.Name);
  1350. if (el.Value) {
  1351. writer.OnStringScalar(*el.Value);
  1352. continue;
  1353. }
  1354. writer.OnBeginMap();
  1355. if (auto val = el.Sum) {
  1356. writer.OnKeyedItem("sum");
  1357. writer.OnInt64Scalar(*val);
  1358. }
  1359. if (auto val = el.Count) {
  1360. writer.OnKeyedItem("count");
  1361. writer.OnInt64Scalar(*val);
  1362. }
  1363. if (auto val = el.Avg) {
  1364. writer.OnKeyedItem("avg");
  1365. writer.OnInt64Scalar(*val);
  1366. }
  1367. if (auto val = el.Max) {
  1368. writer.OnKeyedItem("max");
  1369. writer.OnInt64Scalar(*val);
  1370. }
  1371. if (auto val = el.Min) {
  1372. writer.OnKeyedItem("min");
  1373. writer.OnInt64Scalar(*val);
  1374. }
  1375. writer.OnEndMap();
  1376. }
  1377. writer.OnEndMap();
  1378. }
  1379. void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<ui32, TOperationStatistics>& statistics, bool addTotalKey, bool addExternalMap) {
  1380. if (statistics.empty()) {
  1381. return;
  1382. }
  1383. THashMap<TString, std::tuple<i64, i64, i64, TMaybe<i64>>> total; // sum, count, max, min
  1384. if (addExternalMap) {
  1385. writer.OnBeginMap();
  1386. }
  1387. for (const auto& opStatistics : statistics) {
  1388. for (auto& el : opStatistics.second.Entries) {
  1389. if (el.Value) {
  1390. continue;
  1391. }
  1392. auto& totalEntry = total[el.Name];
  1393. if (auto val = el.Sum) {
  1394. std::get<0>(totalEntry) += *val;
  1395. }
  1396. if (auto val = el.Count) {
  1397. std::get<1>(totalEntry) += *val;
  1398. }
  1399. if (auto val = el.Max) {
  1400. std::get<2>(totalEntry) = Max<i64>(*val, std::get<2>(totalEntry));
  1401. }
  1402. if (auto val = el.Min) {
  1403. std::get<3>(totalEntry) = Min<i64>(*val, std::get<3>(totalEntry).GetOrElse(Max<i64>()));
  1404. }
  1405. }
  1406. }
  1407. if (totalOnly == false) {
  1408. for (const auto& [key, value] : statistics) {
  1409. writer.OnKeyedItem(ToString(key));
  1410. WriteStatistics(writer, value);
  1411. }
  1412. }
  1413. TVector<TString> statKeys;
  1414. std::transform(total.cbegin(), total.cend(), std::back_inserter(statKeys), [](const decltype(total)::value_type& v) { return v.first; });
  1415. std::sort(statKeys.begin(), statKeys.end());
  1416. if (addTotalKey) {
  1417. writer.OnKeyedItem("total");
  1418. writer.OnBeginMap();
  1419. }
  1420. for (auto& key: statKeys) {
  1421. auto& totalEntry = total[key];
  1422. writer.OnKeyedItem(key);
  1423. writer.OnBeginMap();
  1424. writer.OnKeyedItem("sum");
  1425. writer.OnInt64Scalar(std::get<0>(totalEntry));
  1426. writer.OnKeyedItem("count");
  1427. writer.OnInt64Scalar(std::get<1>(totalEntry));
  1428. writer.OnKeyedItem("avg");
  1429. writer.OnInt64Scalar(std::get<1>(totalEntry) ? (std::get<0>(totalEntry) / std::get<1>(totalEntry)) : 0l);
  1430. writer.OnKeyedItem("max");
  1431. writer.OnInt64Scalar(std::get<2>(totalEntry));
  1432. writer.OnKeyedItem("min");
  1433. writer.OnInt64Scalar(std::get<3>(totalEntry).GetOrElse(0));
  1434. writer.OnEndMap();
  1435. }
  1436. if (addTotalKey) {
  1437. writer.OnEndMap(); // total
  1438. }
  1439. if (addExternalMap) {
  1440. writer.OnEndMap();
  1441. }
  1442. }
  1443. bool ValidateCompressionForInput(std::string_view format, std::string_view compression, TExprContext& ctx) {
  1444. if (compression.empty()) {
  1445. return true;
  1446. }
  1447. if (format == "parquet"sv) {
  1448. ctx.AddError(TIssue(TStringBuilder() << "External compression for parquet is not supported"));
  1449. return false;
  1450. }
  1451. if (IsIn(Compressions, compression)) {
  1452. return true;
  1453. }
  1454. ctx.AddError(TIssue(TStringBuilder() << "Unknown compression: " << compression
  1455. << ". Use one of: " << JoinSeq(", ", Compressions)));
  1456. return false;
  1457. }
  1458. bool ValidateCompressionForOutput(std::string_view format, std::string_view compression, TExprContext& ctx) {
  1459. if (compression.empty()) {
  1460. return true;
  1461. }
  1462. if (format == "parquet"sv) {
  1463. ctx.AddError(TIssue(TStringBuilder() << "External compression for parquet is not supported"));
  1464. return false;
  1465. }
  1466. if (IsIn(Compressions, compression)) {
  1467. return true;
  1468. }
  1469. ctx.AddError(TIssue(TStringBuilder() << "Unknown compression: " << compression
  1470. << ". Use one of: " << JoinSeq(", ", Compressions)));
  1471. return false;
  1472. }
  1473. bool ValidateFormatForInput(
  1474. std::string_view format,
  1475. const TStructExprType* schemaStructRowType,
  1476. const std::function<bool(TStringBuf)>& excludeFields,
  1477. TExprContext& ctx) {
  1478. if (format.empty()) {
  1479. return true;
  1480. }
  1481. if (!IsIn(FormatsForInput, format)) {
  1482. ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << format
  1483. << ". Use one of: " << JoinSeq(", ", FormatsForInput)));
  1484. return false;
  1485. }
  1486. if (schemaStructRowType && format == TStringBuf("raw")) {
  1487. ui64 realSchemaColumnsCount = 0;
  1488. for (const TItemExprType* item : schemaStructRowType->GetItems()) {
  1489. if (excludeFields && excludeFields(item->GetName())) {
  1490. continue;
  1491. }
  1492. const TTypeAnnotationNode* rowType = item->GetItemType();
  1493. if (rowType->GetKind() == ETypeAnnotationKind::Optional) {
  1494. rowType = rowType->Cast<TOptionalExprType>()->GetItemType();
  1495. }
  1496. if (rowType->GetKind() != ETypeAnnotationKind::Data
  1497. || !IsDataTypeString(rowType->Cast<TDataExprType>()->GetSlot())) {
  1498. ctx.AddError(TIssue(TStringBuilder() << "Only string type column in schema supported in raw format (you have '"
  1499. << item->GetName() << " " << FormatType(rowType) << "' field)"));
  1500. return false;
  1501. }
  1502. ++realSchemaColumnsCount;
  1503. }
  1504. if (realSchemaColumnsCount != 1) {
  1505. ctx.AddError(TIssue(TStringBuilder() << "Only one column in schema supported in raw format (you have "
  1506. << realSchemaColumnsCount << " fields)"));
  1507. return false;
  1508. }
  1509. }
  1510. else if (schemaStructRowType && format == TStringBuf("json_list")) {
  1511. bool failedSchemaColumns = false;
  1512. for (const TItemExprType* item : schemaStructRowType->GetItems()) {
  1513. if (excludeFields && excludeFields(item->GetName())) {
  1514. continue;
  1515. }
  1516. const TTypeAnnotationNode* rowType = item->GetItemType();
  1517. if (rowType->GetKind() == ETypeAnnotationKind::Optional) {
  1518. rowType = rowType->Cast<TOptionalExprType>()->GetItemType();
  1519. }
  1520. if (rowType->GetKind() == ETypeAnnotationKind::Data
  1521. && IsDataTypeDateOrTzDateOrInterval(rowType->Cast<TDataExprType>()->GetSlot())) {
  1522. ctx.AddError(TIssue(TStringBuilder() << "Date, Timestamp and Interval types are not allowed in json_list format (you have '"
  1523. << item->GetName() << " " << FormatType(rowType) << "' field)"));
  1524. failedSchemaColumns = true;
  1525. }
  1526. }
  1527. if (failedSchemaColumns) {
  1528. return false;
  1529. }
  1530. }
  1531. return true;
  1532. }
  1533. bool ValidateFormatForOutput(std::string_view format, TExprContext& ctx) {
  1534. if (format.empty() || IsIn(FormatsForOutput, format)) {
  1535. return true;
  1536. }
  1537. ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << format
  1538. << ". Use one of: " << JoinSeq(", ", FormatsForOutput)));
  1539. return false;
  1540. }
  1541. template<typename T>
  1542. bool ValidateValueInDictionary(std::string_view value, TExprContext& ctx, const T& dictionary) {
  1543. if (value.empty() || IsIn(dictionary, value)) {
  1544. return true;
  1545. }
  1546. ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << value
  1547. << ". Use one of: " << JoinSeq(", ", dictionary)));
  1548. return false;
  1549. }
  1550. bool ValidateIntervalUnit(std::string_view unit, TExprContext& ctx) {
  1551. return ValidateValueInDictionary(unit, ctx, IntervalUnits);
  1552. }
  1553. bool ValidateDateTimeFormatName(std::string_view formatName, TExprContext& ctx) {
  1554. return ValidateValueInDictionary(formatName, ctx, DateTimeFormatNames);
  1555. }
  1556. bool ValidateTimestampFormatName(std::string_view formatName, TExprContext& ctx) {
  1557. return ValidateValueInDictionary(formatName, ctx, TimestampFormatNames);
  1558. }
  1559. namespace {
  1560. bool MatchesSetItemOption(const TExprBase& setItemOption, TStringBuf name) {
  1561. if (setItemOption.Ref().IsList() && setItemOption.Ref().ChildrenSize() > 0) {
  1562. if (setItemOption.Ref().ChildPtr(0)->Content() == name) {
  1563. return true;
  1564. }
  1565. }
  1566. return false;
  1567. }
  1568. } //namespace
  1569. bool TransformPgSetItemOption(
  1570. const TCoPgSelect& pgSelect,
  1571. TStringBuf optionName,
  1572. std::function<void(const TExprBase&)> lambda
  1573. ) {
  1574. bool applied = false;
  1575. for (const auto& option : pgSelect.SelectOptions()) {
  1576. if (option.Name() == "set_items") {
  1577. auto pgSetItems = option.Value().Cast<TExprList>();
  1578. for (const auto& setItem : pgSetItems) {
  1579. auto setItemNode = setItem.Cast<TCoPgSetItem>();
  1580. for (const auto& setItemOption : setItemNode.SetItemOptions()) {
  1581. if (MatchesSetItemOption(setItemOption, optionName)) {
  1582. applied = true;
  1583. lambda(setItemOption);
  1584. }
  1585. }
  1586. }
  1587. }
  1588. }
  1589. return applied;
  1590. }
  1591. TExprNode::TPtr GetSetItemOption(const TCoPgSelect& pgSelect, TStringBuf optionName) {
  1592. TExprNode::TPtr nodePtr = nullptr;
  1593. TransformPgSetItemOption(pgSelect, optionName, [&nodePtr](const TExprBase& option) {
  1594. nodePtr = option.Ptr();
  1595. });
  1596. return nodePtr;
  1597. }
  1598. TExprNode::TPtr GetSetItemOptionValue(const TExprBase& setItemOption) {
  1599. if (setItemOption.Ref().IsList() && setItemOption.Ref().ChildrenSize() > 1) {
  1600. return setItemOption.Ref().ChildPtr(1);
  1601. }
  1602. return nullptr;
  1603. }
  1604. bool NeedToRenamePgSelectColumns(const TCoPgSelect& pgSelect) {
  1605. auto fill = NCommon::GetSetItemOption(pgSelect, "fill_target_columns");
  1606. return fill && !NCommon::GetSetItemOptionValue(TExprBase(fill));
  1607. }
  1608. bool RenamePgSelectColumns(
  1609. const TCoPgSelect& node,
  1610. TExprNode::TPtr& output,
  1611. const TMaybe<TColumnOrder>& tableColumnOrder,
  1612. TExprContext& ctx,
  1613. TTypeAnnotationContext& types) {
  1614. bool hasValues = (bool)GetSetItemOption(node, "values");
  1615. bool hasProjectionOrder = (bool)GetSetItemOption(node, "projection_order");
  1616. Y_ENSURE(hasValues ^ hasProjectionOrder, "Only one of values and projection_order should be present");
  1617. TString optionName = (hasValues) ? "values" : "projection_order";
  1618. auto selectorColumnOrder = types.LookupColumnOrder(node.Ref());
  1619. TColumnOrder insertColumnOrder;
  1620. if (auto targetColumnsOption = GetSetItemOption(node, "target_columns")) {
  1621. auto targetColumns = GetSetItemOptionValue(TExprBase(targetColumnsOption));
  1622. for (const auto& child : targetColumns->ChildrenList()) {
  1623. insertColumnOrder.AddColumn(TString(child->Content()));
  1624. }
  1625. } else {
  1626. YQL_ENSURE(tableColumnOrder);
  1627. insertColumnOrder = *tableColumnOrder;
  1628. }
  1629. YQL_ENSURE(selectorColumnOrder);
  1630. if (selectorColumnOrder->Size() > insertColumnOrder.Size()) {
  1631. ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << Sprintf(
  1632. "%s have %zu columns, INSERT INTO expects: %zu",
  1633. optionName.data(),
  1634. selectorColumnOrder->Size(),
  1635. insertColumnOrder.Size()
  1636. )));
  1637. return false;
  1638. }
  1639. if (*selectorColumnOrder == insertColumnOrder) {
  1640. output = node.Ptr();
  1641. return true;
  1642. }
  1643. TVector<const TItemExprType*> rowTypeItems;
  1644. rowTypeItems.reserve(selectorColumnOrder->Size());
  1645. const TTypeAnnotationNode* inputType;
  1646. switch (node.Ref().GetTypeAnn()->GetKind()) {
  1647. case ETypeAnnotationKind::List:
  1648. inputType = node.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType();
  1649. break;
  1650. default:
  1651. inputType = node.Ref().GetTypeAnn();
  1652. break;
  1653. }
  1654. YQL_ENSURE(inputType->GetKind() == ETypeAnnotationKind::Struct);
  1655. const auto rowArg = Build<TCoArgument>(ctx, node.Pos())
  1656. .Name("row")
  1657. .Done();
  1658. auto structBuilder = Build<TCoAsStruct>(ctx, node.Pos());
  1659. for (size_t i = 0; i < selectorColumnOrder->Size(); i++) {
  1660. const auto& columnName = selectorColumnOrder->at(i);
  1661. structBuilder.Add<TCoNameValueTuple>()
  1662. .Name().Build(insertColumnOrder.at(i).PhysicalName)
  1663. .Value<TCoMember>()
  1664. .Struct(rowArg)
  1665. .Name().Build(columnName.PhysicalName)
  1666. .Build()
  1667. .Build();
  1668. }
  1669. auto fill = GetSetItemOption(node, "fill_target_columns");
  1670. output = Build<TCoMap>(ctx, node.Pos())
  1671. .Input(node)
  1672. .Lambda<TCoLambda>()
  1673. .Args({rowArg})
  1674. .Body(structBuilder.Done().Ptr())
  1675. .Build()
  1676. .Done().Ptr();
  1677. fill->ChangeChildrenInplace({
  1678. fill->Child(0),
  1679. Build<TCoAtom>(ctx, node.Pos())
  1680. .Value("done")
  1681. .Done().Ptr()
  1682. });
  1683. fill->ChildPtr(1)->SetTypeAnn(ctx.MakeType<TUnitExprType>());
  1684. return true;
  1685. }
  1686. } // namespace NCommon
  1687. } // namespace NYql