module_client_ut.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. #include <library/cpp/testing/unittest/registar.h>
  2. #include "count_down_latch.h"
  3. #include "moduletest.h"
  4. #include <library/cpp/messagebus/test/helper/example.h>
  5. #include <library/cpp/messagebus/test/helper/example_module.h>
  6. #include <library/cpp/messagebus/test/helper/object_count_check.h>
  7. #include <library/cpp/messagebus/test/helper/wait_for.h>
  8. #include <library/cpp/messagebus/misc/test_sync.h>
  9. #include <library/cpp/messagebus/oldmodule/module.h>
  10. #include <util/generic/cast.h>
  11. #include <util/system/event.h>
  12. using namespace NBus;
  13. using namespace NBus::NTest;
  14. // helper class that cleans TBusJob instance, so job's destructor can
  15. // be completed without assertion fail.
  16. struct TJobGuard {
  17. public:
  18. TJobGuard(NBus::TBusJob* job)
  19. : Job(job)
  20. {
  21. }
  22. ~TJobGuard() {
  23. Job->ClearAllMessageStates();
  24. }
  25. private:
  26. NBus::TBusJob* Job;
  27. };
  28. class TMessageOk: public NBus::TBusMessage {
  29. public:
  30. TMessageOk()
  31. : NBus::TBusMessage(1)
  32. {
  33. }
  34. };
  35. class TMessageError: public NBus::TBusMessage {
  36. public:
  37. TMessageError()
  38. : NBus::TBusMessage(2)
  39. {
  40. }
  41. };
  42. Y_UNIT_TEST_SUITE(BusJobTest) {
  43. #if 0
  44. Y_UNIT_TEST(TestPending) {
  45. TObjectCountCheck objectCountCheck;
  46. TDupDetectModule module;
  47. TBusJob job(&module, new TBusMessage(0));
  48. // Guard will clear the job if unit-assertion fails.
  49. TJobGuard g(&job);
  50. NBus::TBusMessage* msg = new NBus::TBusMessage(1);
  51. job.Send(msg, NULL);
  52. NBus::TJobStateVec pending;
  53. job.GetPending(&pending);
  54. UNIT_ASSERT_VALUES_EQUAL(pending.size(), 1u);
  55. UNIT_ASSERT_EQUAL(msg, pending[0].Message);
  56. }
  57. Y_UNIT_TEST(TestCallReplyHandler) {
  58. TObjectCountCheck objectCountCheck;
  59. TDupDetectModule module;
  60. NBus::TBusJob job(&module, new NBus::TBusMessage(0));
  61. // Guard will clear the job if unit-assertion fails.
  62. TJobGuard g(&job);
  63. NBus::TBusMessage* msgOk = new TMessageOk;
  64. NBus::TBusMessage* msgError = new TMessageError;
  65. job.Send(msgOk, NULL);
  66. job.Send(msgError, NULL);
  67. UNIT_ASSERT_EQUAL(job.GetState<TMessageOk>(), NULL);
  68. UNIT_ASSERT_EQUAL(job.GetState<TMessageError>(), NULL);
  69. NBus::TBusMessage* reply = new NBus::TBusMessage(0);
  70. job.CallReplyHandler(NBus::MESSAGE_OK, msgOk, reply);
  71. job.CallReplyHandler(NBus::MESSAGE_TIMEOUT, msgError, NULL);
  72. UNIT_ASSERT_UNEQUAL(job.GetState<TMessageOk>(), NULL);
  73. UNIT_ASSERT_UNEQUAL(job.GetState<TMessageError>(), NULL);
  74. UNIT_ASSERT_VALUES_EQUAL(job.GetStatus<TMessageError>(), NBus::MESSAGE_TIMEOUT);
  75. UNIT_ASSERT_EQUAL(job.GetState<TMessageError>()->Status, NBus::MESSAGE_TIMEOUT);
  76. UNIT_ASSERT_VALUES_EQUAL(job.GetStatus<TMessageOk>(), NBus::MESSAGE_OK);
  77. UNIT_ASSERT_EQUAL(job.GetState<TMessageOk>()->Reply, reply);
  78. }
  79. #endif
  80. struct TParallelOnReplyModule : TExampleClientModule {
  81. TNetAddr ServerAddr;
  82. TCountDownLatch RepliesLatch;
  83. TParallelOnReplyModule(const TNetAddr& serverAddr)
  84. : ServerAddr(serverAddr)
  85. , RepliesLatch(2)
  86. {
  87. }
  88. TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
  89. Y_UNUSED(mess);
  90. job->Send(new TExampleRequest(&Proto.RequestCount), Source, TReplyHandler(&TParallelOnReplyModule::ReplyHandler), 0, ServerAddr);
  91. return &TParallelOnReplyModule::HandleReplies;
  92. }
  93. void ReplyHandler(TBusJob*, EMessageStatus status, TBusMessage* mess, TBusMessage* reply) {
  94. Y_UNUSED(mess);
  95. Y_UNUSED(reply);
  96. Y_ABORT_UNLESS(status == MESSAGE_OK, "failed to get reply: %s", ToCString(status));
  97. }
  98. TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) {
  99. Y_UNUSED(mess);
  100. RepliesLatch.CountDown();
  101. Y_ABORT_UNLESS(RepliesLatch.Await(TDuration::Seconds(10)), "failed to get answers");
  102. job->Cancel(MESSAGE_UNKNOWN);
  103. return nullptr;
  104. }
  105. };
  106. Y_UNIT_TEST(TestReplyHandlerCalledInParallel) {
  107. TObjectCountCheck objectCountCheck;
  108. TExampleServer server;
  109. TExampleProtocol proto;
  110. TBusQueueConfig config;
  111. config.NumWorkers = 5;
  112. TParallelOnReplyModule module(server.GetActualListenAddr());
  113. module.StartModule();
  114. module.StartJob(new TExampleRequest(&proto.StartCount));
  115. module.StartJob(new TExampleRequest(&proto.StartCount));
  116. UNIT_ASSERT(module.RepliesLatch.Await(TDuration::Seconds(10)));
  117. module.Shutdown();
  118. }
  119. struct TErrorHandlerCheckerModule : TExampleModule {
  120. TNetAddr ServerAddr;
  121. TBusClientSessionPtr Source;
  122. TCountDownLatch GotReplyLatch;
  123. TBusMessage* SentMessage;
  124. TErrorHandlerCheckerModule()
  125. : ServerAddr("localhost", 17)
  126. , GotReplyLatch(2)
  127. , SentMessage()
  128. {
  129. }
  130. TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
  131. Y_UNUSED(mess);
  132. TExampleRequest* message = new TExampleRequest(&Proto.RequestCount);
  133. job->Send(message, Source, TReplyHandler(&TErrorHandlerCheckerModule::ReplyHandler), 0, ServerAddr);
  134. SentMessage = message;
  135. return &TErrorHandlerCheckerModule::HandleReplies;
  136. }
  137. void ReplyHandler(TBusJob*, EMessageStatus status, TBusMessage* req, TBusMessage* resp) {
  138. Y_ABORT_UNLESS(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "got wrong status: %s", ToString(status).data());
  139. Y_ABORT_UNLESS(req == SentMessage, "checking request");
  140. Y_ABORT_UNLESS(resp == nullptr, "checking response");
  141. GotReplyLatch.CountDown();
  142. }
  143. TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) {
  144. Y_UNUSED(mess);
  145. job->Cancel(MESSAGE_UNKNOWN);
  146. GotReplyLatch.CountDown();
  147. return nullptr;
  148. }
  149. TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
  150. TBusClientSessionConfig sessionConfig;
  151. sessionConfig.SendTimeout = 1; // TODO: allow 0
  152. sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10);
  153. Source = CreateDefaultSource(queue, &Proto, sessionConfig);
  154. Source->RegisterService("localhost");
  155. return nullptr;
  156. }
  157. };
  158. Y_UNIT_TEST(ErrorHandler) {
  159. TExampleProtocol proto;
  160. TBusQueueConfig config;
  161. config.NumWorkers = 5;
  162. TErrorHandlerCheckerModule module;
  163. TBusModuleConfig moduleConfig;
  164. moduleConfig.Secret.SchedulePeriod = TDuration::MilliSeconds(10);
  165. module.SetConfig(moduleConfig);
  166. module.StartModule();
  167. module.StartJob(new TExampleRequest(&proto.StartCount));
  168. module.GotReplyLatch.Await();
  169. module.Shutdown();
  170. }
  171. struct TSlowReplyServer: public TBusServerHandlerError {
  172. TTestSync* const TestSync;
  173. TBusMessageQueuePtr Bus;
  174. TBusServerSessionPtr ServerSession;
  175. TExampleProtocol Proto;
  176. TAtomic OnMessageCount;
  177. TSlowReplyServer(TTestSync* testSync)
  178. : TestSync(testSync)
  179. , OnMessageCount(0)
  180. {
  181. Bus = CreateMessageQueue("TSlowReplyServer");
  182. TBusServerSessionConfig sessionConfig;
  183. ServerSession = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
  184. }
  185. void OnMessage(TOnMessageContext& req) override {
  186. if (AtomicIncrement(OnMessageCount) == 1) {
  187. TestSync->WaitForAndIncrement(0);
  188. }
  189. TAutoPtr<TBusMessage> response(new TExampleResponse(&Proto.ResponseCount));
  190. req.SendReplyMove(response);
  191. }
  192. };
  193. struct TModuleThatSendsReplyEarly: public TExampleClientModule {
  194. TTestSync* const TestSync;
  195. const unsigned ServerPort;
  196. TBusServerSessionPtr ServerSession;
  197. TAtomic ReplyCount;
  198. TModuleThatSendsReplyEarly(TTestSync* testSync, unsigned serverPort)
  199. : TestSync(testSync)
  200. , ServerPort(serverPort)
  201. , ServerSession(nullptr)
  202. , ReplyCount(0)
  203. {
  204. }
  205. TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
  206. Y_UNUSED(mess);
  207. for (unsigned i = 0; i < 2; ++i) {
  208. job->Send(
  209. new TExampleRequest(&Proto.RequestCount),
  210. Source,
  211. TReplyHandler(&TModuleThatSendsReplyEarly::ReplyHandler),
  212. 0,
  213. TNetAddr("127.0.0.1", ServerPort));
  214. }
  215. return &TModuleThatSendsReplyEarly::HandleReplies;
  216. }
  217. void ReplyHandler(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply) {
  218. Y_UNUSED(mess);
  219. Y_UNUSED(reply);
  220. Y_ABORT_UNLESS(status == MESSAGE_OK, "failed to get reply");
  221. if (AtomicIncrement(ReplyCount) == 1) {
  222. TestSync->WaitForAndIncrement(1);
  223. job->SendReply(new TExampleResponse(&Proto.ResponseCount));
  224. } else {
  225. TestSync->WaitForAndIncrement(3);
  226. }
  227. }
  228. TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) {
  229. Y_UNUSED(mess);
  230. job->Cancel(MESSAGE_UNKNOWN);
  231. return nullptr;
  232. }
  233. TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
  234. TExampleClientModule::CreateExtSession(queue);
  235. TBusServerSessionConfig sessionConfig;
  236. return ServerSession = CreateDefaultDestination(queue, &Proto, sessionConfig);
  237. }
  238. };
  239. Y_UNIT_TEST(SendReplyCalledBeforeAllRepliesReceived) {
  240. TTestSync testSync;
  241. TSlowReplyServer slowReplyServer(&testSync);
  242. TModuleThatSendsReplyEarly module(&testSync, slowReplyServer.ServerSession->GetActualListenPort());
  243. module.StartModule();
  244. TExampleClient client;
  245. TNetAddr addr("127.0.0.1", module.ServerSession->GetActualListenPort());
  246. client.SendMessagesWaitReplies(1, &addr);
  247. testSync.WaitForAndIncrement(2);
  248. module.Shutdown();
  249. }
  250. struct TShutdownCalledBeforeReplyReceivedModule: public TExampleClientModule {
  251. unsigned ServerPort;
  252. TTestSync TestSync;
  253. TShutdownCalledBeforeReplyReceivedModule(unsigned serverPort)
  254. : ServerPort(serverPort)
  255. {
  256. }
  257. TJobHandler Start(TBusJob* job, TBusMessage*) override {
  258. TestSync.CheckAndIncrement(0);
  259. job->Send(new TExampleRequest(&Proto.RequestCount), Source,
  260. TReplyHandler(&TShutdownCalledBeforeReplyReceivedModule::HandleReply),
  261. 0, TNetAddr("localhost", ServerPort));
  262. return &TShutdownCalledBeforeReplyReceivedModule::End;
  263. }
  264. void HandleReply(TBusJob*, EMessageStatus status, TBusMessage*, TBusMessage*) {
  265. Y_ABORT_UNLESS(status == MESSAGE_SHUTDOWN, "got %s", ToCString(status));
  266. TestSync.CheckAndIncrement(1);
  267. }
  268. TJobHandler End(TBusJob* job, TBusMessage*) {
  269. TestSync.CheckAndIncrement(2);
  270. job->Cancel(MESSAGE_SHUTDOWN);
  271. return nullptr;
  272. }
  273. };
  274. Y_UNIT_TEST(ShutdownCalledBeforeReplyReceived) {
  275. TExampleServer server;
  276. server.ForgetRequest = true;
  277. TShutdownCalledBeforeReplyReceivedModule module(server.GetActualListenPort());
  278. module.StartModule();
  279. module.StartJob(new TExampleRequest(&module.Proto.RequestCount));
  280. server.TestSync.WaitFor(1);
  281. module.Shutdown();
  282. module.TestSync.CheckAndIncrement(3);
  283. }
  284. }