123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713 |
- #include "simple_proto.h"
- #include <library/cpp/messagebus/test/perftest/messages.pb.h>
- #include <library/cpp/messagebus/text_utils.h>
- #include <library/cpp/messagebus/thread_extra.h>
- #include <library/cpp/messagebus/ybus.h>
- #include <library/cpp/messagebus/oldmodule/module.h>
- #include <library/cpp/messagebus/protobuf/ybusbuf.h>
- #include <library/cpp/messagebus/www/www.h>
- #include <library/cpp/deprecated/threadable/threadable.h>
- #include <library/cpp/execprofile/profile.h>
- #include <library/cpp/getopt/opt.h>
- #include <library/cpp/lwtrace/start.h>
- #include <library/cpp/sighandler/async_signals_handler.h>
- #include <library/cpp/threading/future/legacy_future.h>
- #include <util/generic/ptr.h>
- #include <util/generic/string.h>
- #include <util/generic/vector.h>
- #include <util/generic/yexception.h>
- #include <util/random/random.h>
- #include <util/stream/file.h>
- #include <util/stream/output.h>
- #include <util/stream/str.h>
- #include <util/string/split.h>
- #include <util/system/event.h>
- #include <util/system/sysstat.h>
- #include <util/system/thread.h>
- #include <util/thread/lfqueue.h>
- #include <signal.h>
- #include <stdlib.h>
- using namespace NBus;
- ///////////////////////////////////////////////////////
- /// \brief Configuration parameters of the test
- const int DEFAULT_PORT = 55666;
- struct TPerftestConfig {
- TString Nodes; ///< node1:port1,node2:port2
- int ClientCount;
- int MessageSize; ///< size of message to send
- int Delay; ///< server delay (milliseconds)
- float Failure; ///< simulated failure rate
- int ServerPort;
- int Run;
- bool ServerUseModules;
- bool ExecuteOnMessageInWorkerPool;
- bool ExecuteOnReplyInWorkerPool;
- bool UseCompression;
- bool Profile;
- unsigned WwwPort;
- TPerftestConfig();
- void Print() {
- fprintf(stderr, "ClientCount=%d\n", ClientCount);
- fprintf(stderr, "ServerPort=%d\n", ServerPort);
- fprintf(stderr, "Delay=%d usecs\n", Delay);
- fprintf(stderr, "MessageSize=%d bytes\n", MessageSize);
- fprintf(stderr, "Failure=%.3f%%\n", Failure * 100.0);
- fprintf(stderr, "Runtime=%d seconds\n", Run);
- fprintf(stderr, "ServerUseModules=%s\n", ServerUseModules ? "true" : "false");
- fprintf(stderr, "ExecuteOnMessageInWorkerPool=%s\n", ExecuteOnMessageInWorkerPool ? "true" : "false");
- fprintf(stderr, "ExecuteOnReplyInWorkerPool=%s\n", ExecuteOnReplyInWorkerPool ? "true" : "false");
- fprintf(stderr, "UseCompression=%s\n", UseCompression ? "true" : "false");
- fprintf(stderr, "Profile=%s\n", Profile ? "true" : "false");
- fprintf(stderr, "WwwPort=%u\n", WwwPort);
- }
- };
- extern TPerftestConfig* TheConfig;
- extern bool TheExit;
- TVector<TNetAddr> ServerAddresses;
- struct TConfig {
- TBusQueueConfig ServerQueueConfig;
- TBusQueueConfig ClientQueueConfig;
- TBusServerSessionConfig ServerSessionConfig;
- TBusClientSessionConfig ClientSessionConfig;
- bool SimpleProtocol;
- private:
- void ConfigureDefaults(TBusQueueConfig& config) {
- config.NumWorkers = 4;
- }
- void ConfigureDefaults(TBusSessionConfig& config) {
- config.MaxInFlight = 10000;
- config.SendTimeout = TDuration::Seconds(20).MilliSeconds();
- config.TotalTimeout = TDuration::Seconds(60).MilliSeconds();
- }
- public:
- TConfig()
- : SimpleProtocol(false)
- {
- ConfigureDefaults(ServerQueueConfig);
- ConfigureDefaults(ClientQueueConfig);
- ConfigureDefaults(ServerSessionConfig);
- ConfigureDefaults(ClientSessionConfig);
- }
- void Print() {
- // TODO: do not print server if only client and vice verse
- Cerr << "server queue config:\n";
- Cerr << IndentText(ServerQueueConfig.PrintToString());
- Cerr << "server session config:" << Endl;
- Cerr << IndentText(ServerSessionConfig.PrintToString());
- Cerr << "client queue config:\n";
- Cerr << IndentText(ClientQueueConfig.PrintToString());
- Cerr << "client session config:" << Endl;
- Cerr << IndentText(ClientSessionConfig.PrintToString());
- Cerr << "simple protocol: " << SimpleProtocol << "\n";
- }
- };
- TConfig Config;
- ////////////////////////////////////////////////////////////////
- /// \brief Fast message
- using TPerftestRequest = TBusBufferMessage<TPerftestRequestRecord, 77>;
- using TPerftestResponse = TBusBufferMessage<TPerftestResponseRecord, 79>;
- static size_t RequestSize() {
- return RandomNumber<size_t>(TheConfig->MessageSize * 2 + 1);
- }
- TAutoPtr<TBusMessage> NewRequest() {
- if (Config.SimpleProtocol) {
- TAutoPtr<TSimpleMessage> r(new TSimpleMessage);
- r->SetCompressed(TheConfig->UseCompression);
- r->Payload = 10;
- return r.Release();
- } else {
- TAutoPtr<TPerftestRequest> r(new TPerftestRequest);
- r->SetCompressed(TheConfig->UseCompression);
- // TODO: use random content for better compression test
- r->Record.SetData(TString(RequestSize(), '?'));
- return r.Release();
- }
- }
- void CheckRequest(TPerftestRequest* request) {
- const TString& data = request->Record.GetData();
- for (size_t i = 0; i != data.size(); ++i) {
- Y_ABORT_UNLESS(data.at(i) == '?', "must be question mark");
- }
- }
- TAutoPtr<TPerftestResponse> NewResponse(TPerftestRequest* request) {
- TAutoPtr<TPerftestResponse> r(new TPerftestResponse);
- r->SetCompressed(TheConfig->UseCompression);
- r->Record.SetData(TString(request->Record.GetData().size(), '.'));
- return r;
- }
- void CheckResponse(TPerftestResponse* response) {
- const TString& data = response->Record.GetData();
- for (size_t i = 0; i != data.size(); ++i) {
- Y_ABORT_UNLESS(data.at(i) == '.', "must be dot");
- }
- }
- ////////////////////////////////////////////////////////////////////
- /// \brief Fast protocol that common between client and server
- class TPerftestProtocol: public TBusBufferProtocol {
- public:
- TPerftestProtocol()
- : TBusBufferProtocol("TPerftestProtocol", TheConfig->ServerPort)
- {
- RegisterType(new TPerftestRequest);
- RegisterType(new TPerftestResponse);
- }
- };
- class TPerftestServer;
- class TPerftestUsingModule;
- class TPerftestClient;
- struct TTestStats {
- TInstant Start;
- TAtomic Messages;
- TAtomic Errors;
- TAtomic Replies;
- void IncMessage() {
- AtomicIncrement(Messages);
- }
- void IncReplies() {
- AtomicDecrement(Messages);
- AtomicIncrement(Replies);
- }
- int NumMessage() {
- return AtomicGet(Messages);
- }
- void IncErrors() {
- AtomicDecrement(Messages);
- AtomicIncrement(Errors);
- }
- int NumErrors() {
- return AtomicGet(Errors);
- }
- int NumReplies() {
- return AtomicGet(Replies);
- }
- double GetThroughput() {
- return NumReplies() * 1000000.0 / (TInstant::Now() - Start).MicroSeconds();
- }
- public:
- TTestStats()
- : Start(TInstant::Now())
- , Messages(0)
- , Errors(0)
- , Replies(0)
- {
- }
- void PeriodicallyPrint();
- };
- TTestStats Stats;
- ////////////////////////////////////////////////////////////////////
- /// \brief Fast of the client session
- class TPerftestClient : IBusClientHandler {
- public:
- TBusClientSessionPtr Session;
- THolder<TBusProtocol> Proto;
- TBusMessageQueuePtr Bus;
- TVector<TBusClientConnectionPtr> Connections;
- public:
- /// constructor creates instances of protocol and session
- TPerftestClient() {
- /// create or get instance of message queue, need one per application
- Bus = CreateMessageQueue(Config.ClientQueueConfig, "client");
- if (Config.SimpleProtocol) {
- Proto.Reset(new TSimpleProtocol);
- } else {
- Proto.Reset(new TPerftestProtocol);
- }
- Session = TBusClientSession::Create(Proto.Get(), this, Config.ClientSessionConfig, Bus);
- for (unsigned i = 0; i < ServerAddresses.size(); ++i) {
- Connections.push_back(Session->GetConnection(ServerAddresses[i]));
- }
- }
- /// dispatch of requests is done here
- void Work() {
- SetCurrentThreadName("FastClient::Work");
- while (!TheExit) {
- TBusClientConnection* connection;
- if (Connections.size() == 1) {
- connection = Connections.front().Get();
- } else {
- connection = Connections.at(RandomNumber<size_t>()).Get();
- }
- TBusMessage* message = NewRequest().Release();
- int ret = connection->SendMessage(message, true);
- if (ret == MESSAGE_OK) {
- Stats.IncMessage();
- } else if (ret == MESSAGE_BUSY) {
- //delete message;
- //Sleep(TDuration::MilliSeconds(1));
- //continue;
- Y_ABORT("unreachable");
- } else if (ret == MESSAGE_SHUTDOWN) {
- delete message;
- } else {
- delete message;
- Stats.IncErrors();
- }
- }
- }
- void Stop() {
- Session->Shutdown();
- }
- /// actual work is being done here
- void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override {
- Y_UNUSED(mess);
- if (Config.SimpleProtocol) {
- VerifyDynamicCast<TSimpleMessage*>(reply.Get());
- } else {
- TPerftestResponse* typed = VerifyDynamicCast<TPerftestResponse*>(reply.Get());
- CheckResponse(typed);
- }
- Stats.IncReplies();
- }
- /// message that could not be delivered
- void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
- Y_UNUSED(mess);
- Y_UNUSED(status);
- if (TheExit) {
- return;
- }
- Stats.IncErrors();
- // Y_ASSERT(TheConfig->Failure > 0.0);
- }
- };
- class TPerftestServerCommon {
- public:
- THolder<TBusProtocol> Proto;
- TBusMessageQueuePtr Bus;
- TBusServerSessionPtr Session;
- protected:
- TPerftestServerCommon(const char* name)
- : Session()
- {
- if (Config.SimpleProtocol) {
- Proto.Reset(new TSimpleProtocol);
- } else {
- Proto.Reset(new TPerftestProtocol);
- }
- /// create or get instance of single message queue, need one for application
- Bus = CreateMessageQueue(Config.ServerQueueConfig, name);
- }
- public:
- void Stop() {
- Session->Shutdown();
- }
- };
- struct TAsyncRequest {
- TBusMessage* Request;
- TInstant ReceivedTime;
- };
- /////////////////////////////////////////////////////////////////////
- /// \brief Fast of the server session
- class TPerftestServer: public TPerftestServerCommon, public IBusServerHandler {
- public:
- TLockFreeQueue<TAsyncRequest> AsyncRequests;
- public:
- TPerftestServer()
- : TPerftestServerCommon("server")
- {
- /// register destination session
- Session = TBusServerSession::Create(Proto.Get(), this, Config.ServerSessionConfig, Bus);
- Y_ASSERT(Session && "probably somebody is listening on the same port");
- }
- /// when message comes, send reply
- void OnMessage(TOnMessageContext& mess) override {
- if (Config.SimpleProtocol) {
- TSimpleMessage* typed = VerifyDynamicCast<TSimpleMessage*>(mess.GetMessage());
- TAutoPtr<TSimpleMessage> response(new TSimpleMessage);
- response->Payload = typed->Payload;
- mess.SendReplyMove(response);
- return;
- }
- TPerftestRequest* typed = VerifyDynamicCast<TPerftestRequest*>(mess.GetMessage());
- CheckRequest(typed);
- /// forget replies for few messages, see what happends
- if (TheConfig->Failure > RandomNumber<double>()) {
- return;
- }
- /// sleep requested time
- if (TheConfig->Delay) {
- TAsyncRequest request;
- request.Request = mess.ReleaseMessage();
- request.ReceivedTime = TInstant::Now();
- AsyncRequests.Enqueue(request);
- return;
- }
- TAutoPtr<TPerftestResponse> reply(NewResponse(typed));
- /// sent empty reply for each message
- mess.SendReplyMove(reply);
- // TODO: count results
- }
- void Stop() {
- TPerftestServerCommon::Stop();
- }
- };
- class TPerftestUsingModule: public TPerftestServerCommon, public TBusModule {
- public:
- TPerftestUsingModule()
- : TPerftestServerCommon("server")
- , TBusModule("fast")
- {
- Y_ABORT_UNLESS(CreatePrivateSessions(Bus.Get()), "failed to initialize dupdetect module");
- Y_ABORT_UNLESS(StartInput(), "failed to start input");
- }
- ~TPerftestUsingModule() override {
- Shutdown();
- }
- private:
- TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
- TPerftestRequest* typed = VerifyDynamicCast<TPerftestRequest*>(mess);
- CheckRequest(typed);
- /// sleep requested time
- if (TheConfig->Delay) {
- usleep(TheConfig->Delay);
- }
- /// forget replies for few messages, see what happends
- if (TheConfig->Failure > RandomNumber<double>()) {
- return nullptr;
- }
- job->SendReply(NewResponse(typed).Release());
- return nullptr;
- }
- TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
- return Session = CreateDefaultDestination(queue, Proto.Get(), Config.ServerSessionConfig);
- }
- };
- // ./perftest/perftest -s 11456 -c localhost:11456 -r 60 -n 4 -i 5000
- using namespace std;
- using namespace NBus;
- static TNetworkAddress ParseNetworkAddress(const char* string) {
- TString Name;
- int Port;
- const char* port = strchr(string, ':');
- if (port != nullptr) {
- Name.append(string, port - string);
- Port = atoi(port + 1);
- } else {
- Name.append(string);
- Port = TheConfig->ServerPort != 0 ? TheConfig->ServerPort : DEFAULT_PORT;
- }
- return TNetworkAddress(Name, Port);
- }
- TVector<TNetAddr> ParseNodes(const TString nodes) {
- TVector<TNetAddr> r;
- TVector<TString> hosts;
- size_t numh = Split(nodes.data(), ",", hosts);
- for (int i = 0; i < int(numh); i++) {
- const TNetworkAddress& networkAddress = ParseNetworkAddress(hosts[i].data());
- Y_ABORT_UNLESS(networkAddress.Begin() != networkAddress.End(), "no addresses");
- r.push_back(TNetAddr(networkAddress, &*networkAddress.Begin()));
- }
- return r;
- }
- TPerftestConfig::TPerftestConfig() {
- TBusSessionConfig defaultConfig;
- ServerPort = DEFAULT_PORT;
- Delay = 0; // artificial delay inside server OnMessage()
- MessageSize = 200;
- Failure = 0.00;
- Run = 60; // in seconds
- Nodes = "localhost";
- ServerUseModules = false;
- ExecuteOnMessageInWorkerPool = defaultConfig.ExecuteOnMessageInWorkerPool;
- ExecuteOnReplyInWorkerPool = defaultConfig.ExecuteOnReplyInWorkerPool;
- UseCompression = false;
- Profile = false;
- WwwPort = 0;
- }
- TPerftestConfig* TheConfig = new TPerftestConfig();
- bool TheExit = false;
- TSystemEvent StopEvent;
- TSimpleSharedPtr<TPerftestServer> Server;
- TSimpleSharedPtr<TPerftestUsingModule> ServerUsingModule;
- TVector<TSimpleSharedPtr<TPerftestClient>> Clients;
- TMutex ClientsLock;
- void stopsignal(int /*sig*/) {
- fprintf(stderr, "\n-------------------- exiting ------------------\n");
- TheExit = true;
- StopEvent.Signal();
- }
- // -s <num> - start server on port <num>
- // -c <node:port,node:port> - start client
- void TTestStats::PeriodicallyPrint() {
- SetCurrentThreadName("print-stats");
- for (;;) {
- StopEvent.WaitT(TDuration::Seconds(1));
- if (TheExit)
- break;
- TVector<TSimpleSharedPtr<TPerftestClient>> clients;
- {
- TGuard<TMutex> guard(ClientsLock);
- clients = Clients;
- }
- fprintf(stderr, "replies=%d errors=%d throughput=%.3f mess/sec\n",
- NumReplies(), NumErrors(), GetThroughput());
- if (!!Server) {
- fprintf(stderr, "server: q: %u %s\n",
- (unsigned)Server->Bus->GetExecutor()->GetWorkQueueSize(),
- Server->Session->GetStatusSingleLine().data());
- }
- if (!!ServerUsingModule) {
- fprintf(stderr, "server: q: %u %s\n",
- (unsigned)ServerUsingModule->Bus->GetExecutor()->GetWorkQueueSize(),
- ServerUsingModule->Session->GetStatusSingleLine().data());
- }
- for (const auto& client : clients) {
- fprintf(stderr, "client: q: %u %s\n",
- (unsigned)client->Bus->GetExecutor()->GetWorkQueueSize(),
- client->Session->GetStatusSingleLine().data());
- }
- TStringStream stats;
- bool first = true;
- if (!!Server) {
- if (!first) {
- stats << "\n";
- }
- first = false;
- stats << "server:\n";
- stats << IndentText(Server->Bus->GetStatus());
- }
- if (!!ServerUsingModule) {
- if (!first) {
- stats << "\n";
- }
- first = false;
- stats << "server using modules:\n";
- stats << IndentText(ServerUsingModule->Bus->GetStatus());
- }
- for (const auto& client : clients) {
- if (!first) {
- stats << "\n";
- }
- first = false;
- stats << "client:\n";
- stats << IndentText(client->Bus->GetStatus());
- }
- TUnbufferedFileOutput("stats").Write(stats.Str());
- }
- }
- int main(int argc, char* argv[]) {
- NLWTrace::StartLwtraceFromEnv();
- /* unix foo */
- setvbuf(stdout, nullptr, _IONBF, 0);
- setvbuf(stderr, nullptr, _IONBF, 0);
- Umask(0);
- SetAsyncSignalHandler(SIGINT, stopsignal);
- SetAsyncSignalHandler(SIGTERM, stopsignal);
- #ifndef _win_
- SetAsyncSignalHandler(SIGUSR1, stopsignal);
- #endif
- signal(SIGPIPE, SIG_IGN);
- NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default();
- opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort);
- opts.AddCharOption('m', "average message size").RequiredArgument("size").StoreResult(&TheConfig->MessageSize);
- opts.AddLongOption('c', "server-host", "server hosts").RequiredArgument("host[,host]...").StoreResult(&TheConfig->Nodes);
- opts.AddCharOption('f', "failure rate (rational number between 0 and 1)").RequiredArgument("rate").StoreResult(&TheConfig->Failure);
- opts.AddCharOption('w', "delay before reply").RequiredArgument("microseconds").StoreResult(&TheConfig->Delay);
- opts.AddCharOption('r', "run duration").RequiredArgument("seconds").StoreResult(&TheConfig->Run);
- opts.AddLongOption("client-count", "amount of clients").RequiredArgument("count").StoreResult(&TheConfig->ClientCount).DefaultValue("1");
- opts.AddLongOption("server-use-modules").StoreResult(&TheConfig->ServerUseModules, true);
- opts.AddLongOption("on-message-in-pool", "execute OnMessage callback in worker pool")
- .RequiredArgument("BOOL")
- .StoreResult(&TheConfig->ExecuteOnMessageInWorkerPool);
- opts.AddLongOption("on-reply-in-pool", "execute OnReply callback in worker pool")
- .RequiredArgument("BOOL")
- .StoreResult(&TheConfig->ExecuteOnReplyInWorkerPool);
- opts.AddLongOption("compression", "use compression").RequiredArgument("BOOL").StoreResult(&TheConfig->UseCompression);
- opts.AddLongOption("simple-proto").SetFlag(&Config.SimpleProtocol);
- opts.AddLongOption("profile").SetFlag(&TheConfig->Profile);
- opts.AddLongOption("www-port").RequiredArgument("PORT").StoreResult(&TheConfig->WwwPort);
- opts.AddHelpOption();
- Config.ServerQueueConfig.ConfigureLastGetopt(opts, "server-");
- Config.ServerSessionConfig.ConfigureLastGetopt(opts, "server-");
- Config.ClientQueueConfig.ConfigureLastGetopt(opts, "client-");
- Config.ClientSessionConfig.ConfigureLastGetopt(opts, "client-");
- opts.SetFreeArgsMax(0);
- NLastGetopt::TOptsParseResult parseResult(&opts, argc, argv);
- TheConfig->Print();
- Config.Print();
- if (TheConfig->Profile) {
- BeginProfiling();
- }
- TIntrusivePtr<TBusWww> www(new TBusWww);
- ServerAddresses = ParseNodes(TheConfig->Nodes);
- if (TheConfig->ServerPort) {
- if (TheConfig->ServerUseModules) {
- ServerUsingModule = new TPerftestUsingModule();
- www->RegisterModule(ServerUsingModule.Get());
- } else {
- Server = new TPerftestServer();
- www->RegisterServerSession(Server->Session);
- }
- }
- TVector<TSimpleSharedPtr<NThreading::TLegacyFuture<void, false>>> futures;
- if (ServerAddresses.size() > 0 && TheConfig->ClientCount > 0) {
- for (int i = 0; i < TheConfig->ClientCount; ++i) {
- TGuard<TMutex> guard(ClientsLock);
- Clients.push_back(new TPerftestClient);
- futures.push_back(new NThreading::TLegacyFuture<void, false>(std::bind(&TPerftestClient::Work, Clients.back())));
- www->RegisterClientSession(Clients.back()->Session);
- }
- }
- futures.push_back(new NThreading::TLegacyFuture<void, false>(std::bind(&TTestStats::PeriodicallyPrint, std::ref(Stats))));
- THolder<TBusWwwHttpServer> wwwServer;
- if (TheConfig->WwwPort != 0) {
- wwwServer.Reset(new TBusWwwHttpServer(www, TheConfig->WwwPort));
- }
- /* sit here until signal terminate our process */
- StopEvent.WaitT(TDuration::Seconds(TheConfig->Run));
- TheExit = true;
- StopEvent.Signal();
- if (!!Server) {
- Cerr << "Stopping server\n";
- Server->Stop();
- }
- if (!!ServerUsingModule) {
- Cerr << "Stopping server (using modules)\n";
- ServerUsingModule->Stop();
- }
- TVector<TSimpleSharedPtr<TPerftestClient>> clients;
- {
- TGuard<TMutex> guard(ClientsLock);
- clients = Clients;
- }
- if (!clients.empty()) {
- Cerr << "Stopping clients\n";
- for (auto& client : clients) {
- client->Stop();
- }
- }
- wwwServer.Destroy();
- for (const auto& future : futures) {
- future->Get();
- }
- if (TheConfig->Profile) {
- EndProfiling();
- }
- Cerr << "***SUCCESS***\n";
- return 0;
- }
|