messagebus_ut.cpp 37 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151
  1. #include <library/cpp/testing/unittest/registar.h>
  2. #include <library/cpp/messagebus/test/helper/example.h>
  3. #include <library/cpp/messagebus/test/helper/fixed_port.h>
  4. #include <library/cpp/messagebus/test/helper/hanging_server.h>
  5. #include <library/cpp/messagebus/test/helper/object_count_check.h>
  6. #include <library/cpp/messagebus/test/helper/wait_for.h>
  7. #include <library/cpp/messagebus/misc/test_sync.h>
  8. #include <util/network/sock.h>
  9. #include <utility>
  10. using namespace NBus;
  11. using namespace NBus::NTest;
  12. namespace {
  13. struct TExampleClientSlowOnMessageSent: public TExampleClient {
  14. TAtomic SentCompleted;
  15. TSystemEvent ReplyReceived;
  16. TExampleClientSlowOnMessageSent()
  17. : SentCompleted(0)
  18. {
  19. }
  20. ~TExampleClientSlowOnMessageSent() override {
  21. Session->Shutdown();
  22. }
  23. void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override {
  24. Y_ABORT_UNLESS(AtomicGet(SentCompleted), "must be completed");
  25. TExampleClient::OnReply(mess, reply);
  26. ReplyReceived.Signal();
  27. }
  28. void OnMessageSent(TBusMessage*) override {
  29. Sleep(TDuration::MilliSeconds(100));
  30. AtomicSet(SentCompleted, 1);
  31. }
  32. };
  33. }
  34. Y_UNIT_TEST_SUITE(TMessageBusTests) {
  35. void TestDestinationTemplate(bool useCompression, bool ackMessageBeforeReply,
  36. const TBusServerSessionConfig& sessionConfig) {
  37. TObjectCountCheck objectCountCheck;
  38. TExampleServer server;
  39. TExampleClient client(sessionConfig);
  40. client.CrashOnError = true;
  41. server.UseCompression = useCompression;
  42. client.UseCompression = useCompression;
  43. server.AckMessageBeforeSendReply = ackMessageBeforeReply;
  44. client.SendMessagesWaitReplies(100, server.GetActualListenAddr());
  45. UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0);
  46. UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0);
  47. }
  48. Y_UNIT_TEST(TestDestination) {
  49. TestDestinationTemplate(false, false, TBusServerSessionConfig());
  50. }
  51. Y_UNIT_TEST(TestDestinationUsingAck) {
  52. TestDestinationTemplate(false, true, TBusServerSessionConfig());
  53. }
  54. Y_UNIT_TEST(TestDestinationWithCompression) {
  55. TestDestinationTemplate(true, false, TBusServerSessionConfig());
  56. }
  57. Y_UNIT_TEST(TestCork) {
  58. TBusServerSessionConfig config;
  59. config.SendThreshold = 1000000000000;
  60. config.Cork = TDuration::MilliSeconds(10);
  61. TestDestinationTemplate(false, false, config);
  62. // TODO: test for cork hanging
  63. }
  64. Y_UNIT_TEST(TestReconnect) {
  65. if (!IsFixedPortTestAllowed()) {
  66. return;
  67. }
  68. TObjectCountCheck objectCountCheck;
  69. unsigned port = FixedPort;
  70. TNetAddr serverAddr("localhost", port);
  71. THolder<TExampleServer> server;
  72. TBusClientSessionConfig clientConfig;
  73. clientConfig.RetryInterval = 0;
  74. TExampleClient client(clientConfig);
  75. server.Reset(new TExampleServer(port, "TExampleServer 1"));
  76. client.SendMessagesWaitReplies(17, serverAddr);
  77. server.Destroy();
  78. // Making the client to detect disconnection.
  79. client.SendMessages(1, serverAddr);
  80. EMessageStatus error = client.WaitForError();
  81. if (error == MESSAGE_DELIVERY_FAILED) {
  82. client.SendMessages(1, serverAddr);
  83. error = client.WaitForError();
  84. }
  85. UNIT_ASSERT_VALUES_EQUAL(MESSAGE_CONNECT_FAILED, error);
  86. server.Reset(new TExampleServer(port, "TExampleServer 2"));
  87. client.SendMessagesWaitReplies(19, serverAddr);
  88. }
  89. struct TestNoServerImplClient: public TExampleClient {
  90. TTestSync TestSync;
  91. int failures = 0;
  92. template <typename... Args>
  93. TestNoServerImplClient(Args&&... args)
  94. : TExampleClient(std::forward<Args>(args)...)
  95. {
  96. }
  97. ~TestNoServerImplClient() override {
  98. Session->Shutdown();
  99. }
  100. void OnError(TAutoPtr<TBusMessage> message, EMessageStatus status) override {
  101. Y_UNUSED(message);
  102. Y_ABORT_UNLESS(status == MESSAGE_CONNECT_FAILED, "must be MESSAGE_CONNECT_FAILED, got %s", ToString(status).data());
  103. TestSync.CheckAndIncrement((failures++) * 2);
  104. }
  105. };
  106. void TestNoServerImpl(unsigned port, bool oneWay) {
  107. TNetAddr noServerAddr("localhost", port);
  108. TestNoServerImplClient client;
  109. int count = 0;
  110. for (; count < 200; ++count) {
  111. EMessageStatus status;
  112. if (oneWay) {
  113. status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr);
  114. } else {
  115. TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
  116. status = client.Session->SendMessageAutoPtr(message, &noServerAddr);
  117. }
  118. Y_ABORT_UNLESS(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
  119. if (count == 0) {
  120. // lame way to wait until it is connected
  121. Sleep(TDuration::MilliSeconds(10));
  122. }
  123. client.TestSync.WaitForAndIncrement(count * 2 + 1);
  124. }
  125. client.TestSync.WaitForAndIncrement(count * 2);
  126. }
  127. void HangingServerImpl(unsigned port) {
  128. TNetAddr noServerAddr("localhost", port);
  129. TExampleClient client;
  130. int count = 0;
  131. for (;; ++count) {
  132. TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
  133. EMessageStatus status = client.Session->SendMessageAutoPtr(message, &noServerAddr);
  134. if (status == MESSAGE_BUSY) {
  135. break;
  136. }
  137. UNIT_ASSERT_VALUES_EQUAL(int(MESSAGE_OK), int(status));
  138. if (count == 0) {
  139. // lame way to wait until it is connected
  140. Sleep(TDuration::MilliSeconds(10));
  141. }
  142. }
  143. UNIT_ASSERT_VALUES_EQUAL(client.Session->GetConfig()->MaxInFlight, count);
  144. }
  145. Y_UNIT_TEST(TestHangindServer) {
  146. TObjectCountCheck objectCountCheck;
  147. THangingServer server(0);
  148. HangingServerImpl(server.GetPort());
  149. }
  150. Y_UNIT_TEST(TestNoServer) {
  151. TObjectCountCheck objectCountCheck;
  152. TestNoServerImpl(17, false);
  153. }
  154. Y_UNIT_TEST(PauseInput) {
  155. TObjectCountCheck objectCountCheck;
  156. TExampleServer server;
  157. server.Session->PauseInput(true);
  158. TBusClientSessionConfig clientConfig;
  159. clientConfig.MaxInFlight = 1000;
  160. TExampleClient client(clientConfig);
  161. client.SendMessages(100, server.GetActualListenAddr());
  162. server.TestSync.Check(0);
  163. server.Session->PauseInput(false);
  164. server.TestSync.WaitFor(100);
  165. client.WaitReplies();
  166. server.Session->PauseInput(true);
  167. client.SendMessages(200, server.GetActualListenAddr());
  168. server.TestSync.Check(100);
  169. server.Session->PauseInput(false);
  170. server.TestSync.WaitFor(300);
  171. client.WaitReplies();
  172. }
  173. struct TSendTimeoutCheckerExampleClient: public TExampleClient {
  174. static TBusClientSessionConfig SessionConfig(bool periodLessThanConnectTimeout) {
  175. TBusClientSessionConfig sessionConfig;
  176. if (periodLessThanConnectTimeout) {
  177. sessionConfig.SendTimeout = 1;
  178. sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(50);
  179. } else {
  180. sessionConfig.SendTimeout = 50;
  181. sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1);
  182. }
  183. return sessionConfig;
  184. }
  185. TSendTimeoutCheckerExampleClient(bool periodLessThanConnectTimeout)
  186. : TExampleClient(SessionConfig(periodLessThanConnectTimeout))
  187. {
  188. }
  189. ~TSendTimeoutCheckerExampleClient() override {
  190. Session->Shutdown();
  191. }
  192. TSystemEvent ErrorHappened;
  193. void OnError(TAutoPtr<TBusMessage>, EMessageStatus status) override {
  194. Y_ABORT_UNLESS(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "got status: %s", ToString(status).data());
  195. ErrorHappened.Signal();
  196. }
  197. };
  198. void NoServer_SendTimeout_Callback_Impl(bool periodLessThanConnectTimeout) {
  199. TObjectCountCheck objectCountCheck;
  200. TNetAddr serverAddr("localhost", 17);
  201. TSendTimeoutCheckerExampleClient client(periodLessThanConnectTimeout);
  202. client.SendMessages(1, serverAddr);
  203. client.ErrorHappened.WaitI();
  204. }
  205. Y_UNIT_TEST(NoServer_SendTimeout_Callback_PeriodLess) {
  206. NoServer_SendTimeout_Callback_Impl(true);
  207. }
  208. Y_UNIT_TEST(NoServer_SendTimeout_Callback_TimeoutLess) {
  209. NoServer_SendTimeout_Callback_Impl(false);
  210. }
  211. Y_UNIT_TEST(TestOnReplyCalledAfterOnMessageSent) {
  212. TObjectCountCheck objectCountCheck;
  213. TExampleServer server;
  214. TNetAddr serverAddr = server.GetActualListenAddr();
  215. TExampleClientSlowOnMessageSent client;
  216. TAutoPtr<TExampleRequest> message(new TExampleRequest(&client.Proto.RequestCount));
  217. EMessageStatus s = client.Session->SendMessageAutoPtr(message, &serverAddr);
  218. UNIT_ASSERT_EQUAL(s, MESSAGE_OK);
  219. UNIT_ASSERT(client.ReplyReceived.WaitT(TDuration::Seconds(5)));
  220. }
  221. struct TDelayReplyServer: public TBusServerHandlerError {
  222. TBusMessageQueuePtr Bus;
  223. TExampleProtocol Proto;
  224. TSystemEvent MessageReceivedEvent; // 1 wait for 1 message
  225. TBusServerSessionPtr Session;
  226. TMutex Lock_;
  227. TDeque<TAutoPtr<TOnMessageContext>> DelayedMessages;
  228. TDelayReplyServer()
  229. : MessageReceivedEvent(TEventResetType::rAuto)
  230. {
  231. Bus = CreateMessageQueue("TDelayReplyServer");
  232. TBusServerSessionConfig sessionConfig;
  233. sessionConfig.SendTimeout = 1000;
  234. sessionConfig.TotalTimeout = 2001;
  235. Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
  236. if (!Session) {
  237. ythrow yexception() << "Failed to create destination session";
  238. }
  239. }
  240. void OnMessage(TOnMessageContext& mess) override {
  241. Y_ABORT_UNLESS(mess.IsConnectionAlive(), "connection should be alive here");
  242. TAutoPtr<TOnMessageContext> delayedMsg(new TOnMessageContext);
  243. delayedMsg->Swap(mess);
  244. auto g(Guard(Lock_));
  245. DelayedMessages.push_back(delayedMsg);
  246. MessageReceivedEvent.Signal();
  247. }
  248. bool CheckClientIsAlive() {
  249. auto g(Guard(Lock_));
  250. for (auto& delayedMessage : DelayedMessages) {
  251. if (!delayedMessage->IsConnectionAlive()) {
  252. return false;
  253. }
  254. }
  255. return true;
  256. }
  257. bool CheckClientIsDead() const {
  258. auto g(Guard(Lock_));
  259. for (const auto& delayedMessage : DelayedMessages) {
  260. if (delayedMessage->IsConnectionAlive()) {
  261. return false;
  262. }
  263. }
  264. return true;
  265. }
  266. void ReplyToDelayedMessages() {
  267. while (true) {
  268. TOnMessageContext msg;
  269. {
  270. auto g(Guard(Lock_));
  271. if (DelayedMessages.empty()) {
  272. break;
  273. }
  274. DelayedMessages.front()->Swap(msg);
  275. DelayedMessages.pop_front();
  276. }
  277. TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount));
  278. msg.SendReplyMove(reply);
  279. }
  280. }
  281. size_t GetDelayedMessageCount() const {
  282. auto g(Guard(Lock_));
  283. return DelayedMessages.size();
  284. }
  285. void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
  286. Y_UNUSED(mess);
  287. Y_ABORT_UNLESS(status == MESSAGE_SHUTDOWN, "only shutdown allowed, got %s", ToString(status).data());
  288. }
  289. };
  290. Y_UNIT_TEST(TestReplyCalledAfterClientDisconnected) {
  291. TObjectCountCheck objectCountCheck;
  292. TDelayReplyServer server;
  293. THolder<TExampleClient> client(new TExampleClient);
  294. client->SendMessages(1, TNetAddr("localhost", server.Session->GetActualListenPort()));
  295. UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5)));
  296. UNIT_ASSERT_VALUES_EQUAL(1, server.Session->GetInFlight());
  297. client.Destroy();
  298. UNIT_WAIT_FOR(server.CheckClientIsDead());
  299. server.ReplyToDelayedMessages();
  300. // wait until all server message are delivered
  301. UNIT_WAIT_FOR(0 == server.Session->GetInFlight());
  302. }
  303. struct TPackUnpackServer: public TBusServerHandlerError {
  304. TBusMessageQueuePtr Bus;
  305. TExampleProtocol Proto;
  306. TSystemEvent MessageReceivedEvent;
  307. TSystemEvent ClientDiedEvent;
  308. TBusServerSessionPtr Session;
  309. TPackUnpackServer() {
  310. Bus = CreateMessageQueue("TPackUnpackServer");
  311. TBusServerSessionConfig sessionConfig;
  312. Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
  313. }
  314. void OnMessage(TOnMessageContext& mess) override {
  315. TBusIdentity ident;
  316. mess.AckMessage(ident);
  317. char packed[BUS_IDENTITY_PACKED_SIZE];
  318. ident.Pack(packed);
  319. TBusIdentity resurrected;
  320. resurrected.Unpack(packed);
  321. mess.GetSession()->SendReply(resurrected, new TExampleResponse(&Proto.ResponseCount));
  322. }
  323. void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
  324. Y_UNUSED(mess);
  325. Y_ABORT_UNLESS(status == MESSAGE_SHUTDOWN, "only shutdown allowed");
  326. }
  327. };
  328. Y_UNIT_TEST(PackUnpack) {
  329. TObjectCountCheck objectCountCheck;
  330. TPackUnpackServer server;
  331. THolder<TExampleClient> client(new TExampleClient);
  332. client->SendMessagesWaitReplies(1, TNetAddr("localhost", server.Session->GetActualListenPort()));
  333. }
  334. Y_UNIT_TEST(ClientRequestTooLarge) {
  335. TObjectCountCheck objectCountCheck;
  336. TExampleServer server;
  337. TBusClientSessionConfig clientConfig;
  338. clientConfig.MaxMessageSize = 100;
  339. TExampleClient client(clientConfig);
  340. client.DataSize = 10;
  341. client.SendMessagesWaitReplies(1, server.GetActualListenAddr());
  342. client.DataSize = 1000;
  343. client.SendMessages(1, server.GetActualListenAddr());
  344. client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE);
  345. client.DataSize = 20;
  346. client.SendMessagesWaitReplies(10, server.GetActualListenAddr());
  347. client.DataSize = 10000;
  348. client.SendMessages(1, server.GetActualListenAddr());
  349. client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE);
  350. }
  351. struct TServerForResponseTooLarge: public TExampleServer {
  352. TTestSync TestSync;
  353. static TBusServerSessionConfig Config() {
  354. TBusServerSessionConfig config;
  355. config.MaxMessageSize = 100;
  356. return config;
  357. }
  358. TServerForResponseTooLarge()
  359. : TExampleServer("TServerForResponseTooLarge", Config())
  360. {
  361. }
  362. ~TServerForResponseTooLarge() override {
  363. Session->Shutdown();
  364. }
  365. void OnMessage(TOnMessageContext& mess) override {
  366. TAutoPtr<TBusMessage> response;
  367. if (TestSync.Get() == 0) {
  368. TestSync.CheckAndIncrement(0);
  369. response.Reset(new TExampleResponse(&Proto.ResponseCount, 1000));
  370. } else {
  371. TestSync.WaitForAndIncrement(3);
  372. response.Reset(new TExampleResponse(&Proto.ResponseCount, 10));
  373. }
  374. mess.SendReplyMove(response);
  375. }
  376. void OnError(TAutoPtr<TBusMessage>, EMessageStatus status) override {
  377. TestSync.WaitForAndIncrement(1);
  378. Y_ABORT_UNLESS(status == MESSAGE_MESSAGE_TOO_LARGE, "status");
  379. }
  380. };
  381. Y_UNIT_TEST(ServerResponseTooLarge) {
  382. TObjectCountCheck objectCountCheck;
  383. TServerForResponseTooLarge server;
  384. TExampleClient client;
  385. client.DataSize = 10;
  386. client.SendMessages(1, server.GetActualListenAddr());
  387. server.TestSync.WaitForAndIncrement(2);
  388. client.ResetCounters();
  389. client.SendMessages(1, server.GetActualListenAddr());
  390. client.WorkDone.WaitI();
  391. server.TestSync.CheckAndIncrement(4);
  392. UNIT_ASSERT_VALUES_EQUAL(1, client.Session->GetInFlight());
  393. }
  394. struct TServerForRequestTooLarge: public TExampleServer {
  395. TTestSync TestSync;
  396. static TBusServerSessionConfig Config() {
  397. TBusServerSessionConfig config;
  398. config.MaxMessageSize = 100;
  399. return config;
  400. }
  401. TServerForRequestTooLarge()
  402. : TExampleServer("TServerForRequestTooLarge", Config())
  403. {
  404. }
  405. ~TServerForRequestTooLarge() override {
  406. Session->Shutdown();
  407. }
  408. void OnMessage(TOnMessageContext& req) override {
  409. unsigned n = TestSync.Get();
  410. if (n < 2) {
  411. TestSync.CheckAndIncrement(n);
  412. TAutoPtr<TExampleResponse> resp(new TExampleResponse(&Proto.ResponseCount, 10));
  413. req.SendReplyMove(resp);
  414. } else {
  415. Y_ABORT("wrong");
  416. }
  417. }
  418. };
  419. Y_UNIT_TEST(ServerRequestTooLarge) {
  420. TObjectCountCheck objectCountCheck;
  421. TServerForRequestTooLarge server;
  422. TExampleClient client;
  423. client.DataSize = 10;
  424. client.SendMessagesWaitReplies(2, server.GetActualListenAddr());
  425. server.TestSync.CheckAndIncrement(2);
  426. client.DataSize = 200;
  427. client.SendMessages(1, server.GetActualListenAddr());
  428. // server closes connection, so MESSAGE_DELIVERY_FAILED is returned to client
  429. client.WaitForError(MESSAGE_DELIVERY_FAILED);
  430. }
  431. Y_UNIT_TEST(ClientResponseTooLarge) {
  432. TObjectCountCheck objectCountCheck;
  433. TExampleServer server;
  434. server.DataSize = 10;
  435. TBusClientSessionConfig clientSessionConfig;
  436. clientSessionConfig.MaxMessageSize = 100;
  437. TExampleClient client(clientSessionConfig);
  438. client.DataSize = 10;
  439. client.SendMessagesWaitReplies(3, server.GetActualListenAddr());
  440. server.DataSize = 1000;
  441. client.SendMessages(1, server.GetActualListenAddr());
  442. client.WaitForError(MESSAGE_DELIVERY_FAILED);
  443. }
  444. Y_UNIT_TEST(ServerUnknownMessage) {
  445. TObjectCountCheck objectCountCheck;
  446. TExampleServer server;
  447. TNetAddr serverAddr = server.GetActualListenAddr();
  448. TExampleClient client;
  449. client.SendMessagesWaitReplies(2, serverAddr);
  450. TAutoPtr<TBusMessage> req(new TExampleRequest(&client.Proto.RequestCount));
  451. req->GetHeader()->Type = 11;
  452. client.Session->SendMessageAutoPtr(req, &serverAddr);
  453. client.MessageCount = 1;
  454. client.WaitForError(MESSAGE_DELIVERY_FAILED);
  455. }
  456. Y_UNIT_TEST(ServerMessageReservedIds) {
  457. TObjectCountCheck objectCountCheck;
  458. TExampleServer server;
  459. TNetAddr serverAddr = server.GetActualListenAddr();
  460. TExampleClient client;
  461. client.SendMessagesWaitReplies(2, serverAddr);
  462. // This test doens't check 0, 1, YBUS_KEYINVALID because there are asserts() on sending side
  463. TAutoPtr<TBusMessage> req(new TExampleRequest(&client.Proto.RequestCount));
  464. req->GetHeader()->Id = 2;
  465. client.Session->SendMessageAutoPtr(req, &serverAddr);
  466. client.MessageCount = 1;
  467. client.WaitForError(MESSAGE_DELIVERY_FAILED);
  468. req.Reset(new TExampleRequest(&client.Proto.RequestCount));
  469. req->GetHeader()->Id = YBUS_KEYLOCAL;
  470. client.Session->SendMessageAutoPtr(req, &serverAddr);
  471. client.MessageCount = 1;
  472. client.WaitForError(MESSAGE_DELIVERY_FAILED);
  473. }
  474. Y_UNIT_TEST(TestGetInFlightForDestination) {
  475. TObjectCountCheck objectCountCheck;
  476. TDelayReplyServer server;
  477. TExampleClient client;
  478. TNetAddr addr("localhost", server.Session->GetActualListenPort());
  479. UNIT_ASSERT_VALUES_EQUAL(size_t(0), client.Session->GetInFlight(addr));
  480. client.SendMessages(2, &addr);
  481. for (size_t i = 0; i < 5; ++i) {
  482. // One MessageReceivedEvent indicates one message, we need to wait for two
  483. UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5)));
  484. if (server.GetDelayedMessageCount() == 2) {
  485. break;
  486. }
  487. }
  488. UNIT_ASSERT_VALUES_EQUAL(server.GetDelayedMessageCount(), 2);
  489. size_t inFlight = client.Session->GetInFlight(addr);
  490. // 4 is for messagebus1 that adds inFlight counter twice for some reason
  491. UNIT_ASSERT(inFlight == 2 || inFlight == 4);
  492. UNIT_ASSERT(server.CheckClientIsAlive());
  493. server.ReplyToDelayedMessages();
  494. client.WaitReplies();
  495. }
  496. struct TResetAfterSendOneWayErrorInCallbackClient: public TExampleClient {
  497. TTestSync TestSync;
  498. static TBusClientSessionConfig SessionConfig() {
  499. TBusClientSessionConfig config;
  500. // 1 ms is not enough when test is running under valgrind
  501. config.ConnectTimeout = 10;
  502. config.SendTimeout = 10;
  503. config.Secret.TimeoutPeriod = TDuration::MilliSeconds(1);
  504. return config;
  505. }
  506. TResetAfterSendOneWayErrorInCallbackClient()
  507. : TExampleClient(SessionConfig())
  508. {
  509. }
  510. ~TResetAfterSendOneWayErrorInCallbackClient() override {
  511. Session->Shutdown();
  512. }
  513. void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
  514. TestSync.WaitForAndIncrement(0);
  515. Y_ABORT_UNLESS(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "must be connection failed, got %s", ToString(status).data());
  516. mess.Destroy();
  517. TestSync.CheckAndIncrement(1);
  518. }
  519. };
  520. Y_UNIT_TEST(ResetAfterSendOneWayErrorInCallback) {
  521. TObjectCountCheck objectCountCheck;
  522. TNetAddr noServerAddr("localhost", 17);
  523. TResetAfterSendOneWayErrorInCallbackClient client;
  524. EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr);
  525. UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
  526. client.TestSync.WaitForAndIncrement(2);
  527. }
  528. struct TResetAfterSendMessageOneWayDuringShutdown: public TExampleClient {
  529. TTestSync TestSync;
  530. ~TResetAfterSendMessageOneWayDuringShutdown() override {
  531. Session->Shutdown();
  532. }
  533. void OnError(TAutoPtr<TBusMessage> message, EMessageStatus status) override {
  534. TestSync.CheckAndIncrement(0);
  535. Y_ABORT_UNLESS(status == MESSAGE_CONNECT_FAILED, "must be MESSAGE_CONNECT_FAILED, got %s", ToString(status).data());
  536. // check reset is possible here
  537. message->Reset();
  538. // intentionally don't destroy the message
  539. // we will try to resend it
  540. Y_UNUSED(message.Release());
  541. TestSync.CheckAndIncrement(1);
  542. }
  543. };
  544. Y_UNIT_TEST(ResetAfterSendMessageOneWayDuringShutdown) {
  545. TObjectCountCheck objectCountCheck;
  546. TNetAddr noServerAddr("localhost", 17);
  547. TResetAfterSendMessageOneWayDuringShutdown client;
  548. TExampleRequest* message = new TExampleRequest(&client.Proto.RequestCount);
  549. EMessageStatus ok = client.Session->SendMessageOneWay(message, &noServerAddr);
  550. UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
  551. client.TestSync.WaitForAndIncrement(2);
  552. client.Session->Shutdown();
  553. ok = client.Session->SendMessageOneWay(message);
  554. Y_ABORT_UNLESS(ok == MESSAGE_SHUTDOWN, "must be shutdown when sending during shutdown, got %s", ToString(ok).data());
  555. // check reset is possible here
  556. message->Reset();
  557. client.TestSync.CheckAndIncrement(3);
  558. delete message;
  559. }
  560. Y_UNIT_TEST(ResetAfterSendOneWayErrorInReturn) {
  561. TObjectCountCheck objectCountCheck;
  562. TestNoServerImpl(17, true);
  563. }
  564. struct TResetAfterSendOneWaySuccessClient: public TExampleClient {
  565. TTestSync TestSync;
  566. ~TResetAfterSendOneWaySuccessClient() override {
  567. Session->Shutdown();
  568. }
  569. void OnMessageSentOneWay(TAutoPtr<TBusMessage> sent) override {
  570. TestSync.WaitForAndIncrement(0);
  571. sent->Reset();
  572. TestSync.CheckAndIncrement(1);
  573. }
  574. };
  575. Y_UNIT_TEST(ResetAfterSendOneWaySuccess) {
  576. TObjectCountCheck objectCountCheck;
  577. TExampleServer server;
  578. TNetAddr serverAddr = server.GetActualListenAddr();
  579. TResetAfterSendOneWaySuccessClient client;
  580. EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &serverAddr);
  581. UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
  582. // otherwize message might go to OnError(MESSAGE_SHUTDOWN)
  583. server.WaitForOnMessageCount(1);
  584. client.TestSync.WaitForAndIncrement(2);
  585. }
  586. Y_UNIT_TEST(GetStatus) {
  587. TObjectCountCheck objectCountCheck;
  588. TExampleServer server;
  589. TExampleClient client;
  590. // make sure connected
  591. client.SendMessagesWaitReplies(3, server.GetActualListenAddr());
  592. server.Bus->GetStatus();
  593. server.Bus->GetStatus();
  594. server.Bus->GetStatus();
  595. client.Bus->GetStatus();
  596. client.Bus->GetStatus();
  597. client.Bus->GetStatus();
  598. }
  599. Y_UNIT_TEST(BindOnRandomPort) {
  600. TObjectCountCheck objectCountCheck;
  601. TBusServerSessionConfig serverConfig;
  602. TExampleServer server;
  603. TExampleClient client;
  604. TNetAddr addr(TNetAddr("127.0.0.1", server.Session->GetActualListenPort()));
  605. client.SendMessagesWaitReplies(3, &addr);
  606. }
  607. Y_UNIT_TEST(UnbindOnShutdown) {
  608. TBusMessageQueuePtr queue(CreateMessageQueue());
  609. TExampleProtocol proto;
  610. TBusServerHandlerError handler;
  611. TBusServerSessionPtr session = TBusServerSession::Create(
  612. &proto, &handler, TBusServerSessionConfig(), queue);
  613. unsigned port = session->GetActualListenPort();
  614. UNIT_ASSERT(port > 0);
  615. session->Shutdown();
  616. // fails is Shutdown() didn't unbind
  617. THangingServer hangingServer(port);
  618. }
  619. Y_UNIT_TEST(VersionNegotiation) {
  620. TObjectCountCheck objectCountCheck;
  621. TExampleServer server;
  622. TSockAddrInet addr(IpFromString("127.0.0.1"), server.Session->GetActualListenPort());
  623. TInetStreamSocket socket;
  624. int r1 = socket.Connect(&addr);
  625. UNIT_ASSERT(r1 >= 0);
  626. TStreamSocketOutput output(&socket);
  627. TBusHeader request;
  628. Zero(request);
  629. request.Size = sizeof(request);
  630. request.SetVersionInternal(0xF); // max
  631. output.Write(&request, sizeof(request));
  632. UNIT_ASSERT_VALUES_EQUAL(IsVersionNegotiation(request), true);
  633. TStreamSocketInput input(&socket);
  634. TBusHeader response;
  635. size_t pos = 0;
  636. while (pos < sizeof(response)) {
  637. size_t count = input.Read(((char*)&response) + pos, sizeof(response) - pos);
  638. pos += count;
  639. }
  640. UNIT_ASSERT_VALUES_EQUAL(sizeof(response), pos);
  641. UNIT_ASSERT_VALUES_EQUAL(YBUS_VERSION, response.GetVersionInternal());
  642. }
  643. struct TOnConnectionEventClient: public TExampleClient {
  644. TTestSync Sync;
  645. ~TOnConnectionEventClient() override {
  646. Session->Shutdown();
  647. }
  648. void OnClientConnectionEvent(const TClientConnectionEvent& event) override {
  649. if (Sync.Get() > 2) {
  650. // Test OnClientConnectionEvent_Disconnect is broken.
  651. // Sometimes reconnect happens during server shutdown
  652. // when acceptor connections is still alive, and
  653. // server connection is already closed
  654. return;
  655. }
  656. if (event.GetType() == TClientConnectionEvent::CONNECTED) {
  657. Sync.WaitForAndIncrement(0);
  658. } else if (event.GetType() == TClientConnectionEvent::DISCONNECTED) {
  659. Sync.WaitForAndIncrement(2);
  660. }
  661. }
  662. void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override {
  663. // We do not check for message errors in this test.
  664. }
  665. void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override {
  666. }
  667. };
  668. struct TOnConnectionEventServer: public TExampleServer {
  669. TOnConnectionEventServer()
  670. : TExampleServer("TOnConnectionEventServer")
  671. {
  672. }
  673. ~TOnConnectionEventServer() override {
  674. Session->Shutdown();
  675. }
  676. void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override {
  677. // We do not check for server message errors in this test.
  678. }
  679. };
  680. Y_UNIT_TEST(OnClientConnectionEvent_Shutdown) {
  681. TObjectCountCheck objectCountCheck;
  682. TOnConnectionEventServer server;
  683. TOnConnectionEventClient client;
  684. TNetAddr addr("127.0.0.1", server.Session->GetActualListenPort());
  685. client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr);
  686. client.Sync.WaitForAndIncrement(1);
  687. client.Session->Shutdown();
  688. client.Sync.WaitForAndIncrement(3);
  689. }
  690. Y_UNIT_TEST(OnClientConnectionEvent_Disconnect) {
  691. TObjectCountCheck objectCountCheck;
  692. THolder<TOnConnectionEventServer> server(new TOnConnectionEventServer);
  693. TOnConnectionEventClient client;
  694. TNetAddr addr("127.0.0.1", server->Session->GetActualListenPort());
  695. client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr);
  696. client.Sync.WaitForAndIncrement(1);
  697. server.Destroy();
  698. client.Sync.WaitForAndIncrement(3);
  699. }
  700. struct TServerForQuotaWake: public TExampleServer {
  701. TSystemEvent GoOn;
  702. TMutex OneLock;
  703. TOnMessageContext OneMessage;
  704. static TBusServerSessionConfig Config() {
  705. TBusServerSessionConfig config;
  706. config.PerConnectionMaxInFlight = 1;
  707. config.PerConnectionMaxInFlightBySize = 1500;
  708. config.MaxMessageSize = 1024;
  709. return config;
  710. }
  711. TServerForQuotaWake()
  712. : TExampleServer("TServerForQuotaWake", Config())
  713. {
  714. }
  715. ~TServerForQuotaWake() override {
  716. Session->Shutdown();
  717. }
  718. void OnMessage(TOnMessageContext& req) override {
  719. if (!GoOn.Wait(0)) {
  720. TGuard<TMutex> guard(OneLock);
  721. UNIT_ASSERT(!OneMessage);
  722. OneMessage.Swap(req);
  723. } else
  724. TExampleServer::OnMessage(req);
  725. }
  726. void WakeOne() {
  727. TGuard<TMutex> guard(OneLock);
  728. UNIT_ASSERT(!!OneMessage);
  729. TExampleServer::OnMessage(OneMessage);
  730. TOnMessageContext().Swap(OneMessage);
  731. }
  732. };
  733. Y_UNIT_TEST(WakeReaderOnQuota) {
  734. const size_t test_msg_count = 64;
  735. TBusClientSessionConfig clientConfig;
  736. clientConfig.MaxInFlight = test_msg_count;
  737. TExampleClient client(clientConfig);
  738. TServerForQuotaWake server;
  739. TInstant start;
  740. client.MessageCount = test_msg_count;
  741. const NBus::TNetAddr addr = server.GetActualListenAddr();
  742. for (unsigned count = 0;;) {
  743. UNIT_ASSERT(count <= test_msg_count);
  744. TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
  745. EMessageStatus status = client.Session->SendMessageAutoPtr(message, &addr);
  746. if (status == MESSAGE_OK) {
  747. count++;
  748. } else if (status == MESSAGE_BUSY) {
  749. if (count == test_msg_count) {
  750. TInstant now = TInstant::Now();
  751. if (start.GetValue() == 0) {
  752. start = now;
  753. // TODO: properly check that server is blocked
  754. } else if (start + TDuration::MilliSeconds(100) < now) {
  755. break;
  756. }
  757. }
  758. Sleep(TDuration::MilliSeconds(10));
  759. } else
  760. UNIT_ASSERT(false);
  761. }
  762. server.GoOn.Signal();
  763. server.WakeOne();
  764. client.WaitReplies();
  765. server.WaitForOnMessageCount(test_msg_count);
  766. }
  767. Y_UNIT_TEST(TestConnectionAttempts) {
  768. TObjectCountCheck objectCountCheck;
  769. TNetAddr noServerAddr("localhost", 17);
  770. TBusClientSessionConfig clientConfig;
  771. clientConfig.RetryInterval = 100;
  772. TestNoServerImplClient client(clientConfig);
  773. int count = 0;
  774. for (; count < 10; ++count) {
  775. EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
  776. &noServerAddr);
  777. Y_ABORT_UNLESS(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
  778. client.TestSync.WaitForAndIncrement(count * 2 + 1);
  779. // First connection attempt is for connect call; second one is to get connect result.
  780. UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
  781. }
  782. Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval));
  783. for (; count < 10; ++count) {
  784. EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
  785. &noServerAddr);
  786. Y_ABORT_UNLESS(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
  787. client.TestSync.WaitForAndIncrement(count * 2 + 1);
  788. // First connection attempt is for connect call; second one is to get connect result.
  789. UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 4);
  790. }
  791. }
  792. Y_UNIT_TEST(TestConnectionAttemptsOnNoMessagesAndNotReconnectWhenIdle) {
  793. TObjectCountCheck objectCountCheck;
  794. TNetAddr noServerAddr("localhost", 17);
  795. TBusClientSessionConfig clientConfig;
  796. clientConfig.RetryInterval = 100;
  797. clientConfig.ReconnectWhenIdle = false;
  798. TestNoServerImplClient client(clientConfig);
  799. int count = 0;
  800. for (; count < 10; ++count) {
  801. EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
  802. &noServerAddr);
  803. Y_ABORT_UNLESS(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
  804. client.TestSync.WaitForAndIncrement(count * 2 + 1);
  805. // First connection attempt is for connect call; second one is to get connect result.
  806. UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
  807. }
  808. Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval / 2));
  809. UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
  810. Sleep(TDuration::MilliSeconds(10 * clientConfig.RetryInterval));
  811. UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
  812. }
  813. Y_UNIT_TEST(TestConnectionAttemptsOnNoMessagesAndReconnectWhenIdle) {
  814. TObjectCountCheck objectCountCheck;
  815. TNetAddr noServerAddr("localhost", 17);
  816. TBusClientSessionConfig clientConfig;
  817. clientConfig.ReconnectWhenIdle = true;
  818. clientConfig.RetryInterval = 100;
  819. TestNoServerImplClient client(clientConfig);
  820. int count = 0;
  821. for (; count < 10; ++count) {
  822. EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
  823. &noServerAddr);
  824. Y_ABORT_UNLESS(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
  825. client.TestSync.WaitForAndIncrement(count * 2 + 1);
  826. // First connection attempt is for connect call; second one is to get connect result.
  827. UNIT_ASSERT_VALUES_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
  828. }
  829. Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval / 2));
  830. UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
  831. Sleep(TDuration::MilliSeconds(10 * clientConfig.RetryInterval));
  832. // it is undeterministic how many reconnects will be during that amount of time
  833. // but it should occur at least once
  834. UNIT_ASSERT(client.Session->GetConnectSyscallsNumForTest(noServerAddr) > 2);
  835. }
  836. }