synchandler.cpp 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. #include "remote_client_session.h"
  2. #include "remote_connection.h"
  3. #include "ybus.h"
  4. using namespace NBus;
  5. using namespace NBus::NPrivate;
  6. /////////////////////////////////////////////////////////////////
  7. /// Object that encapsulates all messgae data required for sending
  8. /// a message synchronously and receiving a reply. It includes:
  9. /// 1. ConditionVariable to wait on message reply
  10. /// 2. Lock used by condition variable
  11. /// 3. Message reply
  12. /// 4. Reply status
  13. struct TBusSyncMessageData {
  14. TCondVar ReplyEvent;
  15. TMutex ReplyLock;
  16. TBusMessage* Reply;
  17. EMessageStatus ReplyStatus;
  18. TBusSyncMessageData()
  19. : Reply(nullptr)
  20. , ReplyStatus(MESSAGE_DONT_ASK)
  21. {
  22. }
  23. };
  24. class TSyncHandler: public IBusClientHandler {
  25. public:
  26. TSyncHandler(bool expectReply = true)
  27. : ExpectReply(expectReply)
  28. , Session(nullptr)
  29. {
  30. }
  31. ~TSyncHandler() override {
  32. }
  33. void OnReply(TAutoPtr<TBusMessage> pMessage0, TAutoPtr<TBusMessage> pReply0) override {
  34. TBusMessage* pMessage = pMessage0.Release();
  35. TBusMessage* pReply = pReply0.Release();
  36. if (!ExpectReply) { // Maybe need VERIFY, but it will be better to support backward compatibility here.
  37. return;
  38. }
  39. TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage->Data);
  40. SignalResult(data, pReply, MESSAGE_OK);
  41. }
  42. void OnError(TAutoPtr<TBusMessage> pMessage0, EMessageStatus status) override {
  43. TBusMessage* pMessage = pMessage0.Release();
  44. TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage->Data);
  45. if (!data) {
  46. return;
  47. }
  48. SignalResult(data, /*pReply=*/nullptr, status);
  49. }
  50. void OnMessageSent(TBusMessage* pMessage) override {
  51. Y_UNUSED(pMessage);
  52. Y_ASSERT(ExpectReply);
  53. }
  54. void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override {
  55. Y_ASSERT(!ExpectReply);
  56. TBusSyncMessageData* data = static_cast<TBusSyncMessageData*>(pMessage.Release()->Data);
  57. SignalResult(data, /*pReply=*/nullptr, MESSAGE_OK);
  58. }
  59. void SetSession(TRemoteClientSession* session) {
  60. if (!ExpectReply) {
  61. Session = session;
  62. }
  63. }
  64. private:
  65. void SignalResult(TBusSyncMessageData* data, TBusMessage* pReply, EMessageStatus status) const {
  66. Y_ABORT_UNLESS(data, "Message data is set to NULL.");
  67. TGuard<TMutex> G(data->ReplyLock);
  68. data->Reply = pReply;
  69. data->ReplyStatus = status;
  70. data->ReplyEvent.Signal();
  71. }
  72. private:
  73. // This is weird, because in regular client one-way-ness is selected per call, not per session.
  74. bool ExpectReply;
  75. TRemoteClientSession* Session;
  76. };
  77. namespace NBus {
  78. namespace NPrivate {
  79. #ifdef _MSC_VER
  80. #pragma warning(push)
  81. #pragma warning(disable : 4250) // 'NBus::NPrivate::TRemoteClientSession' : inherits 'NBus::NPrivate::TBusSessionImpl::NBus::NPrivate::TBusSessionImpl::GetConfig' via dominance
  82. #endif
  83. ///////////////////////////////////////////////////////////////////////////
  84. class TBusSyncSourceSessionImpl
  85. : private TSyncHandler
  86. // TODO: do not extend TRemoteClientSession
  87. ,
  88. public TRemoteClientSession {
  89. private:
  90. bool NeedReply;
  91. public:
  92. TBusSyncSourceSessionImpl(TBusMessageQueue* queue, TBusProtocol* proto, const TBusClientSessionConfig& config, bool needReply, const TString& name)
  93. : TSyncHandler(needReply)
  94. , TRemoteClientSession(queue, proto, this, config, name)
  95. , NeedReply(needReply)
  96. {
  97. SetSession(this);
  98. }
  99. TBusMessage* SendSyncMessage(TBusMessage* pMessage, EMessageStatus& status, const TNetAddr* addr = nullptr) {
  100. Y_ABORT_UNLESS(!Queue->GetExecutor()->IsInExecutorThread(),
  101. "SendSyncMessage must not be called from executor thread");
  102. TBusMessage* reply = nullptr;
  103. THolder<TBusSyncMessageData> data(new TBusSyncMessageData());
  104. pMessage->Data = data.Get();
  105. {
  106. TGuard<TMutex> G(data->ReplyLock);
  107. if (NeedReply) {
  108. status = SendMessage(pMessage, addr, false); // probably should be true
  109. } else {
  110. status = SendMessageOneWay(pMessage, addr);
  111. }
  112. if (status == MESSAGE_OK) {
  113. data->ReplyEvent.Wait(data->ReplyLock);
  114. TBusSyncMessageData* rdata = static_cast<TBusSyncMessageData*>(pMessage->Data);
  115. Y_ABORT_UNLESS(rdata == data.Get(), "Message data pointer should not be modified.");
  116. reply = rdata->Reply;
  117. status = rdata->ReplyStatus;
  118. }
  119. }
  120. // deletion of message and reply is a job of application.
  121. pMessage->Data = nullptr;
  122. return reply;
  123. }
  124. };
  125. #ifdef _MSC_VER
  126. #pragma warning(pop)
  127. #endif
  128. }
  129. }
  130. TBusSyncSourceSession::TBusSyncSourceSession(TIntrusivePtr< ::NBus::NPrivate::TBusSyncSourceSessionImpl> session)
  131. : Session(session)
  132. {
  133. }
  134. TBusSyncSourceSession::~TBusSyncSourceSession() {
  135. Shutdown();
  136. }
  137. void TBusSyncSourceSession::Shutdown() {
  138. Session->Shutdown();
  139. }
  140. TBusMessage* TBusSyncSourceSession::SendSyncMessage(TBusMessage* pMessage, EMessageStatus& status, const TNetAddr* addr) {
  141. return Session->SendSyncMessage(pMessage, status, addr);
  142. }
  143. int TBusSyncSourceSession::RegisterService(const char* hostname, TBusKey start, TBusKey end, EIpVersion ipVersion) {
  144. return Session->RegisterService(hostname, start, end, ipVersion);
  145. }
  146. int TBusSyncSourceSession::GetInFlight() {
  147. return Session->GetInFlight();
  148. }
  149. const TBusProtocol* TBusSyncSourceSession::GetProto() const {
  150. return Session->GetProto();
  151. }
  152. const TBusClientSession* TBusSyncSourceSession::GetBusClientSessionWorkaroundDoNotUse() const {
  153. return Session.Get();
  154. }
  155. TBusSyncClientSessionPtr TBusMessageQueue::CreateSyncSource(TBusProtocol* proto, const TBusClientSessionConfig& config, bool needReply, const TString& name) {
  156. TIntrusivePtr<TBusSyncSourceSessionImpl> session = new TBusSyncSourceSessionImpl(this, proto, config, needReply, name);
  157. Add(session.Get());
  158. return new TBusSyncSourceSession(session);
  159. }
  160. void TBusMessageQueue::Destroy(TBusSyncClientSessionPtr session) {
  161. Destroy(session->Session.Get());
  162. Y_UNUSED(session->Session.Release());
  163. }