one_way_ut.cpp 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. ///////////////////////////////////////////////////////////////////
  2. /// \file
  3. /// \brief Example of reply-less communication
  4. /// This example demostrates how asynchronous message passing library
  5. /// can be used to send message and do not wait for reply back.
  6. /// The usage of reply-less communication should be restricted to
  7. /// low-throughput clients and high-throughput server to provide reasonable
  8. /// utility. Removing replies from the communication removes any restriction
  9. /// on how many message can be send to server and rougue clients may overwelm
  10. /// server without thoughtput control.
  11. /// 1) To implement reply-less client \n
  12. /// Call NBus::TBusSession::AckMessage()
  13. /// from within NBus::IMessageHandler::OnSent() handler when message has
  14. /// gone into wire on client end. See example in NBus::NullClient::OnMessageSent().
  15. /// Discard identity for reply message.
  16. /// 2) To implement reply-less server \n
  17. /// Call NBus::TBusSession::AckMessage() from within NBus::IMessageHandler::OnMessage()
  18. /// handler when message has been received on server end.
  19. /// See example in NBus::NullServer::OnMessage().
  20. /// Discard identity for reply message.
  21. #include <library/cpp/messagebus/test/helper/alloc_counter.h>
  22. #include <library/cpp/messagebus/test/helper/example.h>
  23. #include <library/cpp/messagebus/test/helper/hanging_server.h>
  24. #include <library/cpp/messagebus/test/helper/message_handler_error.h>
  25. #include <library/cpp/messagebus/test/helper/object_count_check.h>
  26. #include <library/cpp/messagebus/test/helper/wait_for.h>
  27. #include <library/cpp/messagebus/ybus.h>
  28. using namespace std;
  29. using namespace NBus;
  30. using namespace NBus::NPrivate;
  31. using namespace NBus::NTest;
  32. ////////////////////////////////////////////////////////////////////
  33. ////////////////////////////////////////////////////////////////////
  34. /// \brief Reply-less client and handler
  35. struct NullClient : TBusClientHandlerError {
  36. TNetAddr ServerAddr;
  37. TBusMessageQueuePtr Queue;
  38. TBusClientSessionPtr Session;
  39. TExampleProtocol Proto;
  40. /// constructor creates instances of protocol and session
  41. NullClient(const TNetAddr& serverAddr, const TBusClientSessionConfig& sessionConfig = TBusClientSessionConfig())
  42. : ServerAddr(serverAddr)
  43. {
  44. UNIT_ASSERT(serverAddr.GetPort() > 0);
  45. /// create or get instance of message queue, need one per application
  46. Queue = CreateMessageQueue();
  47. /// register source/client session
  48. Session = TBusClientSession::Create(&Proto, this, sessionConfig, Queue);
  49. /// register service, announce to clients via LocatorService
  50. Session->RegisterService("localhost");
  51. }
  52. ~NullClient() override {
  53. Session->Shutdown();
  54. }
  55. /// dispatch of requests is done here
  56. void Work() {
  57. int batch = 10;
  58. for (int i = 0; i < batch; i++) {
  59. TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount);
  60. mess->Data = "TADA";
  61. Session->SendMessageOneWay(mess, &ServerAddr);
  62. }
  63. }
  64. void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override {
  65. }
  66. };
  67. /////////////////////////////////////////////////////////////////////
  68. /// \brief Reply-less server and handler
  69. class NullServer: public TBusServerHandlerError {
  70. public:
  71. /// session object to maintian
  72. TBusMessageQueuePtr Queue;
  73. TBusServerSessionPtr Session;
  74. TExampleProtocol Proto;
  75. public:
  76. TAtomic NumMessages;
  77. NullServer() {
  78. NumMessages = 0;
  79. /// create or get instance of single message queue, need one for application
  80. Queue = CreateMessageQueue();
  81. /// register destination session
  82. TBusServerSessionConfig sessionConfig;
  83. Session = TBusServerSession::Create(&Proto, this, sessionConfig, Queue);
  84. }
  85. ~NullServer() override {
  86. Session->Shutdown();
  87. }
  88. /// when message comes do not send reply, just acknowledge
  89. void OnMessage(TOnMessageContext& mess) override {
  90. TExampleRequest* fmess = static_cast<TExampleRequest*>(mess.GetMessage());
  91. Y_ASSERT(fmess->Data == "TADA");
  92. /// tell session to forget this message and never expect any reply
  93. mess.ForgetRequest();
  94. AtomicIncrement(NumMessages);
  95. }
  96. /// this handler should not be called because this server does not send replies
  97. void OnSent(TAutoPtr<TBusMessage> mess) override {
  98. Y_UNUSED(mess);
  99. Y_ABORT("This server does not sent replies");
  100. }
  101. };
  102. Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) {
  103. Y_UNIT_TEST(Simple) {
  104. TObjectCountCheck objectCountCheck;
  105. NullServer server;
  106. NullClient client(TNetAddr("localhost", server.Session->GetActualListenPort()));
  107. client.Work();
  108. // wait until all client message are delivered
  109. UNIT_WAIT_FOR(AtomicGet(server.NumMessages) == 10);
  110. // assert correct number of messages
  111. UNIT_ASSERT_VALUES_EQUAL(AtomicGet(server.NumMessages), 10);
  112. UNIT_ASSERT_VALUES_EQUAL(server.Session->GetInFlight(), 0);
  113. UNIT_ASSERT_VALUES_EQUAL(client.Session->GetInFlight(), 0);
  114. }
  115. struct TMessageTooLargeClient: public NullClient {
  116. TSystemEvent GotTooLarge;
  117. TBusClientSessionConfig Config() {
  118. TBusClientSessionConfig r;
  119. r.MaxMessageSize = 1;
  120. return r;
  121. }
  122. TMessageTooLargeClient(unsigned port)
  123. : NullClient(TNetAddr("localhost", port), Config())
  124. {
  125. }
  126. ~TMessageTooLargeClient() override {
  127. Session->Shutdown();
  128. }
  129. void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
  130. Y_UNUSED(mess);
  131. Y_ABORT_UNLESS(status == MESSAGE_MESSAGE_TOO_LARGE, "wrong status: %s", ToCString(status));
  132. GotTooLarge.Signal();
  133. }
  134. };
  135. Y_UNIT_TEST(MessageTooLargeOnClient) {
  136. TObjectCountCheck objectCountCheck;
  137. NullServer server;
  138. TMessageTooLargeClient client(server.Session->GetActualListenPort());
  139. EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
  140. UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
  141. client.GotTooLarge.WaitI();
  142. }
  143. struct TCheckTimeoutClient: public NullClient {
  144. ~TCheckTimeoutClient() override {
  145. Session->Shutdown();
  146. }
  147. static TBusClientSessionConfig SessionConfig() {
  148. TBusClientSessionConfig sessionConfig;
  149. sessionConfig.SendTimeout = 1;
  150. sessionConfig.ConnectTimeout = 1;
  151. sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10);
  152. return sessionConfig;
  153. }
  154. TCheckTimeoutClient(const TNetAddr& serverAddr)
  155. : NullClient(serverAddr, SessionConfig())
  156. {
  157. }
  158. TSystemEvent GotError;
  159. /// message that could not be delivered
  160. void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
  161. Y_UNUSED(mess);
  162. Y_UNUSED(status); // TODO: check status
  163. GotError.Signal();
  164. }
  165. };
  166. Y_UNIT_TEST(SendTimeout_Callback_NoServer) {
  167. TObjectCountCheck objectCountCheck;
  168. TCheckTimeoutClient client(TNetAddr("localhost", 17));
  169. EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
  170. UNIT_ASSERT_EQUAL(ok, MESSAGE_OK);
  171. client.GotError.WaitI();
  172. }
  173. Y_UNIT_TEST(SendTimeout_Callback_HangingServer) {
  174. THangingServer server;
  175. TObjectCountCheck objectCountCheck;
  176. TCheckTimeoutClient client(TNetAddr("localhost", server.GetPort()));
  177. bool first = true;
  178. for (;;) {
  179. EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
  180. if (ok == MESSAGE_BUSY) {
  181. UNIT_ASSERT(!first);
  182. break;
  183. }
  184. UNIT_ASSERT_VALUES_EQUAL(ok, MESSAGE_OK);
  185. first = false;
  186. }
  187. // BUGBUG: The test is buggy: the client might not get any error when sending one-way messages.
  188. // All the messages that the client has sent before he gets first MESSAGE_BUSY error might get
  189. // serailized and written to the socket buffer, so the write queue gets drained and there are
  190. // no messages to timeout when periodic timeout check happens.
  191. client.GotError.WaitI();
  192. }
  193. }