remote_connection.cpp 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974
  1. #include "remote_connection.h"
  2. #include "key_value_printer.h"
  3. #include "mb_lwtrace.h"
  4. #include "network.h"
  5. #include "remote_client_connection.h"
  6. #include "remote_client_session.h"
  7. #include "remote_server_session.h"
  8. #include "session_impl.h"
  9. #include <library/cpp/messagebus/actor/what_thread_does.h>
  10. #include <util/generic/cast.h>
  11. #include <util/network/init.h>
  12. #include <library/cpp/deprecated/atomic/atomic.h>
  13. LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
  14. using namespace NActor;
  15. using namespace NBus;
  16. using namespace NBus::NPrivate;
  17. namespace NBus {
  18. namespace NPrivate {
  19. TRemoteConnection::TRemoteConnection(TRemoteSessionPtr session, ui64 connectionId, TNetAddr addr)
  20. : TActor<TRemoteConnection, TWriterTag>(session->Queue->WorkQueue.Get())
  21. , TActor<TRemoteConnection, TReaderTag>(session->Queue->WorkQueue.Get())
  22. , TScheduleActor<TRemoteConnection, TWriterTag>(&session->Queue->Scheduler)
  23. , Session(session)
  24. , Proto(session->Proto)
  25. , Config(session->Config)
  26. , RemovedFromSession(false)
  27. , ConnectionId(connectionId)
  28. , PeerAddr(addr)
  29. , PeerAddrSocketAddr(addr)
  30. , CreatedTime(TInstant::Now())
  31. , ReturnConnectFailedImmediately(false)
  32. , GranStatus(Config.Secret.StatusFlushPeriod)
  33. , QuotaMsg(!Session->IsSource_, Config.PerConnectionMaxInFlight, 0)
  34. , QuotaBytes(!Session->IsSource_, Config.PerConnectionMaxInFlightBySize, 0)
  35. , MaxBufferSize(session->Config.MaxBufferSize)
  36. , ShutdownReason(MESSAGE_OK)
  37. {
  38. WriterData.Status.ConnectionId = connectionId;
  39. WriterData.Status.PeerAddr = PeerAddr;
  40. ReaderData.Status.ConnectionId = connectionId;
  41. const TInstant now = TInstant::Now();
  42. WriterFillStatus();
  43. GranStatus.Writer.Update(WriterData.Status, now, true);
  44. GranStatus.Reader.Update(ReaderData.Status, now, true);
  45. }
  46. TRemoteConnection::~TRemoteConnection() {
  47. Y_ABORT_UNLESS(ReplyQueue.IsEmpty());
  48. }
  49. TRemoteConnection::TWriterData::TWriterData()
  50. : Down(0)
  51. , SocketVersion(0)
  52. , InFlight(0)
  53. , AwakeFlags(0)
  54. , State(WRITER_FILLING)
  55. {
  56. }
  57. TRemoteConnection::TWriterData::~TWriterData() {
  58. Y_ABORT_UNLESS(AtomicGet(Down));
  59. Y_ABORT_UNLESS(SendQueue.Empty());
  60. }
  61. bool TRemoteConnection::TReaderData::HasBytesInBuf(size_t bytes) noexcept {
  62. size_t left = Buffer.Size() - Offset;
  63. return (MoreBytes = left >= bytes ? 0 : bytes - left) == 0;
  64. }
  65. void TRemoteConnection::TWriterData::SetChannel(NEventLoop::TChannelPtr channel) {
  66. Y_ABORT_UNLESS(!Channel, "must not have channel");
  67. Y_ABORT_UNLESS(Buffer.GetBuffer().Empty() && Buffer.LeftSize() == 0, "buffer must be empty");
  68. Y_ABORT_UNLESS(State == WRITER_FILLING, "state must be initial");
  69. Channel = channel;
  70. }
  71. void TRemoteConnection::TReaderData::SetChannel(NEventLoop::TChannelPtr channel) {
  72. Y_ABORT_UNLESS(!Channel, "must not have channel");
  73. Y_ABORT_UNLESS(Buffer.Empty(), "buffer must be empty");
  74. Channel = channel;
  75. }
  76. void TRemoteConnection::TWriterData::DropChannel() {
  77. if (!!Channel) {
  78. Channel->Unregister();
  79. Channel.Drop();
  80. }
  81. Buffer.Reset();
  82. State = WRITER_FILLING;
  83. }
  84. void TRemoteConnection::TReaderData::DropChannel() {
  85. // TODO: make Drop call Unregister
  86. if (!!Channel) {
  87. Channel->Unregister();
  88. Channel.Drop();
  89. }
  90. Buffer.Reset();
  91. Offset = 0;
  92. }
  93. TRemoteConnection::TReaderData::TReaderData()
  94. : Down(0)
  95. , SocketVersion(0)
  96. , Offset(0)
  97. , MoreBytes(0)
  98. {
  99. }
  100. TRemoteConnection::TReaderData::~TReaderData() {
  101. Y_ABORT_UNLESS(AtomicGet(Down));
  102. }
  103. void TRemoteConnection::Send(TNonDestroyingAutoPtr<TBusMessage> msg) {
  104. BeforeSendQueue.Enqueue(msg.Release());
  105. AtomicIncrement(WriterData.InFlight);
  106. ScheduleWrite();
  107. }
  108. void TRemoteConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) {
  109. if (!reconnect) {
  110. // Do not clear send queue if reconnecting
  111. WriterData.SendQueue.Clear(&result);
  112. }
  113. }
  114. void TRemoteConnection::Shutdown(EMessageStatus status) {
  115. ScheduleShutdown(status);
  116. ReaderData.ShutdownComplete.WaitI();
  117. WriterData.ShutdownComplete.WaitI();
  118. }
  119. void TRemoteConnection::TryConnect() {
  120. Y_ABORT("TryConnect is client connection only operation");
  121. }
  122. void TRemoteConnection::ScheduleRead() {
  123. GetReaderActor()->Schedule();
  124. }
  125. void TRemoteConnection::ScheduleWrite() {
  126. GetWriterActor()->Schedule();
  127. }
  128. void TRemoteConnection::WriterRotateCounters() {
  129. if (!WriterData.TimeToRotateCounters.FetchTask()) {
  130. return;
  131. }
  132. WriterData.Status.DurationCounterPrev = WriterData.Status.DurationCounter;
  133. Reset(WriterData.Status.DurationCounter);
  134. }
  135. void TRemoteConnection::WriterSendStatus(TInstant now, bool force) {
  136. GranStatus.Writer.Update(std::bind(&TRemoteConnection::WriterGetStatus, this), now, force);
  137. }
  138. void TRemoteConnection::ReaderSendStatus(TInstant now, bool force) {
  139. GranStatus.Reader.Update(std::bind(&TRemoteConnection::ReaderFillStatus, this), now, force);
  140. }
  141. const TRemoteConnectionReaderStatus& TRemoteConnection::ReaderFillStatus() {
  142. ReaderData.Status.BufferSize = ReaderData.Buffer.Capacity();
  143. ReaderData.Status.QuotaMsg = QuotaMsg.Tokens();
  144. ReaderData.Status.QuotaBytes = QuotaBytes.Tokens();
  145. return ReaderData.Status;
  146. }
  147. void TRemoteConnection::ProcessItem(TReaderTag, ::NActor::TDefaultTag, TWriterToReaderSocketMessage readSocket) {
  148. if (AtomicGet(ReaderData.Down)) {
  149. ReaderData.Status.Fd = INVALID_SOCKET;
  150. return;
  151. }
  152. ReaderData.DropChannel();
  153. ReaderData.Status.Fd = readSocket.Socket;
  154. ReaderData.SocketVersion = readSocket.SocketVersion;
  155. if (readSocket.Socket != INVALID_SOCKET) {
  156. ReaderData.SetChannel(Session->ReadEventLoop.Register(readSocket.Socket, this, ReadCookie));
  157. ReaderData.Channel->EnableRead();
  158. }
  159. }
  160. void TRemoteConnection::ProcessItem(TWriterTag, TReconnectTag, ui32 socketVersion) {
  161. Y_ABORT_UNLESS(socketVersion <= WriterData.SocketVersion, "something weird");
  162. if (WriterData.SocketVersion != socketVersion) {
  163. return;
  164. }
  165. Y_ABORT_UNLESS(WriterData.Status.Connected, "must be connected at this point");
  166. Y_ABORT_UNLESS(!!WriterData.Channel, "must have channel at this point");
  167. WriterData.Status.Connected = false;
  168. WriterData.DropChannel();
  169. WriterData.Status.MyAddr = TNetAddr();
  170. ++WriterData.SocketVersion;
  171. LastConnectAttempt = TInstant();
  172. TMessagesPtrs cleared;
  173. ClearOutgoingQueue(cleared, true);
  174. WriterErrorMessages(cleared, MESSAGE_DELIVERY_FAILED);
  175. FireClientConnectionEvent(TClientConnectionEvent::DISCONNECTED);
  176. ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(INVALID_SOCKET, WriterData.SocketVersion));
  177. }
  178. void TRemoteConnection::ProcessItem(TWriterTag, TWakeReaderTag, ui32 awakeFlags) {
  179. WriterData.AwakeFlags |= awakeFlags;
  180. ReadQuotaWakeup();
  181. }
  182. void TRemoteConnection::Act(TReaderTag) {
  183. TInstant now = TInstant::Now();
  184. ReaderData.Status.Acts += 1;
  185. ReaderGetSocketQueue()->DequeueAllLikelyEmpty();
  186. if (AtomicGet(ReaderData.Down)) {
  187. ReaderData.DropChannel();
  188. ReaderProcessStatusDown();
  189. ReaderData.ShutdownComplete.Signal();
  190. } else if (!!ReaderData.Channel) {
  191. Y_ASSERT(ReaderData.ReadMessages.empty());
  192. for (int i = 0;; ++i) {
  193. if (i == 100) {
  194. // perform other tasks
  195. GetReaderActor()->AddTaskFromActorLoop();
  196. break;
  197. }
  198. if (NeedInterruptRead()) {
  199. ReaderData.Channel->EnableRead();
  200. break;
  201. }
  202. if (!ReaderFillBuffer())
  203. break;
  204. if (!ReaderProcessBuffer())
  205. break;
  206. }
  207. ReaderFlushMessages();
  208. }
  209. ReaderSendStatus(now);
  210. }
  211. bool TRemoteConnection::QuotaAcquire(size_t msg, size_t bytes) {
  212. ui32 wakeFlags = 0;
  213. if (!QuotaMsg.Acquire(msg))
  214. wakeFlags |= WAKE_QUOTA_MSG;
  215. else if (!QuotaBytes.Acquire(bytes))
  216. wakeFlags |= WAKE_QUOTA_BYTES;
  217. if (wakeFlags) {
  218. ReaderData.Status.QuotaExhausted++;
  219. WriterGetWakeQueue()->EnqueueAndSchedule(wakeFlags);
  220. }
  221. return wakeFlags == 0;
  222. }
  223. void TRemoteConnection::QuotaConsume(size_t msg, size_t bytes) {
  224. QuotaMsg.Consume(msg);
  225. QuotaBytes.Consume(bytes);
  226. }
  227. void TRemoteConnection::QuotaReturnSelf(size_t items, size_t bytes) {
  228. if (QuotaReturnValues(items, bytes))
  229. ReadQuotaWakeup();
  230. }
  231. void TRemoteConnection::QuotaReturnAside(size_t items, size_t bytes) {
  232. if (QuotaReturnValues(items, bytes) && !AtomicGet(WriterData.Down))
  233. WriterGetWakeQueue()->EnqueueAndSchedule(0x0);
  234. }
  235. bool TRemoteConnection::QuotaReturnValues(size_t items, size_t bytes) {
  236. bool rMsg = QuotaMsg.Return(items);
  237. bool rBytes = QuotaBytes.Return(bytes);
  238. return rMsg || rBytes;
  239. }
  240. void TRemoteConnection::ReadQuotaWakeup() {
  241. const ui32 mask = WriterData.AwakeFlags & WriteWakeFlags();
  242. if (mask && mask == WriterData.AwakeFlags) {
  243. WriterData.Status.ReaderWakeups++;
  244. WriterData.AwakeFlags = 0;
  245. ScheduleRead();
  246. }
  247. }
  248. ui32 TRemoteConnection::WriteWakeFlags() const {
  249. ui32 awakeFlags = 0;
  250. if (QuotaMsg.IsAboveWake())
  251. awakeFlags |= WAKE_QUOTA_MSG;
  252. if (QuotaBytes.IsAboveWake())
  253. awakeFlags |= WAKE_QUOTA_BYTES;
  254. return awakeFlags;
  255. }
  256. bool TRemoteConnection::ReaderProcessBuffer() {
  257. TInstant now = TInstant::Now();
  258. for (;;) {
  259. if (!ReaderData.HasBytesInBuf(sizeof(TBusHeader))) {
  260. break;
  261. }
  262. TBusHeader header(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, ReaderData.Buffer.Size() - ReaderData.Offset));
  263. if (header.Size < sizeof(TBusHeader)) {
  264. LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size));
  265. ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1;
  266. ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false);
  267. return false;
  268. }
  269. if (!IsVersionNegotiation(header) && !IsBusKeyValid(header.Id)) {
  270. LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size));
  271. ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1;
  272. ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false);
  273. return false;
  274. }
  275. if (header.Size > Config.MaxMessageSize) {
  276. LWPROBE(Error, ToString(MESSAGE_MESSAGE_TOO_LARGE), ToString(PeerAddr), ToString(header.Size));
  277. ReaderData.Status.Incremental.StatusCounter[MESSAGE_MESSAGE_TOO_LARGE] += 1;
  278. ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_MESSAGE_TOO_LARGE, false);
  279. return false;
  280. }
  281. if (!ReaderData.HasBytesInBuf(header.Size)) {
  282. if (ReaderData.Offset == 0) {
  283. ReaderData.Buffer.Reserve(header.Size);
  284. }
  285. break;
  286. }
  287. if (!QuotaAcquire(1, header.Size))
  288. return false;
  289. if (!MessageRead(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, header.Size), now)) {
  290. return false;
  291. }
  292. ReaderData.Offset += header.Size;
  293. }
  294. ReaderData.Buffer.ChopHead(ReaderData.Offset);
  295. ReaderData.Offset = 0;
  296. if (ReaderData.Buffer.Capacity() > MaxBufferSize && ReaderData.Buffer.Size() <= MaxBufferSize) {
  297. ReaderData.Status.Incremental.BufferDrops += 1;
  298. TBuffer temp;
  299. // probably should use another constant
  300. temp.Reserve(Config.DefaultBufferSize);
  301. temp.Append(ReaderData.Buffer.Data(), ReaderData.Buffer.Size());
  302. ReaderData.Buffer.Swap(temp);
  303. }
  304. return true;
  305. }
  306. bool TRemoteConnection::ReaderFillBuffer() {
  307. if (!ReaderData.BufferMore())
  308. return true;
  309. if (ReaderData.Buffer.Avail() == 0) {
  310. if (ReaderData.Buffer.Size() == 0) {
  311. ReaderData.Buffer.Reserve(Config.DefaultBufferSize);
  312. } else {
  313. ReaderData.Buffer.Reserve(ReaderData.Buffer.Size() * 2);
  314. }
  315. }
  316. Y_ASSERT(ReaderData.Buffer.Avail() > 0);
  317. ssize_t bytes;
  318. {
  319. TWhatThreadDoesPushPop pp("recv syscall");
  320. bytes = SocketRecv(ReaderData.Channel->GetSocket(), TArrayRef<char>(ReaderData.Buffer.Pos(), ReaderData.Buffer.Avail()));
  321. }
  322. if (bytes < 0) {
  323. if (WouldBlock()) {
  324. ReaderData.Channel->EnableRead();
  325. return false;
  326. } else {
  327. ReaderData.Channel->DisableRead();
  328. ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, false);
  329. return false;
  330. }
  331. }
  332. if (bytes == 0) {
  333. ReaderData.Channel->DisableRead();
  334. // TODO: incorrect: it is possible that only input is shutdown, and output is available
  335. ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, false);
  336. return false;
  337. }
  338. ReaderData.Status.Incremental.NetworkOps += 1;
  339. ReaderData.Buffer.Advance(bytes);
  340. ReaderData.MoreBytes = 0;
  341. return true;
  342. }
  343. void TRemoteConnection::ClearBeforeSendQueue(EMessageStatus reason) {
  344. BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::WriterBeforeWriteErrorMessage, this, std::placeholders::_1, reason));
  345. }
  346. void TRemoteConnection::ClearReplyQueue(EMessageStatus reason) {
  347. TVectorSwaps<TBusMessagePtrAndHeader> replyQueueTemp;
  348. Y_ASSERT(replyQueueTemp.empty());
  349. ReplyQueue.DequeueAllSingleConsumer(&replyQueueTemp);
  350. TVector<TBusMessage*> messages;
  351. for (TVectorSwaps<TBusMessagePtrAndHeader>::reverse_iterator message = replyQueueTemp.rbegin();
  352. message != replyQueueTemp.rend(); ++message) {
  353. messages.push_back(message->MessagePtr.Release());
  354. }
  355. WriterErrorMessages(messages, reason);
  356. replyQueueTemp.clear();
  357. }
  358. void TRemoteConnection::ProcessBeforeSendQueueMessage(TBusMessage* message, TInstant now) {
  359. // legacy clients expect this field to be set
  360. if (!Session->IsSource_) {
  361. message->SendTime = now.MilliSeconds();
  362. }
  363. WriterData.SendQueue.PushBack(message);
  364. }
  365. void TRemoteConnection::ProcessBeforeSendQueue(TInstant now) {
  366. BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::ProcessBeforeSendQueueMessage, this, std::placeholders::_1, now));
  367. }
  368. void TRemoteConnection::WriterFillInFlight() {
  369. // this is hack for TLoadBalancedProtocol
  370. WriterFillStatus();
  371. AtomicSet(WriterData.InFlight, WriterData.Status.GetInFlight());
  372. }
  373. const TRemoteConnectionWriterStatus& TRemoteConnection::WriterGetStatus() {
  374. WriterRotateCounters();
  375. WriterFillStatus();
  376. return WriterData.Status;
  377. }
  378. void TRemoteConnection::WriterFillStatus() {
  379. if (!!WriterData.Channel) {
  380. WriterData.Status.Fd = WriterData.Channel->GetSocket();
  381. } else {
  382. WriterData.Status.Fd = INVALID_SOCKET;
  383. }
  384. WriterData.Status.BufferSize = WriterData.Buffer.Capacity();
  385. WriterData.Status.SendQueueSize = WriterData.SendQueue.Size();
  386. WriterData.Status.State = WriterData.State;
  387. }
  388. void TRemoteConnection::WriterProcessStatusDown() {
  389. Session->GetDeadConnectionWriterStatusQueue()->EnqueueAndSchedule(WriterData.Status.Incremental);
  390. Reset(WriterData.Status.Incremental);
  391. }
  392. void TRemoteConnection::ReaderProcessStatusDown() {
  393. Session->GetDeadConnectionReaderStatusQueue()->EnqueueAndSchedule(ReaderData.Status.Incremental);
  394. Reset(ReaderData.Status.Incremental);
  395. }
  396. void TRemoteConnection::ProcessWriterDown() {
  397. if (!RemovedFromSession) {
  398. Session->GetRemoveConnectionQueue()->EnqueueAndSchedule(this);
  399. if (Session->IsSource_) {
  400. if (WriterData.Status.Connected) {
  401. FireClientConnectionEvent(TClientConnectionEvent::DISCONNECTED);
  402. }
  403. }
  404. LWPROBE(Disconnected, ToString(PeerAddr));
  405. RemovedFromSession = true;
  406. }
  407. WriterData.DropChannel();
  408. DropEnqueuedData(ShutdownReason, MESSAGE_SHUTDOWN);
  409. WriterProcessStatusDown();
  410. WriterData.ShutdownComplete.Signal();
  411. }
  412. void TRemoteConnection::DropEnqueuedData(EMessageStatus reason, EMessageStatus reasonForQueues) {
  413. ClearReplyQueue(reasonForQueues);
  414. ClearBeforeSendQueue(reasonForQueues);
  415. WriterGetReconnectQueue()->Clear();
  416. WriterGetWakeQueue()->Clear();
  417. TMessagesPtrs cleared;
  418. ClearOutgoingQueue(cleared, false);
  419. if (!Session->IsSource_) {
  420. for (auto& i : cleared) {
  421. TBusMessagePtrAndHeader h(i);
  422. CheckedCast<TRemoteServerSession*>(Session.Get())->ReleaseInWorkResponses(MakeArrayRef(&h, 1));
  423. // assignment back is weird
  424. i = h.MessagePtr.Release();
  425. // and this part is not batch
  426. }
  427. }
  428. WriterErrorMessages(cleared, reason);
  429. }
  430. void TRemoteConnection::BeforeTryWrite() {
  431. }
  432. void TRemoteConnection::Act(TWriterTag) {
  433. TInstant now = TInstant::Now();
  434. WriterData.Status.Acts += 1;
  435. if (Y_UNLIKELY(AtomicGet(WriterData.Down))) {
  436. // dump status must work even if WriterDown
  437. WriterSendStatus(now, true);
  438. ProcessWriterDown();
  439. return;
  440. }
  441. ProcessBeforeSendQueue(now);
  442. BeforeTryWrite();
  443. WriterFillInFlight();
  444. WriterGetReconnectQueue()->DequeueAllLikelyEmpty();
  445. if (!WriterData.Status.Connected) {
  446. TryConnect();
  447. } else {
  448. for (int i = 0;; ++i) {
  449. if (i == 100) {
  450. // perform other tasks
  451. GetWriterActor()->AddTaskFromActorLoop();
  452. break;
  453. }
  454. if (WriterData.State == WRITER_FILLING) {
  455. WriterFillBuffer();
  456. if (WriterData.State == WRITER_FILLING) {
  457. WriterData.Channel->DisableWrite();
  458. break;
  459. }
  460. Y_ASSERT(!WriterData.Buffer.Empty());
  461. }
  462. if (WriterData.State == WRITER_FLUSHING) {
  463. WriterFlushBuffer();
  464. if (WriterData.State == WRITER_FLUSHING) {
  465. break;
  466. }
  467. }
  468. }
  469. }
  470. WriterGetWakeQueue()->DequeueAllLikelyEmpty();
  471. WriterSendStatus(now);
  472. }
  473. void TRemoteConnection::WriterFlushBuffer() {
  474. Y_ASSERT(WriterData.State == WRITER_FLUSHING);
  475. Y_ASSERT(!WriterData.Buffer.Empty());
  476. WriterData.CorkUntil = TInstant::Zero();
  477. while (!WriterData.Buffer.Empty()) {
  478. ssize_t bytes;
  479. {
  480. TWhatThreadDoesPushPop pp("send syscall");
  481. bytes = SocketSend(WriterData.Channel->GetSocket(), TArrayRef<const char>(WriterData.Buffer.LeftPos(), WriterData.Buffer.Size()));
  482. }
  483. if (bytes < 0) {
  484. if (WouldBlock()) {
  485. WriterData.Channel->EnableWrite();
  486. return;
  487. } else {
  488. WriterData.Channel->DisableWrite();
  489. ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, true);
  490. return;
  491. }
  492. }
  493. WriterData.Status.Incremental.NetworkOps += 1;
  494. WriterData.Buffer.LeftProceed(bytes);
  495. }
  496. WriterData.Buffer.Clear();
  497. if (WriterData.Buffer.Capacity() > MaxBufferSize) {
  498. WriterData.Status.Incremental.BufferDrops += 1;
  499. WriterData.Buffer.Reset();
  500. }
  501. WriterData.State = WRITER_FILLING;
  502. }
  503. void TRemoteConnection::ScheduleShutdownOnServerOrReconnectOnClient(EMessageStatus status, bool writer) {
  504. if (Session->IsSource_) {
  505. WriterGetReconnectQueue()->EnqueueAndSchedule(writer ? WriterData.SocketVersion : ReaderData.SocketVersion);
  506. } else {
  507. ScheduleShutdown(status);
  508. }
  509. }
  510. void TRemoteConnection::ScheduleShutdown(EMessageStatus status) {
  511. ShutdownReason = status;
  512. AtomicSet(ReaderData.Down, 1);
  513. ScheduleRead();
  514. AtomicSet(WriterData.Down, 1);
  515. ScheduleWrite();
  516. }
  517. void TRemoteConnection::CallSerialize(TBusMessage* msg, TBuffer& buffer) const {
  518. size_t posForAssertion = buffer.Size();
  519. Proto->Serialize(msg, buffer);
  520. Y_ABORT_UNLESS(buffer.Size() >= posForAssertion,
  521. "incorrect Serialize implementation, pos before serialize: %d, pos after serialize: %d",
  522. int(posForAssertion), int(buffer.Size()));
  523. }
  524. namespace {
  525. inline void WriteHeader(const TBusHeader& header, TBuffer& data) {
  526. data.Reserve(data.Size() + sizeof(TBusHeader));
  527. /// \todo hton instead of memcpy
  528. memcpy(data.Data() + data.Size(), &header, sizeof(TBusHeader));
  529. data.Advance(sizeof(TBusHeader));
  530. }
  531. inline void WriteDummyHeader(TBuffer& data) {
  532. data.Resize(data.Size() + sizeof(TBusHeader));
  533. }
  534. }
  535. void TRemoteConnection::SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const {
  536. size_t pos = data->Size();
  537. size_t dataSize;
  538. bool compressionRequested = msg->IsCompressed();
  539. if (compressionRequested) {
  540. TBuffer compdata;
  541. TBuffer plaindata;
  542. CallSerialize(msg, plaindata);
  543. dataSize = sizeof(TBusHeader) + plaindata.Size();
  544. NCodecs::TCodecPtr c = Proto->GetTransportCodec();
  545. c->Encode(TStringBuf{plaindata.data(), plaindata.size()}, compdata);
  546. if (compdata.Size() < plaindata.Size()) {
  547. plaindata.Clear();
  548. msg->GetHeader()->Size = sizeof(TBusHeader) + compdata.Size();
  549. WriteHeader(*msg->GetHeader(), *data);
  550. data->Append(compdata.Data(), compdata.Size());
  551. } else {
  552. compdata.Clear();
  553. msg->SetCompressed(false);
  554. msg->GetHeader()->Size = sizeof(TBusHeader) + plaindata.Size();
  555. WriteHeader(*msg->GetHeader(), *data);
  556. data->Append(plaindata.Data(), plaindata.Size());
  557. }
  558. } else {
  559. WriteDummyHeader(*data);
  560. CallSerialize(msg, *data);
  561. dataSize = msg->GetHeader()->Size = data->Size() - pos;
  562. data->Proceed(pos);
  563. WriteHeader(*msg->GetHeader(), *data);
  564. data->Proceed(pos + msg->GetHeader()->Size);
  565. }
  566. Y_ASSERT(msg->GetHeader()->Size == data->Size() - pos);
  567. counter->AddMessage(dataSize, data->Size() - pos, msg->IsCompressed(), compressionRequested);
  568. }
  569. TBusMessage* TRemoteConnection::DeserializeMessage(TArrayRef<const char> dataRef, const TBusHeader* header, TMessageCounter* messageCounter, EMessageStatus* status) const {
  570. size_t dataSize;
  571. TBusMessage* message;
  572. if (header->FlagsInternal & MESSAGE_COMPRESS_INTERNAL) {
  573. TBuffer msg;
  574. {
  575. TBuffer plaindata;
  576. NCodecs::TCodecPtr c = Proto->GetTransportCodec();
  577. try {
  578. TArrayRef<const char> payload = TBusMessage::GetPayload(dataRef);
  579. c->Decode(TStringBuf{payload.data(), payload.size()}, plaindata);
  580. } catch (...) {
  581. // catch all, because
  582. // http://nga.at.yandex-team.ru/replies.xml?item_no=3884
  583. *status = MESSAGE_DECOMPRESS_ERROR;
  584. return nullptr;
  585. }
  586. msg.Append(dataRef.data(), sizeof(TBusHeader));
  587. msg.Append(plaindata.Data(), plaindata.Size());
  588. }
  589. TArrayRef<const char> msgRef(msg.Data(), msg.Size());
  590. dataSize = sizeof(TBusHeader) + msgRef.size();
  591. // TODO: track error types
  592. message = Proto->Deserialize(header->Type, msgRef.Slice(sizeof(TBusHeader))).Release();
  593. if (!message) {
  594. *status = MESSAGE_DESERIALIZE_ERROR;
  595. return nullptr;
  596. }
  597. *message->GetHeader() = *header;
  598. message->SetCompressed(true);
  599. } else {
  600. dataSize = dataRef.size();
  601. message = Proto->Deserialize(header->Type, dataRef.Slice(sizeof(TBusHeader))).Release();
  602. if (!message) {
  603. *status = MESSAGE_DESERIALIZE_ERROR;
  604. return nullptr;
  605. }
  606. *message->GetHeader() = *header;
  607. }
  608. messageCounter->AddMessage(dataSize, dataRef.size(), header->FlagsInternal & MESSAGE_COMPRESS_INTERNAL, false);
  609. return message;
  610. }
  611. void TRemoteConnection::ResetOneWayFlag(TArrayRef<TBusMessage*> messages) {
  612. for (auto message : messages) {
  613. message->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL;
  614. }
  615. }
  616. void TRemoteConnection::ReaderFlushMessages() {
  617. if (!ReaderData.ReadMessages.empty()) {
  618. Session->OnMessageReceived(this, ReaderData.ReadMessages);
  619. ReaderData.ReadMessages.clear();
  620. }
  621. }
  622. // @return false if actor should break
  623. bool TRemoteConnection::MessageRead(TArrayRef<const char> readDataRef, TInstant now) {
  624. TBusHeader header(readDataRef);
  625. Y_ASSERT(readDataRef.size() == header.Size);
  626. if (header.GetVersionInternal() != YBUS_VERSION) {
  627. ReaderProcessMessageUnknownVersion(readDataRef);
  628. return true;
  629. }
  630. EMessageStatus deserializeFailureStatus = MESSAGE_OK;
  631. TBusMessage* r = DeserializeMessage(readDataRef, &header, &ReaderData.Status.Incremental.MessageCounter, &deserializeFailureStatus);
  632. if (!r) {
  633. Y_ABORT_UNLESS(deserializeFailureStatus != MESSAGE_OK, "state check");
  634. LWPROBE(Error, ToString(deserializeFailureStatus), ToString(PeerAddr), "");
  635. ReaderData.Status.Incremental.StatusCounter[deserializeFailureStatus] += 1;
  636. ScheduleShutdownOnServerOrReconnectOnClient(deserializeFailureStatus, false);
  637. return false;
  638. }
  639. LWPROBE(Read, r->GetHeader()->Size);
  640. r->ReplyTo = PeerAddrSocketAddr;
  641. TBusMessagePtrAndHeader h(r);
  642. r->RecvTime = now;
  643. QuotaConsume(1, header.Size);
  644. ReaderData.ReadMessages.push_back(h);
  645. if (ReaderData.ReadMessages.size() >= 100) {
  646. ReaderFlushMessages();
  647. }
  648. return true;
  649. }
  650. void TRemoteConnection::WriterFillBuffer() {
  651. Y_ASSERT(WriterData.State == WRITER_FILLING);
  652. Y_ASSERT(WriterData.Buffer.LeftSize() == 0);
  653. if (Y_UNLIKELY(!WrongVersionRequests.IsEmpty())) {
  654. TVector<TBusHeader> headers;
  655. WrongVersionRequests.DequeueAllSingleConsumer(&headers);
  656. for (TVector<TBusHeader>::reverse_iterator header = headers.rbegin();
  657. header != headers.rend(); ++header) {
  658. TBusHeader response = *header;
  659. response.SendTime = NBus::Now();
  660. response.Size = sizeof(TBusHeader);
  661. response.FlagsInternal = 0;
  662. response.SetVersionInternal(YBUS_VERSION);
  663. WriteHeader(response, WriterData.Buffer.GetBuffer());
  664. }
  665. Y_ASSERT(!WriterData.Buffer.Empty());
  666. WriterData.State = WRITER_FLUSHING;
  667. return;
  668. }
  669. TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> writeMessages;
  670. for (;;) {
  671. THolder<TBusMessage> writeMessage(WriterData.SendQueue.PopFront());
  672. if (!writeMessage) {
  673. break;
  674. }
  675. if (Config.Cork != TDuration::Zero()) {
  676. if (WriterData.CorkUntil == TInstant::Zero()) {
  677. WriterData.CorkUntil = TInstant::Now() + Config.Cork;
  678. }
  679. }
  680. size_t sizeBeforeSerialize = WriterData.Buffer.Size();
  681. TMessageCounter messageCounter = WriterData.Status.Incremental.MessageCounter;
  682. SerializeMessage(writeMessage.Get(), &WriterData.Buffer.GetBuffer(), &messageCounter);
  683. size_t written = WriterData.Buffer.Size() - sizeBeforeSerialize;
  684. if (written > Config.MaxMessageSize) {
  685. WriterData.Buffer.GetBuffer().EraseBack(written);
  686. WriterBeforeWriteErrorMessage(writeMessage.Release(), MESSAGE_MESSAGE_TOO_LARGE);
  687. continue;
  688. }
  689. WriterData.Status.Incremental.MessageCounter = messageCounter;
  690. TBusMessagePtrAndHeader h(writeMessage.Release());
  691. writeMessages.GetVector()->push_back(h);
  692. Y_ASSERT(!WriterData.Buffer.Empty());
  693. if (WriterData.Buffer.Size() >= Config.SendThreshold) {
  694. break;
  695. }
  696. }
  697. if (!WriterData.Buffer.Empty()) {
  698. if (WriterData.Buffer.Size() >= Config.SendThreshold) {
  699. WriterData.State = WRITER_FLUSHING;
  700. } else if (WriterData.CorkUntil == TInstant::Zero()) {
  701. WriterData.State = WRITER_FLUSHING;
  702. } else if (TInstant::Now() >= WriterData.CorkUntil) {
  703. WriterData.State = WRITER_FLUSHING;
  704. } else {
  705. // keep filling
  706. Y_ASSERT(WriterData.State == WRITER_FILLING);
  707. GetWriterSchedulerActor()->ScheduleAt(WriterData.CorkUntil);
  708. }
  709. } else {
  710. // keep filling
  711. Y_ASSERT(WriterData.State == WRITER_FILLING);
  712. }
  713. size_t bytes = MessageSize(*writeMessages.GetVector());
  714. QuotaReturnSelf(writeMessages.GetVector()->size(), bytes);
  715. // This is called before `send` syscall inducing latency
  716. MessageSent(*writeMessages.GetVector());
  717. }
  718. size_t TRemoteConnection::MessageSize(TArrayRef<TBusMessagePtrAndHeader> messages) {
  719. size_t size = 0;
  720. for (const auto& message : messages)
  721. size += message.MessagePtr->RequestSize;
  722. return size;
  723. }
  724. size_t TRemoteConnection::GetInFlight() {
  725. return AtomicGet(WriterData.InFlight);
  726. }
  727. size_t TRemoteConnection::GetConnectSyscallsNumForTest() {
  728. return WriterData.Status.ConnectSyscalls;
  729. }
  730. void TRemoteConnection::WriterBeforeWriteErrorMessage(TBusMessage* message, EMessageStatus status) {
  731. if (Session->IsSource_) {
  732. CheckedCast<TRemoteClientSession*>(Session.Get())->ReleaseInFlight({message});
  733. WriterErrorMessage(message, status);
  734. } else {
  735. TBusMessagePtrAndHeader h(message);
  736. CheckedCast<TRemoteServerSession*>(Session.Get())->ReleaseInWorkResponses(MakeArrayRef(&h, 1));
  737. WriterErrorMessage(h.MessagePtr.Release(), status);
  738. }
  739. }
  740. void TRemoteConnection::WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status) {
  741. TBusMessage* released = m.Release();
  742. WriterErrorMessages(MakeArrayRef(&released, 1), status);
  743. }
  744. void TRemoteConnection::WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status) {
  745. ResetOneWayFlag(ms);
  746. WriterData.Status.Incremental.StatusCounter[status] += ms.size();
  747. for (auto m : ms) {
  748. Session->InvokeOnError(m, status);
  749. }
  750. }
  751. void TRemoteConnection::FireClientConnectionEvent(TClientConnectionEvent::EType type) {
  752. Y_ABORT_UNLESS(Session->IsSource_, "state check");
  753. TClientConnectionEvent event(type, ConnectionId, PeerAddr);
  754. TRemoteClientSession* session = CheckedCast<TRemoteClientSession*>(Session.Get());
  755. session->ClientHandler->OnClientConnectionEvent(event);
  756. }
  757. bool TRemoteConnection::IsAlive() const {
  758. return !AtomicGet(WriterData.Down);
  759. }
  760. }
  761. }