remote_server_session.cpp 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. #include "remote_server_session.h"
  2. #include "remote_connection.h"
  3. #include "remote_server_connection.h"
  4. #include <library/cpp/messagebus/actor/temp_tls_vector.h>
  5. #include <util/generic/cast.h>
  6. #include <util/stream/output.h>
  7. #include <util/system/yassert.h>
  8. #include <typeinfo>
  9. using namespace NActor;
  10. using namespace NBus;
  11. using namespace NBus::NPrivate;
  12. TRemoteServerSession::TRemoteServerSession(TBusMessageQueue* queue,
  13. TBusProtocol* proto, IBusServerHandler* handler,
  14. const TBusServerSessionConfig& config, const TString& name)
  15. : TBusSessionImpl(false, queue, proto, handler, config, name)
  16. , ServerOwnedMessages(config.MaxInFlight, config.MaxInFlightBySize, "ServerOwnedMessages")
  17. , ServerHandler(handler)
  18. {
  19. if (config.PerConnectionMaxInFlightBySize > 0) {
  20. if (config.PerConnectionMaxInFlightBySize < config.MaxMessageSize)
  21. ythrow yexception()
  22. << "too low PerConnectionMaxInFlightBySize value";
  23. }
  24. }
  25. namespace NBus {
  26. namespace NPrivate {
  27. class TInvokeOnMessage: public IWorkItem {
  28. private:
  29. TRemoteServerSession* RemoteServerSession;
  30. TBusMessagePtrAndHeader Request;
  31. TIntrusivePtr<TRemoteServerConnection> Connection;
  32. public:
  33. TInvokeOnMessage(TRemoteServerSession* session, TBusMessagePtrAndHeader& request, TIntrusivePtr<TRemoteServerConnection>& connection)
  34. : RemoteServerSession(session)
  35. {
  36. Y_ASSERT(!!connection);
  37. Connection.Swap(connection);
  38. Request.Swap(request);
  39. }
  40. void DoWork() override {
  41. THolder<TInvokeOnMessage> holder(this);
  42. RemoteServerSession->InvokeOnMessage(Request, Connection);
  43. // TODO: TRemoteServerSessionSemaphore should be enough
  44. RemoteServerSession->JobCount.Decrement();
  45. }
  46. };
  47. }
  48. }
  49. void TRemoteServerSession::OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& messages) {
  50. AcquireInWorkRequests(messages);
  51. bool executeInPool = Config.ExecuteOnMessageInWorkerPool;
  52. TTempTlsVector< ::IWorkItem*> workQueueTemp;
  53. if (executeInPool) {
  54. workQueueTemp.GetVector()->reserve(messages.size());
  55. }
  56. for (auto& message : messages) {
  57. // TODO: incref once
  58. TIntrusivePtr<TRemoteServerConnection> connection(CheckedCast<TRemoteServerConnection*>(c));
  59. if (executeInPool) {
  60. workQueueTemp.GetVector()->push_back(new TInvokeOnMessage(this, message, connection));
  61. } else {
  62. InvokeOnMessage(message, connection);
  63. }
  64. }
  65. if (executeInPool) {
  66. JobCount.Add(workQueueTemp.GetVector()->size());
  67. Queue->EnqueueWork(*workQueueTemp.GetVector());
  68. }
  69. }
  70. void TRemoteServerSession::InvokeOnMessage(TBusMessagePtrAndHeader& request, TIntrusivePtr<TRemoteServerConnection>& conn) {
  71. if (Y_UNLIKELY(AtomicGet(Down))) {
  72. ReleaseInWorkRequests(*conn.Get(), request.MessagePtr.Get());
  73. InvokeOnError(request.MessagePtr.Release(), MESSAGE_SHUTDOWN);
  74. } else {
  75. TWhatThreadDoesPushPop pp("OnMessage");
  76. TBusIdentity ident;
  77. ident.Connection.Swap(conn);
  78. request.MessagePtr->GetIdentity(ident);
  79. Y_ASSERT(request.MessagePtr->LocalFlags & MESSAGE_IN_WORK);
  80. DoSwap(request.MessagePtr->LocalFlags, ident.LocalFlags);
  81. ident.RecvTime = request.MessagePtr->RecvTime;
  82. #ifndef NDEBUG
  83. auto& message = *request.MessagePtr;
  84. ident.SetMessageType(typeid(message));
  85. #endif
  86. TOnMessageContext context(request.MessagePtr.Release(), ident, this);
  87. ServerHandler->OnMessage(context);
  88. }
  89. }
  90. EMessageStatus TRemoteServerSession::ForgetRequest(const TBusIdentity& ident) {
  91. ReleaseInWork(const_cast<TBusIdentity&>(ident));
  92. return MESSAGE_OK;
  93. }
  94. EMessageStatus TRemoteServerSession::SendReply(const TBusIdentity& ident, TBusMessage* reply) {
  95. reply->CheckClean();
  96. ConvertInWork(const_cast<TBusIdentity&>(ident), reply);
  97. reply->RecvTime = ident.RecvTime;
  98. ident.Connection->Send(reply);
  99. return MESSAGE_OK;
  100. }
  101. int TRemoteServerSession::GetInFlight() const noexcept {
  102. return ServerOwnedMessages.GetCurrentCount();
  103. }
  104. void TRemoteServerSession::FillStatus() {
  105. TBusSessionImpl::FillStatus();
  106. // TODO: weird
  107. StatusData.Status.InFlightCount = ServerOwnedMessages.GetCurrentCount();
  108. StatusData.Status.InFlightSize = ServerOwnedMessages.GetCurrentSize();
  109. StatusData.Status.InputPaused = ServerOwnedMessages.IsLocked();
  110. }
  111. void TRemoteServerSession::AcquireInWorkRequests(TArrayRef<const TBusMessagePtrAndHeader> messages) {
  112. TAtomicBase size = 0;
  113. for (auto message = messages.begin(); message != messages.end(); ++message) {
  114. Y_ASSERT(!(message->MessagePtr->LocalFlags & MESSAGE_IN_WORK));
  115. message->MessagePtr->LocalFlags |= MESSAGE_IN_WORK;
  116. size += message->MessagePtr->GetHeader()->Size;
  117. }
  118. ServerOwnedMessages.IncrementMultiple(messages.size(), size);
  119. }
  120. void TRemoteServerSession::ReleaseInWorkResponses(TArrayRef<const TBusMessagePtrAndHeader> responses) {
  121. TAtomicBase size = 0;
  122. for (auto response = responses.begin(); response != responses.end(); ++response) {
  123. Y_ASSERT((response->MessagePtr->LocalFlags & MESSAGE_REPLY_IS_BEGING_SENT));
  124. response->MessagePtr->LocalFlags &= ~MESSAGE_REPLY_IS_BEGING_SENT;
  125. size += response->MessagePtr->RequestSize;
  126. }
  127. ServerOwnedMessages.ReleaseMultiple(responses.size(), size);
  128. }
  129. void TRemoteServerSession::ReleaseInWorkRequests(TRemoteConnection& con, TBusMessage* request) {
  130. Y_ASSERT((request->LocalFlags & MESSAGE_IN_WORK));
  131. request->LocalFlags &= ~MESSAGE_IN_WORK;
  132. const size_t size = request->GetHeader()->Size;
  133. con.QuotaReturnAside(1, size);
  134. ServerOwnedMessages.ReleaseMultiple(1, size);
  135. }
  136. void TRemoteServerSession::ReleaseInWork(TBusIdentity& ident) {
  137. ident.SetInWork(false);
  138. ident.Connection->QuotaReturnAside(1, ident.Size);
  139. ServerOwnedMessages.ReleaseMultiple(1, ident.Size);
  140. }
  141. void TRemoteServerSession::ConvertInWork(TBusIdentity& req, TBusMessage* reply) {
  142. reply->SetIdentity(req);
  143. req.SetInWork(false);
  144. Y_ASSERT(!(reply->LocalFlags & MESSAGE_REPLY_IS_BEGING_SENT));
  145. reply->LocalFlags |= MESSAGE_REPLY_IS_BEGING_SENT;
  146. reply->RequestSize = req.Size;
  147. }
  148. void TRemoteServerSession::Shutdown() {
  149. ServerOwnedMessages.Stop();
  150. TBusSessionImpl::Shutdown();
  151. }
  152. void TRemoteServerSession::PauseInput(bool pause) {
  153. ServerOwnedMessages.PauseByUsed(pause);
  154. }
  155. unsigned TRemoteServerSession::GetActualListenPort() {
  156. Y_ABORT_UNLESS(Config.ListenPort > 0, "state check");
  157. return Config.ListenPort;
  158. }