purebench.cpp 18 KB


  1. #include <library/cpp/svnversion/svnversion.h>
  2. #include <library/cpp/getopt/last_getopt.h>
  3. #include <yql/essentials/public/purecalc/purecalc.h>
  4. #include <yql/essentials/public/purecalc/io_specs/arrow/spec.h>
  5. #include <yql/essentials/public/purecalc/helpers/stream/stream_from_vector.h>
  6. #include <yql/essentials/utils/yql_panic.h>
  7. #include <yql/essentials/utils/log/log.h>
  8. #include <yql/essentials/utils/backtrace/backtrace.h>
  9. #include <yql/essentials/public/udf/arrow/util.h>
  10. #include <yql/essentials/public/udf/udf_registrator.h>
  11. #include <yql/essentials/public/udf/udf_version.h>
  12. #include <yql/essentials/minikql/mkql_alloc.h>
  13. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  14. #include <yql/essentials/minikql/computation/mkql_custom_list.h>
  15. #include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
  16. #include <yql/essentials/providers/common/codec/yql_codec.h>
  17. #include <yql/essentials/providers/common/schema/mkql/yql_mkql_schema.h>
  18. #include <library/cpp/yson/writer.h>
  19. #include <util/datetime/cputimer.h>
  20. #include <util/stream/file.h>
  21. #include <util/stream/format.h>
  22. #include <util/stream/null.h>
  23. #include <algorithm>
  24. #include <cmath>
  25. using namespace NYql;
  26. using namespace NYql::NPureCalc;
  27. using namespace NKikimr::NMiniKQL;
  28. using namespace NYql::NUdf;
  29. struct TPickleInputSpec : public TInputSpecBase {
  30. TPickleInputSpec(const TVector<NYT::TNode>& schemas)
  31. : Schemas(schemas)
  32. {}
  33. const TVector<NYT::TNode>& GetSchemas() const final {
  34. return Schemas;
  35. }
  36. const TVector<NYT::TNode> Schemas;
  37. };
  38. class TPickleListValue final: public TCustomListValue {
  39. public:
  40. TPickleListValue(
  41. TMemoryUsageInfo* memInfo,
  42. const TPickleInputSpec& /* inputSpec */,
  43. ui32 index,
  44. IInputStream* underlying,
  45. IWorker* worker
  46. )
  47. : TCustomListValue(memInfo)
  48. , Underlying_(underlying)
  49. , Worker_(worker)
  50. , ScopedAlloc_(Worker_->GetScopedAlloc())
  51. , Packer_(false, Worker_->GetInputType(index))
  52. {
  53. }
  54. TUnboxedValue GetListIterator() const override {
  55. YQL_ENSURE(!HasIterator_, "Only one pass over input is supported");
  56. HasIterator_ = true;
  57. return TUnboxedValuePod(const_cast<TPickleListValue*>(this));
  58. }
  59. bool Next(TUnboxedValue& result) override {
  60. ui32 len;
  61. auto read = Underlying_->Load(&len, sizeof(len));
  62. if (!read) {
  63. return false;
  64. }
  65. YQL_ENSURE(read == sizeof(len));
  66. if (len > RecordBuffer_.size()) {
  67. RecordBuffer_.resize(Max<size_t>(2*RecordBuffer_.size(), len));
  68. }
  69. Underlying_->LoadOrFail(RecordBuffer_.data(), len);
  70. result = Packer_.Unpack(TStringBuf(RecordBuffer_.data(), len), Worker_->GetGraph().GetHolderFactory());
  71. return true;
  72. }
  73. private:
  74. mutable bool HasIterator_ = false;
  75. IInputStream* Underlying_;
  76. IWorker* Worker_;
  77. TScopedAlloc& ScopedAlloc_;
  78. TValuePackerGeneric<true> Packer_;
  79. TVector<char> RecordBuffer_;
  80. };
  81. template <>
  82. struct TInputSpecTraits<TPickleInputSpec> {
  83. static const constexpr bool IsPartial = false;
  84. static const constexpr bool SupportPullStreamMode = false;
  85. static const constexpr bool SupportPullListMode = true;
  86. static const constexpr bool SupportPushStreamMode = false;
  87. static void PreparePullListWorker(const TPickleInputSpec& spec, IPullListWorker* worker, IInputStream* stream) {
  88. PreparePullListWorker(spec, worker, TVector<IInputStream*>({stream}));
  89. }
  90. static void PreparePullListWorker(const TPickleInputSpec& spec, IPullListWorker* worker, const TVector<IInputStream*>& streams) {
  91. YQL_ENSURE(worker->GetInputsCount() == streams.size(),
  92. "number of input streams should match number of inputs provided by spec");
  93. with_lock(worker->GetScopedAlloc()) {
  94. auto& holderFactory = worker->GetGraph().GetHolderFactory();
  95. for (ui32 i = 0; i < streams.size(); i++) {
  96. auto input = holderFactory.template Create<TPickleListValue>(
  97. spec, i, std::move(streams[i]), worker);
  98. worker->SetInput(std::move(input), i);
  99. }
  100. }
  101. }
  102. };
  103. struct TPickleOutputSpec : public TOutputSpecBase {
  104. TPickleOutputSpec(const NYT::TNode& schema)
  105. : Schema(schema)
  106. {}
  107. const NYT::TNode& GetSchema() const final {
  108. return Schema;
  109. }
  110. const NYT::TNode Schema;
  111. };
  112. class TStreamOutputHandle: private TMoveOnly {
  113. public:
  114. virtual NKikimr::NMiniKQL::TType* GetOutputType() const = 0;
  115. virtual void Run(IOutputStream*) = 0;
  116. virtual ~TStreamOutputHandle() = default;
  117. };
  118. class TPickleOutputHandle final: public TStreamOutputHandle {
  119. public:
  120. TPickleOutputHandle(TWorkerHolder<IPullListWorker> worker)
  121. : Worker_(std::move(worker))
  122. , Packer_(false, Worker_->GetOutputType())
  123. {}
  124. NKikimr::NMiniKQL::TType* GetOutputType() const final {
  125. return const_cast<NKikimr::NMiniKQL::TType*>(Worker_->GetOutputType());
  126. }
  127. void Run(IOutputStream* stream) final {
  128. Y_ENSURE(
  129. Worker_->GetOutputType()->IsStruct(),
  130. "Run(IOutputStream*) cannot be used with multi-output programs");
  131. TBindTerminator bind(Worker_->GetGraph().GetTerminator());
  132. with_lock(Worker_->GetScopedAlloc()) {
  133. const auto outputIterator = Worker_->GetOutputIterator();
  134. TUnboxedValue value;
  135. while (outputIterator.Next(value)) {
  136. auto buf = Packer_.Pack(value);
  137. ui32 len = buf.Size();
  138. stream->Write(&len, sizeof(len));
  139. stream->Write(buf.Data(), len);
  140. }
  141. }
  142. }
  143. private:
  144. TWorkerHolder<IPullListWorker> Worker_;
  145. TValuePackerGeneric<true> Packer_;
  146. };
  147. template <>
  148. struct TOutputSpecTraits<TPickleOutputSpec> {
  149. static const constexpr bool IsPartial = false;
  150. static const constexpr bool SupportPullStreamMode = false;
  151. static const constexpr bool SupportPullListMode = true;
  152. static const constexpr bool SupportPushStreamMode = false;
  153. using TPullListReturnType = THolder<TPickleOutputHandle>;
  154. static TPullListReturnType ConvertPullListWorkerToOutputType(const TPickleOutputSpec&, TWorkerHolder<IPullListWorker> worker) {
  155. return MakeHolder<TPickleOutputHandle>(std::move(worker));
  156. }
  157. };
  158. struct TPrintOutputSpec : public TOutputSpecBase {
  159. TPrintOutputSpec(const NYT::TNode& schema)
  160. : Schema(schema)
  161. {}
  162. const NYT::TNode& GetSchema() const final {
  163. return Schema;
  164. }
  165. const NYT::TNode Schema;
  166. };
  167. class TPrintOutputHandle final: public TStreamOutputHandle {
  168. public:
  169. TPrintOutputHandle(TWorkerHolder<IPullListWorker> worker)
  170. : Worker_(std::move(worker))
  171. {}
  172. NKikimr::NMiniKQL::TType* GetOutputType() const final {
  173. return const_cast<NKikimr::NMiniKQL::TType*>(Worker_->GetOutputType());
  174. }
  175. void Run(IOutputStream* stream) final {
  176. Y_ENSURE(
  177. Worker_->GetOutputType()->IsStruct(),
  178. "Run(IOutputStream*) cannot be used with multi-output programs");
  179. TBindTerminator bind(Worker_->GetGraph().GetTerminator());
  180. with_lock(Worker_->GetScopedAlloc()) {
  181. const auto outputIterator = Worker_->GetOutputIterator();
  182. TUnboxedValue value;
  183. while (outputIterator.Next(value)) {
  184. auto str = NCommon::WriteYsonValue(value, GetOutputType());
  185. stream->Write(str.data(), str.size());
  186. stream->Write(';');
  187. }
  188. }
  189. }
  190. private:
  191. TWorkerHolder<IPullListWorker> Worker_;
  192. };
  193. template <>
  194. struct TOutputSpecTraits<TPrintOutputSpec> {
  195. static const constexpr bool IsPartial = false;
  196. static const constexpr bool SupportPullStreamMode = false;
  197. static const constexpr bool SupportPullListMode = true;
  198. static const constexpr bool SupportPushStreamMode = false;
  199. using TPullListReturnType = THolder<TPrintOutputHandle>;
  200. static TPullListReturnType ConvertPullListWorkerToOutputType(const TPrintOutputSpec&, TWorkerHolder<IPullListWorker> worker) {
  201. return MakeHolder<TPrintOutputHandle>(std::move(worker));
  202. }
  203. };
  204. TStringStream MakeGenInput(ui64 count) {
  205. TStringStream stream;
  206. TScopedAlloc alloc(__LOCATION__);
  207. TTypeEnvironment env(alloc);
  208. TMemoryUsageInfo memInfo("MakeGenInput");
  209. THolderFactory holderFactory(alloc.Ref(), memInfo);
  210. auto ui64Type = env.GetUi64Lazy();
  211. std::pair<TString, NKikimr::NMiniKQL::TType*> member("index", ui64Type);
  212. auto ui64StructType = TStructType::Create(&member, 1, env);
  213. TValuePackerGeneric<true> packer(false, ui64StructType);
  214. TPlainContainerCache cache;
  215. for (ui64 i = 0; i < count; ++i) {
  216. TUnboxedValue* items;
  217. auto array = cache.NewArray(holderFactory, 1, items);
  218. items[0] = TUnboxedValuePod(i);
  219. auto buf = packer.Pack(array);
  220. ui32 len = buf.Size();
  221. stream.Write(&len, sizeof(len));
  222. stream.Write(buf.Data(), len);
  223. }
  224. return stream;
  225. }
  226. template <typename TInputSpec, typename TOutputSpec>
  227. using TRunCallable = std::function<void (const THolder<TPullListProgram<TInputSpec, TOutputSpec>>&)>;
  228. template <typename TOutputSpec>
  229. NYT::TNode RunGenSql(
  230. const IProgramFactoryPtr factory,
  231. const TVector<NYT::TNode>& inputSchema,
  232. const TString& sql,
  233. ETranslationMode isPg,
  234. TRunCallable<TPickleInputSpec, TOutputSpec> runCallable
  235. ) {
  236. auto inputSpec = TPickleInputSpec(inputSchema);
  237. auto outputSpec = TOutputSpec({NYT::TNode::CreateEntity()});
  238. auto program = factory->MakePullListProgram(inputSpec, outputSpec, sql, isPg);
  239. runCallable(program);
  240. return program->MakeOutputSchema();
  241. }
  242. template <typename TInputSpec, typename TStream>
  243. void ShowResults(
  244. const IProgramFactoryPtr factory,
  245. const TVector<NYT::TNode>& inputSchema,
  246. const TString& sql,
  247. ETranslationMode isPg,
  248. TStream* input
  249. ) {
  250. auto inputSpec = TInputSpec(inputSchema);
  251. auto outputSpec = TPrintOutputSpec({NYT::TNode::CreateEntity()});
  252. auto program = factory->MakePullListProgram(inputSpec, outputSpec, sql, isPg);
  253. auto handle = program->Apply(input);
  254. TStringStream output;
  255. output << "{Type=";
  256. output << NCommon::WriteTypeToYson(handle->GetOutputType());
  257. output << ";Data=[";
  258. handle->Run(&output);
  259. output << "]}";
  260. TStringInput in(output.Str());
  261. NYson::ReformatYsonStream(&in, &Cerr, NYson::EYsonFormat::Pretty, NYson::EYsonType::Node);
  262. Cerr << "\n";
  263. }
  264. template <typename TInputSpec, typename TOutputSpec>
  265. double RunBenchmarks(
  266. const IProgramFactoryPtr factory,
  267. const TVector<NYT::TNode>& inputSchema,
  268. const TString& sql,
  269. ETranslationMode isPg,
  270. ui32 repeats,
  271. TRunCallable<TInputSpec, TOutputSpec> runCallable
  272. ) {
  273. auto inputSpec = TInputSpec(inputSchema);
  274. auto outputSpec = TOutputSpec({NYT::TNode::CreateEntity()});
  275. auto program = factory->MakePullListProgram(inputSpec, outputSpec, sql, isPg);
  276. Cerr << "Dry run of test sql...\n";
  277. runCallable(program);
  278. Cerr << "Run benchmark...\n";
  279. TVector<TDuration> times;
  280. TSimpleTimer allTimer;
  281. for (ui32 i = 0; i < repeats; ++i) {
  282. TSimpleTimer timer;
  283. runCallable(program);
  284. times.push_back(timer.Get());
  285. }
  286. Cout << "Elapsed: " << allTimer.Get() << "\n";
  287. Sort(times);
  288. times.erase(times.end() - times.size() / 3, times.end());
  289. double sum = std::transform_reduce(times.cbegin(), times.cend(),
  290. .0, std::plus{}, [](auto t) { return std::log(t.MicroSeconds()); });
  291. return std::exp(sum / times.size());
  292. }
  293. int Main(int argc, const char *argv[])
  294. {
  295. Y_UNUSED(NUdf::GetStaticSymbols());
  296. using namespace NLastGetopt;
  297. TOpts opts = TOpts::Default();
  298. ui64 count;
  299. ui32 repeats;
  300. TString genSql, testSql;
  301. bool showResults;
  302. TString udfsDir;
  303. TString LLVMSettings;
  304. TString blockEngineSettings;
  305. TString exprFile;
  306. opts.AddHelpOption();
  307. opts.AddLongOption("ndebug", "should be at first argument, do not show debug info in error output").NoArgument();
  308. opts.AddLongOption('b', "blocks-engine", "Block engine settings").StoreResult(&blockEngineSettings).DefaultValue("disable");
  309. opts.AddLongOption('c', "count", "count of input rows").StoreResult(&count).DefaultValue(1000000);
  310. opts.AddLongOption('g', "gen-sql", "SQL query to generate data").StoreResult(&genSql).DefaultValue("select index from Input");
  311. opts.AddLongOption('t', "test-sql", "SQL query to test").StoreResult(&testSql).DefaultValue("select count(*) as count from Input");
  312. opts.AddLongOption('r', "repeats", "number of iterations").StoreResult(&repeats).DefaultValue(10);
  313. opts.AddLongOption('w', "show-results", "show results of test SQL").StoreResult(&showResults).DefaultValue(true);
  314. opts.AddLongOption("pg", "use PG syntax for generate query").NoArgument();
  315. opts.AddLongOption("pt", "use PG syntax for test query").NoArgument();
  316. opts.AddLongOption("udfs-dir", "directory with UDFs").StoreResult(&udfsDir).DefaultValue("");
  317. opts.AddLongOption("llvm-settings", "LLVM settings").StoreResult(&LLVMSettings).DefaultValue("");
  318. opts.AddLongOption("print-expr", "print rebuild AST before execution").NoArgument();
  319. opts.AddLongOption("expr-file", "print AST to that file instead of stdout").StoreResult(&exprFile);
  320. opts.SetFreeArgsMax(0);
  321. TOptsParseResult res(&opts, argc, argv);
  322. auto factoryOptions = TProgramFactoryOptions();
  323. factoryOptions.SetUDFsDir(udfsDir);
  324. factoryOptions.SetLLVMSettings(LLVMSettings);
  325. factoryOptions.SetBlockEngineSettings(blockEngineSettings);
  326. IOutputStream* exprOut = nullptr;
  327. THolder<TFixedBufferFileOutput> exprFileHolder;
  328. if (res.Has("print-expr")) {
  329. exprOut = &Cout;
  330. } else if (!exprFile.empty()) {
  331. exprFileHolder.Reset(new TFixedBufferFileOutput(exprFile));
  332. exprOut = exprFileHolder.Get();
  333. }
  334. factoryOptions.SetExprOutputStream(exprOut);
  335. auto factory = MakeProgramFactory(factoryOptions);
  336. NYT::TNode members{NYT::TNode::CreateList()};
  337. auto typeNode = NYT::TNode::CreateList()
  338. .Add("DataType")
  339. .Add("Int64");
  340. members.Add(NYT::TNode::CreateList()
  341. .Add("index")
  342. .Add(typeNode));
  343. NYT::TNode schema = NYT::TNode::CreateList()
  344. .Add("StructType")
  345. .Add(members);
  346. auto inputGenSchema = TVector<NYT::TNode>{schema};
  347. auto inputGenStream = MakeGenInput(count);
  348. Cerr << "Input data size: " << inputGenStream.Size() << "\n";
  349. ETranslationMode isPgGen = res.Has("pg") ? ETranslationMode::PG : ETranslationMode::SQL;
  350. ETranslationMode isPgTest = res.Has("pt") ? ETranslationMode::PG : ETranslationMode::SQL;
  351. double normalizedTime;
  352. size_t inputBenchSize;
  353. if (blockEngineSettings == "disable") {
  354. TStringStream outputGenStream;
  355. auto outputGenSchema = RunGenSql<TPickleOutputSpec>(
  356. factory, inputGenSchema, genSql, isPgGen,
  357. [&](const auto& program) {
  358. auto handle = program->Apply(&inputGenStream);
  359. handle->Run(&outputGenStream);
  360. Cerr << "Generated data size: " << outputGenStream.Size() << "\n";
  361. });
  362. if (showResults) {
  363. auto inputResStream = TStringStream(outputGenStream);
  364. ShowResults<TPickleInputSpec>(
  365. factory, {outputGenSchema}, testSql, isPgTest, &inputResStream);
  366. }
  367. inputBenchSize = outputGenStream.Size();
  368. normalizedTime = RunBenchmarks<TPickleInputSpec, TPickleOutputSpec>(
  369. factory, {outputGenSchema}, testSql, isPgTest, repeats,
  370. [&](const auto& program) {
  371. auto inputBorrowed = TStringStream(outputGenStream);
  372. auto handle = program->Apply(&inputBorrowed);
  373. TNullOutput output;
  374. handle->Run(&output);
  375. });
  376. } else {
  377. auto inputGenSpec = TPickleInputSpec(inputGenSchema);
  378. auto outputGenSpec = TArrowOutputSpec({NYT::TNode::CreateEntity()});
  379. // XXX: <RunGenSql> cannot be used for this case, since all buffers
  380. // from the Datums in the obtained batches are owned by the worker's
  381. // allocator. Hence, the program (i.e. worker) object should be created
  382. // at the very beginning of the block, or at least prior to all the
  383. // temporary batch storages (mind outputGenStream below).
  384. auto program = factory->MakePullListProgram(
  385. inputGenSpec, outputGenSpec, genSql, isPgGen);
  386. auto handle = program->Apply(&inputGenStream);
  387. auto outputGenSchema = program->MakeOutputSchema();
  388. TVector<arrow::compute::ExecBatch> outputGenStream;
  389. while (arrow::compute::ExecBatch* batch = handle->Fetch()) {
  390. outputGenStream.push_back(*batch);
  391. }
  392. ui64 outputGenSize = std::transform_reduce(
  393. outputGenStream.cbegin(), outputGenStream.cend(),
  394. 0l, std::plus{}, [](const auto& b) {
  395. return NYql::NUdf::GetSizeOfArrowExecBatchInBytes(b);
  396. });
  397. Cerr << "Generated data size: " << outputGenSize << "\n";
  398. if (showResults) {
  399. auto inputResStreamHolder = StreamFromVector(outputGenStream);
  400. auto inputResStream = inputResStreamHolder.Get();
  401. ShowResults<TArrowInputSpec>(
  402. factory, {outputGenSchema}, testSql, isPgTest, inputResStream);
  403. }
  404. inputBenchSize = outputGenSize;
  405. normalizedTime = RunBenchmarks<TArrowInputSpec, TArrowOutputSpec>(
  406. factory, {outputGenSchema}, testSql, isPgTest, repeats,
  407. [&](const auto& program) {
  408. auto handle = program->Apply(StreamFromVector(outputGenStream));
  409. while (arrow::compute::ExecBatch* batch = handle->Fetch()) {}
  410. });
  411. }
  412. Cout << "Bench score: " << Prec(inputBenchSize / normalizedTime, 4) << "\n";
  413. NLog::CleanupLogger();
  414. return 0;
  415. }
  416. int main(int argc, const char *argv[]) {
  417. if (argc > 1 && TString(argv[1]) != TStringBuf("--ndebug")) {
  418. Cerr << "purebench ABI version: " << NKikimr::NUdf::CurrentAbiVersionStr() << Endl;
  419. }
  420. NYql::NBacktrace::RegisterKikimrFatalActions();
  421. NYql::NBacktrace::EnableKikimrSymbolize();
  422. try {
  423. return Main(argc, argv);
  424. } catch (const TCompileError& e) {
  425. Cerr << e.what() << "\n" << e.GetIssues();
  426. } catch (...) {
  427. Cerr << CurrentExceptionMessage() << Endl;
  428. return 1;
  429. }
  430. }