module_server_ut.cpp 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. #include <library/cpp/testing/unittest/registar.h>
  2. #include "count_down_latch.h"
  3. #include "moduletest.h"
  4. #include <library/cpp/messagebus/test/helper/example.h>
  5. #include <library/cpp/messagebus/test/helper/example_module.h>
  6. #include <library/cpp/messagebus/test/helper/object_count_check.h>
  7. #include <library/cpp/messagebus/test/helper/wait_for.h>
  8. #include <library/cpp/messagebus/oldmodule/module.h>
  9. #include <util/generic/cast.h>
  10. using namespace NBus;
  11. using namespace NBus::NTest;
  12. Y_UNIT_TEST_SUITE(ModuleServerTests) {
  13. Y_UNIT_TEST(TestModule) {
  14. TObjectCountCheck objectCountCheck;
  15. /// create or get instance of message queue, need one per application
  16. TBusMessageQueuePtr bus(CreateMessageQueue());
  17. THostInfoHandler hostHandler(bus.Get());
  18. TDupDetectModule module(hostHandler.GetActualListenAddr());
  19. bool success;
  20. success = module.Init(bus.Get());
  21. UNIT_ASSERT_C(success, "failed to initialize dupdetect module");
  22. success = module.StartInput();
  23. UNIT_ASSERT_C(success, "failed to start dupdetect module");
  24. TDupDetectHandler dupHandler(module.ListenAddr, bus.Get());
  25. dupHandler.Work();
  26. UNIT_WAIT_FOR(dupHandler.NumMessages == dupHandler.NumReplies);
  27. module.Shutdown();
  28. dupHandler.DupDetect->Shutdown();
  29. }
  30. struct TParallelOnMessageModule: public TExampleServerModule {
  31. TCountDownLatch WaitTwoRequestsLatch;
  32. TParallelOnMessageModule()
  33. : WaitTwoRequestsLatch(2)
  34. {
  35. }
  36. TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
  37. WaitTwoRequestsLatch.CountDown();
  38. Y_ABORT_UNLESS(WaitTwoRequestsLatch.Await(TDuration::Seconds(5)), "oops");
  39. VerifyDynamicCast<TExampleRequest*>(mess);
  40. job->SendReply(new TExampleResponse(&Proto.ResponseCount));
  41. return nullptr;
  42. }
  43. };
  44. Y_UNIT_TEST(TestOnMessageHandlerCalledInParallel) {
  45. TObjectCountCheck objectCountCheck;
  46. TBusQueueConfig config;
  47. config.NumWorkers = 5;
  48. TParallelOnMessageModule module;
  49. module.StartModule();
  50. TExampleClient client;
  51. client.SendMessagesWaitReplies(2, module.ServerAddr);
  52. module.Shutdown();
  53. }
  54. struct TDelayReplyServer: public TExampleServerModule {
  55. TSystemEvent MessageReceivedEvent;
  56. TSystemEvent ClientDiedEvent;
  57. TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
  58. Y_UNUSED(mess);
  59. MessageReceivedEvent.Signal();
  60. Y_ABORT_UNLESS(ClientDiedEvent.WaitT(TDuration::Seconds(5)), "oops");
  61. job->SendReply(new TExampleResponse(&Proto.ResponseCount));
  62. return nullptr;
  63. }
  64. };
  65. Y_UNIT_TEST(TestReplyCalledAfterClientDisconnected) {
  66. TObjectCountCheck objectCountCheck;
  67. TBusQueueConfig config;
  68. config.NumWorkers = 5;
  69. TDelayReplyServer server;
  70. server.StartModule();
  71. THolder<TExampleClient> client(new TExampleClient);
  72. client->SendMessages(1, server.ServerAddr);
  73. UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5)));
  74. UNIT_ASSERT_VALUES_EQUAL(1, server.GetModuleSessionInFlight());
  75. client.Destroy();
  76. server.ClientDiedEvent.Signal();
  77. // wait until all server message are delivered
  78. UNIT_WAIT_FOR(0 == server.GetModuleSessionInFlight());
  79. server.Shutdown();
  80. }
  81. }