moduletest.h 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. #pragma once
  2. ///////////////////////////////////////////////////////////////////
  3. /// \file
  4. /// \brief Example of using local session for communication.
  5. #include <library/cpp/messagebus/test/helper/alloc_counter.h>
  6. #include <library/cpp/messagebus/test/helper/example.h>
  7. #include <library/cpp/messagebus/test/helper/message_handler_error.h>
  8. #include <library/cpp/messagebus/ybus.h>
  9. #include <library/cpp/messagebus/oldmodule/module.h>
  10. namespace NBus {
  11. namespace NTest {
  12. using namespace std;
  13. #define TYPE_HOSTINFOREQUEST 100
  14. #define TYPE_HOSTINFORESPONSE 101
  15. ////////////////////////////////////////////////////////////////////
  16. /// \brief DupDetect protocol that common between client and server
  17. ////////////////////////////////////////////////////////////////////
  18. /// \brief HostInfo request class
  19. class THostInfoMessage: public TBusMessage {
  20. public:
  21. THostInfoMessage()
  22. : TBusMessage(TYPE_HOSTINFOREQUEST)
  23. {
  24. }
  25. THostInfoMessage(ECreateUninitialized)
  26. : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
  27. {
  28. }
  29. ~THostInfoMessage() override {
  30. }
  31. };
  32. ////////////////////////////////////////////////////////////////////
  33. /// \brief HostInfo reply class
  34. class THostInfoReply: public TBusMessage {
  35. public:
  36. THostInfoReply()
  37. : TBusMessage(TYPE_HOSTINFORESPONSE)
  38. {
  39. }
  40. THostInfoReply(ECreateUninitialized)
  41. : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
  42. {
  43. }
  44. ~THostInfoReply() override {
  45. }
  46. };
  47. ////////////////////////////////////////////////////////////////////
  48. /// \brief HostInfo protocol that common between client and server
  49. class THostInfoProtocol: public TBusProtocol {
  50. public:
  51. THostInfoProtocol()
  52. : TBusProtocol("HOSTINFO", 0)
  53. {
  54. }
  55. /// serialized protocol specific data into TBusData
  56. void Serialize(const TBusMessage* mess, TBuffer& data) override {
  57. Y_UNUSED(data);
  58. Y_UNUSED(mess);
  59. }
  60. /// deserialized TBusData into new instance of the message
  61. TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override {
  62. Y_UNUSED(payload);
  63. if (messageType == TYPE_HOSTINFOREQUEST) {
  64. return new THostInfoMessage(MESSAGE_CREATE_UNINITIALIZED);
  65. } else if (messageType == TYPE_HOSTINFORESPONSE) {
  66. return new THostInfoReply(MESSAGE_CREATE_UNINITIALIZED);
  67. } else {
  68. Y_ABORT("unknown");
  69. }
  70. }
  71. };
  72. //////////////////////////////////////////////////////////////
  73. /// \brief HostInfo handler (should convert it to module too)
  74. struct THostInfoHandler: public TBusServerHandlerError {
  75. TBusServerSessionPtr Session;
  76. TBusServerSessionConfig HostInfoConfig;
  77. THostInfoProtocol HostInfoProto;
  78. THostInfoHandler(TBusMessageQueue* queue) {
  79. Session = TBusServerSession::Create(&HostInfoProto, this, HostInfoConfig, queue);
  80. }
  81. void OnMessage(TOnMessageContext& mess) override {
  82. usleep(10 * 1000); /// pretend we are doing something
  83. TAutoPtr<THostInfoReply> reply(new THostInfoReply());
  84. mess.SendReplyMove(reply);
  85. }
  86. TNetAddr GetActualListenAddr() {
  87. return TNetAddr("localhost", Session->GetActualListenPort());
  88. }
  89. };
  90. //////////////////////////////////////////////////////////////
  91. /// \brief DupDetect handler (should convert it to module too)
  92. struct TDupDetectHandler: public TBusClientHandlerError {
  93. TNetAddr ServerAddr;
  94. TBusClientSessionPtr DupDetect;
  95. TBusClientSessionConfig DupDetectConfig;
  96. TExampleProtocol DupDetectProto;
  97. int NumMessages;
  98. int NumReplies;
  99. TDupDetectHandler(const TNetAddr& serverAddr, TBusMessageQueuePtr queue)
  100. : ServerAddr(serverAddr)
  101. {
  102. DupDetect = TBusClientSession::Create(&DupDetectProto, this, DupDetectConfig, queue);
  103. DupDetect->RegisterService("localhost");
  104. }
  105. void Work() {
  106. NumMessages = 10;
  107. NumReplies = 0;
  108. for (int i = 0; i < NumMessages; i++) {
  109. TExampleRequest* mess = new TExampleRequest(&DupDetectProto.RequestCount);
  110. DupDetect->SendMessage(mess, &ServerAddr);
  111. }
  112. }
  113. void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override {
  114. Y_UNUSED(mess);
  115. Y_UNUSED(reply);
  116. NumReplies++;
  117. }
  118. };
  119. /////////////////////////////////////////////////////////////////
  120. /// \brief DupDetect module
  121. struct TDupDetectModule: public TBusModule {
  122. TNetAddr HostInfoAddr;
  123. TBusClientSessionPtr HostInfoClientSession;
  124. TBusClientSessionConfig HostInfoConfig;
  125. THostInfoProtocol HostInfoProto;
  126. TExampleProtocol DupDetectProto;
  127. TBusServerSessionConfig DupDetectConfig;
  128. TNetAddr ListenAddr;
  129. TDupDetectModule(const TNetAddr& hostInfoAddr)
  130. : TBusModule("DUPDETECTMODULE")
  131. , HostInfoAddr(hostInfoAddr)
  132. {
  133. }
  134. bool Init(TBusMessageQueue* queue) {
  135. HostInfoClientSession = CreateDefaultSource(*queue, &HostInfoProto, HostInfoConfig);
  136. HostInfoClientSession->RegisterService("localhost");
  137. return TBusModule::CreatePrivateSessions(queue);
  138. }
  139. TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
  140. TBusServerSessionPtr session = CreateDefaultDestination(queue, &DupDetectProto, DupDetectConfig);
  141. ListenAddr = TNetAddr("localhost", session->GetActualListenPort());
  142. return session;
  143. }
  144. /// entry point into module, first function to call
  145. TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
  146. TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess);
  147. Y_UNUSED(dmess);
  148. THostInfoMessage* hmess = new THostInfoMessage();
  149. /// send message to imaginary hostinfo server
  150. job->Send(hmess, HostInfoClientSession, TReplyHandler(), 0, HostInfoAddr);
  151. return TJobHandler(&TDupDetectModule::ProcessHostInfo);
  152. }
  153. /// next handler is executed when all outstanding requests from previous handler is completed
  154. TJobHandler ProcessHostInfo(TBusJob* job, TBusMessage* mess) {
  155. TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess);
  156. Y_UNUSED(dmess);
  157. THostInfoMessage* hmess = job->Get<THostInfoMessage>();
  158. THostInfoReply* hreply = job->Get<THostInfoReply>();
  159. EMessageStatus hstatus = job->GetStatus<THostInfoMessage>();
  160. Y_ASSERT(hmess != nullptr);
  161. Y_ASSERT(hreply != nullptr);
  162. Y_ASSERT(hstatus == MESSAGE_OK);
  163. return TJobHandler(&TDupDetectModule::Finish);
  164. }
  165. /// last handler sends reply and returns NULL
  166. TJobHandler Finish(TBusJob* job, TBusMessage* mess) {
  167. Y_UNUSED(mess);
  168. TExampleResponse* reply = new TExampleResponse(&DupDetectProto.ResponseCount);
  169. job->SendReply(reply);
  170. return nullptr;
  171. }
  172. };
  173. }
  174. }