/////////////////////////////////////////////////////////////////// /// \file /// \brief Example of reply-less communication /// This example demostrates how asynchronous message passing library /// can be used to send message and do not wait for reply back. /// The usage of reply-less communication should be restricted to /// low-throughput clients and high-throughput server to provide reasonable /// utility. Removing replies from the communication removes any restriction /// on how many message can be send to server and rougue clients may overwelm /// server without thoughtput control. /// 1) To implement reply-less client \n /// Call NBus::TBusSession::AckMessage() /// from within NBus::IMessageHandler::OnSent() handler when message has /// gone into wire on client end. See example in NBus::NullClient::OnMessageSent(). /// Discard identity for reply message. /// 2) To implement reply-less server \n /// Call NBus::TBusSession::AckMessage() from within NBus::IMessageHandler::OnMessage() /// handler when message has been received on server end. /// See example in NBus::NullServer::OnMessage(). /// Discard identity for reply message. #include #include #include #include #include #include #include using namespace std; using namespace NBus; using namespace NBus::NPrivate; using namespace NBus::NTest; //////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////// /// \brief Reply-less client and handler struct NullClient : TBusClientHandlerError { TNetAddr ServerAddr; TBusMessageQueuePtr Queue; TBusClientSessionPtr Session; TExampleProtocol Proto; /// constructor creates instances of protocol and session NullClient(const TNetAddr& serverAddr, const TBusClientSessionConfig& sessionConfig = TBusClientSessionConfig()) : ServerAddr(serverAddr) { UNIT_ASSERT(serverAddr.GetPort() > 0); /// create or get instance of message queue, need one per application Queue = CreateMessageQueue(); /// register source/client session Session = TBusClientSession::Create(&Proto, this, sessionConfig, Queue); /// register service, announce to clients via LocatorService Session->RegisterService("localhost"); } ~NullClient() override { Session->Shutdown(); } /// dispatch of requests is done here void Work() { int batch = 10; for (int i = 0; i < batch; i++) { TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount); mess->Data = "TADA"; Session->SendMessageOneWay(mess, &ServerAddr); } } void OnMessageSentOneWay(TAutoPtr) override { } }; ///////////////////////////////////////////////////////////////////// /// \brief Reply-less server and handler class NullServer: public TBusServerHandlerError { public: /// session object to maintian TBusMessageQueuePtr Queue; TBusServerSessionPtr Session; TExampleProtocol Proto; public: TAtomic NumMessages; NullServer() { NumMessages = 0; /// create or get instance of single message queue, need one for application Queue = CreateMessageQueue(); /// register destination session TBusServerSessionConfig sessionConfig; Session = TBusServerSession::Create(&Proto, this, sessionConfig, Queue); } ~NullServer() override { Session->Shutdown(); } /// when message comes do not send reply, just acknowledge void OnMessage(TOnMessageContext& mess) override { TExampleRequest* fmess = static_cast(mess.GetMessage()); Y_ASSERT(fmess->Data == "TADA"); /// tell session to forget this message and never expect any reply mess.ForgetRequest(); AtomicIncrement(NumMessages); } /// this handler should not be called because this server does not send replies void OnSent(TAutoPtr mess) override { Y_UNUSED(mess); Y_ABORT("This server does not sent replies"); } }; Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) { Y_UNIT_TEST(Simple) { TObjectCountCheck objectCountCheck; NullServer server; NullClient client(TNetAddr("localhost", server.Session->GetActualListenPort())); client.Work(); // wait until all client message are delivered UNIT_WAIT_FOR(AtomicGet(server.NumMessages) == 10); // assert correct number of messages UNIT_ASSERT_VALUES_EQUAL(AtomicGet(server.NumMessages), 10); UNIT_ASSERT_VALUES_EQUAL(server.Session->GetInFlight(), 0); UNIT_ASSERT_VALUES_EQUAL(client.Session->GetInFlight(), 0); } struct TMessageTooLargeClient: public NullClient { TSystemEvent GotTooLarge; TBusClientSessionConfig Config() { TBusClientSessionConfig r; r.MaxMessageSize = 1; return r; } TMessageTooLargeClient(unsigned port) : NullClient(TNetAddr("localhost", port), Config()) { } ~TMessageTooLargeClient() override { Session->Shutdown(); } void OnError(TAutoPtr mess, EMessageStatus status) override { Y_UNUSED(mess); Y_ABORT_UNLESS(status == MESSAGE_MESSAGE_TOO_LARGE, "wrong status: %s", ToCString(status)); GotTooLarge.Signal(); } }; Y_UNIT_TEST(MessageTooLargeOnClient) { TObjectCountCheck objectCountCheck; NullServer server; TMessageTooLargeClient client(server.Session->GetActualListenPort()); EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); client.GotTooLarge.WaitI(); } struct TCheckTimeoutClient: public NullClient { ~TCheckTimeoutClient() override { Session->Shutdown(); } static TBusClientSessionConfig SessionConfig() { TBusClientSessionConfig sessionConfig; sessionConfig.SendTimeout = 1; sessionConfig.ConnectTimeout = 1; sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10); return sessionConfig; } TCheckTimeoutClient(const TNetAddr& serverAddr) : NullClient(serverAddr, SessionConfig()) { } TSystemEvent GotError; /// message that could not be delivered void OnError(TAutoPtr mess, EMessageStatus status) override { Y_UNUSED(mess); Y_UNUSED(status); // TODO: check status GotError.Signal(); } }; Y_UNIT_TEST(SendTimeout_Callback_NoServer) { TObjectCountCheck objectCountCheck; TCheckTimeoutClient client(TNetAddr("localhost", 17)); EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); UNIT_ASSERT_EQUAL(ok, MESSAGE_OK); client.GotError.WaitI(); } Y_UNIT_TEST(SendTimeout_Callback_HangingServer) { THangingServer server; TObjectCountCheck objectCountCheck; TCheckTimeoutClient client(TNetAddr("localhost", server.GetPort())); bool first = true; for (;;) { EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); if (ok == MESSAGE_BUSY) { UNIT_ASSERT(!first); break; } UNIT_ASSERT_VALUES_EQUAL(ok, MESSAGE_OK); first = false; } // BUGBUG: The test is buggy: the client might not get any error when sending one-way messages. // All the messages that the client has sent before he gets first MESSAGE_BUSY error might get // serailized and written to the socket buffer, so the write queue gets drained and there are // no messages to timeout when periodic timeout check happens. client.GotError.WaitI(); } }