starter_ut.cpp 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. #include <library/cpp/testing/unittest/registar.h>
  2. #include <library/cpp/messagebus/test/helper/example_module.h>
  3. #include <library/cpp/messagebus/test/helper/object_count_check.h>
  4. #include <library/cpp/messagebus/test/helper/wait_for.h>
  5. using namespace NBus;
  6. using namespace NBus::NTest;
  7. Y_UNIT_TEST_SUITE(TBusStarterTest) {
  8. struct TStartJobTestModule: public TExampleModule {
  9. using TBusModule::CreateDefaultStarter;
  10. TAtomic StartCount;
  11. TStartJobTestModule()
  12. : StartCount(0)
  13. {
  14. }
  15. TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
  16. Y_UNUSED(mess);
  17. AtomicIncrement(StartCount);
  18. job->Sleep(10);
  19. return &TStartJobTestModule::End;
  20. }
  21. TJobHandler End(TBusJob* job, TBusMessage* mess) {
  22. Y_UNUSED(mess);
  23. AtomicIncrement(StartCount);
  24. job->Cancel(MESSAGE_UNKNOWN);
  25. return nullptr;
  26. }
  27. };
  28. Y_UNIT_TEST(Test) {
  29. TObjectCountCheck objectCountCheck;
  30. TBusMessageQueuePtr bus(CreateMessageQueue());
  31. TStartJobTestModule module;
  32. //module.StartModule();
  33. module.CreatePrivateSessions(bus.Get());
  34. module.StartInput();
  35. TBusSessionConfig config;
  36. config.SendTimeout = 10;
  37. module.CreateDefaultStarter(*bus, config);
  38. UNIT_WAIT_FOR(AtomicGet(module.StartCount) >= 3);
  39. module.Shutdown();
  40. bus->Stop();
  41. }
  42. Y_UNIT_TEST(TestModuleStartJob) {
  43. TObjectCountCheck objectCountCheck;
  44. TExampleProtocol proto;
  45. TStartJobTestModule module;
  46. TBusModuleConfig moduleConfig;
  47. moduleConfig.Secret.SchedulePeriod = TDuration::MilliSeconds(10);
  48. module.SetConfig(moduleConfig);
  49. module.StartModule();
  50. module.StartJob(new TExampleRequest(&proto.RequestCount));
  51. UNIT_WAIT_FOR(AtomicGet(module.StartCount) != 2);
  52. module.Shutdown();
  53. }
  54. struct TSleepModule: public TExampleServerModule {
  55. TSystemEvent MessageReceivedEvent;
  56. TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
  57. Y_UNUSED(mess);
  58. MessageReceivedEvent.Signal();
  59. job->Sleep(1000000000);
  60. return TJobHandler(&TSleepModule::Never);
  61. }
  62. TJobHandler Never(TBusJob*, TBusMessage*) {
  63. Y_ABORT("happens");
  64. throw 1;
  65. }
  66. };
  67. Y_UNIT_TEST(StartJobDestroyDuringSleep) {
  68. TObjectCountCheck objectCountCheck;
  69. TExampleProtocol proto;
  70. TSleepModule module;
  71. module.StartModule();
  72. module.StartJob(new TExampleRequest(&proto.StartCount));
  73. module.MessageReceivedEvent.WaitI();
  74. module.Shutdown();
  75. }
  76. struct TSendReplyModule: public TExampleServerModule {
  77. TSystemEvent MessageReceivedEvent;
  78. TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
  79. Y_UNUSED(mess);
  80. job->SendReply(new TExampleResponse(&Proto.ResponseCount));
  81. MessageReceivedEvent.Signal();
  82. return nullptr;
  83. }
  84. };
  85. Y_UNIT_TEST(AllowSendReplyInStarted) {
  86. TObjectCountCheck objectCountCheck;
  87. TExampleProtocol proto;
  88. TSendReplyModule module;
  89. module.StartModule();
  90. module.StartJob(new TExampleRequest(&proto.StartCount));
  91. module.MessageReceivedEvent.WaitI();
  92. module.Shutdown();
  93. }
  94. }