remote_connection_status.cpp 8.2 KB


  1. #include "remote_connection_status.h"
  2. #include "key_value_printer.h"
  3. #include <library/cpp/messagebus/monitoring/mon_proto.pb.h>
  4. #include <util/stream/format.h>
  5. #include <util/stream/output.h>
  6. #include <util/system/yassert.h>
  7. using namespace NBus;
  8. using namespace NBus::NPrivate;
  9. template <typename T>
  10. static void Add(T& thiz, const T& that) {
  11. thiz += that;
  12. }
  13. template <typename T>
  14. static void Max(T& thiz, const T& that) {
  15. if (that > thiz) {
  16. thiz = that;
  17. }
  18. }
  19. template <typename T>
  20. static void AssertZero(T& thiz, const T& that) {
  21. Y_ASSERT(thiz == T());
  22. Y_UNUSED(that);
  23. }
  24. TDurationCounter::TDurationCounter()
  25. : DURATION_COUNTER_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA)
  26. {
  27. }
  28. TDuration TDurationCounter::AvgDuration() const {
  29. if (Count == 0) {
  30. return TDuration::Zero();
  31. } else {
  32. return SumDuration / Count;
  33. }
  34. }
  35. TDurationCounter& TDurationCounter::operator+=(const TDurationCounter& that) {
  36. DURATION_COUNTER_MAP(STRUCT_FIELD_ADD, )
  37. return *this;
  38. }
  39. TString TDurationCounter::ToString() const {
  40. if (Count == 0) {
  41. return "0";
  42. } else {
  43. TStringStream ss;
  44. ss << "avg: " << AvgDuration() << ", max: " << MaxDuration << ", count: " << Count;
  45. return ss.Str();
  46. }
  47. }
  48. TRemoteConnectionStatusBase::TRemoteConnectionStatusBase()
  49. : REMOTE_CONNECTION_STATUS_BASE_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA)
  50. {
  51. }
  52. TRemoteConnectionStatusBase& TRemoteConnectionStatusBase ::operator+=(const TRemoteConnectionStatusBase& that) {
  53. REMOTE_CONNECTION_STATUS_BASE_MAP(STRUCT_FIELD_ADD, )
  54. return *this;
  55. }
  56. TRemoteConnectionIncrementalStatusBase::TRemoteConnectionIncrementalStatusBase()
  57. : REMOTE_CONNECTION_INCREMENTAL_STATUS_BASE_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA)
  58. {
  59. }
  60. TRemoteConnectionIncrementalStatusBase& TRemoteConnectionIncrementalStatusBase::operator+=(
  61. const TRemoteConnectionIncrementalStatusBase& that) {
  62. REMOTE_CONNECTION_INCREMENTAL_STATUS_BASE_MAP(STRUCT_FIELD_ADD, )
  63. return *this;
  64. }
  65. TRemoteConnectionReaderIncrementalStatus::TRemoteConnectionReaderIncrementalStatus()
  66. : REMOTE_CONNECTION_READER_INCREMENTAL_STATUS_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA)
  67. {
  68. }
  69. TRemoteConnectionReaderIncrementalStatus& TRemoteConnectionReaderIncrementalStatus::operator+=(
  70. const TRemoteConnectionReaderIncrementalStatus& that) {
  71. TRemoteConnectionIncrementalStatusBase::operator+=(that);
  72. REMOTE_CONNECTION_READER_INCREMENTAL_STATUS_MAP(STRUCT_FIELD_ADD, )
  73. return *this;
  74. }
  75. TRemoteConnectionReaderStatus::TRemoteConnectionReaderStatus()
  76. : REMOTE_CONNECTION_READER_STATUS_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA)
  77. {
  78. }
  79. TRemoteConnectionReaderStatus& TRemoteConnectionReaderStatus::operator+=(const TRemoteConnectionReaderStatus& that) {
  80. TRemoteConnectionStatusBase::operator+=(that);
  81. REMOTE_CONNECTION_READER_STATUS_MAP(STRUCT_FIELD_ADD, )
  82. return *this;
  83. }
  84. TRemoteConnectionWriterIncrementalStatus::TRemoteConnectionWriterIncrementalStatus()
  85. : REMOTE_CONNECTION_WRITER_INCREMENTAL_STATUS(STRUCT_FIELD_INIT_DEFAULT, COMMA)
  86. {
  87. }
  88. TRemoteConnectionWriterIncrementalStatus& TRemoteConnectionWriterIncrementalStatus::operator+=(
  89. const TRemoteConnectionWriterIncrementalStatus& that) {
  90. TRemoteConnectionIncrementalStatusBase::operator+=(that);
  91. REMOTE_CONNECTION_WRITER_INCREMENTAL_STATUS(STRUCT_FIELD_ADD, )
  92. return *this;
  93. }
  94. TRemoteConnectionWriterStatus::TRemoteConnectionWriterStatus()
  95. : REMOTE_CONNECTION_WRITER_STATUS(STRUCT_FIELD_INIT_DEFAULT, COMMA)
  96. {
  97. }
  98. TRemoteConnectionWriterStatus& TRemoteConnectionWriterStatus::operator+=(const TRemoteConnectionWriterStatus& that) {
  99. TRemoteConnectionStatusBase::operator+=(that);
  100. REMOTE_CONNECTION_WRITER_STATUS(STRUCT_FIELD_ADD, )
  101. return *this;
  102. }
  103. size_t TRemoteConnectionWriterStatus::GetInFlight() const {
  104. return SendQueueSize + AckMessagesSize;
  105. }
  106. TConnectionStatusMonRecord TRemoteConnectionStatus::GetStatusProtobuf() const {
  107. TConnectionStatusMonRecord status;
  108. // TODO: fill unfilled fields
  109. status.SetSendQueueSize(WriterStatus.SendQueueSize);
  110. status.SetAckMessagesSize(WriterStatus.AckMessagesSize);
  111. // status.SetErrorCount();
  112. // status.SetWriteBytes();
  113. // status.SetWriteBytesCompressed();
  114. // status.SetWriteMessages();
  115. status.SetWriteSyscalls(WriterStatus.Incremental.NetworkOps);
  116. status.SetWriteActs(WriterStatus.Acts);
  117. // status.SetReadBytes();
  118. // status.SetReadBytesCompressed();
  119. // status.SetReadMessages();
  120. status.SetReadSyscalls(ReaderStatus.Incremental.NetworkOps);
  121. status.SetReadActs(ReaderStatus.Acts);
  122. TMessageStatusCounter sumStatusCounter;
  123. sumStatusCounter += WriterStatus.Incremental.StatusCounter;
  124. sumStatusCounter += ReaderStatus.Incremental.StatusCounter;
  125. sumStatusCounter.FillErrorsProtobuf(&status);
  126. return status;
  127. }
  128. TString TRemoteConnectionStatus::PrintToString() const {
  129. TStringStream ss;
  130. TKeyValuePrinter p;
  131. if (!Summary) {
  132. // TODO: print MyAddr too, but only if it is set
  133. ss << WriterStatus.PeerAddr << " (" << WriterStatus.ConnectionId << ")"
  134. << ", writefd=" << WriterStatus.Fd
  135. << ", readfd=" << ReaderStatus.Fd
  136. << Endl;
  137. if (WriterStatus.Connected) {
  138. p.AddRow("connect time", WriterStatus.ConnectTime.ToString());
  139. p.AddRow("writer state", ToCString(WriterStatus.State));
  140. } else {
  141. ss << "not connected";
  142. if (WriterStatus.ConnectError != 0) {
  143. ss << ", last connect error: " << LastSystemErrorText(WriterStatus.ConnectError);
  144. }
  145. ss << Endl;
  146. }
  147. }
  148. if (!Server) {
  149. p.AddRow("connect syscalls", WriterStatus.ConnectSyscalls);
  150. }
  151. p.AddRow("send queue", LeftPad(WriterStatus.SendQueueSize, 6));
  152. if (Server) {
  153. p.AddRow("quota msg", LeftPad(ReaderStatus.QuotaMsg, 6));
  154. p.AddRow("quota bytes", LeftPad(ReaderStatus.QuotaBytes, 6));
  155. p.AddRow("quota exhausted", LeftPad(ReaderStatus.QuotaExhausted, 6));
  156. p.AddRow("reader wakeups", LeftPad(WriterStatus.ReaderWakeups, 6));
  157. } else {
  158. p.AddRow("ack messages", LeftPad(WriterStatus.AckMessagesSize, 6));
  159. }
  160. p.AddRow("written", WriterStatus.Incremental.MessageCounter.ToString(false));
  161. p.AddRow("read", ReaderStatus.Incremental.MessageCounter.ToString(true));
  162. p.AddRow("write syscalls", LeftPad(WriterStatus.Incremental.NetworkOps, 12));
  163. p.AddRow("read syscalls", LeftPad(ReaderStatus.Incremental.NetworkOps, 12));
  164. p.AddRow("write acts", LeftPad(WriterStatus.Acts, 12));
  165. p.AddRow("read acts", LeftPad(ReaderStatus.Acts, 12));
  166. p.AddRow("write buffer cap", LeftPad(WriterStatus.BufferSize, 12));
  167. p.AddRow("read buffer cap", LeftPad(ReaderStatus.BufferSize, 12));
  168. p.AddRow("write buffer drops", LeftPad(WriterStatus.Incremental.BufferDrops, 10));
  169. p.AddRow("read buffer drops", LeftPad(ReaderStatus.Incremental.BufferDrops, 10));
  170. if (Server) {
  171. p.AddRow("process dur", WriterStatus.DurationCounterPrev.ToString());
  172. }
  173. ss << p.PrintToString();
  174. if (false && Server) {
  175. ss << "time histogram:\n";
  176. ss << WriterStatus.Incremental.ProcessDurationHistogram.PrintToString();
  177. }
  178. TMessageStatusCounter sumStatusCounter;
  179. sumStatusCounter += WriterStatus.Incremental.StatusCounter;
  180. sumStatusCounter += ReaderStatus.Incremental.StatusCounter;
  181. ss << sumStatusCounter.PrintToString();
  182. return ss.Str();
  183. }
  184. TRemoteConnectionStatus::TRemoteConnectionStatus()
  185. : REMOTE_CONNECTION_STATUS_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA)
  186. {
  187. }
  188. TString TSessionDumpStatus::PrintToString() const {
  189. if (Shutdown) {
  190. return "shutdown";
  191. }
  192. TStringStream ss;
  193. ss << Head;
  194. if (ConnectionStatusSummary.Server) {
  195. ss << "\n";
  196. ss << Acceptors;
  197. }
  198. ss << "\n";
  199. ss << "connections summary:" << Endl;
  200. ss << ConnectionsSummary;
  201. if (!!Connections) {
  202. ss << "\n";
  203. ss << Connections;
  204. }
  205. ss << "\n";
  206. ss << Config.PrintToString();
  207. return ss.Str();
  208. }
  209. TString TBusMessageQueueStatus::PrintToString() const {
  210. TStringStream ss;
  211. ss << "work queue:\n";
  212. ss << ExecutorStatus.Status;
  213. ss << "\n";
  214. ss << "queue config:\n";
  215. ss << Config.PrintToString();
  216. return ss.Str();
  217. }