yql_server.cpp 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659
  1. #include "yql_server.h"
  2. #include <yql/essentials/core/cbo/simple/cbo_simple.h>
  3. #include <yql/essentials/providers/common/proto/gateways_config.pb.h>
  4. #include <yql/essentials/providers/common/provider/yql_provider_names.h>
  5. #include <yql/essentials/providers/common/comp_nodes/yql_factory.h>
  6. #include <yql/essentials/providers/pg/provider/yql_pg_provider.h>
  7. #include <yt/yql/providers/yt/common/yql_names.h>
  8. #include <yt/yql/providers/yt/gateway/file/yql_yt_file.h>
  9. #include <yt/yql/providers/yt/gateway/file/yql_yt_file_services.h>
  10. #include <yt/yql/providers/yt/provider/yql_yt_provider_impl.h>
  11. #include <yql/essentials/core/url_preprocessing/url_preprocessing.h>
  12. #include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
  13. #include <yql/essentials/minikql/comp_nodes/mkql_factories.h>
  14. #include <yql/essentials/parser/pg_wrapper/interface/comp_factory.h>
  15. #include <yql/essentials/sql/v1/format/sql_format.h>
  16. #include <yql/essentials/utils/log/log.h>
  17. #include <yql/essentials/utils/log/tls_backend.h>
  18. #include <yql/essentials/core/services/yql_out_transformers.h>
  19. #include <yql/essentials/utils/utf8.h>
  20. #include <library/cpp/logger/stream.h>
  21. #include <library/cpp/yson/node/node_io.h>
  22. #include <library/cpp/yson/node/node_builder.h>
  23. #include <library/cpp/openssl/io/stream.h>
  24. #include <library/cpp/charset/ci_string.h>
  25. #include <library/cpp/yson/parser.h>
  26. #include <library/cpp/string_utils/quote/quote.h>
  27. #include <google/protobuf/arena.h>
  28. #include <util/folder/tempdir.h>
  29. #include <util/system/fstat.h>
  30. #include <util/system/tempfile.h>
  31. #include <util/string/escape.h>
  32. namespace NYql {
  33. namespace NHttp {
  34. namespace {
  35. #ifdef _unix_
  36. static volatile sig_atomic_t Terminated = 0;
  37. void OnTerminate(int)
  38. {
  39. Terminated = 1;
  40. }
  41. #endif
  42. class TTempLogRedirector: private NLog::TScopedBackend<TStreamLogBackend> {
  43. using TBase = NLog::TScopedBackend<TStreamLogBackend>;
  44. public:
  45. TTempLogRedirector(IOutputStream* redirectTo)
  46. : TBase(redirectTo)
  47. {
  48. }
  49. };
  50. class TLogLevelPromouter {
  51. public:
  52. TLogLevelPromouter(bool promote) {
  53. PrevLevelCore = NLog::YqlLogger().GetComponentLevel(NLog::EComponent::Core);
  54. PrevLevelEval = NLog::YqlLogger().GetComponentLevel(NLog::EComponent::CoreEval);
  55. PrevLevelPeepHole = NLog::YqlLogger().GetComponentLevel(NLog::EComponent::CorePeepHole);
  56. if (promote) {
  57. NLog::YqlLogger().SetComponentLevel(NLog::EComponent::Core, NLog::ELevel::TRACE);
  58. NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CoreEval, NLog::ELevel::TRACE);
  59. NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CorePeepHole, NLog::ELevel::TRACE);
  60. }
  61. }
  62. ~TLogLevelPromouter() {
  63. NLog::YqlLogger().SetComponentLevel(NLog::EComponent::Core, PrevLevelCore);
  64. NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CoreEval, PrevLevelEval);
  65. NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CorePeepHole, PrevLevelPeepHole);
  66. }
  67. private:
  68. NLog::ELevel PrevLevelCore;
  69. NLog::ELevel PrevLevelEval;
  70. NLog::ELevel PrevLevelPeepHole;
  71. };
  72. class TPeepHolePipelineConfigurator : public IPipelineConfigurator, TLogLevelPromouter {
  73. public:
  74. TPeepHolePipelineConfigurator(bool promote)
  75. : TLogLevelPromouter(promote)
  76. {
  77. }
  78. void AfterCreate(TTransformationPipeline* pipeline) const final {
  79. Y_UNUSED(pipeline);
  80. }
  81. void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final {
  82. pipeline->Add(TExprLogTransformer::Sync("OptimizedExpr", NLog::EComponent::Core, NLog::ELevel::TRACE),
  83. "OptTrace", TIssuesIds::CORE, "OptTrace");
  84. }
  85. void AfterOptimize(TTransformationPipeline* pipeline) const final {
  86. pipeline->Add(CreateYtWideFlowTransformer(nullptr), "WideFlow");
  87. pipeline->Add(CreateYtBlockInputTransformer(nullptr), "BlockInput");
  88. pipeline->Add(MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole");
  89. pipeline->Add(CreateYtBlockOutputTransformer(nullptr), "BlockOutput");
  90. }
  91. };
  92. class TOptPipelineConfigurator : public IPipelineConfigurator, TLogLevelPromouter {
  93. public:
  94. TOptPipelineConfigurator(TProgramPtr prg, IOutputStream* stream)
  95. : TLogLevelPromouter(!!stream)
  96. , Program(std::move(prg))
  97. , Stream(stream)
  98. {
  99. }
  100. void AfterCreate(TTransformationPipeline* pipeline) const final {
  101. Y_UNUSED(pipeline);
  102. }
  103. void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final {
  104. if (Stream) {
  105. pipeline->Add(TExprLogTransformer::Sync("OptimizedExpr", NLog::EComponent::Core, NLog::ELevel::TRACE),
  106. "OptTrace", TIssuesIds::CORE, "OptTrace");
  107. }
  108. }
  109. void AfterOptimize(TTransformationPipeline* pipeline) const final {
  110. pipeline->Add(TPlanOutputTransformer::Sync(Stream, Program->GetPlanBuilder(), Program->GetOutputFormat()), "PlanOutput");
  111. }
  112. private:
  113. TProgramPtr Program;
  114. IOutputStream* Stream;
  115. };
  116. NSQLTranslation::TTranslationSettings GetTranslationSettings(const THashSet<TString>& sqlFlags) {
  117. static const THashMap<TString, TString> clusters = {
  118. { "plato", TString(YtProviderName) },
  119. { "plato_rtmr", TString(RtmrProviderName) },
  120. { "pg_catalog", TString(PgProviderName) },
  121. { "information_schema", TString(PgProviderName) },
  122. };
  123. NSQLTranslation::TTranslationSettings settings;
  124. settings.ClusterMapping = clusters;
  125. settings.SyntaxVersion = 1;
  126. settings.InferSyntaxVersion = true;
  127. settings.V0Behavior = NSQLTranslation::EV0Behavior::Report;
  128. settings.Flags = sqlFlags;
  129. return settings;
  130. }
  131. void SetupProgram(TProgram& prg, const TString& program) {
  132. Y_UNUSED(program);
  133. prg.SetValidateOptions(NKikimr::NUdf::EValidateMode::Greedy);
  134. prg.EnableResultPosition();
  135. }
  136. struct TTableFileHolder {
  137. TTempFile Main;
  138. TTempFile Attr;
  139. TTableFileHolder(const TString& path)
  140. : Main(path)
  141. , Attr(path + ".attr")
  142. {}
  143. };
  144. TProgramPtr MakeFileProgram(const TString& program, TYqlServer& yqlServer,
  145. const THashMap<TString, TString>& tables, const THashMap<std::pair<TString, TString>,
  146. TVector<std::pair<TString, TString>>>& /* rtmrTableAttributes */, const TString& tmpDir) {
  147. TVector<TDataProviderInitializer> dataProvidersInit;
  148. auto ytNativeServices = NFile::TYtFileServices::Make(yqlServer.FunctionRegistry, tables, yqlServer.FileStorage, tmpDir);
  149. auto ytNativeGateway = CreateYtFileGateway(ytNativeServices);
  150. auto dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({
  151. NKikimr::NMiniKQL::GetYqlFactory(),
  152. GetPgFactory()
  153. });
  154. dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway, MakeSimpleCBOOptimizerFactory(), {}));
  155. dataProvidersInit.push_back(GetPgDataProviderInitializer());
  156. TProgramFactory programFactory(
  157. true,
  158. yqlServer.FunctionRegistry,
  159. yqlServer.NextUniqueId,
  160. dataProvidersInit,
  161. "yqlrun");
  162. programFactory.AddUserDataTable(yqlServer.FilesMapping);
  163. programFactory.SetModules(yqlServer.Modules);
  164. programFactory.SetUdfResolver(yqlServer.UdfResolver);
  165. programFactory.SetUdfIndex(yqlServer.UdfIndex, new TUdfIndexPackageSet());
  166. programFactory.SetFileStorage(yqlServer.FileStorage);
  167. programFactory.EnableRangeComputeFor();
  168. programFactory.SetGatewaysConfig(yqlServer.GatewaysConfig.Get());
  169. if (yqlServer.GatewaysConfig && yqlServer.GatewaysConfig->HasFs()) {
  170. programFactory.SetUrlPreprocessing(new NYql::TUrlPreprocessing(*yqlServer.GatewaysConfig));
  171. }
  172. auto prg = programFactory.Create("-stdin-", program);
  173. SetupProgram(*prg, program);
  174. return prg;
  175. }
  176. TProgramPtr MakeFileProgram(const TString& program, const TString& input, const TString& attr,
  177. TAutoPtr<TTableFileHolder>& inputFile, TTempFile& outputFile, TYqlServer& yqlServer, const TString& tmpDir) {
  178. TString cluster = "plato";
  179. THashMap<TString, TString> tables;
  180. inputFile.Reset(new TTableFileHolder(MakeTempName()));
  181. TFile mainFile(inputFile->Main.Name(), CreateAlways | RdWr);
  182. TFile attrFile(inputFile->Attr.Name(), CreateAlways | RdWr);
  183. mainFile.Write(input.data(), input.size());
  184. attrFile.Write(attr.data(), attr.size());
  185. mainFile.Close();
  186. attrFile.Close();
  187. tables[TString(YtProviderName).append('.').append(cluster).append(TStringBuf(".Input"))] = inputFile->Main.Name();
  188. tables[TString(YtProviderName).append('.').append(cluster).append(TStringBuf(".Output"))] = outputFile.Name();
  189. NFs::Remove(outputFile.Name());
  190. THashMap<std::pair<TString, TString>, TVector<std::pair<TString, TString>>> rtmrTableAttributes;
  191. auto node = NYT::NodeFromYsonString(attr);
  192. if (node.IsMap() && node.HasKey(YqlRowSpecAttribute)) {
  193. rtmrTableAttributes[std::make_pair("plato_rtmr", "Input")] = {{"_yql_row_spec", NYT::NodeToYsonString(node[YqlRowSpecAttribute])}};
  194. }
  195. return MakeFileProgram(program, yqlServer, tables, rtmrTableAttributes, tmpDir);
  196. }
  197. YQL_ACTION(Paste)
  198. void Perform(const TString& program, const TString& input, const TString& attr, ui32 options, const TString& parameters) {
  199. Y_UNUSED(input);
  200. Y_UNUSED(attr);
  201. Y_UNUSED(options);
  202. Y_UNUSED(parameters);
  203. const static TString pasteHost(TStringBuf("paste.yandex-team.ru"));
  204. TSocket s(TNetworkAddress(pasteHost, 443));
  205. TSocketOutput so(s);
  206. TSocketInput si(s);
  207. TOpenSslClientIO ssl(&si, &so);
  208. {
  209. THttpOutput output(&ssl);
  210. TStringBuf data = "syntax=yql&text=";
  211. TString quotedProgram(program);
  212. Quote(quotedProgram);
  213. output << TStringBuf("POST / HTTP/1.1\r\n")
  214. << TStringBuf("Host: ") << pasteHost << TStringBuf("\r\n")
  215. << TStringBuf("Content-Type: application/x-www-form-urlencoded\r\n")
  216. << TStringBuf("Content-Length: ") << (data.size() + quotedProgram.size())
  217. << TStringBuf("\r\n\r\n")
  218. << data << quotedProgram
  219. << TStringBuf("\r\n");
  220. output.Finish();
  221. }
  222. {
  223. THttpInput input(&ssl);
  224. unsigned httpCode = ParseHttpRetCode(input.FirstLine());
  225. Cout << "return code: " << httpCode << Endl;
  226. for (auto i = input.Headers().Begin(), e = input.Headers().End(); i != e; ++i) {
  227. if (0 == TCiString::compare(i->Name(), TStringBuf("location"))) {
  228. Writer.Write(TStringBuf("location"), i->Value());
  229. return;
  230. }
  231. }
  232. }
  233. ythrow yexception() << "Unknown redirect location";
  234. }
  235. };
  236. YQL_ACTION(Format)
  237. void Perform(const TString& program, const TString& input, const TString& attr, ui32 options, const TString& parameters) {
  238. Y_UNUSED(input);
  239. Y_UNUSED(attr);
  240. Y_UNUSED(options);
  241. Y_UNUSED(parameters);
  242. google::protobuf::Arena arena;
  243. NSQLTranslation::TTranslationSettings settings;
  244. settings.Arena = &arena;
  245. auto formatter = NSQLFormat::MakeSqlFormatter(settings);
  246. TString frm_query;
  247. TString error;
  248. NYql::TIssues issues;
  249. if (!formatter->Format(program, frm_query, issues)) {
  250. WriteStatus(false, issues);
  251. } else {
  252. Writer.Write(TStringBuf("sql"), frm_query);
  253. }
  254. }
  255. };
  256. ///////////////////////////////////////////////////////////////////////////////
  257. // parse action
  258. ///////////////////////////////////////////////////////////////////////////////
  259. YQL_ACTION(Parse)
  260. void Perform(const TString& program, const TString& input, const TString& attr, ui32 options, const TString& parameters) {
  261. Y_UNUSED(input);
  262. Y_UNUSED(attr);
  263. Y_UNUSED(parameters);
  264. TTempDir tmpDir;
  265. TProgramPtr prg = MakeFileProgram(program, YqlServer, {}, {}, tmpDir.Name());
  266. bool parsed = (options & TYqlAction::YqlProgram)
  267. ? prg->ParseYql()
  268. : prg->ParseSql(GetTranslationSettings(YqlServer.SqlFlags));
  269. if (parsed) {
  270. ui32 prettyFlg = TAstPrintFlags::PerLine | TAstPrintFlags::ShortQuote;
  271. Writer.Write(TStringBuf("expr"), prg->AstRoot()->ToString(prettyFlg));
  272. if (options & EOptions::PrintAst) {
  273. Writer.Write(TStringBuf("ast"));
  274. WriteAstTree(prg->AstRoot());
  275. }
  276. }
  277. WriteStatus(parsed, prg->Issues());
  278. }
  279. };
  280. ///////////////////////////////////////////////////////////////////////////////
  281. // compile action
  282. ///////////////////////////////////////////////////////////////////////////////
  283. YQL_ACTION(Compile)
  284. void Perform(const TString& program, const TString& input, const TString& attr, ui32 options, const TString& parameters) {
  285. Y_UNUSED(input);
  286. Y_UNUSED(attr);
  287. TTempDir tmpDir;
  288. TProgramPtr prg = MakeFileProgram(program, YqlServer, {}, {}, tmpDir.Name());
  289. prg->SetParametersYson(parameters);
  290. bool noError = (options & TYqlAction::YqlProgram) ? prg->ParseYql() : prg->ParseSql(GetTranslationSettings(YqlServer.SqlFlags));
  291. noError = noError && prg->Compile(GetUsername());
  292. if (options & (EOptions::PrintAst | EOptions::PrintExpr)) {
  293. if (prg->ExprRoot()) {
  294. auto ast = ConvertToAst(*prg->ExprRoot(), prg->ExprCtx(), TExprAnnotationFlags::None, true);
  295. if (options & EOptions::PrintAst) {
  296. Writer.Write(TStringBuf("ast"));
  297. WriteAstTree(ast.Root);
  298. }
  299. if (options & EOptions::PrintExpr) {
  300. ui32 prettyFlg = TAstPrintFlags::PerLine | TAstPrintFlags::ShortQuote;
  301. Writer.Write(TStringBuf("expr"), ast.Root->ToString(prettyFlg));
  302. }
  303. }
  304. }
  305. WriteStatus(noError, prg->Issues());
  306. }
  307. };
  308. ///////////////////////////////////////////////////////////////////////////////
  309. // optimize, validate and peephole actions
  310. ///////////////////////////////////////////////////////////////////////////////
  311. YQL_ACTION(OptimizeOrValidateFile)
  312. void Perform(const TString& program, const TString& input, const TString& attr, ui32 options, const TString& parameters) {
  313. TAutoPtr<TTableFileHolder> inputFile;
  314. TTempFile outputFile(MakeTempName());
  315. TTempFile outputFileAttr(outputFile.Name() + ".attr");
  316. TTempDir tmpDir;
  317. TProgramPtr prg = MakeFileProgram(program, input, attr, inputFile, outputFile, YqlServer, tmpDir.Name());
  318. bool noError = (options & TYqlAction::YqlProgram) ? prg->ParseYql() : prg->ParseSql(GetTranslationSettings(YqlServer.SqlFlags));
  319. prg->SetParametersYson(parameters);
  320. prg->SetDiagnosticFormat(NYson::EYsonFormat::Pretty);
  321. THolder<TStringStream> traceOut;
  322. THolder<TTempLogRedirector> logRedirector;
  323. if (options & EOptions::PrintTraceOpt) {
  324. traceOut.Reset(new TStringStream);
  325. logRedirector.Reset(new TTempLogRedirector(traceOut.Get()));
  326. }
  327. noError = noError && prg->Compile(GetUsername());
  328. if (noError) {
  329. TProgram::TStatus status = TProgram::TStatus::Error;
  330. auto name = TStringBuf(Req.RD.ScriptName());
  331. if (name.Contains(TStringBuf("/optimize"))) {
  332. auto config = TOptPipelineConfigurator(prg, traceOut.Get());
  333. status = prg->OptimizeWithConfig(GetUsername(), config);
  334. } else if (name.Contains(TStringBuf("/validate"))) {
  335. status = prg->Validate(GetUsername());
  336. } else if (name.Contains(TStringBuf("/peephole"))) {
  337. auto config = TPeepHolePipelineConfigurator(options & EOptions::PrintTraceOpt);
  338. status = prg->OptimizeWithConfig(GetUsername(), config);
  339. }
  340. noError = status == TProgram::TStatus::Ok;
  341. }
  342. if (options & (EOptions::PrintAst | EOptions::PrintExpr)) {
  343. if (prg->ExprRoot()) {
  344. auto ast = ConvertToAst(*prg->ExprRoot(), prg->ExprCtx(), TExprAnnotationFlags::None, true);
  345. if (options & EOptions::PrintAst) {
  346. Writer.Write(TStringBuf("ast"));
  347. WriteAstTree(ast.Root);
  348. }
  349. if (options & EOptions::PrintExpr) {
  350. ui32 prettyFlg = TAstPrintFlags::PerLine | TAstPrintFlags::ShortQuote;
  351. Writer.Write(TStringBuf("expr"), ast.Root->ToString(prettyFlg));
  352. }
  353. }
  354. }
  355. auto diagnostics = prg->GetDiagnostics();
  356. if (diagnostics) {
  357. Cerr << *diagnostics;
  358. }
  359. if (!!traceOut && !traceOut->Str().empty()) {
  360. if (diagnostics) {
  361. traceOut->Write(*diagnostics);
  362. }
  363. Writer.Write(TStringBuf("opttrace"), traceOut->Str());
  364. }
  365. if (options & TYqlAction::WithFinalIssues) {
  366. prg->FinalizeIssues();
  367. }
  368. WriteStatus(noError, prg->Issues());
  369. }
  370. };
  371. ///////////////////////////////////////////////////////////////////////////////
  372. // run actions
  373. ///////////////////////////////////////////////////////////////////////////////
  374. YQL_ACTION(FileRun)
  375. void Perform(const TString& program, const TString& input, const TString& attr, ui32 options, const TString& parameters) {
  376. auto name = TStringBuf(Req.RD.ScriptName());
  377. TAutoPtr<TTableFileHolder> inputFile;
  378. TTempFile outputFile(MakeTempName());
  379. TTempFile outputFileAttr(outputFile.Name() + ".attr");
  380. TTempDir tmpDir;
  381. TProgramPtr prg = MakeFileProgram(program, input, attr, inputFile, outputFile, YqlServer, tmpDir.Name());
  382. bool noError = (options & TYqlAction::YqlProgram) ? prg->ParseYql() : prg->ParseSql(GetTranslationSettings(YqlServer.SqlFlags));
  383. prg->SetDiagnosticFormat(NYson::EYsonFormat::Pretty);
  384. prg->SetParametersYson(parameters);
  385. THolder<TStringStream> traceOut;
  386. THolder<TTempLogRedirector> logRedirector;
  387. if (options & EOptions::PrintTraceOpt) {
  388. traceOut.Reset(new TStringStream);
  389. logRedirector.Reset(new TTempLogRedirector(traceOut.Get()));
  390. }
  391. noError = noError && prg->Compile(GetUsername());
  392. TProgram::TStatus status = TProgram::TStatus::Error;
  393. if (noError) {
  394. auto config = TOptPipelineConfigurator(prg, traceOut.Get());
  395. if (name.Contains(TStringBuf("/lineage"))) {
  396. status = prg->LineageWithConfig(GetUsername(), config);
  397. } else {
  398. status = prg->RunWithConfig(GetUsername(), config);
  399. }
  400. }
  401. if (options & (EOptions::PrintAst | EOptions::PrintExpr)) {
  402. if (prg->ExprRoot()) {
  403. auto ast = ConvertToAst(*prg->ExprRoot(), prg->ExprCtx(), TExprAnnotationFlags::None, true);
  404. if (options & EOptions::PrintAst) {
  405. Writer.Write(TStringBuf("ast"));
  406. WriteAstTree(ast.Root);
  407. }
  408. if (options & EOptions::PrintExpr) {
  409. ui32 prettyFlg = TAstPrintFlags::PerLine | TAstPrintFlags::ShortQuote;
  410. Writer.Write(TStringBuf("expr"), ast.Root->ToString(prettyFlg));
  411. }
  412. }
  413. }
  414. auto diagnostics = prg->GetDiagnostics();
  415. if (diagnostics) {
  416. Cerr << *diagnostics;
  417. }
  418. if (!!traceOut && !traceOut->Str().empty()) {
  419. if (diagnostics) {
  420. traceOut->Write(*diagnostics);
  421. }
  422. Writer.Write(TStringBuf("opttrace"), traceOut->Str());
  423. }
  424. if (options & TYqlAction::WithFinalIssues) {
  425. prg->FinalizeIssues();
  426. }
  427. WriteStatus(status != TProgram::TStatus::Error, prg->Issues());
  428. if (status != TProgram::TStatus::Error) {
  429. // write output
  430. Writer.Write(TStringBuf("output"));
  431. Writer.OpenMap();
  432. if (TFileStat(outputFile.Name()).IsFile()) {
  433. TFileInput fileInput(outputFile.Name());
  434. NYT::TNode list = NYT::TNode::CreateList();
  435. NYT::TNodeBuilder builder(&list);
  436. NYson::TYsonParser parser(&builder, &fileInput, ::NYson::EYsonType::ListFragment);
  437. parser.Parse();
  438. std::set<TString> headers;
  439. for (auto& row: list.AsList()) {
  440. for (auto& val: row.AsMap()) {
  441. headers.insert(val.first);
  442. }
  443. }
  444. { // headers
  445. Writer.Write(TStringBuf("headers"));
  446. Writer.OpenArray();
  447. for (const auto& header : headers) {
  448. Writer.Write(header);
  449. }
  450. Writer.CloseArray();
  451. }
  452. { // rows
  453. Writer.Write(TStringBuf("rows"));
  454. Writer.OpenArray();
  455. for (auto& row: list.AsList()) {
  456. Writer.OpenArray();
  457. for (const auto& header : headers) {
  458. if (auto p = row.AsMap().FindPtr(header)) {
  459. if (p->IsString()) {
  460. const auto& str = p->AsString();
  461. Writer.Write(IsUtf8(str) ? str : EscapeC(str));
  462. } else {
  463. Writer.Write(NYT::NodeToYsonString(*p, NYson::EYsonFormat::Text));
  464. }
  465. } else {
  466. Writer.Write(TString());
  467. }
  468. }
  469. Writer.CloseArray();
  470. }
  471. Writer.CloseArray();
  472. }
  473. }
  474. Writer.CloseMap();
  475. }
  476. if (name.Contains(TStringBuf("/lineage"))) {
  477. if (auto data = prg->GetLineage()) {
  478. TString str;
  479. TStringOutput out(str);
  480. TStringInput in(*data);
  481. NYson::ReformatYsonStream(&in, &out, NYson::EYsonFormat::Pretty);
  482. Writer.Write(TStringBuf("results"), str);
  483. }
  484. } else {
  485. Writer.Write(TStringBuf("results"), prg->ResultsAsString());
  486. }
  487. }
  488. };
  489. } // namespace
  490. void TYqlServer::Start()
  491. {
  492. #ifdef _unix_
  493. ShutdownOn(SIGINT);
  494. ShutdownOn(SIGTERM);
  495. #endif
  496. bool started = HttpServer.Start();
  497. if (!started) {
  498. ythrow yexception() << "YqlServer not started. Error: "
  499. << HttpServer.GetErrorCode()
  500. << ": " << HttpServer.GetError();
  501. }
  502. }
  503. void TYqlServer::ShutdownOn(int signal)
  504. {
  505. #ifdef _unix_
  506. struct sigaction sa = {};
  507. sa.sa_handler = OnTerminate;
  508. sigfillset(&sa.sa_mask); // block every signal during the handler
  509. if (sigaction(signal, &sa, nullptr) < 0) {
  510. ythrow yexception() << "Error: cannot handle signal " << signal;
  511. }
  512. #else
  513. Y_UNUSED(signal);
  514. #endif
  515. }
  516. void TYqlServer::Wait()
  517. {
  518. #ifdef _unix_
  519. while (!Terminated) {
  520. sleep(1);
  521. }
  522. #else
  523. HttpServer.Wait();
  524. #endif
  525. }
  526. TAutoPtr<TYqlServer> CreateYqlServer(
  527. TServerConfig config,
  528. const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
  529. TUdfIndex::TPtr udfIndex,
  530. ui64 nextUniqueId,
  531. TUserDataTable filesMapping,
  532. THolder<TGatewaysConfig>&& gatewaysConfig,
  533. const THashSet<TString>& sqlFlags,
  534. IModuleResolver::TPtr modules,
  535. IUdfResolver::TPtr udfResolver,
  536. TFileStoragePtr fileStorage)
  537. {
  538. TAutoPtr<TYqlServer> server = new TYqlServer(
  539. config, functionRegistry, udfIndex, nextUniqueId,
  540. std::move(filesMapping), std::move(gatewaysConfig), sqlFlags, modules, udfResolver, fileStorage);
  541. server->RegisterAction<TYqlActionPaste>("/api/yql/paste");
  542. server->RegisterAction<TYqlActionParse>("/api/yql/parse");
  543. server->RegisterAction<TYqlActionCompile>("/api/yql/compile");
  544. server->RegisterAction<TYqlActionFormat>("/api/yql/format");
  545. server->RegisterAction<TYqlActionOptimizeOrValidateFile>("/api/yql/validate");
  546. server->RegisterAction<TYqlActionOptimizeOrValidateFile>("/api/yql/optimize");
  547. server->RegisterAction<TYqlActionOptimizeOrValidateFile>("/api/yql/peephole");
  548. server->RegisterServlet("/js/yql-functions.js", new TYqlFunctoinsServlet());
  549. server->RegisterAction<TYqlActionFileRun>("/api/yql/lineage");
  550. server->RegisterAction<TYqlActionFileRun>("/api/yql/run");
  551. server->RegisterServlet("/",
  552. new TAssetsServlet("/", config.GetAssetsPath(), "file-index.html"));
  553. return server;
  554. }
  555. } // namspace NHttp
  556. } // namspace NYql