yql_server.cpp 25 KB


  1. #include "yql_server.h"
  2. #include <yql/essentials/core/cbo/simple/cbo_simple.h>
  3. #include <yql/essentials/providers/common/proto/gateways_config.pb.h>
  4. #include <yql/essentials/providers/common/provider/yql_provider_names.h>
  5. #include <yql/essentials/providers/common/comp_nodes/yql_factory.h>
  6. #include <yql/essentials/providers/pg/provider/yql_pg_provider.h>
  7. #include <yt/yql/providers/yt/common/yql_names.h>
  8. #include <yt/yql/providers/yt/gateway/file/yql_yt_file.h>
  9. #include <yt/yql/providers/yt/gateway/file/yql_yt_file_services.h>
  10. #include <yt/yql/providers/yt/provider/yql_yt_provider_impl.h>
  11. #include <yql/essentials/core/url_preprocessing/url_preprocessing.h>
  12. #include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
  13. #include <yql/essentials/minikql/comp_nodes/mkql_factories.h>
  14. #include <yql/essentials/parser/pg_wrapper/interface/comp_factory.h>
  15. #include <yql/essentials/sql/v1/format/sql_format.h>
  16. #include <yql/essentials/sql/v1/lexer/antlr4/lexer.h>
  17. #include <yql/essentials/sql/v1/lexer/antlr4_ansi/lexer.h>
  18. #include <yql/essentials/sql/v1/proto_parser/antlr4/proto_parser.h>
  19. #include <yql/essentials/sql/v1/proto_parser/antlr4_ansi/proto_parser.h>
  20. #include <yql/essentials/utils/log/log.h>
  21. #include <yql/essentials/utils/log/tls_backend.h>
  22. #include <yql/essentials/core/services/yql_out_transformers.h>
  23. #include <yql/essentials/utils/utf8.h>
  24. #include <library/cpp/logger/stream.h>
  25. #include <library/cpp/yson/node/node_io.h>
  26. #include <library/cpp/yson/node/node_builder.h>
  27. #include <library/cpp/openssl/io/stream.h>
  28. #include <library/cpp/charset/ci_string.h>
  29. #include <library/cpp/yson/parser.h>
  30. #include <library/cpp/string_utils/quote/quote.h>
  31. #include <google/protobuf/arena.h>
  32. #include <util/folder/tempdir.h>
  33. #include <util/system/fstat.h>
  34. #include <util/system/tempfile.h>
  35. #include <util/string/escape.h>
  36. namespace NYql {
  37. namespace NHttp {
  38. namespace {
  39. #ifdef _unix_
  40. static volatile sig_atomic_t Terminated = 0;
  41. void OnTerminate(int)
  42. {
  43. Terminated = 1;
  44. }
  45. #endif
  46. class TTempLogRedirector: private NLog::TScopedBackend<TStreamLogBackend> {
  47. using TBase = NLog::TScopedBackend<TStreamLogBackend>;
  48. public:
  49. TTempLogRedirector(IOutputStream* redirectTo)
  50. : TBase(redirectTo)
  51. {
  52. }
  53. };
  54. class TLogLevelPromouter {
  55. public:
  56. TLogLevelPromouter(bool promote) {
  57. PrevLevelCore = NLog::YqlLogger().GetComponentLevel(NLog::EComponent::Core);
  58. PrevLevelEval = NLog::YqlLogger().GetComponentLevel(NLog::EComponent::CoreEval);
  59. PrevLevelPeepHole = NLog::YqlLogger().GetComponentLevel(NLog::EComponent::CorePeepHole);
  60. if (promote) {
  61. NLog::YqlLogger().SetComponentLevel(NLog::EComponent::Core, NLog::ELevel::TRACE);
  62. NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CoreEval, NLog::ELevel::TRACE);
  63. NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CorePeepHole, NLog::ELevel::TRACE);
  64. }
  65. }
  66. ~TLogLevelPromouter() {
  67. NLog::YqlLogger().SetComponentLevel(NLog::EComponent::Core, PrevLevelCore);
  68. NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CoreEval, PrevLevelEval);
  69. NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CorePeepHole, PrevLevelPeepHole);
  70. }
  71. private:
  72. NLog::ELevel PrevLevelCore;
  73. NLog::ELevel PrevLevelEval;
  74. NLog::ELevel PrevLevelPeepHole;
  75. };
  76. class TPeepHolePipelineConfigurator : public IPipelineConfigurator, TLogLevelPromouter {
  77. public:
  78. TPeepHolePipelineConfigurator(bool promote)
  79. : TLogLevelPromouter(promote)
  80. {
  81. }
  82. void AfterCreate(TTransformationPipeline* pipeline) const final {
  83. Y_UNUSED(pipeline);
  84. }
  85. void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final {
  86. pipeline->Add(TExprLogTransformer::Sync("OptimizedExpr", NLog::EComponent::Core, NLog::ELevel::TRACE),
  87. "OptTrace", TIssuesIds::CORE, "OptTrace");
  88. }
  89. void AfterOptimize(TTransformationPipeline* pipeline) const final {
  90. pipeline->Add(CreateYtWideFlowTransformer(nullptr), "WideFlow");
  91. pipeline->Add(CreateYtBlockInputTransformer(nullptr), "BlockInput");
  92. pipeline->Add(MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole");
  93. pipeline->Add(CreateYtBlockOutputTransformer(nullptr), "BlockOutput");
  94. }
  95. };
  96. class TOptPipelineConfigurator : public IPipelineConfigurator, TLogLevelPromouter {
  97. public:
  98. TOptPipelineConfigurator(TProgramPtr prg, IOutputStream* stream)
  99. : TLogLevelPromouter(!!stream)
  100. , Program(std::move(prg))
  101. , Stream(stream)
  102. {
  103. }
  104. void AfterCreate(TTransformationPipeline* pipeline) const final {
  105. Y_UNUSED(pipeline);
  106. }
  107. void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final {
  108. if (Stream) {
  109. pipeline->Add(TExprLogTransformer::Sync("OptimizedExpr", NLog::EComponent::Core, NLog::ELevel::TRACE),
  110. "OptTrace", TIssuesIds::CORE, "OptTrace");
  111. }
  112. }
  113. void AfterOptimize(TTransformationPipeline* pipeline) const final {
  114. pipeline->Add(TPlanOutputTransformer::Sync(Stream, Program->GetPlanBuilder(), Program->GetOutputFormat()), "PlanOutput");
  115. }
  116. private:
  117. TProgramPtr Program;
  118. IOutputStream* Stream;
  119. };
  120. NSQLTranslation::TTranslationSettings GetTranslationSettings(const THashSet<TString>& sqlFlags) {
  121. static const THashMap<TString, TString> clusters = {
  122. { "plato", TString(YtProviderName) },
  123. { "plato_rtmr", TString(RtmrProviderName) },
  124. { "pg_catalog", TString(PgProviderName) },
  125. { "information_schema", TString(PgProviderName) },
  126. };
  127. NSQLTranslation::TTranslationSettings settings;
  128. settings.ClusterMapping = clusters;
  129. settings.SyntaxVersion = 1;
  130. settings.InferSyntaxVersion = true;
  131. settings.V0Behavior = NSQLTranslation::EV0Behavior::Report;
  132. settings.Flags = sqlFlags;
  133. return settings;
  134. }
  135. void SetupProgram(TProgram& prg, const TString& program) {
  136. Y_UNUSED(program);
  137. prg.SetValidateOptions(NKikimr::NUdf::EValidateMode::Greedy);
  138. prg.EnableResultPosition();
  139. }
  140. struct TTableFileHolder {
  141. TTempFile Main;
  142. TTempFile Attr;
  143. TTableFileHolder(const TString& path)
  144. : Main(path)
  145. , Attr(path + ".attr")
  146. {}
  147. };
  148. TProgramPtr MakeFileProgram(const TString& program, TYqlServer& yqlServer,
  149. const THashMap<TString, TString>& tables, const THashMap<std::pair<TString, TString>,
  150. TVector<std::pair<TString, TString>>>& /* rtmrTableAttributes */, const TString& tmpDir) {
  151. TVector<TDataProviderInitializer> dataProvidersInit;
  152. auto ytNativeServices = NFile::TYtFileServices::Make(yqlServer.FunctionRegistry, tables, yqlServer.FileStorage, tmpDir);
  153. auto ytNativeGateway = CreateYtFileGateway(ytNativeServices);
  154. auto dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({
  155. NKikimr::NMiniKQL::GetYqlFactory(),
  156. GetPgFactory()
  157. });
  158. dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway, MakeSimpleCBOOptimizerFactory(), {}));
  159. dataProvidersInit.push_back(GetPgDataProviderInitializer());
  160. TProgramFactory programFactory(
  161. true,
  162. yqlServer.FunctionRegistry,
  163. yqlServer.NextUniqueId,
  164. dataProvidersInit,
  165. "yqlrun");
  166. programFactory.AddUserDataTable(yqlServer.FilesMapping);
  167. programFactory.SetModules(yqlServer.Modules);
  168. programFactory.SetUdfResolver(yqlServer.UdfResolver);
  169. programFactory.SetUdfIndex(yqlServer.UdfIndex, new TUdfIndexPackageSet());
  170. programFactory.SetFileStorage(yqlServer.FileStorage);
  171. programFactory.EnableRangeComputeFor();
  172. programFactory.SetGatewaysConfig(yqlServer.GatewaysConfig.Get());
  173. if (yqlServer.GatewaysConfig && yqlServer.GatewaysConfig->HasFs()) {
  174. programFactory.SetUrlPreprocessing(new NYql::TUrlPreprocessing(*yqlServer.GatewaysConfig));
  175. }
  176. auto prg = programFactory.Create("-stdin-", program);
  177. SetupProgram(*prg, program);
  178. return prg;
  179. }
  180. TProgramPtr MakeFileProgram(const TString& program, const TString& input, const TString& attr,
  181. TAutoPtr<TTableFileHolder>& inputFile, TTempFile& outputFile, TYqlServer& yqlServer, const TString& tmpDir) {
  182. TString cluster = "plato";
  183. THashMap<TString, TString> tables;
  184. inputFile.Reset(new TTableFileHolder(MakeTempName()));
  185. TFile mainFile(inputFile->Main.Name(), CreateAlways | RdWr);
  186. TFile attrFile(inputFile->Attr.Name(), CreateAlways | RdWr);
  187. mainFile.Write(input.data(), input.size());
  188. attrFile.Write(attr.data(), attr.size());
  189. mainFile.Close();
  190. attrFile.Close();
  191. tables[TString(YtProviderName).append('.').append(cluster).append(TStringBuf(".Input"))] = inputFile->Main.Name();
  192. tables[TString(YtProviderName).append('.').append(cluster).append(TStringBuf(".Output"))] = outputFile.Name();
  193. NFs::Remove(outputFile.Name());
  194. THashMap<std::pair<TString, TString>, TVector<std::pair<TString, TString>>> rtmrTableAttributes;
  195. auto node = NYT::NodeFromYsonString(attr);
  196. if (node.IsMap() && node.HasKey(YqlRowSpecAttribute)) {
  197. rtmrTableAttributes[std::make_pair("plato_rtmr", "Input")] = {{"_yql_row_spec", NYT::NodeToYsonString(node[YqlRowSpecAttribute])}};
  198. }
  199. return MakeFileProgram(program, yqlServer, tables, rtmrTableAttributes, tmpDir);
  200. }
  201. YQL_ACTION(Paste)
  202. void Perform(const TString& program, const TString& input, const TString& attr, ui32 options, const TString& parameters) {
  203. Y_UNUSED(input);
  204. Y_UNUSED(attr);
  205. Y_UNUSED(options);
  206. Y_UNUSED(parameters);
  207. const static TString pasteHost(TStringBuf("paste.yandex-team.ru"));
  208. TSocket s(TNetworkAddress(pasteHost, 443));
  209. TSocketOutput so(s);
  210. TSocketInput si(s);
  211. TOpenSslClientIO ssl(&si, &so);
  212. {
  213. THttpOutput output(&ssl);
  214. TStringBuf data = "syntax=yql&text=";
  215. TString quotedProgram(program);
  216. Quote(quotedProgram);
  217. output << TStringBuf("POST / HTTP/1.1\r\n")
  218. << TStringBuf("Host: ") << pasteHost << TStringBuf("\r\n")
  219. << TStringBuf("Content-Type: application/x-www-form-urlencoded\r\n")
  220. << TStringBuf("Content-Length: ") << (data.size() + quotedProgram.size())
  221. << TStringBuf("\r\n\r\n")
  222. << data << quotedProgram
  223. << TStringBuf("\r\n");
  224. output.Finish();
  225. }
  226. {
  227. THttpInput input(&ssl);
  228. unsigned httpCode = ParseHttpRetCode(input.FirstLine());
  229. Cout << "return code: " << httpCode << Endl;
  230. for (auto i = input.Headers().Begin(), e = input.Headers().End(); i != e; ++i) {
  231. if (0 == TCiString::compare(i->Name(), TStringBuf("location"))) {
  232. Writer.Write(TStringBuf("location"), i->Value());
  233. return;
  234. }
  235. }
  236. }
  237. ythrow yexception() << "Unknown redirect location";
  238. }
  239. };
  240. YQL_ACTION(Format)
  241. void Perform(const TString& program, const TString& input, const TString& attr, ui32 options, const TString& parameters) {
  242. Y_UNUSED(input);
  243. Y_UNUSED(attr);
  244. Y_UNUSED(options);
  245. Y_UNUSED(parameters);
  246. google::protobuf::Arena arena;
  247. NSQLTranslation::TTranslationSettings settings;
  248. settings.Arena = &arena;
  249. NSQLTranslationV1::TLexers lexers;
  250. lexers.Antlr4 = NSQLTranslationV1::MakeAntlr4LexerFactory();
  251. lexers.Antlr4Ansi = NSQLTranslationV1::MakeAntlr4AnsiLexerFactory();
  252. NSQLTranslationV1::TParsers parsers;
  253. parsers.Antlr4 = NSQLTranslationV1::MakeAntlr4ParserFactory();
  254. parsers.Antlr4Ansi = NSQLTranslationV1::MakeAntlr4AnsiParserFactory();
  255. auto formatter = NSQLFormat::MakeSqlFormatter(lexers, parsers, settings);
  256. TString frm_query;
  257. TString error;
  258. NYql::TIssues issues;
  259. if (!formatter->Format(program, frm_query, issues)) {
  260. WriteStatus(false, issues);
  261. } else {
  262. Writer.Write(TStringBuf("sql"), frm_query);
  263. }
  264. }
  265. };
  266. ///////////////////////////////////////////////////////////////////////////////
  267. // parse action
  268. ///////////////////////////////////////////////////////////////////////////////
  269. YQL_ACTION(Parse)
  270. void Perform(const TString& program, const TString& input, const TString& attr, ui32 options, const TString& parameters) {
  271. Y_UNUSED(input);
  272. Y_UNUSED(attr);
  273. Y_UNUSED(parameters);
  274. TTempDir tmpDir;
  275. TProgramPtr prg = MakeFileProgram(program, YqlServer, {}, {}, tmpDir.Name());
  276. bool parsed = (options & TYqlAction::YqlProgram)
  277. ? prg->ParseYql()
  278. : prg->ParseSql(GetTranslationSettings(YqlServer.SqlFlags));
  279. if (parsed) {
  280. ui32 prettyFlg = TAstPrintFlags::PerLine | TAstPrintFlags::ShortQuote;
  281. Writer.Write(TStringBuf("expr"), prg->AstRoot()->ToString(prettyFlg));
  282. if (options & EOptions::PrintAst) {
  283. Writer.Write(TStringBuf("ast"));
  284. WriteAstTree(prg->AstRoot());
  285. }
  286. }
  287. WriteStatus(parsed, prg->Issues());
  288. }
  289. };
  290. ///////////////////////////////////////////////////////////////////////////////
  291. // compile action
  292. ///////////////////////////////////////////////////////////////////////////////
  293. YQL_ACTION(Compile)
  294. void Perform(const TString& program, const TString& input, const TString& attr, ui32 options, const TString& parameters) {
  295. Y_UNUSED(input);
  296. Y_UNUSED(attr);
  297. TTempDir tmpDir;
  298. TProgramPtr prg = MakeFileProgram(program, YqlServer, {}, {}, tmpDir.Name());
  299. prg->SetParametersYson(parameters);
  300. bool noError = (options & TYqlAction::YqlProgram) ? prg->ParseYql() : prg->ParseSql(GetTranslationSettings(YqlServer.SqlFlags));
  301. noError = noError && prg->Compile(GetUsername());
  302. if (options & (EOptions::PrintAst | EOptions::PrintExpr)) {
  303. if (prg->ExprRoot()) {
  304. auto ast = ConvertToAst(*prg->ExprRoot(), prg->ExprCtx(), TExprAnnotationFlags::None, true);
  305. if (options & EOptions::PrintAst) {
  306. Writer.Write(TStringBuf("ast"));
  307. WriteAstTree(ast.Root);
  308. }
  309. if (options & EOptions::PrintExpr) {
  310. ui32 prettyFlg = TAstPrintFlags::PerLine | TAstPrintFlags::ShortQuote;
  311. Writer.Write(TStringBuf("expr"), ast.Root->ToString(prettyFlg));
  312. }
  313. }
  314. }
  315. WriteStatus(noError, prg->Issues());
  316. }
  317. };
  318. ///////////////////////////////////////////////////////////////////////////////
  319. // optimize, validate and peephole actions
  320. ///////////////////////////////////////////////////////////////////////////////
  321. YQL_ACTION(OptimizeOrValidateFile)
  322. void Perform(const TString& program, const TString& input, const TString& attr, ui32 options, const TString& parameters) {
  323. TAutoPtr<TTableFileHolder> inputFile;
  324. TTempFile outputFile(MakeTempName());
  325. TTempFile outputFileAttr(outputFile.Name() + ".attr");
  326. TTempDir tmpDir;
  327. TProgramPtr prg = MakeFileProgram(program, input, attr, inputFile, outputFile, YqlServer, tmpDir.Name());
  328. bool noError = (options & TYqlAction::YqlProgram) ? prg->ParseYql() : prg->ParseSql(GetTranslationSettings(YqlServer.SqlFlags));
  329. prg->SetParametersYson(parameters);
  330. prg->SetDiagnosticFormat(NYson::EYsonFormat::Pretty);
  331. THolder<TStringStream> traceOut;
  332. THolder<TTempLogRedirector> logRedirector;
  333. if (options & EOptions::PrintTraceOpt) {
  334. traceOut.Reset(new TStringStream);
  335. logRedirector.Reset(new TTempLogRedirector(traceOut.Get()));
  336. }
  337. noError = noError && prg->Compile(GetUsername());
  338. if (noError) {
  339. TProgram::TStatus status = TProgram::TStatus::Error;
  340. auto name = TStringBuf(Req.RD.ScriptName());
  341. if (name.Contains(TStringBuf("/optimize"))) {
  342. auto config = TOptPipelineConfigurator(prg, traceOut.Get());
  343. status = prg->OptimizeWithConfig(GetUsername(), config);
  344. } else if (name.Contains(TStringBuf("/validate"))) {
  345. status = prg->Validate(GetUsername());
  346. } else if (name.Contains(TStringBuf("/peephole"))) {
  347. auto config = TPeepHolePipelineConfigurator(options & EOptions::PrintTraceOpt);
  348. status = prg->OptimizeWithConfig(GetUsername(), config);
  349. }
  350. noError = status == TProgram::TStatus::Ok;
  351. }
  352. if (options & (EOptions::PrintAst | EOptions::PrintExpr)) {
  353. if (prg->ExprRoot()) {
  354. auto ast = ConvertToAst(*prg->ExprRoot(), prg->ExprCtx(), TExprAnnotationFlags::None, true);
  355. if (options & EOptions::PrintAst) {
  356. Writer.Write(TStringBuf("ast"));
  357. WriteAstTree(ast.Root);
  358. }
  359. if (options & EOptions::PrintExpr) {
  360. ui32 prettyFlg = TAstPrintFlags::PerLine | TAstPrintFlags::ShortQuote;
  361. Writer.Write(TStringBuf("expr"), ast.Root->ToString(prettyFlg));
  362. }
  363. }
  364. }
  365. auto diagnostics = prg->GetDiagnostics();
  366. if (diagnostics) {
  367. Cerr << *diagnostics;
  368. }
  369. if (!!traceOut && !traceOut->Str().empty()) {
  370. if (diagnostics) {
  371. traceOut->Write(*diagnostics);
  372. }
  373. Writer.Write(TStringBuf("opttrace"), traceOut->Str());
  374. }
  375. if (options & TYqlAction::WithFinalIssues) {
  376. prg->FinalizeIssues();
  377. }
  378. WriteStatus(noError, prg->Issues());
  379. }
  380. };
  381. ///////////////////////////////////////////////////////////////////////////////
  382. // run actions
  383. ///////////////////////////////////////////////////////////////////////////////
  384. YQL_ACTION(FileRun)
  385. void Perform(const TString& program, const TString& input, const TString& attr, ui32 options, const TString& parameters) {
  386. auto name = TStringBuf(Req.RD.ScriptName());
  387. TAutoPtr<TTableFileHolder> inputFile;
  388. TTempFile outputFile(MakeTempName());
  389. TTempFile outputFileAttr(outputFile.Name() + ".attr");
  390. TTempDir tmpDir;
  391. TProgramPtr prg = MakeFileProgram(program, input, attr, inputFile, outputFile, YqlServer, tmpDir.Name());
  392. bool noError = (options & TYqlAction::YqlProgram) ? prg->ParseYql() : prg->ParseSql(GetTranslationSettings(YqlServer.SqlFlags));
  393. prg->SetDiagnosticFormat(NYson::EYsonFormat::Pretty);
  394. prg->SetParametersYson(parameters);
  395. THolder<TStringStream> traceOut;
  396. THolder<TTempLogRedirector> logRedirector;
  397. if (options & EOptions::PrintTraceOpt) {
  398. traceOut.Reset(new TStringStream);
  399. logRedirector.Reset(new TTempLogRedirector(traceOut.Get()));
  400. }
  401. noError = noError && prg->Compile(GetUsername());
  402. TProgram::TStatus status = TProgram::TStatus::Error;
  403. if (noError) {
  404. auto config = TOptPipelineConfigurator(prg, traceOut.Get());
  405. if (name.Contains(TStringBuf("/lineage"))) {
  406. status = prg->LineageWithConfig(GetUsername(), config);
  407. } else {
  408. status = prg->RunWithConfig(GetUsername(), config);
  409. }
  410. }
  411. if (options & (EOptions::PrintAst | EOptions::PrintExpr)) {
  412. if (prg->ExprRoot()) {
  413. auto ast = ConvertToAst(*prg->ExprRoot(), prg->ExprCtx(), TExprAnnotationFlags::None, true);
  414. if (options & EOptions::PrintAst) {
  415. Writer.Write(TStringBuf("ast"));
  416. WriteAstTree(ast.Root);
  417. }
  418. if (options & EOptions::PrintExpr) {
  419. ui32 prettyFlg = TAstPrintFlags::PerLine | TAstPrintFlags::ShortQuote;
  420. Writer.Write(TStringBuf("expr"), ast.Root->ToString(prettyFlg));
  421. }
  422. }
  423. }
  424. auto diagnostics = prg->GetDiagnostics();
  425. if (diagnostics) {
  426. Cerr << *diagnostics;
  427. }
  428. if (!!traceOut && !traceOut->Str().empty()) {
  429. if (diagnostics) {
  430. traceOut->Write(*diagnostics);
  431. }
  432. Writer.Write(TStringBuf("opttrace"), traceOut->Str());
  433. }
  434. if (options & TYqlAction::WithFinalIssues) {
  435. prg->FinalizeIssues();
  436. }
  437. WriteStatus(status != TProgram::TStatus::Error, prg->Issues());
  438. if (status != TProgram::TStatus::Error) {
  439. // write output
  440. Writer.Write(TStringBuf("output"));
  441. Writer.OpenMap();
  442. if (TFileStat(outputFile.Name()).IsFile()) {
  443. TFileInput fileInput(outputFile.Name());
  444. NYT::TNode list = NYT::TNode::CreateList();
  445. NYT::TNodeBuilder builder(&list);
  446. NYson::TYsonParser parser(&builder, &fileInput, ::NYson::EYsonType::ListFragment);
  447. parser.Parse();
  448. std::set<TString> headers;
  449. for (auto& row: list.AsList()) {
  450. for (auto& val: row.AsMap()) {
  451. headers.insert(val.first);
  452. }
  453. }
  454. { // headers
  455. Writer.Write(TStringBuf("headers"));
  456. Writer.OpenArray();
  457. for (const auto& header : headers) {
  458. Writer.Write(header);
  459. }
  460. Writer.CloseArray();
  461. }
  462. { // rows
  463. Writer.Write(TStringBuf("rows"));
  464. Writer.OpenArray();
  465. for (auto& row: list.AsList()) {
  466. Writer.OpenArray();
  467. for (const auto& header : headers) {
  468. if (auto p = row.AsMap().FindPtr(header)) {
  469. if (p->IsString()) {
  470. const auto& str = p->AsString();
  471. Writer.Write(IsUtf8(str) ? str : EscapeC(str));
  472. } else {
  473. Writer.Write(NYT::NodeToYsonString(*p, NYson::EYsonFormat::Text));
  474. }
  475. } else {
  476. Writer.Write(TString());
  477. }
  478. }
  479. Writer.CloseArray();
  480. }
  481. Writer.CloseArray();
  482. }
  483. }
  484. Writer.CloseMap();
  485. }
  486. if (name.Contains(TStringBuf("/lineage"))) {
  487. if (auto data = prg->GetLineage()) {
  488. TString str;
  489. TStringOutput out(str);
  490. TStringInput in(*data);
  491. NYson::ReformatYsonStream(&in, &out, NYson::EYsonFormat::Pretty);
  492. Writer.Write(TStringBuf("results"), str);
  493. }
  494. } else {
  495. Writer.Write(TStringBuf("results"), prg->ResultsAsString());
  496. }
  497. }
  498. };
  499. } // namespace
  500. void TYqlServer::Start()
  501. {
  502. #ifdef _unix_
  503. ShutdownOn(SIGINT);
  504. ShutdownOn(SIGTERM);
  505. #endif
  506. bool started = HttpServer.Start();
  507. if (!started) {
  508. ythrow yexception() << "YqlServer not started. Error: "
  509. << HttpServer.GetErrorCode()
  510. << ": " << HttpServer.GetError();
  511. }
  512. }
  513. void TYqlServer::ShutdownOn(int signal)
  514. {
  515. #ifdef _unix_
  516. struct sigaction sa = {};
  517. sa.sa_handler = OnTerminate;
  518. sigfillset(&sa.sa_mask); // block every signal during the handler
  519. if (sigaction(signal, &sa, nullptr) < 0) {
  520. ythrow yexception() << "Error: cannot handle signal " << signal;
  521. }
  522. #else
  523. Y_UNUSED(signal);
  524. #endif
  525. }
  526. void TYqlServer::Wait()
  527. {
  528. #ifdef _unix_
  529. while (!Terminated) {
  530. sleep(1);
  531. }
  532. #else
  533. HttpServer.Wait();
  534. #endif
  535. }
  536. TAutoPtr<TYqlServer> CreateYqlServer(
  537. TServerConfig config,
  538. const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
  539. TUdfIndex::TPtr udfIndex,
  540. ui64 nextUniqueId,
  541. TUserDataTable filesMapping,
  542. THolder<TGatewaysConfig>&& gatewaysConfig,
  543. const THashSet<TString>& sqlFlags,
  544. IModuleResolver::TPtr modules,
  545. IUdfResolver::TPtr udfResolver,
  546. TFileStoragePtr fileStorage)
  547. {
  548. TAutoPtr<TYqlServer> server = new TYqlServer(
  549. config, functionRegistry, udfIndex, nextUniqueId,
  550. std::move(filesMapping), std::move(gatewaysConfig), sqlFlags, modules, udfResolver, fileStorage);
  551. server->RegisterAction<TYqlActionPaste>("/api/yql/paste");
  552. server->RegisterAction<TYqlActionParse>("/api/yql/parse");
  553. server->RegisterAction<TYqlActionCompile>("/api/yql/compile");
  554. server->RegisterAction<TYqlActionFormat>("/api/yql/format");
  555. server->RegisterAction<TYqlActionOptimizeOrValidateFile>("/api/yql/validate");
  556. server->RegisterAction<TYqlActionOptimizeOrValidateFile>("/api/yql/optimize");
  557. server->RegisterAction<TYqlActionOptimizeOrValidateFile>("/api/yql/peephole");
  558. server->RegisterServlet("/js/yql-functions.js", new TYqlFunctoinsServlet());
  559. server->RegisterAction<TYqlActionFileRun>("/api/yql/lineage");
  560. server->RegisterAction<TYqlActionFileRun>("/api/yql/run");
  561. server->RegisterServlet("/",
  562. new TAssetsServlet("/", config.GetAssetsPath(), "file-index.html"));
  563. return server;
  564. }
  565. } // namspace NHttp
  566. } // namspace NYql