perftest.cpp 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713
  1. #include "simple_proto.h"
  2. #include <library/cpp/messagebus/test/perftest/messages.pb.h>
  3. #include <library/cpp/messagebus/text_utils.h>
  4. #include <library/cpp/messagebus/thread_extra.h>
  5. #include <library/cpp/messagebus/ybus.h>
  6. #include <library/cpp/messagebus/oldmodule/module.h>
  7. #include <library/cpp/messagebus/protobuf/ybusbuf.h>
  8. #include <library/cpp/messagebus/www/www.h>
  9. #include <library/cpp/deprecated/threadable/threadable.h>
  10. #include <library/cpp/execprofile/profile.h>
  11. #include <library/cpp/getopt/opt.h>
  12. #include <library/cpp/lwtrace/start.h>
  13. #include <library/cpp/sighandler/async_signals_handler.h>
  14. #include <library/cpp/threading/future/legacy_future.h>
  15. #include <util/generic/ptr.h>
  16. #include <util/generic/string.h>
  17. #include <util/generic/vector.h>
  18. #include <util/generic/yexception.h>
  19. #include <util/random/random.h>
  20. #include <util/stream/file.h>
  21. #include <util/stream/output.h>
  22. #include <util/stream/str.h>
  23. #include <util/string/split.h>
  24. #include <util/system/event.h>
  25. #include <util/system/sysstat.h>
  26. #include <util/system/thread.h>
  27. #include <util/thread/lfqueue.h>
  28. #include <signal.h>
  29. #include <stdlib.h>
  30. using namespace NBus;
  31. ///////////////////////////////////////////////////////
  32. /// \brief Configuration parameters of the test
  33. const int DEFAULT_PORT = 55666;
  34. struct TPerftestConfig {
  35. TString Nodes; ///< node1:port1,node2:port2
  36. int ClientCount;
  37. int MessageSize; ///< size of message to send
  38. int Delay; ///< server delay (milliseconds)
  39. float Failure; ///< simulated failure rate
  40. int ServerPort;
  41. int Run;
  42. bool ServerUseModules;
  43. bool ExecuteOnMessageInWorkerPool;
  44. bool ExecuteOnReplyInWorkerPool;
  45. bool UseCompression;
  46. bool Profile;
  47. unsigned WwwPort;
  48. TPerftestConfig();
  49. void Print() {
  50. fprintf(stderr, "ClientCount=%d\n", ClientCount);
  51. fprintf(stderr, "ServerPort=%d\n", ServerPort);
  52. fprintf(stderr, "Delay=%d usecs\n", Delay);
  53. fprintf(stderr, "MessageSize=%d bytes\n", MessageSize);
  54. fprintf(stderr, "Failure=%.3f%%\n", Failure * 100.0);
  55. fprintf(stderr, "Runtime=%d seconds\n", Run);
  56. fprintf(stderr, "ServerUseModules=%s\n", ServerUseModules ? "true" : "false");
  57. fprintf(stderr, "ExecuteOnMessageInWorkerPool=%s\n", ExecuteOnMessageInWorkerPool ? "true" : "false");
  58. fprintf(stderr, "ExecuteOnReplyInWorkerPool=%s\n", ExecuteOnReplyInWorkerPool ? "true" : "false");
  59. fprintf(stderr, "UseCompression=%s\n", UseCompression ? "true" : "false");
  60. fprintf(stderr, "Profile=%s\n", Profile ? "true" : "false");
  61. fprintf(stderr, "WwwPort=%u\n", WwwPort);
  62. }
  63. };
  64. extern TPerftestConfig* TheConfig;
  65. extern bool TheExit;
  66. TVector<TNetAddr> ServerAddresses;
  67. struct TConfig {
  68. TBusQueueConfig ServerQueueConfig;
  69. TBusQueueConfig ClientQueueConfig;
  70. TBusServerSessionConfig ServerSessionConfig;
  71. TBusClientSessionConfig ClientSessionConfig;
  72. bool SimpleProtocol;
  73. private:
  74. void ConfigureDefaults(TBusQueueConfig& config) {
  75. config.NumWorkers = 4;
  76. }
  77. void ConfigureDefaults(TBusSessionConfig& config) {
  78. config.MaxInFlight = 10000;
  79. config.SendTimeout = TDuration::Seconds(20).MilliSeconds();
  80. config.TotalTimeout = TDuration::Seconds(60).MilliSeconds();
  81. }
  82. public:
  83. TConfig()
  84. : SimpleProtocol(false)
  85. {
  86. ConfigureDefaults(ServerQueueConfig);
  87. ConfigureDefaults(ClientQueueConfig);
  88. ConfigureDefaults(ServerSessionConfig);
  89. ConfigureDefaults(ClientSessionConfig);
  90. }
  91. void Print() {
  92. // TODO: do not print server if only client and vice verse
  93. Cerr << "server queue config:\n";
  94. Cerr << IndentText(ServerQueueConfig.PrintToString());
  95. Cerr << "server session config:" << Endl;
  96. Cerr << IndentText(ServerSessionConfig.PrintToString());
  97. Cerr << "client queue config:\n";
  98. Cerr << IndentText(ClientQueueConfig.PrintToString());
  99. Cerr << "client session config:" << Endl;
  100. Cerr << IndentText(ClientSessionConfig.PrintToString());
  101. Cerr << "simple protocol: " << SimpleProtocol << "\n";
  102. }
  103. };
  104. TConfig Config;
  105. ////////////////////////////////////////////////////////////////
  106. /// \brief Fast message
  107. using TPerftestRequest = TBusBufferMessage<TPerftestRequestRecord, 77>;
  108. using TPerftestResponse = TBusBufferMessage<TPerftestResponseRecord, 79>;
  109. static size_t RequestSize() {
  110. return RandomNumber<size_t>(TheConfig->MessageSize * 2 + 1);
  111. }
  112. TAutoPtr<TBusMessage> NewRequest() {
  113. if (Config.SimpleProtocol) {
  114. TAutoPtr<TSimpleMessage> r(new TSimpleMessage);
  115. r->SetCompressed(TheConfig->UseCompression);
  116. r->Payload = 10;
  117. return r.Release();
  118. } else {
  119. TAutoPtr<TPerftestRequest> r(new TPerftestRequest);
  120. r->SetCompressed(TheConfig->UseCompression);
  121. // TODO: use random content for better compression test
  122. r->Record.SetData(TString(RequestSize(), '?'));
  123. return r.Release();
  124. }
  125. }
  126. void CheckRequest(TPerftestRequest* request) {
  127. const TString& data = request->Record.GetData();
  128. for (size_t i = 0; i != data.size(); ++i) {
  129. Y_ABORT_UNLESS(data.at(i) == '?', "must be question mark");
  130. }
  131. }
  132. TAutoPtr<TPerftestResponse> NewResponse(TPerftestRequest* request) {
  133. TAutoPtr<TPerftestResponse> r(new TPerftestResponse);
  134. r->SetCompressed(TheConfig->UseCompression);
  135. r->Record.SetData(TString(request->Record.GetData().size(), '.'));
  136. return r;
  137. }
  138. void CheckResponse(TPerftestResponse* response) {
  139. const TString& data = response->Record.GetData();
  140. for (size_t i = 0; i != data.size(); ++i) {
  141. Y_ABORT_UNLESS(data.at(i) == '.', "must be dot");
  142. }
  143. }
  144. ////////////////////////////////////////////////////////////////////
  145. /// \brief Fast protocol that common between client and server
  146. class TPerftestProtocol: public TBusBufferProtocol {
  147. public:
  148. TPerftestProtocol()
  149. : TBusBufferProtocol("TPerftestProtocol", TheConfig->ServerPort)
  150. {
  151. RegisterType(new TPerftestRequest);
  152. RegisterType(new TPerftestResponse);
  153. }
  154. };
  155. class TPerftestServer;
  156. class TPerftestUsingModule;
  157. class TPerftestClient;
  158. struct TTestStats {
  159. TInstant Start;
  160. TAtomic Messages;
  161. TAtomic Errors;
  162. TAtomic Replies;
  163. void IncMessage() {
  164. AtomicIncrement(Messages);
  165. }
  166. void IncReplies() {
  167. AtomicDecrement(Messages);
  168. AtomicIncrement(Replies);
  169. }
  170. int NumMessage() {
  171. return AtomicGet(Messages);
  172. }
  173. void IncErrors() {
  174. AtomicDecrement(Messages);
  175. AtomicIncrement(Errors);
  176. }
  177. int NumErrors() {
  178. return AtomicGet(Errors);
  179. }
  180. int NumReplies() {
  181. return AtomicGet(Replies);
  182. }
  183. double GetThroughput() {
  184. return NumReplies() * 1000000.0 / (TInstant::Now() - Start).MicroSeconds();
  185. }
  186. public:
  187. TTestStats()
  188. : Start(TInstant::Now())
  189. , Messages(0)
  190. , Errors(0)
  191. , Replies(0)
  192. {
  193. }
  194. void PeriodicallyPrint();
  195. };
  196. TTestStats Stats;
  197. ////////////////////////////////////////////////////////////////////
  198. /// \brief Fast of the client session
  199. class TPerftestClient : IBusClientHandler {
  200. public:
  201. TBusClientSessionPtr Session;
  202. THolder<TBusProtocol> Proto;
  203. TBusMessageQueuePtr Bus;
  204. TVector<TBusClientConnectionPtr> Connections;
  205. public:
  206. /// constructor creates instances of protocol and session
  207. TPerftestClient() {
  208. /// create or get instance of message queue, need one per application
  209. Bus = CreateMessageQueue(Config.ClientQueueConfig, "client");
  210. if (Config.SimpleProtocol) {
  211. Proto.Reset(new TSimpleProtocol);
  212. } else {
  213. Proto.Reset(new TPerftestProtocol);
  214. }
  215. Session = TBusClientSession::Create(Proto.Get(), this, Config.ClientSessionConfig, Bus);
  216. for (unsigned i = 0; i < ServerAddresses.size(); ++i) {
  217. Connections.push_back(Session->GetConnection(ServerAddresses[i]));
  218. }
  219. }
  220. /// dispatch of requests is done here
  221. void Work() {
  222. SetCurrentThreadName("FastClient::Work");
  223. while (!TheExit) {
  224. TBusClientConnection* connection;
  225. if (Connections.size() == 1) {
  226. connection = Connections.front().Get();
  227. } else {
  228. connection = Connections.at(RandomNumber<size_t>()).Get();
  229. }
  230. TBusMessage* message = NewRequest().Release();
  231. int ret = connection->SendMessage(message, true);
  232. if (ret == MESSAGE_OK) {
  233. Stats.IncMessage();
  234. } else if (ret == MESSAGE_BUSY) {
  235. //delete message;
  236. //Sleep(TDuration::MilliSeconds(1));
  237. //continue;
  238. Y_ABORT("unreachable");
  239. } else if (ret == MESSAGE_SHUTDOWN) {
  240. delete message;
  241. } else {
  242. delete message;
  243. Stats.IncErrors();
  244. }
  245. }
  246. }
  247. void Stop() {
  248. Session->Shutdown();
  249. }
  250. /// actual work is being done here
  251. void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override {
  252. Y_UNUSED(mess);
  253. if (Config.SimpleProtocol) {
  254. VerifyDynamicCast<TSimpleMessage*>(reply.Get());
  255. } else {
  256. TPerftestResponse* typed = VerifyDynamicCast<TPerftestResponse*>(reply.Get());
  257. CheckResponse(typed);
  258. }
  259. Stats.IncReplies();
  260. }
  261. /// message that could not be delivered
  262. void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
  263. Y_UNUSED(mess);
  264. Y_UNUSED(status);
  265. if (TheExit) {
  266. return;
  267. }
  268. Stats.IncErrors();
  269. // Y_ASSERT(TheConfig->Failure > 0.0);
  270. }
  271. };
  272. class TPerftestServerCommon {
  273. public:
  274. THolder<TBusProtocol> Proto;
  275. TBusMessageQueuePtr Bus;
  276. TBusServerSessionPtr Session;
  277. protected:
  278. TPerftestServerCommon(const char* name)
  279. : Session()
  280. {
  281. if (Config.SimpleProtocol) {
  282. Proto.Reset(new TSimpleProtocol);
  283. } else {
  284. Proto.Reset(new TPerftestProtocol);
  285. }
  286. /// create or get instance of single message queue, need one for application
  287. Bus = CreateMessageQueue(Config.ServerQueueConfig, name);
  288. }
  289. public:
  290. void Stop() {
  291. Session->Shutdown();
  292. }
  293. };
  294. struct TAsyncRequest {
  295. TBusMessage* Request;
  296. TInstant ReceivedTime;
  297. };
  298. /////////////////////////////////////////////////////////////////////
  299. /// \brief Fast of the server session
  300. class TPerftestServer: public TPerftestServerCommon, public IBusServerHandler {
  301. public:
  302. TLockFreeQueue<TAsyncRequest> AsyncRequests;
  303. public:
  304. TPerftestServer()
  305. : TPerftestServerCommon("server")
  306. {
  307. /// register destination session
  308. Session = TBusServerSession::Create(Proto.Get(), this, Config.ServerSessionConfig, Bus);
  309. Y_ASSERT(Session && "probably somebody is listening on the same port");
  310. }
  311. /// when message comes, send reply
  312. void OnMessage(TOnMessageContext& mess) override {
  313. if (Config.SimpleProtocol) {
  314. TSimpleMessage* typed = VerifyDynamicCast<TSimpleMessage*>(mess.GetMessage());
  315. TAutoPtr<TSimpleMessage> response(new TSimpleMessage);
  316. response->Payload = typed->Payload;
  317. mess.SendReplyMove(response);
  318. return;
  319. }
  320. TPerftestRequest* typed = VerifyDynamicCast<TPerftestRequest*>(mess.GetMessage());
  321. CheckRequest(typed);
  322. /// forget replies for few messages, see what happends
  323. if (TheConfig->Failure > RandomNumber<double>()) {
  324. return;
  325. }
  326. /// sleep requested time
  327. if (TheConfig->Delay) {
  328. TAsyncRequest request;
  329. request.Request = mess.ReleaseMessage();
  330. request.ReceivedTime = TInstant::Now();
  331. AsyncRequests.Enqueue(request);
  332. return;
  333. }
  334. TAutoPtr<TPerftestResponse> reply(NewResponse(typed));
  335. /// sent empty reply for each message
  336. mess.SendReplyMove(reply);
  337. // TODO: count results
  338. }
  339. void Stop() {
  340. TPerftestServerCommon::Stop();
  341. }
  342. };
  343. class TPerftestUsingModule: public TPerftestServerCommon, public TBusModule {
  344. public:
  345. TPerftestUsingModule()
  346. : TPerftestServerCommon("server")
  347. , TBusModule("fast")
  348. {
  349. Y_ABORT_UNLESS(CreatePrivateSessions(Bus.Get()), "failed to initialize dupdetect module");
  350. Y_ABORT_UNLESS(StartInput(), "failed to start input");
  351. }
  352. ~TPerftestUsingModule() override {
  353. Shutdown();
  354. }
  355. private:
  356. TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
  357. TPerftestRequest* typed = VerifyDynamicCast<TPerftestRequest*>(mess);
  358. CheckRequest(typed);
  359. /// sleep requested time
  360. if (TheConfig->Delay) {
  361. usleep(TheConfig->Delay);
  362. }
  363. /// forget replies for few messages, see what happends
  364. if (TheConfig->Failure > RandomNumber<double>()) {
  365. return nullptr;
  366. }
  367. job->SendReply(NewResponse(typed).Release());
  368. return nullptr;
  369. }
  370. TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
  371. return Session = CreateDefaultDestination(queue, Proto.Get(), Config.ServerSessionConfig);
  372. }
  373. };
  374. // ./perftest/perftest -s 11456 -c localhost:11456 -r 60 -n 4 -i 5000
  375. using namespace std;
  376. using namespace NBus;
  377. static TNetworkAddress ParseNetworkAddress(const char* string) {
  378. TString Name;
  379. int Port;
  380. const char* port = strchr(string, ':');
  381. if (port != nullptr) {
  382. Name.append(string, port - string);
  383. Port = atoi(port + 1);
  384. } else {
  385. Name.append(string);
  386. Port = TheConfig->ServerPort != 0 ? TheConfig->ServerPort : DEFAULT_PORT;
  387. }
  388. return TNetworkAddress(Name, Port);
  389. }
  390. TVector<TNetAddr> ParseNodes(const TString nodes) {
  391. TVector<TNetAddr> r;
  392. TVector<TString> hosts;
  393. size_t numh = Split(nodes.data(), ",", hosts);
  394. for (int i = 0; i < int(numh); i++) {
  395. const TNetworkAddress& networkAddress = ParseNetworkAddress(hosts[i].data());
  396. Y_ABORT_UNLESS(networkAddress.Begin() != networkAddress.End(), "no addresses");
  397. r.push_back(TNetAddr(networkAddress, &*networkAddress.Begin()));
  398. }
  399. return r;
  400. }
  401. TPerftestConfig::TPerftestConfig() {
  402. TBusSessionConfig defaultConfig;
  403. ServerPort = DEFAULT_PORT;
  404. Delay = 0; // artificial delay inside server OnMessage()
  405. MessageSize = 200;
  406. Failure = 0.00;
  407. Run = 60; // in seconds
  408. Nodes = "localhost";
  409. ServerUseModules = false;
  410. ExecuteOnMessageInWorkerPool = defaultConfig.ExecuteOnMessageInWorkerPool;
  411. ExecuteOnReplyInWorkerPool = defaultConfig.ExecuteOnReplyInWorkerPool;
  412. UseCompression = false;
  413. Profile = false;
  414. WwwPort = 0;
  415. }
  416. TPerftestConfig* TheConfig = new TPerftestConfig();
  417. bool TheExit = false;
  418. TSystemEvent StopEvent;
  419. TSimpleSharedPtr<TPerftestServer> Server;
  420. TSimpleSharedPtr<TPerftestUsingModule> ServerUsingModule;
  421. TVector<TSimpleSharedPtr<TPerftestClient>> Clients;
  422. TMutex ClientsLock;
  423. void stopsignal(int /*sig*/) {
  424. fprintf(stderr, "\n-------------------- exiting ------------------\n");
  425. TheExit = true;
  426. StopEvent.Signal();
  427. }
  428. // -s <num> - start server on port <num>
  429. // -c <node:port,node:port> - start client
  430. void TTestStats::PeriodicallyPrint() {
  431. SetCurrentThreadName("print-stats");
  432. for (;;) {
  433. StopEvent.WaitT(TDuration::Seconds(1));
  434. if (TheExit)
  435. break;
  436. TVector<TSimpleSharedPtr<TPerftestClient>> clients;
  437. {
  438. TGuard<TMutex> guard(ClientsLock);
  439. clients = Clients;
  440. }
  441. fprintf(stderr, "replies=%d errors=%d throughput=%.3f mess/sec\n",
  442. NumReplies(), NumErrors(), GetThroughput());
  443. if (!!Server) {
  444. fprintf(stderr, "server: q: %u %s\n",
  445. (unsigned)Server->Bus->GetExecutor()->GetWorkQueueSize(),
  446. Server->Session->GetStatusSingleLine().data());
  447. }
  448. if (!!ServerUsingModule) {
  449. fprintf(stderr, "server: q: %u %s\n",
  450. (unsigned)ServerUsingModule->Bus->GetExecutor()->GetWorkQueueSize(),
  451. ServerUsingModule->Session->GetStatusSingleLine().data());
  452. }
  453. for (const auto& client : clients) {
  454. fprintf(stderr, "client: q: %u %s\n",
  455. (unsigned)client->Bus->GetExecutor()->GetWorkQueueSize(),
  456. client->Session->GetStatusSingleLine().data());
  457. }
  458. TStringStream stats;
  459. bool first = true;
  460. if (!!Server) {
  461. if (!first) {
  462. stats << "\n";
  463. }
  464. first = false;
  465. stats << "server:\n";
  466. stats << IndentText(Server->Bus->GetStatus());
  467. }
  468. if (!!ServerUsingModule) {
  469. if (!first) {
  470. stats << "\n";
  471. }
  472. first = false;
  473. stats << "server using modules:\n";
  474. stats << IndentText(ServerUsingModule->Bus->GetStatus());
  475. }
  476. for (const auto& client : clients) {
  477. if (!first) {
  478. stats << "\n";
  479. }
  480. first = false;
  481. stats << "client:\n";
  482. stats << IndentText(client->Bus->GetStatus());
  483. }
  484. TUnbufferedFileOutput("stats").Write(stats.Str());
  485. }
  486. }
  487. int main(int argc, char* argv[]) {
  488. NLWTrace::StartLwtraceFromEnv();
  489. /* unix foo */
  490. setvbuf(stdout, nullptr, _IONBF, 0);
  491. setvbuf(stderr, nullptr, _IONBF, 0);
  492. Umask(0);
  493. SetAsyncSignalHandler(SIGINT, stopsignal);
  494. SetAsyncSignalHandler(SIGTERM, stopsignal);
  495. #ifndef _win_
  496. SetAsyncSignalHandler(SIGUSR1, stopsignal);
  497. #endif
  498. signal(SIGPIPE, SIG_IGN);
  499. NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default();
  500. opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort);
  501. opts.AddCharOption('m', "average message size").RequiredArgument("size").StoreResult(&TheConfig->MessageSize);
  502. opts.AddLongOption('c', "server-host", "server hosts").RequiredArgument("host[,host]...").StoreResult(&TheConfig->Nodes);
  503. opts.AddCharOption('f', "failure rate (rational number between 0 and 1)").RequiredArgument("rate").StoreResult(&TheConfig->Failure);
  504. opts.AddCharOption('w', "delay before reply").RequiredArgument("microseconds").StoreResult(&TheConfig->Delay);
  505. opts.AddCharOption('r', "run duration").RequiredArgument("seconds").StoreResult(&TheConfig->Run);
  506. opts.AddLongOption("client-count", "amount of clients").RequiredArgument("count").StoreResult(&TheConfig->ClientCount).DefaultValue("1");
  507. opts.AddLongOption("server-use-modules").StoreResult(&TheConfig->ServerUseModules, true);
  508. opts.AddLongOption("on-message-in-pool", "execute OnMessage callback in worker pool")
  509. .RequiredArgument("BOOL")
  510. .StoreResult(&TheConfig->ExecuteOnMessageInWorkerPool);
  511. opts.AddLongOption("on-reply-in-pool", "execute OnReply callback in worker pool")
  512. .RequiredArgument("BOOL")
  513. .StoreResult(&TheConfig->ExecuteOnReplyInWorkerPool);
  514. opts.AddLongOption("compression", "use compression").RequiredArgument("BOOL").StoreResult(&TheConfig->UseCompression);
  515. opts.AddLongOption("simple-proto").SetFlag(&Config.SimpleProtocol);
  516. opts.AddLongOption("profile").SetFlag(&TheConfig->Profile);
  517. opts.AddLongOption("www-port").RequiredArgument("PORT").StoreResult(&TheConfig->WwwPort);
  518. opts.AddHelpOption();
  519. Config.ServerQueueConfig.ConfigureLastGetopt(opts, "server-");
  520. Config.ServerSessionConfig.ConfigureLastGetopt(opts, "server-");
  521. Config.ClientQueueConfig.ConfigureLastGetopt(opts, "client-");
  522. Config.ClientSessionConfig.ConfigureLastGetopt(opts, "client-");
  523. opts.SetFreeArgsMax(0);
  524. NLastGetopt::TOptsParseResult parseResult(&opts, argc, argv);
  525. TheConfig->Print();
  526. Config.Print();
  527. if (TheConfig->Profile) {
  528. BeginProfiling();
  529. }
  530. TIntrusivePtr<TBusWww> www(new TBusWww);
  531. ServerAddresses = ParseNodes(TheConfig->Nodes);
  532. if (TheConfig->ServerPort) {
  533. if (TheConfig->ServerUseModules) {
  534. ServerUsingModule = new TPerftestUsingModule();
  535. www->RegisterModule(ServerUsingModule.Get());
  536. } else {
  537. Server = new TPerftestServer();
  538. www->RegisterServerSession(Server->Session);
  539. }
  540. }
  541. TVector<TSimpleSharedPtr<NThreading::TLegacyFuture<void, false>>> futures;
  542. if (ServerAddresses.size() > 0 && TheConfig->ClientCount > 0) {
  543. for (int i = 0; i < TheConfig->ClientCount; ++i) {
  544. TGuard<TMutex> guard(ClientsLock);
  545. Clients.push_back(new TPerftestClient);
  546. futures.push_back(new NThreading::TLegacyFuture<void, false>(std::bind(&TPerftestClient::Work, Clients.back())));
  547. www->RegisterClientSession(Clients.back()->Session);
  548. }
  549. }
  550. futures.push_back(new NThreading::TLegacyFuture<void, false>(std::bind(&TTestStats::PeriodicallyPrint, std::ref(Stats))));
  551. THolder<TBusWwwHttpServer> wwwServer;
  552. if (TheConfig->WwwPort != 0) {
  553. wwwServer.Reset(new TBusWwwHttpServer(www, TheConfig->WwwPort));
  554. }
  555. /* sit here until signal terminate our process */
  556. StopEvent.WaitT(TDuration::Seconds(TheConfig->Run));
  557. TheExit = true;
  558. StopEvent.Signal();
  559. if (!!Server) {
  560. Cerr << "Stopping server\n";
  561. Server->Stop();
  562. }
  563. if (!!ServerUsingModule) {
  564. Cerr << "Stopping server (using modules)\n";
  565. ServerUsingModule->Stop();
  566. }
  567. TVector<TSimpleSharedPtr<TPerftestClient>> clients;
  568. {
  569. TGuard<TMutex> guard(ClientsLock);
  570. clients = Clients;
  571. }
  572. if (!clients.empty()) {
  573. Cerr << "Stopping clients\n";
  574. for (auto& client : clients) {
  575. client->Stop();
  576. }
  577. }
  578. wwwServer.Destroy();
  579. for (const auto& future : futures) {
  580. future->Get();
  581. }
  582. if (TheConfig->Profile) {
  583. EndProfiling();
  584. }
  585. Cerr << "***SUCCESS***\n";
  586. return 0;
  587. }