#include #include #include #include #include using namespace NBus; using namespace NBus::NTest; Y_UNIT_TEST_SUITE(ModuleClientOneWay) { struct TTestServer: public TBusServerHandlerError { TExampleProtocol Proto; TTestSync* const TestSync; TBusMessageQueuePtr Queue; TBusServerSessionPtr ServerSession; TTestServer(TTestSync* testSync) : TestSync(testSync) { Queue = CreateMessageQueue(); ServerSession = TBusServerSession::Create(&Proto, this, TBusServerSessionConfig(), Queue); } void OnMessage(TOnMessageContext& context) override { TestSync->WaitForAndIncrement(1); context.ForgetRequest(); } }; struct TClientModule: public TBusModule { TExampleProtocol Proto; TTestSync* const TestSync; unsigned const Port; TBusClientSessionPtr ClientSession; TClientModule(TTestSync* testSync, unsigned port) : TBusModule("m") , TestSync(testSync) , Port(port) { } TJobHandler Start(TBusJob* job, TBusMessage*) override { TestSync->WaitForAndIncrement(0); job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", Port)); return &TClientModule::Sent; } TJobHandler Sent(TBusJob* job, TBusMessage*) { TestSync->WaitForAndIncrement(2); job->Cancel(MESSAGE_DONT_ASK); return nullptr; } TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { ClientSession = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig()); return nullptr; } }; Y_UNIT_TEST(Simple) { TTestSync testSync; TTestServer server(&testSync); TBusMessageQueuePtr queue = CreateMessageQueue(); TClientModule clientModule(&testSync, server.ServerSession->GetActualListenPort()); clientModule.CreatePrivateSessions(queue.Get()); clientModule.StartInput(); clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount)); testSync.WaitForAndIncrement(3); clientModule.Shutdown(); } struct TSendErrorModule: public TBusModule { TExampleProtocol Proto; TTestSync* const TestSync; TBusClientSessionPtr ClientSession; TSendErrorModule(TTestSync* testSync) : TBusModule("m") , TestSync(testSync) { } TJobHandler Start(TBusJob* job, TBusMessage*) override { TestSync->WaitForAndIncrement(0); job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", 1)); return &TSendErrorModule::Sent; } TJobHandler Sent(TBusJob* job, TBusMessage*) { TestSync->WaitForAndIncrement(1); job->Cancel(MESSAGE_DONT_ASK); return nullptr; } TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { TBusServerSessionConfig sessionConfig; sessionConfig.ConnectTimeout = 1; sessionConfig.SendTimeout = 1; sessionConfig.TotalTimeout = 1; sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1); ClientSession = CreateDefaultSource(queue, &Proto, sessionConfig); return nullptr; } }; Y_UNIT_TEST(SendError) { TTestSync testSync; TBusQueueConfig queueConfig; queueConfig.NumWorkers = 5; TBusMessageQueuePtr queue = CreateMessageQueue(queueConfig); TSendErrorModule clientModule(&testSync); clientModule.CreatePrivateSessions(queue.Get()); clientModule.StartInput(); clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount)); testSync.WaitForAndIncrement(2); clientModule.Shutdown(); } }