pgrun.cpp 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349
  1. #include <yql/essentials/utils/backtrace/backtrace.h>
  2. #include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
  3. #include <yql/essentials/minikql/mkql_function_registry.h>
  4. #include <yql/essentials/core/cbo/simple/cbo_simple.h>
  5. #include <yql/essentials/core/facade/yql_facade.h>
  6. #include <yql/essentials/core/yql_opt_utils.h>
  7. #include <yql/essentials/core/yql_expr_optimize.h>
  8. #include <yt/yql/providers/yt/gateway/file/yql_yt_file.h>
  9. #include <yt/yql/providers/yt/gateway/file/yql_yt_file_services.h>
  10. #include <yql/essentials/providers/common/provider/yql_provider_names.h>
  11. #include <yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h>
  12. #include <yql/essentials/providers/common/proto/gateways_config.pb.h>
  13. #include "yt/yql/providers/yt/common/yql_names.h"
  14. #include <yt/yql/providers/yt/provider/yql_yt_provider.h>
  15. #include <yql/essentials/providers/pg/provider/yql_pg_provider.h>
  16. #include <yql/essentials/public/issue/yql_issue.h>
  17. #include <yql/essentials/parser/pg_wrapper/interface/utils.h>
  18. #include <yt/yql/providers/yt/lib/schema/schema.h>
  19. #include <yql/essentials/core/services/mounts/yql_mounts.h>
  20. #include <library/cpp/getopt/last_getopt.h>
  21. #include <library/cpp/yson/public.h>
  22. #include "library/cpp/yson/node/node_io.h"
  23. #include <library/cpp/yt/yson_string/string.h>
  24. #include <fmt/format.h>
  25. #include <util/system/user.h>
  26. #include <util/stream/file.h>
  27. #include <util/system/fs.h>
  28. #include <util/folder/path.h>
  29. #include <util/folder/tempdir.h>
  30. #include <util/string/split.h>
  31. #include <util/generic/yexception.h>
  32. #include <util/generic/iterator.h>
  33. #include <util/generic/string.h>
  34. #include <util/generic/strbuf.h>
  35. #include <library/cpp/yson/parser.h>
  36. #include <library/cpp/yson/node/node.h>
  37. #include <library/cpp/yson/node/node_builder.h>
  38. #include <library/cpp/string_utils/base64/base64.h>
  39. using namespace NYql;
  40. using namespace NKikimr::NMiniKQL;
  41. using namespace NNodes;
  42. using NUdf::EDataSlot;
  43. namespace NMiniKQL = NKikimr::NMiniKQL;
  44. const ui32 PRETTY_FLAGS = NYql::TAstPrintFlags::PerLine | NYql::TAstPrintFlags::ShortQuote |
  45. NYql::TAstPrintFlags::AdaptArbitraryContent;
  46. enum class EByteaOutput{
  47. hex,
  48. escape,
  49. };
  50. TString nullRepr("");
  51. EByteaOutput byteaOutput = EByteaOutput::hex;
  52. bool IsEscapedChar(const TString& s, size_t pos) {
  53. bool escaped = false;
  54. while (s[--pos] == '\\') {
  55. escaped = !escaped;
  56. }
  57. return escaped;
  58. }
  59. class TStatementIterator final
  60. : public TInputRangeAdaptor<TStatementIterator>
  61. {
  62. enum class State {
  63. InOperator,
  64. EndOfOperator,
  65. LineComment,
  66. BlockComment,
  67. QuotedIdentifier,
  68. StringLiteral,
  69. EscapedStringLiteral,
  70. DollarStringLiteral,
  71. InMetaCommand,
  72. InCopyFromStdin,
  73. InVar,
  74. };
  75. public:
  76. TStatementIterator(const TString&& program)
  77. : Program_(std::move(program))
  78. , Cur_()
  79. , Pos_(0)
  80. , State_(State::InOperator)
  81. , AtStmtStart_(true)
  82. , Mode_(State::InOperator)
  83. , Depth_(0)
  84. , Tag_()
  85. , StandardConformingStrings_(true)
  86. {
  87. }
  88. static bool IsInWsSignificantState(State state) {
  89. switch (state) {
  90. case State::QuotedIdentifier:
  91. case State::StringLiteral:
  92. case State::EscapedStringLiteral:
  93. case State::DollarStringLiteral:
  94. return true;
  95. default:
  96. return false;
  97. }
  98. }
  99. TString RemoveEmptyLines(const TString& s, bool inStatement) {
  100. if (s.empty()) {
  101. return {};
  102. }
  103. TStringBuilder sb;
  104. auto isFirstLine = true;
  105. if (inStatement && s[0] == '\n') {
  106. sb << '\n';
  107. }
  108. for (TStringBuf line : StringSplitter(s).SplitBySet("\r\n").SkipEmpty()) {
  109. if (isFirstLine) {
  110. isFirstLine = false;
  111. } else {
  112. sb << '\n';
  113. }
  114. sb << line;
  115. }
  116. return sb;
  117. }
  118. const TString* Next()
  119. {
  120. if (TStringBuf::npos == Pos_)
  121. return nullptr;
  122. size_t startPos = Pos_;
  123. size_t curPos = Pos_;
  124. size_t endPos;
  125. auto prevState = State_;
  126. TStringBuilder stmt;
  127. TStringBuilder rawStmt;
  128. auto inStatement = false;
  129. while (!CallParser(startPos)) {
  130. endPos = (TStringBuf::npos != Pos_) ? Pos_ : Program_.length();
  131. TStringBuf part{&Program_[curPos], endPos - curPos};
  132. if (IsInWsSignificantState(prevState)) {
  133. if (!rawStmt.empty()) {
  134. stmt << RemoveEmptyLines(rawStmt, inStatement);
  135. rawStmt.clear();
  136. }
  137. stmt << part;
  138. inStatement = true;
  139. } else {
  140. rawStmt << part;
  141. }
  142. curPos = endPos;
  143. prevState = State_;
  144. }
  145. endPos = (TStringBuf::npos != Pos_) ? Pos_ : Program_.length();
  146. TStringBuf part{&Program_[curPos], endPos - curPos};
  147. if (IsInWsSignificantState(prevState)) {
  148. if (!rawStmt.empty()) {
  149. stmt << RemoveEmptyLines(rawStmt, inStatement);
  150. rawStmt.clear();
  151. }
  152. stmt << part;
  153. inStatement = true;
  154. } else {
  155. rawStmt << part;
  156. }
  157. #if 0
  158. if (0 < Pos_ && !(Pos_ == TStringBuf::npos || Program_[Pos_-1] == '\n')) {
  159. Cerr << "Last char: '" << Program_[Pos_-1] << "'\n";
  160. }
  161. #endif
  162. stmt << RemoveEmptyLines(rawStmt, inStatement);
  163. // inv: Pos_ is at the start of next token
  164. if (startPos == endPos)
  165. return nullptr;
  166. stmt << '\n';
  167. Cur_ = stmt;
  168. ApplyStateFromStatement(Cur_);
  169. return &Cur_;
  170. }
  171. private:
  172. // States:
  173. // - in-operator
  174. // - line comment
  175. // - block comment
  176. // - quoted identifier (U& quoted identifier is no difference)
  177. // - string literal (U& string literal is the same for our purpose)
  178. // - E string literal
  179. // - $ string literal
  180. // - end-of-operator
  181. // Rules:
  182. // - in-operator
  183. // -- -> next: line comment
  184. // /* -> depth := 1, next: block comment
  185. // " -> next: quoted identifier
  186. // ' -> next: string literal
  187. // E' -> next: E string literal
  188. // $tag$, not preceded by alnum char (a bit of simplification here but sufficient) -> tag := tag, next: $ string literal
  189. // ; -> current_mode := end-of-operator, next: end-of-operator
  190. // - line comment
  191. // EOL -> next: current_mode
  192. // - block comment
  193. // /* -> ++depth
  194. // */ -> --depth, if (depth == 0) -> next: current_mode
  195. // - quoted identifier
  196. // " -> next: in-operator
  197. // - string literal
  198. // ' -> next: in-operator
  199. // - E string literal
  200. // ' -> if not preceeded by \ next: in-operator
  201. // - $ string literal
  202. // $tag$ -> next: in-operator
  203. // - end-of-operator
  204. // -- -> next: line comment, just once
  205. // /* -> depth := 1, next: block comment
  206. // non-space char -> unget, emit, current_mode := in-operator, next: in-operator
  207. // In every state:
  208. // EOS -> emit if consumed part of the input is not empty
  209. bool SaveDollarTag() {
  210. if (Pos_ + 1 == Program_.length())
  211. return false;
  212. auto p = Program_.cbegin() + (Pos_ + 1);
  213. if (std::isdigit(*p))
  214. return false;
  215. for (;p != Program_.cend(); ++p) {
  216. if (*p == '$') {
  217. auto bp = &Program_[Pos_];
  218. auto l = p - bp;
  219. Tag_ = TStringBuf(bp, l + 1);
  220. Pos_ += l;
  221. return true;
  222. }
  223. if (!(std::isalpha(*p) || std::isdigit(*p) || *p == '_'))
  224. return false;
  225. }
  226. return false;
  227. }
  228. bool IsCopyFromStdin(size_t startPos, size_t endPos) {
  229. TString stmt(Program_, startPos, endPos - startPos + 1);
  230. stmt.to_upper();
  231. // FROM STDOUT is used in insert.sql testcase, probably a bug
  232. return stmt.Contains(" FROM STDIN") || stmt.Contains(" FROM STDOUT");
  233. }
  234. bool InOperatorParser(size_t startPos) {
  235. // need \ to detect psql meta-commands
  236. static const TString midNextTokens{"'\";-/$\\"};
  237. // need : for basic psql-vars support
  238. static const TString initNextTokens{"'\";-/$\\:"};
  239. const auto& nextTokens = (AtStmtStart_) ? initNextTokens : midNextTokens;
  240. if (AtStmtStart_) {
  241. Pos_ = Program_.find_first_not_of(" \t\n\r\v", Pos_);
  242. if (TString::npos == Pos_) {
  243. return true;
  244. }
  245. }
  246. Pos_ = Program_.find_first_of(nextTokens, Pos_);
  247. if (TString::npos == Pos_) {
  248. return true;
  249. }
  250. switch (Program_[Pos_]) {
  251. case '\'':
  252. State_ = (!StandardConformingStrings_ || 0 < Pos_ && std::toupper(Program_[Pos_ - 1]) == 'E')
  253. ? State::EscapedStringLiteral
  254. : State::StringLiteral;
  255. break;
  256. case '"':
  257. State_ = State::QuotedIdentifier;
  258. break;
  259. case ';':
  260. State_ = Mode_ = IsCopyFromStdin(startPos, Pos_)
  261. ? State::InCopyFromStdin
  262. : State::EndOfOperator;
  263. break;
  264. case '-':
  265. if (Pos_ < Program_.length() && Program_[Pos_ + 1] == '-') {
  266. State_ = State::LineComment;
  267. ++Pos_;
  268. }
  269. break;
  270. case '/':
  271. if (Pos_ < Program_.length() && Program_[Pos_ + 1] == '*') {
  272. State_ = State::BlockComment;
  273. ++Depth_;
  274. ++Pos_;
  275. }
  276. break;
  277. case '$':
  278. if (Pos_ == 0 || std::isspace(Program_[Pos_ - 1])) {
  279. if (SaveDollarTag())
  280. State_ = State::DollarStringLiteral;
  281. }
  282. break;
  283. case '\\':
  284. if (AtStmtStart_) {
  285. State_ = State::InMetaCommand;
  286. } else if (Program_.Contains("\\gexec", Pos_)) {
  287. Pos_ += 6;
  288. return Emit(Program_[Pos_] == '\n');
  289. }
  290. break;
  291. case ':':
  292. if (Pos_ == 0 || Program_[Pos_-1] == '\n') {
  293. State_ = State::InVar;
  294. }
  295. break;
  296. }
  297. ++Pos_;
  298. if (Program_.length() == Pos_) {
  299. Pos_ = TString::npos;
  300. return true;
  301. }
  302. return false;
  303. }
  304. bool Emit(bool atEol) {
  305. State_ = Mode_ = State::InOperator;
  306. AtStmtStart_ = true;
  307. if (atEol) {
  308. ++Pos_;
  309. if (Program_.length() == Pos_) {
  310. Pos_ = TString::npos;
  311. return true;
  312. }
  313. }
  314. // else do not consume as we're expected to be on the first char of the next statement
  315. return true;
  316. }
  317. bool EndOfOperatorParser() {
  318. const auto p = std::find_if_not(Program_.cbegin() + Pos_, Program_.cend(), [](const auto& c) {
  319. return c == ' ' || c == '\t' || c == '\r';
  320. });
  321. if (p == Program_.cend()) {
  322. Pos_ = TStringBuf::npos;
  323. return true;
  324. }
  325. Pos_ = p - Program_.cbegin();
  326. switch (*p) {
  327. case '-':
  328. if (Pos_ < Program_.length() && Program_[Pos_ + 1] == '-') {
  329. State_ = State::LineComment;
  330. ++Pos_;
  331. }
  332. break;
  333. case '/':
  334. if (Pos_ < Program_.length() && Program_[Pos_ + 1] == '*') {
  335. State_ = State::BlockComment;
  336. ++Depth_;
  337. ++Pos_;
  338. }
  339. break;
  340. default:
  341. return Emit(*p == '\n');
  342. }
  343. ++Pos_;
  344. if (Program_.length() == Pos_) {
  345. Pos_ = TString::npos;
  346. return true;
  347. }
  348. return false;
  349. }
  350. bool LineCommentParser() {
  351. Pos_ = Program_.find('\n', Pos_);
  352. if (TString::npos == Pos_)
  353. return true;
  354. ++Pos_;
  355. if (Program_.length() == Pos_) {
  356. Pos_ = TString::npos;
  357. return true;
  358. }
  359. if (Mode_ == State::EndOfOperator) {
  360. return Emit(false);
  361. }
  362. State_ = Mode_;
  363. return false;
  364. }
  365. bool BlockCommentParser() {
  366. Pos_ = Program_.find_first_of("*/", Pos_);
  367. if (TString::npos == Pos_)
  368. return true;
  369. switch(Program_[Pos_]) {
  370. case '/':
  371. if (Pos_ < Program_.length() && Program_[Pos_ + 1] == '*') {
  372. ++Depth_;
  373. ++Pos_;
  374. }
  375. break;
  376. case '*':
  377. if (Pos_ < Program_.length() && Program_[Pos_ + 1] == '/') {
  378. --Depth_;
  379. ++Pos_;
  380. if (0 == Depth_) {
  381. State_ = Mode_;
  382. }
  383. }
  384. break;
  385. }
  386. ++Pos_;
  387. if (Program_.length() == Pos_) {
  388. Pos_ = TString::npos;
  389. return true;
  390. }
  391. return false;
  392. }
  393. bool QuotedIdentifierParser() {
  394. Pos_ = Program_.find('"', Pos_);
  395. if (TString::npos == Pos_)
  396. return true;
  397. ++Pos_;
  398. if (Program_.length() == Pos_) {
  399. Pos_ = TString::npos;
  400. return true;
  401. }
  402. State_ = State::InOperator;
  403. AtStmtStart_ = false;
  404. return false;
  405. }
  406. bool StringLiteralParser() {
  407. Pos_ = Program_.find('\'', Pos_);
  408. if (TString::npos == Pos_)
  409. return true;
  410. ++Pos_;
  411. if (Program_.length() == Pos_) {
  412. Pos_ = TString::npos;
  413. return true;
  414. }
  415. State_ = State::InOperator;
  416. AtStmtStart_ = false;
  417. return false;
  418. }
  419. bool EscapedStringLiteralParser() {
  420. Pos_ = Program_.find('\'', Pos_);
  421. if (TString::npos == Pos_)
  422. return true;
  423. if (IsEscapedChar(Program_, Pos_)) {
  424. ++Pos_;
  425. return false;
  426. }
  427. ++Pos_;
  428. if (Program_.length() == Pos_) {
  429. Pos_ = TString::npos;
  430. return true;
  431. }
  432. State_ = State::InOperator;
  433. AtStmtStart_ = false;
  434. return false;
  435. }
  436. bool DollarStringLiteralParser() {
  437. Y_ENSURE(Tag_ != nullptr && 2 <= Tag_.length());
  438. Pos_ = Program_.find(Tag_, Pos_);
  439. if (TString::npos == Pos_)
  440. return true;
  441. Pos_ += Tag_.length();
  442. if (Program_.length() == Pos_) {
  443. Pos_ = TString::npos;
  444. return true;
  445. }
  446. Tag_.Clear();
  447. State_ = State::InOperator;
  448. AtStmtStart_ = false;
  449. return false;
  450. }
  451. bool MetaCommandParser() {
  452. Pos_ = Program_.find('\n', Pos_);
  453. if (TString::npos == Pos_)
  454. return true;
  455. ++Pos_;
  456. if (Program_.length() == Pos_) {
  457. Pos_ = TString::npos;
  458. return true;
  459. }
  460. return Emit(false);
  461. }
  462. bool InCopyFromStdinParser() {
  463. Pos_ = Program_.find("\n\\.\n", Pos_);
  464. if (TString::npos == Pos_)
  465. return true;
  466. Pos_ += 4;
  467. return Emit(false);
  468. }
  469. // For now we support vars occupying a whole line only
  470. bool VarParser() {
  471. // TODO: validate var name
  472. Pos_ = Program_.find('\n', Pos_);
  473. if (TString::npos == Pos_)
  474. return true;
  475. ++Pos_;
  476. if (Program_.length() == Pos_) {
  477. Pos_ = TString::npos;
  478. return true;
  479. }
  480. return Emit(false);
  481. }
  482. bool CallParser(size_t startPos) {
  483. switch (State_) {
  484. case State::InOperator:
  485. return InOperatorParser(startPos);
  486. case State::EndOfOperator:
  487. return EndOfOperatorParser();
  488. case State::LineComment:
  489. return LineCommentParser();
  490. case State::BlockComment:
  491. return BlockCommentParser();
  492. case State::QuotedIdentifier:
  493. return QuotedIdentifierParser();
  494. case State::StringLiteral:
  495. return StringLiteralParser();
  496. case State::EscapedStringLiteral:
  497. return EscapedStringLiteralParser();
  498. case State::DollarStringLiteral:
  499. return DollarStringLiteralParser();
  500. case State::InMetaCommand:
  501. return MetaCommandParser();
  502. case State::InCopyFromStdin:
  503. return InCopyFromStdinParser();
  504. case State::InVar:
  505. return VarParser();
  506. default:
  507. Y_UNREACHABLE();
  508. }
  509. }
  510. void ApplyStateFromStatement(const TStringBuf& stmt) {
  511. if (stmt.contains("set standard_conforming_strings = on;") ||
  512. stmt.contains("reset standard_conforming_strings;"))
  513. {
  514. StandardConformingStrings_ = true;
  515. } else if (stmt.contains("set standard_conforming_strings = off;")) {
  516. StandardConformingStrings_ = false;
  517. }
  518. }
  519. TString Program_;
  520. TString Cur_;
  521. size_t Pos_;
  522. State State_;
  523. bool AtStmtStart_;
  524. State Mode_;
  525. ui16 Depth_;
  526. TStringBuf Tag_;
  527. bool StandardConformingStrings_;
  528. };
  529. TString GetFormattedStmt(const TStringBuf& stmt) {
  530. TString result;
  531. result.reserve(stmt.length());
  532. size_t pos = 0, next_pos = TStringBuf::npos;
  533. while (TStringBuf::npos != (next_pos = stmt.find('\n', pos))) {
  534. if (0 < next_pos - pos) {
  535. if (!(stmt[pos] == '\r' && 1 == next_pos - pos)) {
  536. result += stmt.substr(pos, next_pos - pos + 1);
  537. }
  538. }
  539. pos = next_pos + 1;
  540. }
  541. if (pos < stmt.length())
  542. result += stmt.substr(pos);
  543. if (0 < result.length() && '\n' == result.back())
  544. result.pop_back();
  545. if (0 < result.length() && '\r' == result.back())
  546. result.pop_back();
  547. return result;
  548. }
  549. void PrintExprTo(const TProgramPtr& program, IOutputStream& out) {
  550. TStringStream baseSS;
  551. auto baseAst = ConvertToAst(*program->ExprRoot(), program->ExprCtx(),
  552. NYql::TExprAnnotationFlags::None, true);
  553. baseAst.Root->PrettyPrintTo(baseSS, PRETTY_FLAGS);
  554. out << baseSS.Data();
  555. }
  556. NYT::TNode ParseYson(const TString& yson)
  557. {
  558. NYT::TNode root;
  559. NYT::TNodeBuilder builder(&root);
  560. NYson::TStatelessYsonParser resultParser(&builder);
  561. resultParser.Parse(yson);
  562. return root;
  563. }
  564. TString GetPgErrorMessage(const TIssue& issue) {
  565. const TString anchor("reason: ");
  566. const auto& msg = issue.GetMessage();
  567. auto pos = msg.find(anchor);
  568. if (TString::npos == pos)
  569. return TString(msg);
  570. return msg.substr(pos + anchor.length());
  571. }
  572. void WriteErrorToStream(const TProgramPtr program)
  573. {
  574. program->PrintErrorsTo(Cerr);
  575. for (const auto& topIssue: program->Issues()) {
  576. WalkThroughIssues(topIssue, true, [&](const TIssue& issue, ui16 /*level*/) {
  577. const auto msg = GetPgErrorMessage(issue);
  578. Cout << msg;
  579. if (msg.back() != '\n') {
  580. Cout << '\n';
  581. }
  582. });
  583. }
  584. }
  585. using CellFormatter = std::function<const TString(const TString&)>;
  586. using TColumnType = TString;
  587. inline const TString FormatBool(const TString& value)
  588. {
  589. static const TString T = "t";
  590. static const TString F = "f";
  591. return (value == "true") ? T
  592. : (value == "false") ? F
  593. : (value == nullRepr) ? nullRepr
  594. : ythrow yexception() << "Unexpected bool literal: " << value;
  595. }
  596. inline const TString FormatNumeric(const TString& value)
  597. {
  598. static const TString Zero = "0.0";
  599. return (value == "0") ? Zero : value;
  600. }
  601. const TString FormatFloat(const TString& value, std::function<TString(const TString&)> formatter) {
  602. static const TString nan = "NaN";
  603. static const TString inf = "Infinity";
  604. static const TString minf = "-Infinity";
  605. try {
  606. return (value == "") ? ""
  607. : (value == "nan") ? nan
  608. : (value == "inf") ? inf
  609. : (value == "-inf") ? minf
  610. : formatter(value);
  611. } catch (const std::exception& e) {
  612. Cerr << "Unexpected float value '" << value << "'\n";
  613. return "";
  614. }
  615. }
  616. inline const TString FormatFloat4(const TString& value)
  617. {
  618. return FormatFloat(value,
  619. [] (const TString& val) { return TString(fmt::format("{:.8g}", std::stof(val))); });
  620. }
  621. inline const TString FormatFloat8(const TString& value)
  622. {
  623. return FormatFloat(value,
  624. [] (const TString& val) { return TString(fmt::format("{:.15g}", std::stod(val))); });
  625. }
  626. inline const TString FormatTransparent(const TString& value)
  627. {
  628. return value;
  629. }
  630. static const THashMap<TColumnType, CellFormatter> ColumnFormatters {
  631. { "bool", FormatBool },
  632. { "numeric", FormatNumeric },
  633. { "float4", FormatFloat4 },
  634. { "float8", FormatFloat8 },
  635. };
  636. static const THashSet<TColumnType> RightAlignedTypes {
  637. "int2",
  638. "int4",
  639. "int8",
  640. "float4",
  641. "float8",
  642. "numeric",
  643. "oid",
  644. };
  645. struct TColumn {
  646. TString Name;
  647. TString Type;
  648. size_t Width;
  649. CellFormatter Formatter;
  650. bool RightAligned;
  651. };
  652. std::string FormatCell(const TString& data, const TColumn& column, size_t index, size_t numberOfColumns) {
  653. const auto delim = (index == 0) ? " " : " | ";
  654. if (column.RightAligned)
  655. return fmt::format("{0}{1:>{2}}", delim, data, column.Width);
  656. if (index == numberOfColumns - 1)
  657. return fmt::format("{0}{1}", delim, data);
  658. return fmt::format("{0}{1:<{2}}", delim, data, column.Width);
  659. }
  660. TString GetCellData(const NYT::TNode& cell, const TColumn& column) {
  661. if (column.Type == "bytea") {
  662. const auto rawValue = (cell.IsList())
  663. ? Base64Decode(cell.AsList()[0].AsString())
  664. : cell.AsString();
  665. switch (byteaOutput) {
  666. case EByteaOutput::hex: {
  667. TString result;
  668. const auto expectedSize = rawValue.size() * 2 + 2;
  669. result.resize(expectedSize);
  670. result[0] = '\\';
  671. result[1] = 'x';
  672. const auto cnt = HexEncode(rawValue.data(), rawValue.size(), result.begin() + 2);
  673. Y_ASSERT(cnt + 2 == expectedSize);
  674. return result;
  675. }
  676. case EByteaOutput::escape: {
  677. TString result;
  678. ui64 expectedSize = std::accumulate(rawValue.cbegin(), rawValue.cend(), 0U,
  679. [] (ui64 acc, char c) {
  680. return acc + ((c == '\\')
  681. ? 2
  682. : ((ui8)c < 0x20 || 0x7e < (ui8)c)
  683. ? 4
  684. : 1);
  685. });
  686. result.resize(expectedSize);
  687. auto p = result.begin();
  688. for (const auto c : rawValue) {
  689. if (c == '\\') {
  690. *p++ = '\\';
  691. *p++ = '\\';
  692. } else if ((ui8)c < 0x20 || 0x7e < (ui8)c) {
  693. auto val = (ui8)c;
  694. *p++ = '\\';
  695. *p++ = ((val >> 6) & 03) + '0';
  696. *p++ = ((val >> 3) & 07) + '0';
  697. *p++ = (val & 07) + '0';
  698. } else {
  699. *p++ = c;
  700. }
  701. }
  702. return result;
  703. }
  704. default:
  705. throw yexception() << "Unhandled EByteaOutput value";
  706. }
  707. }
  708. return cell.AsString();
  709. }
  710. void WriteTableToStream(IOutputStream& stream, const NYT::TNode::TListType& cols, const NYT::TNode::TListType& rows)
  711. {
  712. TVector<TColumn> columns;
  713. TList<TVector<TString>> formattedData;
  714. for (const auto& col: cols) {
  715. const auto& colName = col[0].AsString();
  716. const auto& colType = col[1][1].AsString();
  717. auto& c = columns.emplace_back();
  718. c.Name = colName;
  719. c.Type = colType;
  720. c.Width = colName.length();
  721. c.Formatter = ColumnFormatters.Value(colType, FormatTransparent);
  722. c.RightAligned = RightAlignedTypes.contains(colType);
  723. }
  724. for (const auto& row : rows) {
  725. auto& rowData = formattedData.emplace_back();
  726. { int i = 0;
  727. for (const auto& cell : row.AsList()) {
  728. auto& c = columns[i];
  729. const auto cellData = cell.HasValue() ? GetCellData(cell, c) : nullRepr;
  730. rowData.emplace_back(c.Formatter(cellData));
  731. c.Width = std::max(c.Width, rowData.back().length());
  732. ++i;
  733. }}
  734. }
  735. if (columns.empty()) {
  736. stream << "--";
  737. } else {
  738. const auto totalTableWidth =
  739. std::accumulate(columns.cbegin(), columns.cend(), std::size_t{0},
  740. [] (const auto& sum, const auto& elem) { return sum + elem.Width; }) + columns.size() * 3 - 1;
  741. TString filler(totalTableWidth, '-');
  742. stream << fmt::format(" {0:^{1}} ", columns[0].Name, columns[0].Width);
  743. for (size_t i = 1, pos = columns[0].Width + 2; i < columns.size(); ++i) {
  744. const auto& c = columns[i];
  745. stream << fmt::format("| {0:^{1}} ", c.Name, c.Width);
  746. filler[pos] = '+';
  747. pos += c.Width + 3;
  748. }
  749. stream << '\n' << filler;
  750. }
  751. for (const auto& row : formattedData) {
  752. stream << '\n';
  753. for (size_t i = 0; i < row.size(); ++i) {
  754. stream << FormatCell(row[i], columns[i], i, columns.size());
  755. }
  756. }
  757. stream << fmt::format("\n({} {})\n", formattedData.size(), (formattedData.size() == 1) ? "row" : "rows");
  758. }
  759. std::pair<TString, TString> GetYtTableDataPaths(const TFsPath& dataDir, const TString tableName) {
  760. const auto dataFileName = dataDir / tableName;
  761. const auto attrFileName = dataDir / (tableName + ".attr");
  762. return {dataFileName, attrFileName};
  763. }
  764. void CreateYtFileTable(const TFsPath& dataDir, const TString tableName, const TExprNode::TPtr columnsNode,
  765. THashMap<TString, TString>& tablesMapping, TExprContext& ctx, const TPosition& pos) {
  766. const auto [dataFilePath, attrFilePath] =
  767. GetYtTableDataPaths(dataDir, tableName);
  768. TFile dataFile{dataFilePath, CreateNew};
  769. TFile attrFile{attrFilePath, CreateNew};
  770. auto rowSpec = MakeIntrusive<TYqlRowSpecInfo>();
  771. TColumnOrder columnOrder;
  772. columnOrder.Reserve(columnsNode->ChildrenSize());
  773. TStringBuilder ysonType;
  774. ysonType << "[\"StructType\";[";
  775. for (const auto &columnNode : columnsNode->Children()) {
  776. const auto &colName = columnNode->Child(0)->Content();
  777. const auto &colTypeNode = columnNode->Child(1);
  778. columnOrder.AddColumn(TString(colName));
  779. ysonType << fmt::format("[\"{0}\";[\"{1}\";\"{2}\";];];",
  780. colName, colTypeNode->Content(),
  781. colTypeNode->Child(0)->Content());
  782. }
  783. ysonType << "];]";
  784. const auto *typeNode = NCommon::ParseTypeFromYson(TStringBuf(ysonType), ctx, pos);
  785. rowSpec->SetType(typeNode->Cast<TStructExprType>());
  786. rowSpec->SetColumnOrder(std::move(columnOrder));
  787. NYT::TNode attrs = NYT::TNode::CreateMap();
  788. rowSpec->FillAttrNode(attrs[YqlRowSpecAttribute], 0, false);
  789. NYT::TNode spec;
  790. rowSpec->FillCodecNode(spec[YqlRowSpecAttribute]);
  791. attrs["schema"] = RowSpecToYTSchema(spec[YqlRowSpecAttribute], 0).ToNode();
  792. TOFStream of(attrFile.GetName());
  793. of.Write(NYT::NodeToYsonString(attrs, NYson::EYsonFormat::Pretty));
  794. tablesMapping[TString("yt.plato.") + tableName] = dataFile.GetName();
  795. }
  796. bool RemoveFile(const TString& fileName) {
  797. if (NFs::Remove(fileName)) {
  798. return true;
  799. }
  800. switch (errno) {
  801. case ENOENT:
  802. return false;
  803. default:
  804. throw yexception() << "Cannot remove existing table file \"" << fileName << "\"\n";
  805. }
  806. }
  807. void DeleteYtFileTable(const TFsPath& dataDir, const TString tableName, THashMap<TString, TString>& tablesMapping) {
  808. const auto [dataFilePath, attrFilePath] = GetYtTableDataPaths(dataDir, tableName);
  809. if (!RemoveFile(dataFilePath)) {
  810. Cout << "table \"" << tableName << "\" does not exist\n";
  811. }
  812. RemoveFile(attrFilePath);
  813. tablesMapping.erase(TString("yt.plato.") + tableName);
  814. }
  815. int SplitStatements(int argc, char* argv[]) {
  816. Y_UNUSED(argc);
  817. Y_UNUSED(argv);
  818. const TString delimiter{"===a738dc70-2d81-45b4-88f2-738d09b186b7===\n"};
  819. for (const auto& stmt : TStatementIterator{Cin.ReadAll()}) {
  820. Cout << delimiter << stmt;
  821. }
  822. return 0;
  823. }
  824. void WriteToYtTableScheme(
  825. const NYql::TExprNode& writeNode,
  826. const TTempDir& tempDir,
  827. const TIntrusivePtr<class NYql::NFile::TYtFileServices> yqlNativeServices,
  828. TExprContext& ctx) {
  829. const auto* keyNode = writeNode.Child(2);
  830. const auto* tableNameNode = keyNode->Child(0)->Child(1);
  831. Y_ENSURE(tableNameNode->IsCallable("String"));
  832. const auto& tableName = tableNameNode->Child(0)->Content();
  833. Y_ENSURE(!tableName.empty());
  834. const auto* optionsNode = writeNode.Child(4);
  835. Y_ENSURE(optionsNode);
  836. const auto modeNode = GetSetting(*optionsNode, "mode");
  837. Y_ENSURE(modeNode);
  838. const auto mode = modeNode->Child(1)->Content();
  839. if (mode == "create") {
  840. const auto columnsNode = GetSetting(*optionsNode, "columns");
  841. Y_ENSURE(columnsNode);
  842. CreateYtFileTable(tempDir.Path(), TString(tableName), columnsNode->ChildPtr(1),
  843. yqlNativeServices->GetTablesMapping(), ctx, writeNode.Pos(ctx));
  844. }
  845. else if (mode == "drop") {
  846. DeleteYtFileTable(tempDir.Path(), TString(tableName), yqlNativeServices->GetTablesMapping());
  847. }
  848. }
  849. void ProcessMetaCmd(const TStringBuf& cmd) {
  850. const TStringBuf pset_null("\\pset null ");
  851. if (cmd.starts_with(pset_null)) {
  852. const auto secondArgPos = cmd.find_first_not_of(" ", pset_null.length());
  853. if (secondArgPos != std::string_view::npos) {
  854. TStringBuf newNullRepr(cmd, secondArgPos);
  855. if (newNullRepr.front() == '\'') {
  856. newNullRepr.remove_prefix(1);
  857. if (newNullRepr.back() == '\'') {
  858. newNullRepr.remove_suffix(1);
  859. }
  860. }
  861. nullRepr = newNullRepr;
  862. return;
  863. }
  864. }
  865. Cerr << "Metacommand " << cmd << " is not supported\n";
  866. }
  867. void ShowFinalAst(TProgramPtr& program, IOutputStream& stream) {
  868. Cerr << "Final AST:\n";
  869. PrintExprTo(program, stream);
  870. }
  871. void FillTablesMapping(const TFsPath& dataDir, THashMap<TString, TString>& tablesMapping) {
  872. TVector<TFsPath> children;
  873. dataDir.List(children);
  874. bool regMsgLogged = false;
  875. for (const auto& f: children) {
  876. if (f.GetExtension() != "attr") {
  877. continue;
  878. }
  879. auto tableName = f.Basename();
  880. tableName.resize(tableName.length() - 5);
  881. if (tableName.EndsWith(".tmp")) {
  882. continue;
  883. }
  884. if (!regMsgLogged) {
  885. regMsgLogged = true;
  886. Cerr << "Registering pre-existing tables\n";
  887. }
  888. const auto fullTableName = f.Parent() / tableName;
  889. Cerr << '\t' << tableName << '\n';
  890. tablesMapping[TString("yt.plato.") + tableName] = f.Parent() / tableName;
  891. }
  892. }
  893. int Main(int argc, char* argv[])
  894. {
  895. using namespace NLastGetopt;
  896. TOpts opts = TOpts::Default();
  897. const TString runnerName{"pgrun"};
  898. TVector<TString> udfsPaths;
  899. TString rawDataDir;
  900. THashMap<TString, TString> clusterMapping;
  901. static const TString DefaultCluster{"plato"};
  902. clusterMapping[DefaultCluster] = YtProviderName;
  903. clusterMapping["pg_catalog"] = PgProviderName;
  904. clusterMapping["information_schema"] = PgProviderName;
  905. opts.AddHelpOption();
  906. opts.AddLongOption("print-ast", "print initial & final ASTs to stderr").NoArgument();
  907. opts.AddLongOption("print-result", "print program execution result to stderr").NoArgument();
  908. opts.AddLongOption("datadir", "directory for tables").StoreResult<TString>(&rawDataDir);
  909. opts.AddLongOption('u', "udf", "Load shared library with UDF by given path").AppendTo(&udfsPaths);
  910. opts.SetFreeArgsMax(0);
  911. TOptsParseResult res(&opts, argc, argv);
  912. const auto needPrintAst = res.Has("print-ast");
  913. const auto needPrintResult = res.Has("print-result");
  914. const bool tempDirExists = !rawDataDir.empty() && NFs::Exists(rawDataDir);
  915. TTempDir tempDir{rawDataDir.empty() ? TTempDir{} : TTempDir{rawDataDir}};
  916. if (tempDirExists) {
  917. tempDir.DoNotRemove();
  918. }
  919. auto fsConfig = MakeHolder<TFileStorageConfig>();
  920. THolder<TGatewaysConfig> gatewaysConfig;
  921. auto fileStorage = CreateFileStorage(*fsConfig);
  922. fileStorage = WithAsync(fileStorage);
  923. auto funcRegistry = CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace, CreateBuiltinRegistry(), false, udfsPaths);
  924. IUdfResolver::TPtr udfResolver = NCommon::CreateSimpleUdfResolver(funcRegistry.Get(), fileStorage, true);;
  925. bool keepTempFiles = true;
  926. bool emulateOutputForMultirun = false;
  927. auto yqlNativeServices = NFile::TYtFileServices::Make(funcRegistry.Get(), {}, fileStorage, tempDir.Path(), keepTempFiles);
  928. auto ytNativeGateway = CreateYtFileGateway(yqlNativeServices, &emulateOutputForMultirun);
  929. if (tempDirExists) {
  930. FillTablesMapping(tempDir.Path(), yqlNativeServices->GetTablesMapping());
  931. }
  932. TVector<TDataProviderInitializer> dataProvidersInit;
  933. dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway, MakeSimpleCBOOptimizerFactory(), {}));
  934. dataProvidersInit.push_back(GetPgDataProviderInitializer());
  935. TExprContext ctx;
  936. IModuleResolver::TPtr moduleResolver;
  937. if (!GetYqlDefaultModuleResolver(ctx, moduleResolver, clusterMapping, true)) {
  938. Cerr << "Errors loading default YQL libraries:" << Endl;
  939. ctx.IssueManager.GetIssues().PrintTo(Cerr);
  940. return -1;
  941. }
  942. TExprContext::TFreezeGuard freezeGuard(ctx);
  943. TProgramFactory factory(true, funcRegistry.Get(), ctx.NextUniqueId, dataProvidersInit, runnerName);
  944. factory.SetModules(moduleResolver);
  945. factory.SetUdfResolver(udfResolver);
  946. factory.SetGatewaysConfig(gatewaysConfig.Get());
  947. const TString username = GetUsername();
  948. THashSet<TString> sqlFlags;
  949. NSQLTranslation::TTranslationSettings settings;
  950. settings.ClusterMapping = clusterMapping;
  951. settings.DefaultCluster = DefaultCluster;
  952. settings.Flags = sqlFlags;
  953. settings.SyntaxVersion = 1;
  954. settings.AnsiLexer = false;
  955. settings.V0Behavior = NSQLTranslation::EV0Behavior::Report;
  956. settings.AssumeYdbOnClusterWithSlash = false;
  957. settings.PgParser = true;
  958. for (const auto& raw_stmt : TStatementIterator{Cin.ReadAll()}) {
  959. const auto stmt = GetFormattedStmt(raw_stmt);
  960. Cout << stmt << '\n';
  961. Cerr << "<sql-statement>\n" << stmt << "\n</sql-statement>\n";
  962. if (stmt[0] == '\\') {
  963. ProcessMetaCmd(stmt);
  964. continue;
  965. }
  966. {
  967. const auto metaCmdStart = stmt.find("\n\\");
  968. if (TString::npos != metaCmdStart) {
  969. const auto metaCmdEnd = stmt.find_first_of("\r\n", metaCmdStart + 2);
  970. ProcessMetaCmd(stmt.substr(metaCmdStart + 1, metaCmdEnd));
  971. continue;
  972. }
  973. }
  974. if (TString::npos != stmt.find("SET bytea_output TO hex")) {
  975. byteaOutput = EByteaOutput::hex;
  976. continue;
  977. }
  978. if (TString::npos != stmt.find("SET bytea_output TO escape")) {
  979. byteaOutput = EByteaOutput::escape;
  980. continue;
  981. }
  982. google::protobuf::Arena arena;
  983. settings.Arena = &arena;
  984. auto program = factory.Create("-stdin-", stmt);
  985. if (!program->ParseSql(settings)) {
  986. WriteErrorToStream(program);
  987. continue;
  988. }
  989. if (!program->Compile(username)) {
  990. WriteErrorToStream(program);
  991. continue;
  992. }
  993. #if 0
  994. auto validate_status = program->Validate(username, &Cout, true);
  995. program->PrintErrorsTo(Cerr);
  996. if (validate_status == TProgram::TStatus::Error) {
  997. return 1;
  998. }
  999. auto optimize_status = program->Optimize(username, nullptr, &Cerr, &Cout, true);
  1000. program->PrintErrorsTo(Cerr);
  1001. if (optimize_status == TProgram::TStatus::Error) {
  1002. return 1;
  1003. }
  1004. #endif
  1005. if (needPrintAst) {
  1006. Cerr << "Initial AST:\n";
  1007. PrintExprTo(program, Cerr);
  1008. }
  1009. static const THashSet<TString> ignoredNodes{"CommitAll!", "Commit!" };
  1010. const auto opNode = NYql::FindNode(program->ExprRoot(),
  1011. [] (const TExprNode::TPtr& node) { return !ignoredNodes.contains(node->Content()); });
  1012. if (opNode->IsCallable("Write!")) {
  1013. Y_ENSURE(opNode->ChildrenSize() == 5);
  1014. const auto* keyNode = opNode->Child(2);
  1015. const bool isWriteToTableSchemeNode = keyNode->IsCallable("Key") && 0 < keyNode->ChildrenSize() &&
  1016. keyNode->Child(0)->Child(0)->IsAtom("tablescheme");
  1017. if (isWriteToTableSchemeNode) {
  1018. try {
  1019. WriteToYtTableScheme(*opNode, tempDir, yqlNativeServices, program->ExprCtx());
  1020. } catch (const yexception& e) {
  1021. program->Issues().AddIssue(e.what());
  1022. WriteErrorToStream(program);
  1023. continue;
  1024. }
  1025. if (needPrintAst) {
  1026. program->Optimize(username);
  1027. ShowFinalAst(program, Cerr);
  1028. }
  1029. continue;
  1030. }
  1031. }
  1032. auto status = program->Run(username, nullptr, nullptr, nullptr, true);
  1033. program->ConfigureYsonResultFormat(NYson::EYsonFormat::Text);
  1034. if (status == TProgram::TStatus::Error) {
  1035. WriteErrorToStream(program);
  1036. continue;
  1037. }
  1038. if (needPrintAst) {
  1039. ShowFinalAst(program, Cerr);
  1040. }
  1041. if (program->HasResults()) {
  1042. if (needPrintResult) {
  1043. Cerr << program->ResultsAsString() << Endl;
  1044. }
  1045. const auto root = ParseYson(program->ResultsAsString());
  1046. const auto& cols = root[0]["Write"][0]["Type"][1][1].AsList();
  1047. const auto& data = root[0]["Write"][0]["Data"].AsList();
  1048. WriteTableToStream(Cout, cols, data);
  1049. Cout << Endl;
  1050. }
  1051. }
  1052. return 0;
  1053. }
  1054. int main(int argc, char* argv[])
  1055. {
  1056. NYql::NBacktrace::RegisterKikimrFatalActions();
  1057. NYql::NBacktrace::EnableKikimrSymbolize();
  1058. try {
  1059. if (1 < argc) {
  1060. if (TString(argv[1]) == "split-statements") {
  1061. return SplitStatements(argc, argv);
  1062. }
  1063. }
  1064. return Main(argc, argv);
  1065. }
  1066. catch (...) {
  1067. Cerr << CurrentExceptionMessage() << Endl;
  1068. return 1;
  1069. }
  1070. }