module_client_one_way_ut.cpp 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. #include <library/cpp/testing/unittest/registar.h>
  2. #include <library/cpp/messagebus/test/helper/example.h>
  3. #include <library/cpp/messagebus/test/helper/message_handler_error.h>
  4. #include <library/cpp/messagebus/misc/test_sync.h>
  5. #include <library/cpp/messagebus/oldmodule/module.h>
  6. using namespace NBus;
  7. using namespace NBus::NTest;
  8. Y_UNIT_TEST_SUITE(ModuleClientOneWay) {
  9. struct TTestServer: public TBusServerHandlerError {
  10. TExampleProtocol Proto;
  11. TTestSync* const TestSync;
  12. TBusMessageQueuePtr Queue;
  13. TBusServerSessionPtr ServerSession;
  14. TTestServer(TTestSync* testSync)
  15. : TestSync(testSync)
  16. {
  17. Queue = CreateMessageQueue();
  18. ServerSession = TBusServerSession::Create(&Proto, this, TBusServerSessionConfig(), Queue);
  19. }
  20. void OnMessage(TOnMessageContext& context) override {
  21. TestSync->WaitForAndIncrement(1);
  22. context.ForgetRequest();
  23. }
  24. };
  25. struct TClientModule: public TBusModule {
  26. TExampleProtocol Proto;
  27. TTestSync* const TestSync;
  28. unsigned const Port;
  29. TBusClientSessionPtr ClientSession;
  30. TClientModule(TTestSync* testSync, unsigned port)
  31. : TBusModule("m")
  32. , TestSync(testSync)
  33. , Port(port)
  34. {
  35. }
  36. TJobHandler Start(TBusJob* job, TBusMessage*) override {
  37. TestSync->WaitForAndIncrement(0);
  38. job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", Port));
  39. return &TClientModule::Sent;
  40. }
  41. TJobHandler Sent(TBusJob* job, TBusMessage*) {
  42. TestSync->WaitForAndIncrement(2);
  43. job->Cancel(MESSAGE_DONT_ASK);
  44. return nullptr;
  45. }
  46. TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
  47. ClientSession = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig());
  48. return nullptr;
  49. }
  50. };
  51. Y_UNIT_TEST(Simple) {
  52. TTestSync testSync;
  53. TTestServer server(&testSync);
  54. TBusMessageQueuePtr queue = CreateMessageQueue();
  55. TClientModule clientModule(&testSync, server.ServerSession->GetActualListenPort());
  56. clientModule.CreatePrivateSessions(queue.Get());
  57. clientModule.StartInput();
  58. clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount));
  59. testSync.WaitForAndIncrement(3);
  60. clientModule.Shutdown();
  61. }
  62. struct TSendErrorModule: public TBusModule {
  63. TExampleProtocol Proto;
  64. TTestSync* const TestSync;
  65. TBusClientSessionPtr ClientSession;
  66. TSendErrorModule(TTestSync* testSync)
  67. : TBusModule("m")
  68. , TestSync(testSync)
  69. {
  70. }
  71. TJobHandler Start(TBusJob* job, TBusMessage*) override {
  72. TestSync->WaitForAndIncrement(0);
  73. job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", 1));
  74. return &TSendErrorModule::Sent;
  75. }
  76. TJobHandler Sent(TBusJob* job, TBusMessage*) {
  77. TestSync->WaitForAndIncrement(1);
  78. job->Cancel(MESSAGE_DONT_ASK);
  79. return nullptr;
  80. }
  81. TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
  82. TBusServerSessionConfig sessionConfig;
  83. sessionConfig.ConnectTimeout = 1;
  84. sessionConfig.SendTimeout = 1;
  85. sessionConfig.TotalTimeout = 1;
  86. sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1);
  87. ClientSession = CreateDefaultSource(queue, &Proto, sessionConfig);
  88. return nullptr;
  89. }
  90. };
  91. Y_UNIT_TEST(SendError) {
  92. TTestSync testSync;
  93. TBusQueueConfig queueConfig;
  94. queueConfig.NumWorkers = 5;
  95. TBusMessageQueuePtr queue = CreateMessageQueue(queueConfig);
  96. TSendErrorModule clientModule(&testSync);
  97. clientModule.CreatePrivateSessions(queue.Get());
  98. clientModule.StartInput();
  99. clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount));
  100. testSync.WaitForAndIncrement(2);
  101. clientModule.Shutdown();
  102. }
  103. }