purebench.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. #include <library/cpp/svnversion/svnversion.h>
  2. #include <library/cpp/getopt/last_getopt.h>
  3. #include <contrib/ydb/library/yql/public/purecalc/purecalc.h>
  4. #include <contrib/ydb/library/yql/public/purecalc/io_specs/mkql/spec.h>
  5. #include <contrib/ydb/library/yql/public/purecalc/io_specs/arrow/spec.h>
  6. #include <contrib/ydb/library/yql/public/purecalc/helpers/stream/stream_from_vector.h>
  7. #include <yql/essentials/utils/log/log.h>
  8. #include <yql/essentials/utils/backtrace/backtrace.h>
  9. #include <yql/essentials/public/udf/arrow/util.h>
  10. #include <yql/essentials/public/udf/udf_registrator.h>
  11. #include <yql/essentials/public/udf/udf_version.h>
  12. #include <library/cpp/skiff/skiff.h>
  13. #include <library/cpp/yson/writer.h>
  14. #include <util/datetime/cputimer.h>
  15. #include <util/stream/file.h>
  16. #include <util/stream/format.h>
  17. #include <util/stream/null.h>
  18. #include <algorithm>
  19. #include <cmath>
  20. using namespace NYql;
  21. using namespace NYql::NPureCalc;
  22. TStringStream MakeGenInput(ui64 count) {
  23. TStringStream stream;
  24. NSkiff::TUncheckedSkiffWriter writer{&stream};
  25. for (ui64 i = 0; i < count; ++i) {
  26. writer.WriteVariant16Tag(0);
  27. writer.WriteInt64(i);
  28. }
  29. writer.Finish();
  30. return stream;
  31. }
  32. template <typename TInputSpec, typename TOutputSpec>
  33. using TRunCallable = std::function<void (const THolder<TPullListProgram<TInputSpec, TOutputSpec>>&)>;
  34. template <typename TOutputSpec>
  35. NYT::TNode RunGenSql(
  36. const IProgramFactoryPtr factory,
  37. const TVector<NYT::TNode>& inputSchema,
  38. const TString& sql,
  39. ETranslationMode isPg,
  40. TRunCallable<TSkiffInputSpec, TOutputSpec> runCallable
  41. ) {
  42. auto inputSpec = TSkiffInputSpec(inputSchema);
  43. auto outputSpec = TOutputSpec({NYT::TNode::CreateEntity()});
  44. auto program = factory->MakePullListProgram(inputSpec, outputSpec, sql, isPg);
  45. runCallable(program);
  46. return program->MakeOutputSchema();
  47. }
  48. template <typename TInputSpec, typename TStream>
  49. void ShowResults(
  50. const IProgramFactoryPtr factory,
  51. const TVector<NYT::TNode>& inputSchema,
  52. const TString& sql,
  53. ETranslationMode isPg,
  54. TStream* input
  55. ) {
  56. auto inputSpec = TInputSpec(inputSchema);
  57. auto outputSpec = TYsonOutputSpec({NYT::TNode::CreateEntity()});
  58. auto program = factory->MakePullListProgram(inputSpec, outputSpec, sql, isPg);
  59. auto handle = program->Apply(input);
  60. TStringStream output;
  61. handle->Run(&output);
  62. TStringInput in(output.Str());
  63. NYson::ReformatYsonStream(&in, &Cerr, NYson::EYsonFormat::Pretty, NYson::EYsonType::ListFragment);
  64. }
  65. template <typename TInputSpec, typename TOutputSpec>
  66. double RunBenchmarks(
  67. const IProgramFactoryPtr factory,
  68. const TVector<NYT::TNode>& inputSchema,
  69. const TString& sql,
  70. ETranslationMode isPg,
  71. ui32 repeats,
  72. TRunCallable<TInputSpec, TOutputSpec> runCallable
  73. ) {
  74. auto inputSpec = TInputSpec(inputSchema);
  75. auto outputSpec = TOutputSpec({NYT::TNode::CreateEntity()});
  76. auto program = factory->MakePullListProgram(inputSpec, outputSpec, sql, isPg);
  77. Cerr << "Dry run of test sql...\n";
  78. runCallable(program);
  79. Cerr << "Run benchmark...\n";
  80. TVector<TDuration> times;
  81. TSimpleTimer allTimer;
  82. for (ui32 i = 0; i < repeats; ++i) {
  83. TSimpleTimer timer;
  84. runCallable(program);
  85. times.push_back(timer.Get());
  86. }
  87. Cout << "Elapsed: " << allTimer.Get() << "\n";
  88. Sort(times);
  89. times.erase(times.end() - times.size() / 3, times.end());
  90. double sum = std::transform_reduce(times.cbegin(), times.cend(),
  91. .0, std::plus{}, [](auto t) { return std::log(t.MicroSeconds()); });
  92. return std::exp(sum / times.size());
  93. }
  94. int Main(int argc, const char *argv[])
  95. {
  96. Y_UNUSED(NUdf::GetStaticSymbols());
  97. using namespace NLastGetopt;
  98. TOpts opts = TOpts::Default();
  99. ui64 count;
  100. ui32 repeats;
  101. TString genSql, testSql;
  102. bool showResults;
  103. TString udfsDir;
  104. TString LLVMSettings;
  105. TString blockEngineSettings;
  106. TString exprFile;
  107. opts.AddHelpOption();
  108. opts.AddLongOption("ndebug", "should be at first argument, do not show debug info in error output").NoArgument();
  109. opts.AddLongOption('b', "blocks-engine", "Block engine settings").StoreResult(&blockEngineSettings).DefaultValue("disable");
  110. opts.AddLongOption('c', "count", "count of input rows").StoreResult(&count).DefaultValue(1000000);
  111. opts.AddLongOption('g', "gen-sql", "SQL query to generate data").StoreResult(&genSql).DefaultValue("select index from Input");
  112. opts.AddLongOption('t', "test-sql", "SQL query to test").StoreResult(&testSql).DefaultValue("select count(*) as count from Input");
  113. opts.AddLongOption('r', "repeats", "number of iterations").StoreResult(&repeats).DefaultValue(10);
  114. opts.AddLongOption('w', "show-results", "show results of test SQL").StoreResult(&showResults).DefaultValue(true);
  115. opts.AddLongOption("pg", "use PG syntax for generate query").NoArgument();
  116. opts.AddLongOption("pt", "use PG syntax for test query").NoArgument();
  117. opts.AddLongOption("udfs-dir", "directory with UDFs").StoreResult(&udfsDir).DefaultValue("");
  118. opts.AddLongOption("llvm-settings", "LLVM settings").StoreResult(&LLVMSettings).DefaultValue("");
  119. opts.AddLongOption("print-expr", "print rebuild AST before execution").NoArgument();
  120. opts.AddLongOption("expr-file", "print AST to that file instead of stdout").StoreResult(&exprFile);
  121. opts.SetFreeArgsMax(0);
  122. TOptsParseResult res(&opts, argc, argv);
  123. auto factoryOptions = TProgramFactoryOptions();
  124. factoryOptions.SetUDFsDir(udfsDir);
  125. factoryOptions.SetLLVMSettings(LLVMSettings);
  126. factoryOptions.SetBlockEngineSettings(blockEngineSettings);
  127. IOutputStream* exprOut = nullptr;
  128. THolder<TFixedBufferFileOutput> exprFileHolder;
  129. if (res.Has("print-expr")) {
  130. exprOut = &Cout;
  131. } else if (!exprFile.empty()) {
  132. exprFileHolder.Reset(new TFixedBufferFileOutput(exprFile));
  133. exprOut = exprFileHolder.Get();
  134. }
  135. factoryOptions.SetExprOutputStream(exprOut);
  136. auto factory = MakeProgramFactory(factoryOptions);
  137. NYT::TNode members{NYT::TNode::CreateList()};
  138. auto typeNode = NYT::TNode::CreateList()
  139. .Add("DataType")
  140. .Add("Int64");
  141. members.Add(NYT::TNode::CreateList()
  142. .Add("index")
  143. .Add(typeNode));
  144. NYT::TNode schema = NYT::TNode::CreateList()
  145. .Add("StructType")
  146. .Add(members);
  147. auto inputGenSchema = TVector<NYT::TNode>{schema};
  148. auto inputGenStream = MakeGenInput(count);
  149. Cerr << "Input data size: " << inputGenStream.Size() << "\n";
  150. ETranslationMode isPgGen = res.Has("pg") ? ETranslationMode::PG : ETranslationMode::SQL;
  151. ETranslationMode isPgTest = res.Has("pt") ? ETranslationMode::PG : ETranslationMode::SQL;
  152. double normalizedTime;
  153. size_t inputBenchSize;
  154. if (blockEngineSettings == "disable") {
  155. TStringStream outputGenStream;
  156. auto outputGenSchema = RunGenSql<TSkiffOutputSpec>(
  157. factory, inputGenSchema, genSql, isPgGen,
  158. [&](const auto& program) {
  159. auto handle = program->Apply(&inputGenStream);
  160. handle->Run(&outputGenStream);
  161. Cerr << "Generated data size: " << outputGenStream.Size() << "\n";
  162. });
  163. if (showResults) {
  164. auto inputResStream = TStringStream(outputGenStream);
  165. ShowResults<TSkiffInputSpec>(
  166. factory, {outputGenSchema}, testSql, isPgTest, &inputResStream);
  167. }
  168. inputBenchSize = outputGenStream.Size();
  169. normalizedTime = RunBenchmarks<TSkiffInputSpec, TSkiffOutputSpec>(
  170. factory, {outputGenSchema}, testSql, isPgTest, repeats,
  171. [&](const auto& program) {
  172. auto inputBorrowed = TStringStream(outputGenStream);
  173. auto handle = program->Apply(&inputBorrowed);
  174. TNullOutput output;
  175. handle->Run(&output);
  176. });
  177. } else {
  178. auto inputGenSpec = TSkiffInputSpec(inputGenSchema);
  179. auto outputGenSpec = TArrowOutputSpec({NYT::TNode::CreateEntity()});
  180. // XXX: <RunGenSql> cannot be used for this case, since all buffers
  181. // from the Datums in the obtained batches are owned by the worker's
  182. // allocator. Hence, the program (i.e. worker) object should be created
  183. // at the very beginning of the block, or at least prior to all the
  184. // temporary batch storages (mind outputGenStream below).
  185. auto program = factory->MakePullListProgram(
  186. inputGenSpec, outputGenSpec, genSql, isPgGen);
  187. auto handle = program->Apply(&inputGenStream);
  188. auto outputGenSchema = program->MakeOutputSchema();
  189. TVector<arrow::compute::ExecBatch> outputGenStream;
  190. while (arrow::compute::ExecBatch* batch = handle->Fetch()) {
  191. outputGenStream.push_back(*batch);
  192. }
  193. ui64 outputGenSize = std::transform_reduce(
  194. outputGenStream.cbegin(), outputGenStream.cend(),
  195. 0l, std::plus{}, [](const auto& b) {
  196. return NYql::NUdf::GetSizeOfArrowExecBatchInBytes(b);
  197. });
  198. Cerr << "Generated data size: " << outputGenSize << "\n";
  199. if (showResults) {
  200. auto inputResStreamHolder = StreamFromVector(outputGenStream);
  201. auto inputResStream = inputResStreamHolder.Get();
  202. ShowResults<TArrowInputSpec>(
  203. factory, {outputGenSchema}, testSql, isPgTest, inputResStream);
  204. }
  205. inputBenchSize = outputGenSize;
  206. normalizedTime = RunBenchmarks<TArrowInputSpec, TArrowOutputSpec>(
  207. factory, {outputGenSchema}, testSql, isPgTest, repeats,
  208. [&](const auto& program) {
  209. auto handle = program->Apply(StreamFromVector(outputGenStream));
  210. while (arrow::compute::ExecBatch* batch = handle->Fetch()) {}
  211. });
  212. }
  213. Cout << "Bench score: " << Prec(inputBenchSize / normalizedTime, 4) << "\n";
  214. NLog::CleanupLogger();
  215. return 0;
  216. }
  217. int main(int argc, const char *argv[]) {
  218. if (argc > 1 && TString(argv[1]) != TStringBuf("--ndebug")) {
  219. Cerr << "purebench ABI version: " << NKikimr::NUdf::CurrentAbiVersionStr() << Endl;
  220. }
  221. NYql::NBacktrace::RegisterKikimrFatalActions();
  222. NYql::NBacktrace::EnableKikimrSymbolize();
  223. try {
  224. return Main(argc, argv);
  225. } catch (const TCompileError& e) {
  226. Cerr << e.what() << "\n" << e.GetIssues();
  227. } catch (...) {
  228. Cerr << CurrentExceptionMessage() << Endl;
  229. return 1;
  230. }
  231. }