#include "simple_proto.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace NBus; /////////////////////////////////////////////////////// /// \brief Configuration parameters of the test const int DEFAULT_PORT = 55666; struct TPerftestConfig { TString Nodes; ///< node1:port1,node2:port2 int ClientCount; int MessageSize; ///< size of message to send int Delay; ///< server delay (milliseconds) float Failure; ///< simulated failure rate int ServerPort; int Run; bool ServerUseModules; bool ExecuteOnMessageInWorkerPool; bool ExecuteOnReplyInWorkerPool; bool UseCompression; bool Profile; unsigned WwwPort; TPerftestConfig(); void Print() { fprintf(stderr, "ClientCount=%d\n", ClientCount); fprintf(stderr, "ServerPort=%d\n", ServerPort); fprintf(stderr, "Delay=%d usecs\n", Delay); fprintf(stderr, "MessageSize=%d bytes\n", MessageSize); fprintf(stderr, "Failure=%.3f%%\n", Failure * 100.0); fprintf(stderr, "Runtime=%d seconds\n", Run); fprintf(stderr, "ServerUseModules=%s\n", ServerUseModules ? "true" : "false"); fprintf(stderr, "ExecuteOnMessageInWorkerPool=%s\n", ExecuteOnMessageInWorkerPool ? "true" : "false"); fprintf(stderr, "ExecuteOnReplyInWorkerPool=%s\n", ExecuteOnReplyInWorkerPool ? "true" : "false"); fprintf(stderr, "UseCompression=%s\n", UseCompression ? "true" : "false"); fprintf(stderr, "Profile=%s\n", Profile ? "true" : "false"); fprintf(stderr, "WwwPort=%u\n", WwwPort); } }; extern TPerftestConfig* TheConfig; extern bool TheExit; TVector ServerAddresses; struct TConfig { TBusQueueConfig ServerQueueConfig; TBusQueueConfig ClientQueueConfig; TBusServerSessionConfig ServerSessionConfig; TBusClientSessionConfig ClientSessionConfig; bool SimpleProtocol; private: void ConfigureDefaults(TBusQueueConfig& config) { config.NumWorkers = 4; } void ConfigureDefaults(TBusSessionConfig& config) { config.MaxInFlight = 10000; config.SendTimeout = TDuration::Seconds(20).MilliSeconds(); config.TotalTimeout = TDuration::Seconds(60).MilliSeconds(); } public: TConfig() : SimpleProtocol(false) { ConfigureDefaults(ServerQueueConfig); ConfigureDefaults(ClientQueueConfig); ConfigureDefaults(ServerSessionConfig); ConfigureDefaults(ClientSessionConfig); } void Print() { // TODO: do not print server if only client and vice verse Cerr << "server queue config:\n"; Cerr << IndentText(ServerQueueConfig.PrintToString()); Cerr << "server session config:" << Endl; Cerr << IndentText(ServerSessionConfig.PrintToString()); Cerr << "client queue config:\n"; Cerr << IndentText(ClientQueueConfig.PrintToString()); Cerr << "client session config:" << Endl; Cerr << IndentText(ClientSessionConfig.PrintToString()); Cerr << "simple protocol: " << SimpleProtocol << "\n"; } }; TConfig Config; //////////////////////////////////////////////////////////////// /// \brief Fast message using TPerftestRequest = TBusBufferMessage; using TPerftestResponse = TBusBufferMessage; static size_t RequestSize() { return RandomNumber(TheConfig->MessageSize * 2 + 1); } TAutoPtr NewRequest() { if (Config.SimpleProtocol) { TAutoPtr r(new TSimpleMessage); r->SetCompressed(TheConfig->UseCompression); r->Payload = 10; return r.Release(); } else { TAutoPtr r(new TPerftestRequest); r->SetCompressed(TheConfig->UseCompression); // TODO: use random content for better compression test r->Record.SetData(TString(RequestSize(), '?')); return r.Release(); } } void CheckRequest(TPerftestRequest* request) { const TString& data = request->Record.GetData(); for (size_t i = 0; i != data.size(); ++i) { Y_ABORT_UNLESS(data.at(i) == '?', "must be question mark"); } } TAutoPtr NewResponse(TPerftestRequest* request) { TAutoPtr r(new TPerftestResponse); r->SetCompressed(TheConfig->UseCompression); r->Record.SetData(TString(request->Record.GetData().size(), '.')); return r; } void CheckResponse(TPerftestResponse* response) { const TString& data = response->Record.GetData(); for (size_t i = 0; i != data.size(); ++i) { Y_ABORT_UNLESS(data.at(i) == '.', "must be dot"); } } //////////////////////////////////////////////////////////////////// /// \brief Fast protocol that common between client and server class TPerftestProtocol: public TBusBufferProtocol { public: TPerftestProtocol() : TBusBufferProtocol("TPerftestProtocol", TheConfig->ServerPort) { RegisterType(new TPerftestRequest); RegisterType(new TPerftestResponse); } }; class TPerftestServer; class TPerftestUsingModule; class TPerftestClient; struct TTestStats { TInstant Start; TAtomic Messages; TAtomic Errors; TAtomic Replies; void IncMessage() { AtomicIncrement(Messages); } void IncReplies() { AtomicDecrement(Messages); AtomicIncrement(Replies); } int NumMessage() { return AtomicGet(Messages); } void IncErrors() { AtomicDecrement(Messages); AtomicIncrement(Errors); } int NumErrors() { return AtomicGet(Errors); } int NumReplies() { return AtomicGet(Replies); } double GetThroughput() { return NumReplies() * 1000000.0 / (TInstant::Now() - Start).MicroSeconds(); } public: TTestStats() : Start(TInstant::Now()) , Messages(0) , Errors(0) , Replies(0) { } void PeriodicallyPrint(); }; TTestStats Stats; //////////////////////////////////////////////////////////////////// /// \brief Fast of the client session class TPerftestClient : IBusClientHandler { public: TBusClientSessionPtr Session; THolder Proto; TBusMessageQueuePtr Bus; TVector Connections; public: /// constructor creates instances of protocol and session TPerftestClient() { /// create or get instance of message queue, need one per application Bus = CreateMessageQueue(Config.ClientQueueConfig, "client"); if (Config.SimpleProtocol) { Proto.Reset(new TSimpleProtocol); } else { Proto.Reset(new TPerftestProtocol); } Session = TBusClientSession::Create(Proto.Get(), this, Config.ClientSessionConfig, Bus); for (unsigned i = 0; i < ServerAddresses.size(); ++i) { Connections.push_back(Session->GetConnection(ServerAddresses[i])); } } /// dispatch of requests is done here void Work() { SetCurrentThreadName("FastClient::Work"); while (!TheExit) { TBusClientConnection* connection; if (Connections.size() == 1) { connection = Connections.front().Get(); } else { connection = Connections.at(RandomNumber()).Get(); } TBusMessage* message = NewRequest().Release(); int ret = connection->SendMessage(message, true); if (ret == MESSAGE_OK) { Stats.IncMessage(); } else if (ret == MESSAGE_BUSY) { //delete message; //Sleep(TDuration::MilliSeconds(1)); //continue; Y_ABORT("unreachable"); } else if (ret == MESSAGE_SHUTDOWN) { delete message; } else { delete message; Stats.IncErrors(); } } } void Stop() { Session->Shutdown(); } /// actual work is being done here void OnReply(TAutoPtr mess, TAutoPtr reply) override { Y_UNUSED(mess); if (Config.SimpleProtocol) { VerifyDynamicCast(reply.Get()); } else { TPerftestResponse* typed = VerifyDynamicCast(reply.Get()); CheckResponse(typed); } Stats.IncReplies(); } /// message that could not be delivered void OnError(TAutoPtr mess, EMessageStatus status) override { Y_UNUSED(mess); Y_UNUSED(status); if (TheExit) { return; } Stats.IncErrors(); // Y_ASSERT(TheConfig->Failure > 0.0); } }; class TPerftestServerCommon { public: THolder Proto; TBusMessageQueuePtr Bus; TBusServerSessionPtr Session; protected: TPerftestServerCommon(const char* name) : Session() { if (Config.SimpleProtocol) { Proto.Reset(new TSimpleProtocol); } else { Proto.Reset(new TPerftestProtocol); } /// create or get instance of single message queue, need one for application Bus = CreateMessageQueue(Config.ServerQueueConfig, name); } public: void Stop() { Session->Shutdown(); } }; struct TAsyncRequest { TBusMessage* Request; TInstant ReceivedTime; }; ///////////////////////////////////////////////////////////////////// /// \brief Fast of the server session class TPerftestServer: public TPerftestServerCommon, public IBusServerHandler { public: TLockFreeQueue AsyncRequests; public: TPerftestServer() : TPerftestServerCommon("server") { /// register destination session Session = TBusServerSession::Create(Proto.Get(), this, Config.ServerSessionConfig, Bus); Y_ASSERT(Session && "probably somebody is listening on the same port"); } /// when message comes, send reply void OnMessage(TOnMessageContext& mess) override { if (Config.SimpleProtocol) { TSimpleMessage* typed = VerifyDynamicCast(mess.GetMessage()); TAutoPtr response(new TSimpleMessage); response->Payload = typed->Payload; mess.SendReplyMove(response); return; } TPerftestRequest* typed = VerifyDynamicCast(mess.GetMessage()); CheckRequest(typed); /// forget replies for few messages, see what happends if (TheConfig->Failure > RandomNumber()) { return; } /// sleep requested time if (TheConfig->Delay) { TAsyncRequest request; request.Request = mess.ReleaseMessage(); request.ReceivedTime = TInstant::Now(); AsyncRequests.Enqueue(request); return; } TAutoPtr reply(NewResponse(typed)); /// sent empty reply for each message mess.SendReplyMove(reply); // TODO: count results } void Stop() { TPerftestServerCommon::Stop(); } }; class TPerftestUsingModule: public TPerftestServerCommon, public TBusModule { public: TPerftestUsingModule() : TPerftestServerCommon("server") , TBusModule("fast") { Y_ABORT_UNLESS(CreatePrivateSessions(Bus.Get()), "failed to initialize dupdetect module"); Y_ABORT_UNLESS(StartInput(), "failed to start input"); } ~TPerftestUsingModule() override { Shutdown(); } private: TJobHandler Start(TBusJob* job, TBusMessage* mess) override { TPerftestRequest* typed = VerifyDynamicCast(mess); CheckRequest(typed); /// sleep requested time if (TheConfig->Delay) { usleep(TheConfig->Delay); } /// forget replies for few messages, see what happends if (TheConfig->Failure > RandomNumber()) { return nullptr; } job->SendReply(NewResponse(typed).Release()); return nullptr; } TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { return Session = CreateDefaultDestination(queue, Proto.Get(), Config.ServerSessionConfig); } }; // ./perftest/perftest -s 11456 -c localhost:11456 -r 60 -n 4 -i 5000 using namespace std; using namespace NBus; static TNetworkAddress ParseNetworkAddress(const char* string) { TString Name; int Port; const char* port = strchr(string, ':'); if (port != nullptr) { Name.append(string, port - string); Port = atoi(port + 1); } else { Name.append(string); Port = TheConfig->ServerPort != 0 ? TheConfig->ServerPort : DEFAULT_PORT; } return TNetworkAddress(Name, Port); } TVector ParseNodes(const TString nodes) { TVector r; TVector hosts; size_t numh = Split(nodes.data(), ",", hosts); for (int i = 0; i < int(numh); i++) { const TNetworkAddress& networkAddress = ParseNetworkAddress(hosts[i].data()); Y_ABORT_UNLESS(networkAddress.Begin() != networkAddress.End(), "no addresses"); r.push_back(TNetAddr(networkAddress, &*networkAddress.Begin())); } return r; } TPerftestConfig::TPerftestConfig() { TBusSessionConfig defaultConfig; ServerPort = DEFAULT_PORT; Delay = 0; // artificial delay inside server OnMessage() MessageSize = 200; Failure = 0.00; Run = 60; // in seconds Nodes = "localhost"; ServerUseModules = false; ExecuteOnMessageInWorkerPool = defaultConfig.ExecuteOnMessageInWorkerPool; ExecuteOnReplyInWorkerPool = defaultConfig.ExecuteOnReplyInWorkerPool; UseCompression = false; Profile = false; WwwPort = 0; } TPerftestConfig* TheConfig = new TPerftestConfig(); bool TheExit = false; TSystemEvent StopEvent; TSimpleSharedPtr Server; TSimpleSharedPtr ServerUsingModule; TVector> Clients; TMutex ClientsLock; void stopsignal(int /*sig*/) { fprintf(stderr, "\n-------------------- exiting ------------------\n"); TheExit = true; StopEvent.Signal(); } // -s - start server on port // -c - start client void TTestStats::PeriodicallyPrint() { SetCurrentThreadName("print-stats"); for (;;) { StopEvent.WaitT(TDuration::Seconds(1)); if (TheExit) break; TVector> clients; { TGuard guard(ClientsLock); clients = Clients; } fprintf(stderr, "replies=%d errors=%d throughput=%.3f mess/sec\n", NumReplies(), NumErrors(), GetThroughput()); if (!!Server) { fprintf(stderr, "server: q: %u %s\n", (unsigned)Server->Bus->GetExecutor()->GetWorkQueueSize(), Server->Session->GetStatusSingleLine().data()); } if (!!ServerUsingModule) { fprintf(stderr, "server: q: %u %s\n", (unsigned)ServerUsingModule->Bus->GetExecutor()->GetWorkQueueSize(), ServerUsingModule->Session->GetStatusSingleLine().data()); } for (const auto& client : clients) { fprintf(stderr, "client: q: %u %s\n", (unsigned)client->Bus->GetExecutor()->GetWorkQueueSize(), client->Session->GetStatusSingleLine().data()); } TStringStream stats; bool first = true; if (!!Server) { if (!first) { stats << "\n"; } first = false; stats << "server:\n"; stats << IndentText(Server->Bus->GetStatus()); } if (!!ServerUsingModule) { if (!first) { stats << "\n"; } first = false; stats << "server using modules:\n"; stats << IndentText(ServerUsingModule->Bus->GetStatus()); } for (const auto& client : clients) { if (!first) { stats << "\n"; } first = false; stats << "client:\n"; stats << IndentText(client->Bus->GetStatus()); } TUnbufferedFileOutput("stats").Write(stats.Str()); } } int main(int argc, char* argv[]) { NLWTrace::StartLwtraceFromEnv(); /* unix foo */ setvbuf(stdout, nullptr, _IONBF, 0); setvbuf(stderr, nullptr, _IONBF, 0); Umask(0); SetAsyncSignalHandler(SIGINT, stopsignal); SetAsyncSignalHandler(SIGTERM, stopsignal); #ifndef _win_ SetAsyncSignalHandler(SIGUSR1, stopsignal); #endif signal(SIGPIPE, SIG_IGN); NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort); opts.AddCharOption('m', "average message size").RequiredArgument("size").StoreResult(&TheConfig->MessageSize); opts.AddLongOption('c', "server-host", "server hosts").RequiredArgument("host[,host]...").StoreResult(&TheConfig->Nodes); opts.AddCharOption('f', "failure rate (rational number between 0 and 1)").RequiredArgument("rate").StoreResult(&TheConfig->Failure); opts.AddCharOption('w', "delay before reply").RequiredArgument("microseconds").StoreResult(&TheConfig->Delay); opts.AddCharOption('r', "run duration").RequiredArgument("seconds").StoreResult(&TheConfig->Run); opts.AddLongOption("client-count", "amount of clients").RequiredArgument("count").StoreResult(&TheConfig->ClientCount).DefaultValue("1"); opts.AddLongOption("server-use-modules").StoreResult(&TheConfig->ServerUseModules, true); opts.AddLongOption("on-message-in-pool", "execute OnMessage callback in worker pool") .RequiredArgument("BOOL") .StoreResult(&TheConfig->ExecuteOnMessageInWorkerPool); opts.AddLongOption("on-reply-in-pool", "execute OnReply callback in worker pool") .RequiredArgument("BOOL") .StoreResult(&TheConfig->ExecuteOnReplyInWorkerPool); opts.AddLongOption("compression", "use compression").RequiredArgument("BOOL").StoreResult(&TheConfig->UseCompression); opts.AddLongOption("simple-proto").SetFlag(&Config.SimpleProtocol); opts.AddLongOption("profile").SetFlag(&TheConfig->Profile); opts.AddLongOption("www-port").RequiredArgument("PORT").StoreResult(&TheConfig->WwwPort); opts.AddHelpOption(); Config.ServerQueueConfig.ConfigureLastGetopt(opts, "server-"); Config.ServerSessionConfig.ConfigureLastGetopt(opts, "server-"); Config.ClientQueueConfig.ConfigureLastGetopt(opts, "client-"); Config.ClientSessionConfig.ConfigureLastGetopt(opts, "client-"); opts.SetFreeArgsMax(0); NLastGetopt::TOptsParseResult parseResult(&opts, argc, argv); TheConfig->Print(); Config.Print(); if (TheConfig->Profile) { BeginProfiling(); } TIntrusivePtr www(new TBusWww); ServerAddresses = ParseNodes(TheConfig->Nodes); if (TheConfig->ServerPort) { if (TheConfig->ServerUseModules) { ServerUsingModule = new TPerftestUsingModule(); www->RegisterModule(ServerUsingModule.Get()); } else { Server = new TPerftestServer(); www->RegisterServerSession(Server->Session); } } TVector>> futures; if (ServerAddresses.size() > 0 && TheConfig->ClientCount > 0) { for (int i = 0; i < TheConfig->ClientCount; ++i) { TGuard guard(ClientsLock); Clients.push_back(new TPerftestClient); futures.push_back(new NThreading::TLegacyFuture(std::bind(&TPerftestClient::Work, Clients.back()))); www->RegisterClientSession(Clients.back()->Session); } } futures.push_back(new NThreading::TLegacyFuture(std::bind(&TTestStats::PeriodicallyPrint, std::ref(Stats)))); THolder wwwServer; if (TheConfig->WwwPort != 0) { wwwServer.Reset(new TBusWwwHttpServer(www, TheConfig->WwwPort)); } /* sit here until signal terminate our process */ StopEvent.WaitT(TDuration::Seconds(TheConfig->Run)); TheExit = true; StopEvent.Signal(); if (!!Server) { Cerr << "Stopping server\n"; Server->Stop(); } if (!!ServerUsingModule) { Cerr << "Stopping server (using modules)\n"; ServerUsingModule->Stop(); } TVector> clients; { TGuard guard(ClientsLock); clients = Clients; } if (!clients.empty()) { Cerr << "Stopping clients\n"; for (auto& client : clients) { client->Stop(); } } wwwServer.Destroy(); for (const auto& future : futures) { future->Get(); } if (TheConfig->Profile) { EndProfiling(); } Cerr << "***SUCCESS***\n"; return 0; }