remote_client_session.cpp 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. #include "remote_client_session.h"
  2. #include "mb_lwtrace.h"
  3. #include "remote_client_connection.h"
  4. #include <library/cpp/messagebus/scheduler/scheduler.h>
  5. #include <util/generic/cast.h>
  6. #include <util/system/defaults.h>
  7. LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
  8. using namespace NBus;
  9. using namespace NBus::NPrivate;
  10. TRemoteClientSession::TRemoteClientSession(TBusMessageQueue* queue,
  11. TBusProtocol* proto, IBusClientHandler* handler,
  12. const TBusClientSessionConfig& config, const TString& name)
  13. : TBusSessionImpl(true, queue, proto, handler, config, name)
  14. , ClientRemoteInFlight(config.MaxInFlight, "ClientRemoteInFlight")
  15. , ClientHandler(handler)
  16. {
  17. }
  18. TRemoteClientSession::~TRemoteClientSession() {
  19. //Cerr << "~TRemoteClientSession" << Endl;
  20. }
  21. void TRemoteClientSession::OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) {
  22. TAutoPtr<TVectorSwaps<TBusMessagePtrAndHeader>> temp(new TVectorSwaps<TBusMessagePtrAndHeader>);
  23. temp->swap(newMsg);
  24. c->ReplyQueue.EnqueueAll(temp);
  25. c->ScheduleWrite();
  26. }
  27. EMessageStatus TRemoteClientSession::SendMessageImpl(TBusMessage* msg, const TNetAddr* addr, bool wait, bool oneWay) {
  28. if (Y_UNLIKELY(IsDown())) {
  29. return MESSAGE_SHUTDOWN;
  30. }
  31. TBusSocketAddr resolvedAddr;
  32. EMessageStatus ret = GetMessageDestination(msg, addr, &resolvedAddr);
  33. if (ret != MESSAGE_OK) {
  34. return ret;
  35. }
  36. msg->ReplyTo = resolvedAddr;
  37. TRemoteConnectionPtr c = ((TBusSessionImpl*)this)->GetConnection(resolvedAddr, true);
  38. Y_ASSERT(!!c);
  39. return CheckedCast<TRemoteClientConnection*>(c.Get())->SendMessageImpl(msg, wait, oneWay);
  40. }
  41. EMessageStatus TRemoteClientSession::SendMessage(TBusMessage* msg, const TNetAddr* addr, bool wait) {
  42. return SendMessageImpl(msg, addr, wait, false);
  43. }
  44. EMessageStatus TRemoteClientSession::SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr, bool wait) {
  45. return SendMessageImpl(pMes, addr, wait, true);
  46. }
  47. int TRemoteClientSession::GetInFlight() const noexcept {
  48. return ClientRemoteInFlight.GetCurrent();
  49. }
  50. void TRemoteClientSession::FillStatus() {
  51. TBusSessionImpl::FillStatus();
  52. StatusData.Status.InFlightCount = ClientRemoteInFlight.GetCurrent();
  53. StatusData.Status.InputPaused = false;
  54. }
  55. void TRemoteClientSession::AcquireInFlight(TArrayRef<TBusMessage* const> messages) {
  56. for (auto message : messages) {
  57. Y_ASSERT(!(message->LocalFlags & MESSAGE_IN_FLIGHT_ON_CLIENT));
  58. message->LocalFlags |= MESSAGE_IN_FLIGHT_ON_CLIENT;
  59. }
  60. ClientRemoteInFlight.IncrementMultiple(messages.size());
  61. }
  62. void TRemoteClientSession::ReleaseInFlight(TArrayRef<TBusMessage* const> messages) {
  63. for (auto message : messages) {
  64. Y_ASSERT(message->LocalFlags & MESSAGE_IN_FLIGHT_ON_CLIENT);
  65. message->LocalFlags &= ~MESSAGE_IN_FLIGHT_ON_CLIENT;
  66. }
  67. ClientRemoteInFlight.ReleaseMultiple(messages.size());
  68. }
  69. void TRemoteClientSession::ReleaseInFlightAndCallOnReply(TNonDestroyingAutoPtr<TBusMessage> request, TBusMessagePtrAndHeader& response) {
  70. ReleaseInFlight({request.Get()});
  71. if (Y_UNLIKELY(AtomicGet(Down))) {
  72. InvokeOnError(request, MESSAGE_SHUTDOWN);
  73. InvokeOnError(response.MessagePtr.Release(), MESSAGE_SHUTDOWN);
  74. TRemoteConnectionReaderIncrementalStatus counter;
  75. LWPROBE(Error, ToString(MESSAGE_SHUTDOWN), "", "");
  76. counter.StatusCounter[MESSAGE_SHUTDOWN] += 1;
  77. GetDeadConnectionReaderStatusQueue()->EnqueueAndSchedule(counter);
  78. } else {
  79. TWhatThreadDoesPushPop pp("OnReply");
  80. ClientHandler->OnReply(request, response.MessagePtr.Release());
  81. }
  82. }
  83. EMessageStatus TRemoteClientSession::GetMessageDestination(TBusMessage* mess, const TNetAddr* addrp, TBusSocketAddr* dest) {
  84. if (addrp) {
  85. *dest = *addrp;
  86. } else {
  87. TNetAddr tmp;
  88. EMessageStatus ret = const_cast<TBusProtocol*>(GetProto())->GetDestination(this, mess, GetQueue()->GetLocator(), &tmp);
  89. if (ret != MESSAGE_OK) {
  90. return ret;
  91. }
  92. *dest = tmp;
  93. }
  94. return MESSAGE_OK;
  95. }
  96. void TRemoteClientSession::OpenConnection(const TNetAddr& addr) {
  97. GetConnection(addr)->OpenConnection();
  98. }
  99. TBusClientConnectionPtr TRemoteClientSession::GetConnection(const TNetAddr& addr) {
  100. // TODO: GetConnection should not open
  101. return CheckedCast<TRemoteClientConnection*>(((TBusSessionImpl*)this)->GetConnection(addr, true).Get());
  102. }