#include #include #include class TContext { public: TContext() : NextInputPromise_(NThreading::NewPromise()) {} ~TContext() { UpdateNextInput(false); } NThreading::TFuture NextInput() { return NextInputPromise_.GetFuture(); } void UpdateNextInput(bool hasInput = true) { auto prevNextInputPromise = NextInputPromise_; NextInputPromise_ = NThreading::NewPromise(); prevNextInputPromise.SetValue(hasInput); } private: NThreading::TPromise NextInputPromise_; }; static void TestPureFutureChainSubscribe(benchmark::State& state) { TContext context; size_t cnt = 0; std::function&)> processInput = [&context, &cnt, &processInput](const NThreading::TFuture& hasInput) { if (hasInput.GetValue()) { benchmark::DoNotOptimize(++cnt); context.NextInput().Subscribe(processInput); } }; processInput(NThreading::MakeFuture(true)); for (auto _ : state) { context.UpdateNextInput(); } context.UpdateNextInput(false); } static void TestPureFutureChainApply(benchmark::State& state) { TContext context; size_t cnt = 0; std::function&)> processInput = [&context, &cnt, &processInput](const NThreading::TFuture& hasInput) { if (hasInput.GetValue()) { benchmark::DoNotOptimize(++cnt); context.NextInput().Apply(processInput); } }; processInput(NThreading::MakeFuture(true)); for (auto _ : state) { context.UpdateNextInput(); } context.UpdateNextInput(false); } static void TestCoroFutureChain(benchmark::State& state) { TContext context; size_t cnt = 0; auto coroutine = [&context, &cnt]() -> NThreading::TFuture { while (co_await context.NextInput()) { benchmark::DoNotOptimize(++cnt); } }; auto coroutineFuture = coroutine(); for (auto _ : state) { context.UpdateNextInput(); } context.UpdateNextInput(false); coroutineFuture.GetValueSync(); } BENCHMARK(TestPureFutureChainSubscribe); BENCHMARK(TestPureFutureChainApply); BENCHMARK(TestCoroFutureChain);