session_impl.cpp 21 KB


  1. #include "session_impl.h"
  2. #include "acceptor.h"
  3. #include "network.h"
  4. #include "remote_client_connection.h"
  5. #include "remote_client_session.h"
  6. #include "remote_server_connection.h"
  7. #include "remote_server_session.h"
  8. #include "misc/weak_ptr.h"
  9. #include <util/generic/cast.h>
  10. using namespace NActor;
  11. using namespace NBus;
  12. using namespace NBus::NPrivate;
  13. using namespace NEventLoop;
  14. namespace {
  15. class TScheduleSession: public IScheduleItem {
  16. public:
  17. TScheduleSession(TBusSessionImpl* session, TInstant deadline)
  18. : IScheduleItem(deadline)
  19. , Session(session)
  20. , SessionImpl(session)
  21. {
  22. }
  23. void Do() override {
  24. TIntrusivePtr<TBusSession> session = Session.Get();
  25. if (!!session) {
  26. SessionImpl->Cron();
  27. }
  28. }
  29. private:
  30. TWeakPtr<TBusSession> Session;
  31. // Work around TWeakPtr limitation
  32. TBusSessionImpl* SessionImpl;
  33. };
  34. }
  35. TConnectionsAcceptorsSnapshot::TConnectionsAcceptorsSnapshot()
  36. : LastConnectionId(0)
  37. , LastAcceptorId(0)
  38. {
  39. }
  40. struct TBusSessionImpl::TImpl {
  41. TRemoteConnectionWriterIncrementalStatus DeadConnectionWriterStatusSummary;
  42. TRemoteConnectionReaderIncrementalStatus DeadConnectionReaderStatusSummary;
  43. TAcceptorStatus DeadAcceptorStatusSummary;
  44. };
  45. namespace {
  46. TBusSessionConfig SessionConfigFillDefaults(const TBusSessionConfig& config, const TString& name) {
  47. TBusSessionConfig copy = config;
  48. if (copy.TotalTimeout == 0 && copy.SendTimeout == 0) {
  49. copy.TotalTimeout = TDuration::Seconds(60).MilliSeconds();
  50. copy.SendTimeout = TDuration::Seconds(15).MilliSeconds();
  51. } else if (copy.TotalTimeout == 0) {
  52. Y_ASSERT(copy.SendTimeout != 0);
  53. copy.TotalTimeout = config.SendTimeout + TDuration::MilliSeconds(10).MilliSeconds();
  54. } else if (copy.SendTimeout == 0) {
  55. Y_ASSERT(copy.TotalTimeout != 0);
  56. if ((ui64)copy.TotalTimeout > (ui64)TDuration::MilliSeconds(10).MilliSeconds()) {
  57. copy.SendTimeout = copy.TotalTimeout - TDuration::MilliSeconds(10).MilliSeconds();
  58. } else {
  59. copy.SendTimeout = copy.TotalTimeout;
  60. }
  61. } else {
  62. Y_ASSERT(copy.TotalTimeout != 0);
  63. Y_ASSERT(copy.SendTimeout != 0);
  64. }
  65. if (copy.ConnectTimeout == 0) {
  66. copy.ConnectTimeout = copy.SendTimeout;
  67. }
  68. Y_ABORT_UNLESS(copy.SendTimeout > 0, "SendTimeout must be > 0");
  69. Y_ABORT_UNLESS(copy.TotalTimeout > 0, "TotalTimeout must be > 0");
  70. Y_ABORT_UNLESS(copy.ConnectTimeout > 0, "ConnectTimeout must be > 0");
  71. Y_ABORT_UNLESS(copy.TotalTimeout >= copy.SendTimeout, "TotalTimeout must be >= SendTimeout");
  72. if (!copy.Name) {
  73. copy.Name = name;
  74. }
  75. return copy;
  76. }
  77. }
  78. TBusSessionImpl::TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusProtocol* proto,
  79. IBusErrorHandler* handler,
  80. const TBusSessionConfig& config, const TString& name)
  81. : TActor<TBusSessionImpl, TStatusTag>(queue->WorkQueue.Get())
  82. , TActor<TBusSessionImpl, TConnectionTag>(queue->WorkQueue.Get())
  83. , Impl(new TImpl)
  84. , IsSource_(isSource)
  85. , Queue(queue)
  86. , Proto(proto)
  87. , ProtoName(Proto->GetService())
  88. , ErrorHandler(handler)
  89. , HandlerUseCountHolder(&handler->UseCountChecker)
  90. , Config(SessionConfigFillDefaults(config, name))
  91. , WriteEventLoop("wr-el")
  92. , ReadEventLoop("rd-el")
  93. , LastAcceptorId(0)
  94. , LastConnectionId(0)
  95. , Down(0)
  96. {
  97. Impl->DeadAcceptorStatusSummary.Summary = true;
  98. ReadEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(ReadEventLoop))));
  99. WriteEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(WriteEventLoop))));
  100. Queue->Schedule(IScheduleItemAutoPtr(new TScheduleSession(this, TInstant::Now() + Config.Secret.TimeoutPeriod)));
  101. }
  102. TBusSessionImpl::~TBusSessionImpl() {
  103. Y_ABORT_UNLESS(Down);
  104. Y_ABORT_UNLESS(ShutdownCompleteEvent.WaitT(TDuration::Zero()));
  105. Y_ABORT_UNLESS(!WriteEventLoop.IsRunning());
  106. Y_ABORT_UNLESS(!ReadEventLoop.IsRunning());
  107. }
  108. TBusSessionStatus::TBusSessionStatus()
  109. : InFlightCount(0)
  110. , InFlightSize(0)
  111. , InputPaused(false)
  112. {
  113. }
  114. void TBusSessionImpl::Shutdown() {
  115. if (!AtomicCas(&Down, 1, 0)) {
  116. ShutdownCompleteEvent.WaitI();
  117. return;
  118. }
  119. Y_ABORT_UNLESS(Queue->IsRunning(), "Session must be shut down prior to queue shutdown");
  120. TUseAfterFreeCheckerGuard handlerAliveCheckedGuard(ErrorHandler->UseAfterFreeChecker);
  121. // For legacy clients that don't use smart pointers
  122. TIntrusivePtr<TBusSessionImpl> thiz(this);
  123. Queue->Remove(this);
  124. // shutdown event loops first, so they won't send more events
  125. // to acceptors and connections
  126. ReadEventLoop.Stop();
  127. WriteEventLoop.Stop();
  128. ReadEventLoopThread->Get();
  129. WriteEventLoopThread->Get();
  130. // shutdown acceptors before connections
  131. // so they won't create more connections
  132. TVector<TAcceptorPtr> acceptors;
  133. GetAcceptors(&acceptors);
  134. {
  135. TGuard<TMutex> guard(ConnectionsLock);
  136. Acceptors.clear();
  137. }
  138. for (auto& acceptor : acceptors) {
  139. acceptor->Shutdown();
  140. }
  141. // shutdown connections
  142. TVector<TRemoteConnectionPtr> cs;
  143. GetConnections(&cs);
  144. for (auto& c : cs) {
  145. c->Shutdown(MESSAGE_SHUTDOWN);
  146. }
  147. // shutdown connections actor
  148. // must shutdown after connections destroyed
  149. ConnectionsData.ShutdownState.ShutdownCommand();
  150. GetConnectionsActor()->Schedule();
  151. ConnectionsData.ShutdownState.ShutdownComplete.WaitI();
  152. // finally shutdown status actor
  153. StatusData.ShutdownState.ShutdownCommand();
  154. GetStatusActor()->Schedule();
  155. StatusData.ShutdownState.ShutdownComplete.WaitI();
  156. // Make sure no one references IMessageHandler after Shutdown()
  157. JobCount.WaitForZero();
  158. HandlerUseCountHolder.Reset();
  159. ShutdownCompleteEvent.Signal();
  160. }
  161. bool TBusSessionImpl::IsDown() {
  162. return static_cast<bool>(AtomicGet(Down));
  163. }
  164. size_t TBusSessionImpl::GetInFlightImpl(const TNetAddr& addr) const {
  165. TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false);
  166. if (!!conn) {
  167. return conn->GetInFlight();
  168. } else {
  169. return 0;
  170. }
  171. }
  172. void TBusSessionImpl::GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const {
  173. Y_ABORT_UNLESS(addrs.size() == results.size(), "input.size != output.size");
  174. for (size_t i = 0; i < addrs.size(); ++i) {
  175. results[i] = GetInFlightImpl(addrs[i]);
  176. }
  177. }
  178. size_t TBusSessionImpl::GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const {
  179. TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false);
  180. if (!!conn) {
  181. return conn->GetConnectSyscallsNumForTest();
  182. } else {
  183. return 0;
  184. }
  185. }
  186. void TBusSessionImpl::GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const {
  187. Y_ABORT_UNLESS(addrs.size() == results.size(), "input.size != output.size");
  188. for (size_t i = 0; i < addrs.size(); ++i) {
  189. results[i] = GetConnectSyscallsNumForTestImpl(addrs[i]);
  190. }
  191. }
  192. void TBusSessionImpl::FillStatus() {
  193. }
  194. TSessionDumpStatus TBusSessionImpl::GetStatusRecordInternal() {
  195. // Probably useless, because it returns cached info now
  196. Y_ABORT_UNLESS(!Queue->GetExecutor()->IsInExecutorThread(),
  197. "GetStatus must not be called from executor thread");
  198. TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex);
  199. // TODO: returns zeros for a second after start
  200. // (until first cron)
  201. return StatusData.StatusDumpCached;
  202. }
  203. TString TBusSessionImpl::GetStatus(ui16 flags) {
  204. Y_UNUSED(flags);
  205. return GetStatusRecordInternal().PrintToString();
  206. }
  207. TConnectionStatusMonRecord TBusSessionImpl::GetStatusProtobuf() {
  208. Y_ABORT_UNLESS(!Queue->GetExecutor()->IsInExecutorThread(),
  209. "GetStatus must not be called from executor thread");
  210. TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex);
  211. return StatusData.StatusDumpCached.ConnectionStatusSummary.GetStatusProtobuf();
  212. }
  213. TString TBusSessionImpl::GetStatusSingleLine() {
  214. TSessionDumpStatus status = GetStatusRecordInternal();
  215. TStringStream ss;
  216. ss << "in-flight: " << status.Status.InFlightCount;
  217. if (IsSource_) {
  218. ss << " ack: " << status.ConnectionStatusSummary.WriterStatus.AckMessagesSize;
  219. }
  220. ss << " send-q: " << status.ConnectionStatusSummary.WriterStatus.SendQueueSize;
  221. return ss.Str();
  222. }
  223. void TBusSessionImpl::ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionWriterIncrementalStatus& connectionStatus) {
  224. Impl->DeadConnectionWriterStatusSummary += connectionStatus;
  225. }
  226. void TBusSessionImpl::ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionReaderIncrementalStatus& connectionStatus) {
  227. Impl->DeadConnectionReaderStatusSummary += connectionStatus;
  228. }
  229. void TBusSessionImpl::ProcessItem(TStatusTag, TDeadConnectionTag, const TAcceptorStatus& acceptorStatus) {
  230. Impl->DeadAcceptorStatusSummary += acceptorStatus;
  231. }
  232. void TBusSessionImpl::ProcessItem(TConnectionTag, ::NActor::TDefaultTag, const TOnAccept& onAccept) {
  233. TSocketHolder socket(onAccept.s);
  234. if (AtomicGet(Down)) {
  235. // do not create connections after shutdown initiated
  236. return;
  237. }
  238. //if (Connections.find(addr) != Connections.end()) {
  239. // TODO: it is possible
  240. // won't be a problem after socket address replaced with id
  241. //}
  242. TRemoteConnectionPtr c(new TRemoteServerConnection(VerifyDynamicCast<TRemoteServerSession*>(this), ++LastConnectionId, onAccept.addr));
  243. VerifyDynamicCast<TRemoteServerConnection*>(c.Get())->Init(socket.Release(), onAccept.now);
  244. InsertConnectionLockAcquired(c.Get());
  245. }
  246. void TBusSessionImpl::ProcessItem(TConnectionTag, TRemoveTag, TRemoteConnectionPtr c) {
  247. TAddrRemoteConnections::iterator it1 = Connections.find(c->PeerAddrSocketAddr);
  248. if (it1 != Connections.end()) {
  249. if (it1->second.Get() == c.Get()) {
  250. Connections.erase(it1);
  251. }
  252. }
  253. THashMap<ui64, TRemoteConnectionPtr>::iterator it2 = ConnectionsById.find(c->ConnectionId);
  254. if (it2 != ConnectionsById.end()) {
  255. ConnectionsById.erase(it2);
  256. }
  257. SendSnapshotToStatusActor();
  258. }
  259. void TBusSessionImpl::ProcessConnectionsAcceptorsShapshotQueueItem(TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> snapshot) {
  260. for (TVector<TRemoteConnectionPtr>::const_iterator connection = snapshot->Connections.begin();
  261. connection != snapshot->Connections.end(); ++connection) {
  262. Y_ASSERT((*connection)->ConnectionId <= snapshot->LastConnectionId);
  263. }
  264. for (TVector<TAcceptorPtr>::const_iterator acceptor = snapshot->Acceptors.begin();
  265. acceptor != snapshot->Acceptors.end(); ++acceptor) {
  266. Y_ASSERT((*acceptor)->AcceptorId <= snapshot->LastAcceptorId);
  267. }
  268. StatusData.ConnectionsAcceptorsSnapshot = snapshot;
  269. }
  270. void TBusSessionImpl::StatusUpdateCachedDumpIfNecessary(TInstant now) {
  271. if (now - StatusData.StatusDumpCachedLastUpdate > Config.Secret.StatusFlushPeriod) {
  272. StatusUpdateCachedDump();
  273. StatusData.StatusDumpCachedLastUpdate = now;
  274. }
  275. }
  276. void TBusSessionImpl::StatusUpdateCachedDump() {
  277. TSessionDumpStatus r;
  278. if (AtomicGet(Down)) {
  279. r.Shutdown = true;
  280. TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex);
  281. StatusData.StatusDumpCached = r;
  282. return;
  283. }
  284. // TODO: make thread-safe
  285. FillStatus();
  286. r.Status = StatusData.Status;
  287. {
  288. TStringStream ss;
  289. TString name = Config.Name;
  290. if (!name) {
  291. name = "unnamed";
  292. }
  293. ss << (IsSource_ ? "client" : "server") << " session " << name << ", proto " << Proto->GetService() << Endl;
  294. ss << "in flight: " << r.Status.InFlightCount;
  295. if (!IsSource_) {
  296. ss << ", " << r.Status.InFlightSize << "b";
  297. }
  298. if (r.Status.InputPaused) {
  299. ss << " (input paused)";
  300. }
  301. ss << "\n";
  302. r.Head = ss.Str();
  303. }
  304. TVector<TRemoteConnectionPtr>& connections = StatusData.ConnectionsAcceptorsSnapshot->Connections;
  305. TVector<TAcceptorPtr>& acceptors = StatusData.ConnectionsAcceptorsSnapshot->Acceptors;
  306. r.ConnectionStatusSummary = TRemoteConnectionStatus();
  307. r.ConnectionStatusSummary.Summary = true;
  308. r.ConnectionStatusSummary.Server = !IsSource_;
  309. r.ConnectionStatusSummary.WriterStatus.Incremental = Impl->DeadConnectionWriterStatusSummary;
  310. r.ConnectionStatusSummary.ReaderStatus.Incremental = Impl->DeadConnectionReaderStatusSummary;
  311. TAcceptorStatus acceptorStatusSummary = Impl->DeadAcceptorStatusSummary;
  312. {
  313. TStringStream ss;
  314. for (TVector<TAcceptorPtr>::const_iterator acceptor = acceptors.begin();
  315. acceptor != acceptors.end(); ++acceptor) {
  316. const TAcceptorStatus status = (*acceptor)->GranStatus.Listen.Get();
  317. acceptorStatusSummary += status;
  318. if (acceptor != acceptors.begin()) {
  319. ss << "\n";
  320. }
  321. ss << status.PrintToString();
  322. }
  323. r.Acceptors = ss.Str();
  324. }
  325. {
  326. TStringStream ss;
  327. for (TVector<TRemoteConnectionPtr>::const_iterator connection = connections.begin();
  328. connection != connections.end(); ++connection) {
  329. if (connection != connections.begin()) {
  330. ss << "\n";
  331. }
  332. TRemoteConnectionStatus status;
  333. status.Server = !IsSource_;
  334. status.ReaderStatus = (*connection)->GranStatus.Reader.Get();
  335. status.WriterStatus = (*connection)->GranStatus.Writer.Get();
  336. ss << status.PrintToString();
  337. r.ConnectionStatusSummary.ReaderStatus += status.ReaderStatus;
  338. r.ConnectionStatusSummary.WriterStatus += status.WriterStatus;
  339. }
  340. r.ConnectionsSummary = r.ConnectionStatusSummary.PrintToString();
  341. r.Connections = ss.Str();
  342. }
  343. r.Config = Config;
  344. TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex);
  345. StatusData.StatusDumpCached = r;
  346. }
  347. TBusSessionImpl::TStatusData::TStatusData()
  348. : ConnectionsAcceptorsSnapshot(new TConnectionsAcceptorsSnapshot)
  349. {
  350. }
  351. void TBusSessionImpl::Act(TStatusTag) {
  352. TInstant now = TInstant::Now();
  353. EShutdownState shutdownState = StatusData.ShutdownState.State.Get();
  354. StatusData.ConnectionsAcceptorsSnapshotsQueue.DequeueAllLikelyEmpty(std::bind(&TBusSessionImpl::ProcessConnectionsAcceptorsShapshotQueueItem, this, std::placeholders::_1));
  355. GetDeadConnectionWriterStatusQueue()->DequeueAllLikelyEmpty();
  356. GetDeadConnectionReaderStatusQueue()->DequeueAllLikelyEmpty();
  357. GetDeadAcceptorStatusQueue()->DequeueAllLikelyEmpty();
  358. // TODO: check queues are empty if already stopped
  359. if (shutdownState != SS_RUNNING) {
  360. // important to beak cyclic link session -> connection -> session
  361. StatusData.ConnectionsAcceptorsSnapshot->Connections.clear();
  362. StatusData.ConnectionsAcceptorsSnapshot->Acceptors.clear();
  363. }
  364. if (shutdownState == SS_SHUTDOWN_COMMAND) {
  365. StatusData.ShutdownState.CompleteShutdown();
  366. }
  367. StatusUpdateCachedDumpIfNecessary(now);
  368. }
  369. TBusSessionImpl::TConnectionsData::TConnectionsData() {
  370. }
  371. void TBusSessionImpl::Act(TConnectionTag) {
  372. TConnectionsGuard guard(ConnectionsLock);
  373. EShutdownState shutdownState = ConnectionsData.ShutdownState.State.Get();
  374. if (shutdownState == SS_SHUTDOWN_COMPLETE) {
  375. Y_ABORT_UNLESS(GetRemoveConnectionQueue()->IsEmpty());
  376. Y_ABORT_UNLESS(GetOnAcceptQueue()->IsEmpty());
  377. }
  378. GetRemoveConnectionQueue()->DequeueAllLikelyEmpty();
  379. GetOnAcceptQueue()->DequeueAllLikelyEmpty();
  380. if (shutdownState == SS_SHUTDOWN_COMMAND) {
  381. ConnectionsData.ShutdownState.CompleteShutdown();
  382. }
  383. }
  384. void TBusSessionImpl::Listen(int port, TBusMessageQueue* q) {
  385. Listen(BindOnPort(port, Config.ReusePort).second, q);
  386. }
  387. void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q) {
  388. Y_ASSERT(q == Queue);
  389. int actualPort = -1;
  390. for (const TBindResult& br : bindTo) {
  391. if (actualPort == -1) {
  392. actualPort = br.Addr.GetPort();
  393. } else {
  394. Y_ABORT_UNLESS(actualPort == br.Addr.GetPort(), "state check");
  395. }
  396. if (Config.SocketToS >= 0) {
  397. SetSocketToS(*br.Socket, &(br.Addr), Config.SocketToS);
  398. }
  399. TAcceptorPtr acceptor(new TAcceptor(this, ++LastAcceptorId, br.Socket->Release(), br.Addr));
  400. TConnectionsGuard guard(ConnectionsLock);
  401. InsertAcceptorLockAcquired(acceptor.Get());
  402. }
  403. Config.ListenPort = actualPort;
  404. }
  405. void TBusSessionImpl::SendSnapshotToStatusActor() {
  406. //Y_ASSERT(ConnectionsLock.IsLocked());
  407. TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> snapshot(new TConnectionsAcceptorsSnapshot);
  408. GetAcceptorsLockAquired(&snapshot->Acceptors);
  409. GetConnectionsLockAquired(&snapshot->Connections);
  410. snapshot->LastAcceptorId = LastAcceptorId;
  411. snapshot->LastConnectionId = LastConnectionId;
  412. StatusData.ConnectionsAcceptorsSnapshotsQueue.Enqueue(snapshot);
  413. GetStatusActor()->Schedule();
  414. }
  415. void TBusSessionImpl::InsertConnectionLockAcquired(TRemoteConnection* connection) {
  416. //Y_ASSERT(ConnectionsLock.IsLocked());
  417. Connections.insert(std::make_pair(connection->PeerAddrSocketAddr, connection));
  418. // connection for given adds may already exist at this point
  419. // (so we overwrite old connection)
  420. // after reconnect, if previous connections wasn't shutdown yet
  421. bool inserted2 = ConnectionsById.insert(std::make_pair(connection->ConnectionId, connection)).second;
  422. Y_ABORT_UNLESS(inserted2, "state check: must be inserted (2)");
  423. SendSnapshotToStatusActor();
  424. }
  425. void TBusSessionImpl::InsertAcceptorLockAcquired(TAcceptor* acceptor) {
  426. //Y_ASSERT(ConnectionsLock.IsLocked());
  427. Acceptors.push_back(acceptor);
  428. SendSnapshotToStatusActor();
  429. }
  430. void TBusSessionImpl::GetConnections(TVector<TRemoteConnectionPtr>* r) {
  431. TConnectionsGuard guard(ConnectionsLock);
  432. GetConnectionsLockAquired(r);
  433. }
  434. void TBusSessionImpl::GetAcceptors(TVector<TAcceptorPtr>* r) {
  435. TConnectionsGuard guard(ConnectionsLock);
  436. GetAcceptorsLockAquired(r);
  437. }
  438. void TBusSessionImpl::GetConnectionsLockAquired(TVector<TRemoteConnectionPtr>* r) {
  439. //Y_ASSERT(ConnectionsLock.IsLocked());
  440. r->reserve(Connections.size());
  441. for (auto& connection : Connections) {
  442. r->push_back(connection.second);
  443. }
  444. }
  445. void TBusSessionImpl::GetAcceptorsLockAquired(TVector<TAcceptorPtr>* r) {
  446. //Y_ASSERT(ConnectionsLock.IsLocked());
  447. r->reserve(Acceptors.size());
  448. for (auto& acceptor : Acceptors) {
  449. r->push_back(acceptor);
  450. }
  451. }
  452. TRemoteConnectionPtr TBusSessionImpl::GetConnectionById(ui64 id) {
  453. TConnectionsGuard guard(ConnectionsLock);
  454. THashMap<ui64, TRemoteConnectionPtr>::const_iterator it = ConnectionsById.find(id);
  455. if (it == ConnectionsById.end()) {
  456. return nullptr;
  457. } else {
  458. return it->second;
  459. }
  460. }
  461. TAcceptorPtr TBusSessionImpl::GetAcceptorById(ui64 id) {
  462. TGuard<TMutex> guard(ConnectionsLock);
  463. for (const auto& Acceptor : Acceptors) {
  464. if (Acceptor->AcceptorId == id) {
  465. return Acceptor;
  466. }
  467. }
  468. return nullptr;
  469. }
  470. void TBusSessionImpl::InvokeOnError(TNonDestroyingAutoPtr<TBusMessage> message, EMessageStatus status) {
  471. message->CheckClean();
  472. ErrorHandler->OnError(message, status);
  473. }
  474. TRemoteConnectionPtr TBusSessionImpl::GetConnection(const TBusSocketAddr& addr, bool create) {
  475. TConnectionsGuard guard(ConnectionsLock);
  476. TAddrRemoteConnections::const_iterator it = Connections.find(addr);
  477. if (it != Connections.end()) {
  478. return it->second;
  479. }
  480. if (!create) {
  481. return TRemoteConnectionPtr();
  482. }
  483. Y_ABORT_UNLESS(IsSource_, "must be source");
  484. TRemoteConnectionPtr c(new TRemoteClientConnection(VerifyDynamicCast<TRemoteClientSession*>(this), ++LastConnectionId, addr.ToNetAddr()));
  485. InsertConnectionLockAcquired(c.Get());
  486. return c;
  487. }
  488. void TBusSessionImpl::Cron() {
  489. TVector<TRemoteConnectionPtr> connections;
  490. GetConnections(&connections);
  491. for (const auto& it : connections) {
  492. TRemoteConnection* connection = it.Get();
  493. if (IsSource_) {
  494. VerifyDynamicCast<TRemoteClientConnection*>(connection)->ScheduleTimeoutMessages();
  495. } else {
  496. VerifyDynamicCast<TRemoteServerConnection*>(connection)->WriterData.TimeToRotateCounters.AddTask();
  497. // no schedule: do not rotate if there's no traffic
  498. }
  499. }
  500. // status updates are sent without scheduling
  501. GetStatusActor()->Schedule();
  502. Queue->Schedule(IScheduleItemAutoPtr(new TScheduleSession(this, TInstant::Now() + Config.Secret.TimeoutPeriod)));
  503. }
  504. TString TBusSessionImpl::GetNameInternal() {
  505. if (!!Config.Name) {
  506. return Config.Name;
  507. }
  508. return ProtoName;
  509. }