123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- #include <library/cpp/testing/unittest/registar.h>
- #include <library/cpp/messagebus/test/helper/example.h>
- #include <library/cpp/messagebus/test/helper/message_handler_error.h>
- #include <library/cpp/messagebus/misc/test_sync.h>
- #include <library/cpp/messagebus/oldmodule/module.h>
- using namespace NBus;
- using namespace NBus::NTest;
- Y_UNIT_TEST_SUITE(ModuleClientOneWay) {
- struct TTestServer: public TBusServerHandlerError {
- TExampleProtocol Proto;
- TTestSync* const TestSync;
- TBusMessageQueuePtr Queue;
- TBusServerSessionPtr ServerSession;
- TTestServer(TTestSync* testSync)
- : TestSync(testSync)
- {
- Queue = CreateMessageQueue();
- ServerSession = TBusServerSession::Create(&Proto, this, TBusServerSessionConfig(), Queue);
- }
- void OnMessage(TOnMessageContext& context) override {
- TestSync->WaitForAndIncrement(1);
- context.ForgetRequest();
- }
- };
- struct TClientModule: public TBusModule {
- TExampleProtocol Proto;
- TTestSync* const TestSync;
- unsigned const Port;
- TBusClientSessionPtr ClientSession;
- TClientModule(TTestSync* testSync, unsigned port)
- : TBusModule("m")
- , TestSync(testSync)
- , Port(port)
- {
- }
- TJobHandler Start(TBusJob* job, TBusMessage*) override {
- TestSync->WaitForAndIncrement(0);
- job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", Port));
- return &TClientModule::Sent;
- }
- TJobHandler Sent(TBusJob* job, TBusMessage*) {
- TestSync->WaitForAndIncrement(2);
- job->Cancel(MESSAGE_DONT_ASK);
- return nullptr;
- }
- TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
- ClientSession = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig());
- return nullptr;
- }
- };
- Y_UNIT_TEST(Simple) {
- TTestSync testSync;
- TTestServer server(&testSync);
- TBusMessageQueuePtr queue = CreateMessageQueue();
- TClientModule clientModule(&testSync, server.ServerSession->GetActualListenPort());
- clientModule.CreatePrivateSessions(queue.Get());
- clientModule.StartInput();
- clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount));
- testSync.WaitForAndIncrement(3);
- clientModule.Shutdown();
- }
- struct TSendErrorModule: public TBusModule {
- TExampleProtocol Proto;
- TTestSync* const TestSync;
- TBusClientSessionPtr ClientSession;
- TSendErrorModule(TTestSync* testSync)
- : TBusModule("m")
- , TestSync(testSync)
- {
- }
- TJobHandler Start(TBusJob* job, TBusMessage*) override {
- TestSync->WaitForAndIncrement(0);
- job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", 1));
- return &TSendErrorModule::Sent;
- }
- TJobHandler Sent(TBusJob* job, TBusMessage*) {
- TestSync->WaitForAndIncrement(1);
- job->Cancel(MESSAGE_DONT_ASK);
- return nullptr;
- }
- TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
- TBusServerSessionConfig sessionConfig;
- sessionConfig.ConnectTimeout = 1;
- sessionConfig.SendTimeout = 1;
- sessionConfig.TotalTimeout = 1;
- sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1);
- ClientSession = CreateDefaultSource(queue, &Proto, sessionConfig);
- return nullptr;
- }
- };
- Y_UNIT_TEST(SendError) {
- TTestSync testSync;
- TBusQueueConfig queueConfig;
- queueConfig.NumWorkers = 5;
- TBusMessageQueuePtr queue = CreateMessageQueue(queueConfig);
- TSendErrorModule clientModule(&testSync);
- clientModule.CreatePrivateSessions(queue.Get());
- clientModule.StartInput();
- clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount));
- testSync.WaitForAndIncrement(2);
- clientModule.Shutdown();
- }
- }
|