sockpool.h 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. #pragma once
  2. #include "impl.h"
  3. #include "network.h"
  4. #include <util/network/address.h>
  5. #include <util/network/socket.h>
  6. #include <util/system/mutex.h>
  7. extern void SetCommonSockOpts(SOCKET sock, const struct sockaddr* sa = nullptr);
  8. class TSocketPool;
  9. class TPooledSocket {
  10. class TImpl: public TIntrusiveListItem<TImpl>, public TSimpleRefCount<TImpl, TImpl> {
  11. public:
  12. TImpl(SOCKET fd, TSocketPool* pool) noexcept
  13. : Pool_(pool)
  14. , IsKeepAlive_(false)
  15. , Fd_(fd)
  16. {
  17. Touch();
  18. }
  19. static void Destroy(TImpl* impl) noexcept {
  20. impl->DoDestroy();
  21. }
  22. void DoDestroy() noexcept {
  23. if (!Closed() && IsKeepAlive() && IsInGoodState()) {
  24. ReturnToPool();
  25. } else {
  26. delete this;
  27. }
  28. }
  29. bool IsKeepAlive() const noexcept {
  30. return IsKeepAlive_;
  31. }
  32. void SetKeepAlive(bool ka) {
  33. ::SetKeepAlive(Fd_, ka);
  34. IsKeepAlive_ = ka;
  35. }
  36. SOCKET Socket() const noexcept {
  37. return Fd_;
  38. }
  39. bool Closed() const noexcept {
  40. return Fd_.Closed();
  41. }
  42. void Close() noexcept {
  43. Fd_.Close();
  44. }
  45. bool IsInGoodState() const noexcept {
  46. int err = 0;
  47. socklen_t len = sizeof(err);
  48. getsockopt(Fd_, SOL_SOCKET, SO_ERROR, (char*)&err, &len);
  49. return !err;
  50. }
  51. bool IsOpen() const noexcept {
  52. return IsInGoodState() && IsNotSocketClosedByOtherSide(Fd_);
  53. }
  54. void Touch() noexcept {
  55. TouchTime_ = TInstant::Now();
  56. }
  57. const TInstant& LastTouch() const noexcept {
  58. return TouchTime_;
  59. }
  60. private:
  61. inline void ReturnToPool() noexcept;
  62. private:
  63. TSocketPool* Pool_;
  64. bool IsKeepAlive_;
  65. TSocketHolder Fd_;
  66. TInstant TouchTime_;
  67. };
  68. friend class TSocketPool;
  69. public:
  70. TPooledSocket()
  71. : Impl_(nullptr)
  72. {
  73. }
  74. TPooledSocket(TImpl* impl)
  75. : Impl_(impl)
  76. {
  77. }
  78. ~TPooledSocket() {
  79. if (UncaughtException() && !!Impl_) {
  80. Close();
  81. }
  82. }
  83. operator SOCKET() const noexcept {
  84. return Impl_->Socket();
  85. }
  86. void SetKeepAlive(bool ka) {
  87. Impl_->SetKeepAlive(ka);
  88. }
  89. void Close() noexcept {
  90. Impl_->Close();
  91. }
  92. private:
  93. TIntrusivePtr<TImpl> Impl_;
  94. };
  95. struct TConnectData {
  96. TConnectData(TCont* cont, const TInstant& deadLine)
  97. : Cont(cont)
  98. , DeadLine(deadLine)
  99. {
  100. }
  101. TConnectData(TCont* cont, const TDuration& timeOut)
  102. : Cont(cont)
  103. , DeadLine(TInstant::Now() + timeOut)
  104. {
  105. }
  106. TCont* Cont;
  107. const TInstant DeadLine;
  108. };
  109. class TSocketPool {
  110. friend class TPooledSocket::TImpl;
  111. public:
  112. typedef TAtomicSharedPtr<NAddr::IRemoteAddr> TAddrRef;
  113. TSocketPool(int ip, int port)
  114. : Addr_(new NAddr::TIPv4Addr(TIpAddress((ui32)ip, (ui16)port)))
  115. {
  116. }
  117. TSocketPool(const TAddrRef& addr)
  118. : Addr_(addr)
  119. {
  120. }
  121. void EraseStale(const TInstant& maxAge) noexcept {
  122. TSockets toDelete;
  123. {
  124. TGuard<TMutex> guard(Mutex_);
  125. for (TSockets::TIterator it = Pool_.Begin(); it != Pool_.End();) {
  126. if (it->LastTouch() < maxAge) {
  127. toDelete.PushBack(&*(it++));
  128. } else {
  129. ++it;
  130. }
  131. }
  132. }
  133. }
  134. TPooledSocket Get(TConnectData* conn) {
  135. TPooledSocket ret;
  136. if (TPooledSocket::TImpl* alive = GetImpl()) {
  137. ret = TPooledSocket(alive);
  138. } else {
  139. ret = AllocateMore(conn);
  140. }
  141. ret.Impl_->Touch();
  142. return ret;
  143. }
  144. bool GetAlive(TPooledSocket& socket) {
  145. if (TPooledSocket::TImpl* alive = GetImpl()) {
  146. alive->Touch();
  147. socket = TPooledSocket(alive);
  148. return true;
  149. }
  150. return false;
  151. }
  152. private:
  153. TPooledSocket::TImpl* GetImpl() {
  154. TGuard<TMutex> guard(Mutex_);
  155. while (!Pool_.Empty()) {
  156. THolder<TPooledSocket::TImpl> ret(Pool_.PopFront());
  157. if (ret->IsOpen()) {
  158. return ret.Release();
  159. }
  160. }
  161. return nullptr;
  162. }
  163. void Release(TPooledSocket::TImpl* impl) noexcept {
  164. TGuard<TMutex> guard(Mutex_);
  165. Pool_.PushFront(impl);
  166. }
  167. TPooledSocket AllocateMore(TConnectData* conn);
  168. private:
  169. TAddrRef Addr_;
  170. using TSockets = TIntrusiveListWithAutoDelete<TPooledSocket::TImpl, TDelete>;
  171. TSockets Pool_;
  172. TMutex Mutex_;
  173. };
  174. inline void TPooledSocket::TImpl::ReturnToPool() noexcept {
  175. Pool_->Release(this);
  176. }
  177. class TContIO: public IInputStream, public IOutputStream {
  178. public:
  179. TContIO(SOCKET fd, TCont* cont)
  180. : Fd_(fd)
  181. , Cont_(cont)
  182. {
  183. }
  184. void DoWrite(const void* buf, size_t len) override {
  185. NCoro::WriteI(Cont_, Fd_, buf, len).Checked();
  186. }
  187. size_t DoRead(void* buf, size_t len) override {
  188. return NCoro::ReadI(Cont_, Fd_, buf, len).Checked();
  189. }
  190. SOCKET Fd() const noexcept {
  191. return Fd_;
  192. }
  193. private:
  194. SOCKET Fd_;
  195. TCont* Cont_;
  196. };