123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- ///////////////////////////////////////////////////////////////////
- /// \file
- /// \brief Example of reply-less communication
- /// This example demostrates how asynchronous message passing library
- /// can be used to send message and do not wait for reply back.
- /// The usage of reply-less communication should be restricted to
- /// low-throughput clients and high-throughput server to provide reasonable
- /// utility. Removing replies from the communication removes any restriction
- /// on how many message can be send to server and rougue clients may overwelm
- /// server without thoughtput control.
- /// 1) To implement reply-less client \n
- /// Call NBus::TBusSession::AckMessage()
- /// from within NBus::IMessageHandler::OnSent() handler when message has
- /// gone into wire on client end. See example in NBus::NullClient::OnMessageSent().
- /// Discard identity for reply message.
- /// 2) To implement reply-less server \n
- /// Call NBus::TBusSession::AckMessage() from within NBus::IMessageHandler::OnMessage()
- /// handler when message has been received on server end.
- /// See example in NBus::NullServer::OnMessage().
- /// Discard identity for reply message.
- #include <library/cpp/messagebus/test/helper/alloc_counter.h>
- #include <library/cpp/messagebus/test/helper/example.h>
- #include <library/cpp/messagebus/test/helper/hanging_server.h>
- #include <library/cpp/messagebus/test/helper/message_handler_error.h>
- #include <library/cpp/messagebus/test/helper/object_count_check.h>
- #include <library/cpp/messagebus/test/helper/wait_for.h>
- #include <library/cpp/messagebus/ybus.h>
- using namespace std;
- using namespace NBus;
- using namespace NBus::NPrivate;
- using namespace NBus::NTest;
- ////////////////////////////////////////////////////////////////////
- ////////////////////////////////////////////////////////////////////
- /// \brief Reply-less client and handler
- struct NullClient : TBusClientHandlerError {
- TNetAddr ServerAddr;
- TBusMessageQueuePtr Queue;
- TBusClientSessionPtr Session;
- TExampleProtocol Proto;
- /// constructor creates instances of protocol and session
- NullClient(const TNetAddr& serverAddr, const TBusClientSessionConfig& sessionConfig = TBusClientSessionConfig())
- : ServerAddr(serverAddr)
- {
- UNIT_ASSERT(serverAddr.GetPort() > 0);
- /// create or get instance of message queue, need one per application
- Queue = CreateMessageQueue();
- /// register source/client session
- Session = TBusClientSession::Create(&Proto, this, sessionConfig, Queue);
- /// register service, announce to clients via LocatorService
- Session->RegisterService("localhost");
- }
- ~NullClient() override {
- Session->Shutdown();
- }
- /// dispatch of requests is done here
- void Work() {
- int batch = 10;
- for (int i = 0; i < batch; i++) {
- TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount);
- mess->Data = "TADA";
- Session->SendMessageOneWay(mess, &ServerAddr);
- }
- }
- void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override {
- }
- };
- /////////////////////////////////////////////////////////////////////
- /// \brief Reply-less server and handler
- class NullServer: public TBusServerHandlerError {
- public:
- /// session object to maintian
- TBusMessageQueuePtr Queue;
- TBusServerSessionPtr Session;
- TExampleProtocol Proto;
- public:
- TAtomic NumMessages;
- NullServer() {
- NumMessages = 0;
- /// create or get instance of single message queue, need one for application
- Queue = CreateMessageQueue();
- /// register destination session
- TBusServerSessionConfig sessionConfig;
- Session = TBusServerSession::Create(&Proto, this, sessionConfig, Queue);
- }
- ~NullServer() override {
- Session->Shutdown();
- }
- /// when message comes do not send reply, just acknowledge
- void OnMessage(TOnMessageContext& mess) override {
- TExampleRequest* fmess = static_cast<TExampleRequest*>(mess.GetMessage());
- Y_ASSERT(fmess->Data == "TADA");
- /// tell session to forget this message and never expect any reply
- mess.ForgetRequest();
- AtomicIncrement(NumMessages);
- }
- /// this handler should not be called because this server does not send replies
- void OnSent(TAutoPtr<TBusMessage> mess) override {
- Y_UNUSED(mess);
- Y_ABORT("This server does not sent replies");
- }
- };
- Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) {
- Y_UNIT_TEST(Simple) {
- TObjectCountCheck objectCountCheck;
- NullServer server;
- NullClient client(TNetAddr("localhost", server.Session->GetActualListenPort()));
- client.Work();
- // wait until all client message are delivered
- UNIT_WAIT_FOR(AtomicGet(server.NumMessages) == 10);
- // assert correct number of messages
- UNIT_ASSERT_VALUES_EQUAL(AtomicGet(server.NumMessages), 10);
- UNIT_ASSERT_VALUES_EQUAL(server.Session->GetInFlight(), 0);
- UNIT_ASSERT_VALUES_EQUAL(client.Session->GetInFlight(), 0);
- }
- struct TMessageTooLargeClient: public NullClient {
- TSystemEvent GotTooLarge;
- TBusClientSessionConfig Config() {
- TBusClientSessionConfig r;
- r.MaxMessageSize = 1;
- return r;
- }
- TMessageTooLargeClient(unsigned port)
- : NullClient(TNetAddr("localhost", port), Config())
- {
- }
- ~TMessageTooLargeClient() override {
- Session->Shutdown();
- }
- void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
- Y_UNUSED(mess);
- Y_ABORT_UNLESS(status == MESSAGE_MESSAGE_TOO_LARGE, "wrong status: %s", ToCString(status));
- GotTooLarge.Signal();
- }
- };
- Y_UNIT_TEST(MessageTooLargeOnClient) {
- TObjectCountCheck objectCountCheck;
- NullServer server;
- TMessageTooLargeClient client(server.Session->GetActualListenPort());
- EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
- UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
- client.GotTooLarge.WaitI();
- }
- struct TCheckTimeoutClient: public NullClient {
- ~TCheckTimeoutClient() override {
- Session->Shutdown();
- }
- static TBusClientSessionConfig SessionConfig() {
- TBusClientSessionConfig sessionConfig;
- sessionConfig.SendTimeout = 1;
- sessionConfig.ConnectTimeout = 1;
- sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10);
- return sessionConfig;
- }
- TCheckTimeoutClient(const TNetAddr& serverAddr)
- : NullClient(serverAddr, SessionConfig())
- {
- }
- TSystemEvent GotError;
- /// message that could not be delivered
- void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
- Y_UNUSED(mess);
- Y_UNUSED(status); // TODO: check status
- GotError.Signal();
- }
- };
- Y_UNIT_TEST(SendTimeout_Callback_NoServer) {
- TObjectCountCheck objectCountCheck;
- TCheckTimeoutClient client(TNetAddr("localhost", 17));
- EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
- UNIT_ASSERT_EQUAL(ok, MESSAGE_OK);
- client.GotError.WaitI();
- }
- Y_UNIT_TEST(SendTimeout_Callback_HangingServer) {
- THangingServer server;
- TObjectCountCheck objectCountCheck;
- TCheckTimeoutClient client(TNetAddr("localhost", server.GetPort()));
- bool first = true;
- for (;;) {
- EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
- if (ok == MESSAGE_BUSY) {
- UNIT_ASSERT(!first);
- break;
- }
- UNIT_ASSERT_VALUES_EQUAL(ok, MESSAGE_OK);
- first = false;
- }
- // BUGBUG: The test is buggy: the client might not get any error when sending one-way messages.
- // All the messages that the client has sent before he gets first MESSAGE_BUSY error might get
- // serailized and written to the socket buffer, so the write queue gets drained and there are
- // no messages to timeout when periodic timeout check happens.
- client.GotError.WaitI();
- }
- }
|