tcp_socket_impl.h 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. #pragma once
  2. #include "asio.h"
  3. #include "io_service_impl.h"
  4. #include <sys/uio.h>
  5. #if defined(_bionic_)
  6. # define IOV_MAX 1024
  7. #endif
  8. namespace NAsio {
  9. // ownership/keep-alive references:
  10. // Handlers <- TOperation...(TFdOperation) <- TPollFdEventHandler <- TIOService
  11. class TSocketOperation: public TFdOperation {
  12. public:
  13. TSocketOperation(TTcpSocket::TImpl& s, TPollType pt, TInstant deadline);
  14. protected:
  15. TTcpSocket::TImpl& S_;
  16. };
  17. class TOperationConnect: public TSocketOperation {
  18. public:
  19. TOperationConnect(TTcpSocket::TImpl& s, TTcpSocket::TConnectHandler h, TInstant deadline)
  20. : TSocketOperation(s, PollWrite, deadline)
  21. , H_(h)
  22. {
  23. }
  24. bool Execute(int errorCode) override {
  25. H_(errorCode, *this);
  26. return true;
  27. }
  28. TTcpSocket::TConnectHandler H_;
  29. };
  30. class TOperationConnectFailed: public TSocketOperation {
  31. public:
  32. TOperationConnectFailed(TTcpSocket::TImpl& s, TTcpSocket::TConnectHandler h, int errorCode, TInstant deadline)
  33. : TSocketOperation(s, PollWrite, deadline)
  34. , H_(h)
  35. , ErrorCode_(errorCode)
  36. {
  37. Speculative_ = true;
  38. }
  39. bool Execute(int errorCode) override {
  40. Y_UNUSED(errorCode);
  41. H_(ErrorCode_, *this);
  42. return true;
  43. }
  44. TTcpSocket::TConnectHandler H_;
  45. int ErrorCode_;
  46. };
  47. class TOperationWrite: public TSocketOperation {
  48. public:
  49. TOperationWrite(TTcpSocket::TImpl& s, NAsio::TTcpSocket::TSendedData& buffs, TTcpSocket::TWriteHandler h, TInstant deadline)
  50. : TSocketOperation(s, PollWrite, deadline)
  51. , H_(h)
  52. , Buffs_(buffs)
  53. , Written_(0)
  54. {
  55. Speculative_ = true;
  56. }
  57. //return true, if not need write more data
  58. bool Execute(int errorCode) override;
  59. private:
  60. TTcpSocket::TWriteHandler H_;
  61. NAsio::TTcpSocket::TSendedData Buffs_;
  62. size_t Written_;
  63. };
  64. class TOperationWriteVector: public TSocketOperation {
  65. public:
  66. TOperationWriteVector(TTcpSocket::TImpl& s, TContIOVector* v, TTcpSocket::TWriteHandler h, TInstant deadline)
  67. : TSocketOperation(s, PollWrite, deadline)
  68. , H_(h)
  69. , V_(*v)
  70. , Written_(0)
  71. {
  72. Speculative_ = true;
  73. }
  74. //return true, if not need write more data
  75. bool Execute(int errorCode) override;
  76. private:
  77. TTcpSocket::TWriteHandler H_;
  78. TContIOVector& V_;
  79. size_t Written_;
  80. };
  81. class TOperationReadSome: public TSocketOperation {
  82. public:
  83. TOperationReadSome(TTcpSocket::TImpl& s, void* buff, size_t size, TTcpSocket::TReadHandler h, TInstant deadline)
  84. : TSocketOperation(s, PollRead, deadline)
  85. , H_(h)
  86. , Buff_(static_cast<char*>(buff))
  87. , Size_(size)
  88. {
  89. }
  90. //return true, if not need read more data
  91. bool Execute(int errorCode) override;
  92. protected:
  93. TTcpSocket::TReadHandler H_;
  94. char* Buff_;
  95. size_t Size_;
  96. };
  97. class TOperationRead: public TOperationReadSome {
  98. public:
  99. TOperationRead(TTcpSocket::TImpl& s, void* buff, size_t size, TTcpSocket::TReadHandler h, TInstant deadline)
  100. : TOperationReadSome(s, buff, size, h, deadline)
  101. , Read_(0)
  102. {
  103. }
  104. bool Execute(int errorCode) override;
  105. private:
  106. size_t Read_;
  107. };
  108. class TOperationPoll: public TSocketOperation {
  109. public:
  110. TOperationPoll(TTcpSocket::TImpl& s, TPollType pt, TTcpSocket::TPollHandler h, TInstant deadline)
  111. : TSocketOperation(s, pt, deadline)
  112. , H_(h)
  113. {
  114. }
  115. bool Execute(int errorCode) override {
  116. H_(errorCode, *this);
  117. return true;
  118. }
  119. private:
  120. TTcpSocket::TPollHandler H_;
  121. };
  122. template <class T>
  123. class TOperationCancel: public TNoneOperation {
  124. public:
  125. TOperationCancel(T* s)
  126. : TNoneOperation()
  127. , S_(s)
  128. {
  129. Speculative_ = true;
  130. }
  131. ~TOperationCancel() override {
  132. }
  133. private:
  134. bool Execute(int errorCode) override {
  135. Y_UNUSED(errorCode);
  136. if (!errorCode && S_->Fd() != INVALID_SOCKET) {
  137. S_->GetIOServiceImpl().CancelFdOp(S_->Fd());
  138. }
  139. return true;
  140. }
  141. TIntrusivePtr<T> S_;
  142. };
  143. class TTcpSocket::TImpl: public TNonCopyable, public TThrRefBase {
  144. public:
  145. typedef TTcpSocket::TSendedData TSendedData;
  146. TImpl(TIOService::TImpl& srv) noexcept
  147. : Srv_(srv)
  148. {
  149. }
  150. ~TImpl() override {
  151. DBGOUT("TSocket::~TImpl()");
  152. }
  153. void Assign(SOCKET fd, TEndpoint ep) {
  154. TSocketHolder(fd).Swap(S_);
  155. RemoteEndpoint_ = ep;
  156. }
  157. void AsyncConnect(const TEndpoint& ep, TTcpSocket::TConnectHandler h, TInstant deadline) {
  158. TSocketHolder s(socket(ep.SockAddr()->sa_family, SOCK_STREAM, 0));
  159. if (Y_UNLIKELY(s == INVALID_SOCKET || Srv_.HasAbort())) {
  160. throw TSystemError() << TStringBuf("can't create socket");
  161. }
  162. SetNonBlock(s);
  163. int err;
  164. do {
  165. err = connect(s, ep.SockAddr(), (int)ep.SockAddrLen());
  166. if (Y_LIKELY(err)) {
  167. err = LastSystemError();
  168. }
  169. #if defined(_freebsd_)
  170. if (Y_UNLIKELY(err == EINTR)) {
  171. err = EINPROGRESS;
  172. }
  173. } while (0);
  174. #elif defined(_linux_)
  175. } while (Y_UNLIKELY(err == EINTR));
  176. #else
  177. } while (0);
  178. #endif
  179. RemoteEndpoint_ = ep;
  180. S_.Swap(s);
  181. DBGOUT("AsyncConnect(): " << err);
  182. if (Y_LIKELY(err == EINPROGRESS || err == EWOULDBLOCK || err == 0)) {
  183. Srv_.ScheduleOp(new TOperationConnect(*this, h, deadline)); //set callback
  184. } else {
  185. Srv_.ScheduleOp(new TOperationConnectFailed(*this, h, err, deadline)); //set callback
  186. }
  187. }
  188. inline void AsyncWrite(TSendedData& d, TTcpSocket::TWriteHandler h, TInstant deadline) {
  189. Srv_.ScheduleOp(new TOperationWrite(*this, d, h, deadline));
  190. }
  191. inline void AsyncWrite(TContIOVector* v, TTcpSocket::TWriteHandler h, TInstant deadline) {
  192. Srv_.ScheduleOp(new TOperationWriteVector(*this, v, h, deadline));
  193. }
  194. inline void AsyncRead(void* buff, size_t size, TTcpSocket::TReadHandler h, TInstant deadline) {
  195. Srv_.ScheduleOp(new TOperationRead(*this, buff, size, h, deadline));
  196. }
  197. inline void AsyncReadSome(void* buff, size_t size, TTcpSocket::TReadHandler h, TInstant deadline) {
  198. Srv_.ScheduleOp(new TOperationReadSome(*this, buff, size, h, deadline));
  199. }
  200. inline void AsyncPollWrite(TTcpSocket::TPollHandler h, TInstant deadline) {
  201. Srv_.ScheduleOp(new TOperationPoll(*this, TOperationPoll::PollWrite, h, deadline));
  202. }
  203. inline void AsyncPollRead(TTcpSocket::TPollHandler h, TInstant deadline) {
  204. Srv_.ScheduleOp(new TOperationPoll(*this, TOperationPoll::PollRead, h, deadline));
  205. }
  206. inline void AsyncCancel() {
  207. if (Y_UNLIKELY(Srv_.HasAbort())) {
  208. return;
  209. }
  210. Srv_.ScheduleOp(new TOperationCancel<TTcpSocket::TImpl>(this));
  211. }
  212. inline bool SysCallHasResult(ssize_t& n, TErrorCode& ec) noexcept {
  213. if (n >= 0) {
  214. return true;
  215. }
  216. int errn = LastSystemError();
  217. if (errn == EINTR) {
  218. return false;
  219. }
  220. ec.Assign(errn);
  221. n = 0;
  222. return true;
  223. }
  224. size_t WriteSome(TContIOVector& iov, TErrorCode& ec) noexcept {
  225. for (;;) {
  226. ssize_t n = writev(S_, (const iovec*)iov.Parts(), Min(IOV_MAX, (int)iov.Count()));
  227. DBGOUT("WriteSome(): n=" << n);
  228. if (SysCallHasResult(n, ec)) {
  229. return n;
  230. }
  231. }
  232. }
  233. size_t WriteSome(const void* buff, size_t size, TErrorCode& ec) noexcept {
  234. for (;;) {
  235. ssize_t n = send(S_, (char*)buff, size, 0);
  236. DBGOUT("WriteSome(): n=" << n);
  237. if (SysCallHasResult(n, ec)) {
  238. return n;
  239. }
  240. }
  241. }
  242. size_t ReadSome(void* buff, size_t size, TErrorCode& ec) noexcept {
  243. for (;;) {
  244. ssize_t n = recv(S_, (char*)buff, size, 0);
  245. DBGOUT("ReadSome(): n=" << n);
  246. if (SysCallHasResult(n, ec)) {
  247. return n;
  248. }
  249. }
  250. }
  251. inline void Shutdown(TTcpSocket::TShutdownMode mode, TErrorCode& ec) {
  252. if (shutdown(S_, mode)) {
  253. ec.Assign(LastSystemError());
  254. }
  255. }
  256. TIOService::TImpl& GetIOServiceImpl() const noexcept {
  257. return Srv_;
  258. }
  259. inline SOCKET Fd() const noexcept {
  260. return S_;
  261. }
  262. TEndpoint RemoteEndpoint() const {
  263. return RemoteEndpoint_;
  264. }
  265. private:
  266. TIOService::TImpl& Srv_;
  267. TSocketHolder S_;
  268. TEndpoint RemoteEndpoint_;
  269. };
  270. }