remote_server_connection.cpp 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. #include "remote_server_connection.h"
  2. #include "mb_lwtrace.h"
  3. #include "remote_server_session.h"
  4. #include <util/generic/cast.h>
  5. LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
  6. using namespace NBus;
  7. using namespace NBus::NPrivate;
  8. TRemoteServerConnection::TRemoteServerConnection(TRemoteServerSessionPtr session, ui64 id, TNetAddr addr)
  9. : TRemoteConnection(session.Get(), id, addr)
  10. {
  11. }
  12. void TRemoteServerConnection::Init(SOCKET socket, TInstant now) {
  13. WriterData.Status.ConnectTime = now;
  14. WriterData.Status.Connected = true;
  15. Y_ABORT_UNLESS(socket != INVALID_SOCKET, "must be a valid socket");
  16. TSocket readSocket(socket);
  17. TSocket writeSocket = readSocket;
  18. // this must not be done in constructor, because if event loop is stopped,
  19. // this is deleted
  20. WriterData.SetChannel(Session->WriteEventLoop.Register(writeSocket, this, WriteCookie));
  21. WriterData.SocketVersion = 1;
  22. ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(readSocket, WriterData.SocketVersion));
  23. }
  24. TRemoteServerSession* TRemoteServerConnection::GetSession() {
  25. return CheckedCast<TRemoteServerSession*>(Session.Get());
  26. }
  27. void TRemoteServerConnection::HandleEvent(SOCKET socket, void* cookie) {
  28. Y_UNUSED(socket);
  29. Y_ASSERT(cookie == ReadCookie || cookie == WriteCookie);
  30. if (cookie == ReadCookie) {
  31. GetSession()->ServerOwnedMessages.Wait();
  32. ScheduleRead();
  33. } else {
  34. ScheduleWrite();
  35. }
  36. }
  37. bool TRemoteServerConnection::NeedInterruptRead() {
  38. return !GetSession()->ServerOwnedMessages.TryWait();
  39. }
  40. void TRemoteServerConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) {
  41. TInstant now = TInstant::Now();
  42. GetSession()->ReleaseInWorkResponses(messages);
  43. for (auto& message : messages) {
  44. TInstant recvTime = message.MessagePtr->RecvTime;
  45. GetSession()->ServerHandler->OnSent(message.MessagePtr.Release());
  46. TDuration d = now - recvTime;
  47. WriterData.Status.DurationCounter.AddDuration(d);
  48. WriterData.Status.Incremental.ProcessDurationHistogram.AddTime(d);
  49. }
  50. }
  51. void TRemoteServerConnection::ReaderProcessMessageUnknownVersion(TArrayRef<const char> dataRef) {
  52. TBusHeader header(dataRef);
  53. // TODO: full version hex
  54. LWPROBE(ServerUnknownVersion, ToString(PeerAddr), header.GetVersionInternal());
  55. WrongVersionRequests.Enqueue(header);
  56. GetWriterActor()->Schedule();
  57. }