bufferevent_async.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706
  1. /*
  2. * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
  3. *
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions
  8. * are met:
  9. * 1. Redistributions of source code must retain the above copyright
  10. * notice, this list of conditions and the following disclaimer.
  11. * 2. Redistributions in binary form must reproduce the above copyright
  12. * notice, this list of conditions and the following disclaimer in the
  13. * documentation and/or other materials provided with the distribution.
  14. * 3. The name of the author may not be used to endorse or promote products
  15. * derived from this software without specific prior written permission.
  16. *
  17. * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
  18. * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
  19. * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
  20. * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
  21. * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
  22. * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  23. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  24. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  25. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
  26. * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  27. */
  28. #include "event2/event-config.h"
  29. #include "evconfig-private.h"
  30. #ifdef EVENT__HAVE_SYS_TIME_H
  31. #include <sys/time.h>
  32. #endif
  33. #include <errno.h>
  34. #include <stdio.h>
  35. #include <stdlib.h>
  36. #include <string.h>
  37. #ifdef EVENT__HAVE_STDARG_H
  38. #include <stdarg.h>
  39. #endif
  40. #ifdef EVENT__HAVE_UNISTD_H
  41. #include <unistd.h>
  42. #endif
  43. #ifdef _WIN32
  44. #include <winsock2.h>
  45. #include <winerror.h>
  46. #include <ws2tcpip.h>
  47. #endif
  48. #include <sys/queue.h>
  49. #include "event2/util.h"
  50. #include "event2/bufferevent.h"
  51. #include "event2/buffer.h"
  52. #include "event2/bufferevent_struct.h"
  53. #include "event2/event.h"
  54. #include "event2/util.h"
  55. #include "event-internal.h"
  56. #include "log-internal.h"
  57. #include "mm-internal.h"
  58. #include "bufferevent-internal.h"
  59. #include "util-internal.h"
  60. #include "iocp-internal.h"
  61. #ifndef SO_UPDATE_CONNECT_CONTEXT
  62. /* Mingw is sometimes missing this */
  63. #define SO_UPDATE_CONNECT_CONTEXT 0x7010
  64. #endif
  65. /* prototypes */
  66. static int be_async_enable(struct bufferevent *, short);
  67. static int be_async_disable(struct bufferevent *, short);
  68. static void be_async_destruct(struct bufferevent *);
  69. static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
  70. static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
  71. struct bufferevent_async {
  72. struct bufferevent_private bev;
  73. struct event_overlapped connect_overlapped;
  74. struct event_overlapped read_overlapped;
  75. struct event_overlapped write_overlapped;
  76. size_t read_in_progress;
  77. size_t write_in_progress;
  78. unsigned ok : 1;
  79. unsigned read_added : 1;
  80. unsigned write_added : 1;
  81. };
  82. const struct bufferevent_ops bufferevent_ops_async = {
  83. "socket_async",
  84. evutil_offsetof(struct bufferevent_async, bev.bev),
  85. be_async_enable,
  86. be_async_disable,
  87. NULL, /* Unlink */
  88. be_async_destruct,
  89. bufferevent_generic_adj_timeouts_,
  90. be_async_flush,
  91. be_async_ctrl,
  92. };
  93. static inline void
  94. be_async_run_eventcb(struct bufferevent *bev, short what, int options)
  95. { bufferevent_run_eventcb_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
  96. static inline void
  97. be_async_trigger_nolock(struct bufferevent *bev, short what, int options)
  98. { bufferevent_trigger_nolock_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
  99. static inline int
  100. fatal_error(int err)
  101. {
  102. switch (err) {
  103. /* We may have already associated this fd with a port.
  104. * Let's hope it's this port, and that the error code
  105. * for doing this neer changes. */
  106. case ERROR_INVALID_PARAMETER:
  107. return 0;
  108. }
  109. return 1;
  110. }
  111. static inline struct bufferevent_async *
  112. upcast(struct bufferevent *bev)
  113. {
  114. struct bufferevent_async *bev_a;
  115. if (!BEV_IS_ASYNC(bev))
  116. return NULL;
  117. bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
  118. return bev_a;
  119. }
  120. static inline struct bufferevent_async *
  121. upcast_connect(struct event_overlapped *eo)
  122. {
  123. struct bufferevent_async *bev_a;
  124. bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
  125. EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
  126. return bev_a;
  127. }
  128. static inline struct bufferevent_async *
  129. upcast_read(struct event_overlapped *eo)
  130. {
  131. struct bufferevent_async *bev_a;
  132. bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
  133. EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
  134. return bev_a;
  135. }
  136. static inline struct bufferevent_async *
  137. upcast_write(struct event_overlapped *eo)
  138. {
  139. struct bufferevent_async *bev_a;
  140. bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
  141. EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
  142. return bev_a;
  143. }
  144. static void
  145. bev_async_del_write(struct bufferevent_async *beva)
  146. {
  147. struct bufferevent *bev = &beva->bev.bev;
  148. if (beva->write_added) {
  149. beva->write_added = 0;
  150. event_base_del_virtual_(bev->ev_base);
  151. }
  152. }
  153. static void
  154. bev_async_del_read(struct bufferevent_async *beva)
  155. {
  156. struct bufferevent *bev = &beva->bev.bev;
  157. if (beva->read_added) {
  158. beva->read_added = 0;
  159. event_base_del_virtual_(bev->ev_base);
  160. }
  161. }
  162. static void
  163. bev_async_add_write(struct bufferevent_async *beva)
  164. {
  165. struct bufferevent *bev = &beva->bev.bev;
  166. if (!beva->write_added) {
  167. beva->write_added = 1;
  168. event_base_add_virtual_(bev->ev_base);
  169. }
  170. }
  171. static void
  172. bev_async_add_read(struct bufferevent_async *beva)
  173. {
  174. struct bufferevent *bev = &beva->bev.bev;
  175. if (!beva->read_added) {
  176. beva->read_added = 1;
  177. event_base_add_virtual_(bev->ev_base);
  178. }
  179. }
  180. static void
  181. bev_async_consider_writing(struct bufferevent_async *beva)
  182. {
  183. size_t at_most;
  184. int limit;
  185. struct bufferevent *bev = &beva->bev.bev;
  186. /* Don't write if there's a write in progress, or we do not
  187. * want to write, or when there's nothing left to write. */
  188. if (beva->write_in_progress || beva->bev.connecting)
  189. return;
  190. if (!beva->ok || !(bev->enabled&EV_WRITE) ||
  191. !evbuffer_get_length(bev->output)) {
  192. bev_async_del_write(beva);
  193. return;
  194. }
  195. at_most = evbuffer_get_length(bev->output);
  196. /* This is safe so long as bufferevent_get_write_max never returns
  197. * more than INT_MAX. That's true for now. XXXX */
  198. limit = (int)bufferevent_get_write_max_(&beva->bev);
  199. if (at_most >= (size_t)limit && limit >= 0)
  200. at_most = limit;
  201. if (beva->bev.write_suspended) {
  202. bev_async_del_write(beva);
  203. return;
  204. }
  205. /* XXXX doesn't respect low-water mark very well. */
  206. bufferevent_incref_(bev);
  207. if (evbuffer_launch_write_(bev->output, at_most,
  208. &beva->write_overlapped)) {
  209. bufferevent_decref_(bev);
  210. beva->ok = 0;
  211. be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
  212. } else {
  213. beva->write_in_progress = at_most;
  214. bufferevent_decrement_write_buckets_(&beva->bev, at_most);
  215. bev_async_add_write(beva);
  216. }
  217. }
  218. static void
  219. bev_async_consider_reading(struct bufferevent_async *beva)
  220. {
  221. size_t cur_size;
  222. size_t read_high;
  223. size_t at_most;
  224. int limit;
  225. struct bufferevent *bev = &beva->bev.bev;
  226. /* Don't read if there is a read in progress, or we do not
  227. * want to read. */
  228. if (beva->read_in_progress || beva->bev.connecting)
  229. return;
  230. if (!beva->ok || !(bev->enabled&EV_READ)) {
  231. bev_async_del_read(beva);
  232. return;
  233. }
  234. /* Don't read if we're full */
  235. cur_size = evbuffer_get_length(bev->input);
  236. read_high = bev->wm_read.high;
  237. if (read_high) {
  238. if (cur_size >= read_high) {
  239. bev_async_del_read(beva);
  240. return;
  241. }
  242. at_most = read_high - cur_size;
  243. } else {
  244. at_most = 16384; /* FIXME totally magic. */
  245. }
  246. /* XXXX This over-commits. */
  247. /* XXXX see also not above on cast on bufferevent_get_write_max_() */
  248. limit = (int)bufferevent_get_read_max_(&beva->bev);
  249. if (at_most >= (size_t)limit && limit >= 0)
  250. at_most = limit;
  251. if (beva->bev.read_suspended) {
  252. bev_async_del_read(beva);
  253. return;
  254. }
  255. bufferevent_incref_(bev);
  256. if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
  257. beva->ok = 0;
  258. be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
  259. bufferevent_decref_(bev);
  260. } else {
  261. beva->read_in_progress = at_most;
  262. bufferevent_decrement_read_buckets_(&beva->bev, at_most);
  263. bev_async_add_read(beva);
  264. }
  265. return;
  266. }
  267. static void
  268. be_async_outbuf_callback(struct evbuffer *buf,
  269. const struct evbuffer_cb_info *cbinfo,
  270. void *arg)
  271. {
  272. struct bufferevent *bev = arg;
  273. struct bufferevent_async *bev_async = upcast(bev);
  274. /* If we added data to the outbuf and were not writing before,
  275. * we may want to write now. */
  276. bufferevent_incref_and_lock_(bev);
  277. if (cbinfo->n_added)
  278. bev_async_consider_writing(bev_async);
  279. bufferevent_decref_and_unlock_(bev);
  280. }
  281. static void
  282. be_async_inbuf_callback(struct evbuffer *buf,
  283. const struct evbuffer_cb_info *cbinfo,
  284. void *arg)
  285. {
  286. struct bufferevent *bev = arg;
  287. struct bufferevent_async *bev_async = upcast(bev);
  288. /* If we drained data from the inbuf and were not reading before,
  289. * we may want to read now */
  290. bufferevent_incref_and_lock_(bev);
  291. if (cbinfo->n_deleted)
  292. bev_async_consider_reading(bev_async);
  293. bufferevent_decref_and_unlock_(bev);
  294. }
  295. static int
  296. be_async_enable(struct bufferevent *buf, short what)
  297. {
  298. struct bufferevent_async *bev_async = upcast(buf);
  299. if (!bev_async->ok)
  300. return -1;
  301. if (bev_async->bev.connecting) {
  302. /* Don't launch anything during connection attempts. */
  303. return 0;
  304. }
  305. if (what & EV_READ)
  306. BEV_RESET_GENERIC_READ_TIMEOUT(buf);
  307. if (what & EV_WRITE)
  308. BEV_RESET_GENERIC_WRITE_TIMEOUT(buf);
  309. /* If we newly enable reading or writing, and we aren't reading or
  310. writing already, consider launching a new read or write. */
  311. if (what & EV_READ)
  312. bev_async_consider_reading(bev_async);
  313. if (what & EV_WRITE)
  314. bev_async_consider_writing(bev_async);
  315. return 0;
  316. }
  317. static int
  318. be_async_disable(struct bufferevent *bev, short what)
  319. {
  320. struct bufferevent_async *bev_async = upcast(bev);
  321. /* XXXX If we disable reading or writing, we may want to consider
  322. * canceling any in-progress read or write operation, though it might
  323. * not work. */
  324. if (what & EV_READ) {
  325. BEV_DEL_GENERIC_READ_TIMEOUT(bev);
  326. bev_async_del_read(bev_async);
  327. }
  328. if (what & EV_WRITE) {
  329. BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
  330. bev_async_del_write(bev_async);
  331. }
  332. return 0;
  333. }
  334. static void
  335. be_async_destruct(struct bufferevent *bev)
  336. {
  337. struct bufferevent_async *bev_async = upcast(bev);
  338. struct bufferevent_private *bev_p = BEV_UPCAST(bev);
  339. evutil_socket_t fd;
  340. EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
  341. !upcast(bev)->read_in_progress);
  342. bev_async_del_read(bev_async);
  343. bev_async_del_write(bev_async);
  344. fd = evbuffer_overlapped_get_fd_(bev->input);
  345. if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
  346. (bev_p->options & BEV_OPT_CLOSE_ON_FREE)) {
  347. evutil_closesocket(fd);
  348. evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
  349. }
  350. }
  351. /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
  352. * we use WSAGetOverlappedResult to translate. */
  353. static void
  354. bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
  355. {
  356. DWORD bytes, flags;
  357. evutil_socket_t fd;
  358. fd = evbuffer_overlapped_get_fd_(bev->input);
  359. WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
  360. }
  361. static int
  362. be_async_flush(struct bufferevent *bev, short what,
  363. enum bufferevent_flush_mode mode)
  364. {
  365. return 0;
  366. }
  367. static void
  368. connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
  369. ev_ssize_t nbytes, int ok)
  370. {
  371. struct bufferevent_async *bev_a = upcast_connect(eo);
  372. struct bufferevent *bev = &bev_a->bev.bev;
  373. evutil_socket_t sock;
  374. BEV_LOCK(bev);
  375. EVUTIL_ASSERT(bev_a->bev.connecting);
  376. bev_a->bev.connecting = 0;
  377. sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input);
  378. /* XXXX Handle error? */
  379. setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
  380. if (ok)
  381. bufferevent_async_set_connected_(bev);
  382. else
  383. bev_async_set_wsa_error(bev, eo);
  384. be_async_run_eventcb(bev, ok ? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
  385. event_base_del_virtual_(bev->ev_base);
  386. bufferevent_decref_and_unlock_(bev);
  387. }
  388. static void
  389. read_complete(struct event_overlapped *eo, ev_uintptr_t key,
  390. ev_ssize_t nbytes, int ok)
  391. {
  392. struct bufferevent_async *bev_a = upcast_read(eo);
  393. struct bufferevent *bev = &bev_a->bev.bev;
  394. short what = BEV_EVENT_READING;
  395. ev_ssize_t amount_unread;
  396. BEV_LOCK(bev);
  397. EVUTIL_ASSERT(bev_a->read_in_progress);
  398. amount_unread = bev_a->read_in_progress - nbytes;
  399. evbuffer_commit_read_(bev->input, nbytes);
  400. bev_a->read_in_progress = 0;
  401. if (amount_unread)
  402. bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread);
  403. if (!ok)
  404. bev_async_set_wsa_error(bev, eo);
  405. if (bev_a->ok) {
  406. if (ok && nbytes) {
  407. BEV_RESET_GENERIC_READ_TIMEOUT(bev);
  408. be_async_trigger_nolock(bev, EV_READ, 0);
  409. bev_async_consider_reading(bev_a);
  410. } else if (!ok) {
  411. what |= BEV_EVENT_ERROR;
  412. bev_a->ok = 0;
  413. be_async_run_eventcb(bev, what, 0);
  414. } else if (!nbytes) {
  415. what |= BEV_EVENT_EOF;
  416. bev_a->ok = 0;
  417. be_async_run_eventcb(bev, what, 0);
  418. }
  419. }
  420. bufferevent_decref_and_unlock_(bev);
  421. }
  422. static void
  423. write_complete(struct event_overlapped *eo, ev_uintptr_t key,
  424. ev_ssize_t nbytes, int ok)
  425. {
  426. struct bufferevent_async *bev_a = upcast_write(eo);
  427. struct bufferevent *bev = &bev_a->bev.bev;
  428. short what = BEV_EVENT_WRITING;
  429. ev_ssize_t amount_unwritten;
  430. BEV_LOCK(bev);
  431. EVUTIL_ASSERT(bev_a->write_in_progress);
  432. amount_unwritten = bev_a->write_in_progress - nbytes;
  433. evbuffer_commit_write_(bev->output, nbytes);
  434. bev_a->write_in_progress = 0;
  435. if (amount_unwritten)
  436. bufferevent_decrement_write_buckets_(&bev_a->bev,
  437. -amount_unwritten);
  438. if (!ok)
  439. bev_async_set_wsa_error(bev, eo);
  440. if (bev_a->ok) {
  441. if (ok && nbytes) {
  442. BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
  443. be_async_trigger_nolock(bev, EV_WRITE, 0);
  444. bev_async_consider_writing(bev_a);
  445. } else if (!ok) {
  446. what |= BEV_EVENT_ERROR;
  447. bev_a->ok = 0;
  448. be_async_run_eventcb(bev, what, 0);
  449. } else if (!nbytes) {
  450. what |= BEV_EVENT_EOF;
  451. bev_a->ok = 0;
  452. be_async_run_eventcb(bev, what, 0);
  453. }
  454. }
  455. bufferevent_decref_and_unlock_(bev);
  456. }
  457. struct bufferevent *
  458. bufferevent_async_new_(struct event_base *base,
  459. evutil_socket_t fd, int options)
  460. {
  461. struct bufferevent_async *bev_a;
  462. struct bufferevent *bev;
  463. struct event_iocp_port *iocp;
  464. options |= BEV_OPT_THREADSAFE;
  465. if (!(iocp = event_base_get_iocp_(base)))
  466. return NULL;
  467. if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
  468. if (fatal_error(GetLastError()))
  469. return NULL;
  470. }
  471. if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
  472. return NULL;
  473. bev = &bev_a->bev.bev;
  474. if (!(bev->input = evbuffer_overlapped_new_(fd))) {
  475. mm_free(bev_a);
  476. return NULL;
  477. }
  478. if (!(bev->output = evbuffer_overlapped_new_(fd))) {
  479. evbuffer_free(bev->input);
  480. mm_free(bev_a);
  481. return NULL;
  482. }
  483. if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async,
  484. options)<0)
  485. goto err;
  486. evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
  487. evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
  488. event_overlapped_init_(&bev_a->connect_overlapped, connect_complete);
  489. event_overlapped_init_(&bev_a->read_overlapped, read_complete);
  490. event_overlapped_init_(&bev_a->write_overlapped, write_complete);
  491. bufferevent_init_generic_timeout_cbs_(bev);
  492. bev_a->ok = fd >= 0;
  493. return bev;
  494. err:
  495. bufferevent_free(&bev_a->bev.bev);
  496. return NULL;
  497. }
  498. void
  499. bufferevent_async_set_connected_(struct bufferevent *bev)
  500. {
  501. struct bufferevent_async *bev_async = upcast(bev);
  502. bev_async->ok = 1;
  503. /* Now's a good time to consider reading/writing */
  504. be_async_enable(bev, bev->enabled);
  505. }
  506. int
  507. bufferevent_async_can_connect_(struct bufferevent *bev)
  508. {
  509. const struct win32_extension_fns *ext =
  510. event_get_win32_extension_fns_();
  511. if (BEV_IS_ASYNC(bev) &&
  512. event_base_get_iocp_(bev->ev_base) &&
  513. ext && ext->ConnectEx)
  514. return 1;
  515. return 0;
  516. }
  517. int
  518. bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
  519. const struct sockaddr *sa, int socklen)
  520. {
  521. BOOL rc;
  522. struct bufferevent_async *bev_async = upcast(bev);
  523. struct sockaddr_storage ss;
  524. const struct win32_extension_fns *ext =
  525. event_get_win32_extension_fns_();
  526. EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
  527. /* ConnectEx() requires that the socket be bound to an address
  528. * with bind() before using, otherwise it will fail. We attempt
  529. * to issue a bind() here, taking into account that the error
  530. * code is set to WSAEINVAL when the socket is already bound. */
  531. memset(&ss, 0, sizeof(ss));
  532. if (sa->sa_family == AF_INET) {
  533. struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
  534. sin->sin_family = AF_INET;
  535. sin->sin_addr.s_addr = INADDR_ANY;
  536. } else if (sa->sa_family == AF_INET6) {
  537. struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
  538. sin6->sin6_family = AF_INET6;
  539. sin6->sin6_addr = in6addr_any;
  540. } else {
  541. /* Well, the user will have to bind() */
  542. return -1;
  543. }
  544. if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
  545. WSAGetLastError() != WSAEINVAL)
  546. return -1;
  547. event_base_add_virtual_(bev->ev_base);
  548. bufferevent_incref_(bev);
  549. rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
  550. &bev_async->connect_overlapped.overlapped);
  551. if (rc || WSAGetLastError() == ERROR_IO_PENDING)
  552. return 0;
  553. event_base_del_virtual_(bev->ev_base);
  554. bufferevent_decref_(bev);
  555. return -1;
  556. }
  557. static int
  558. be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
  559. union bufferevent_ctrl_data *data)
  560. {
  561. switch (op) {
  562. case BEV_CTRL_GET_FD:
  563. data->fd = evbuffer_overlapped_get_fd_(bev->input);
  564. return 0;
  565. case BEV_CTRL_SET_FD: {
  566. struct bufferevent_async *bev_a = upcast(bev);
  567. struct event_iocp_port *iocp;
  568. if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
  569. return 0;
  570. if (!(iocp = event_base_get_iocp_(bev->ev_base)))
  571. return -1;
  572. if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) {
  573. if (fatal_error(GetLastError()))
  574. return -1;
  575. }
  576. evbuffer_overlapped_set_fd_(bev->input, data->fd);
  577. evbuffer_overlapped_set_fd_(bev->output, data->fd);
  578. bev_a->ok = data->fd >= 0;
  579. return 0;
  580. }
  581. case BEV_CTRL_CANCEL_ALL: {
  582. struct bufferevent_async *bev_a = upcast(bev);
  583. evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input);
  584. if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
  585. (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
  586. closesocket(fd);
  587. evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
  588. }
  589. bev_a->ok = 0;
  590. return 0;
  591. }
  592. case BEV_CTRL_GET_UNDERLYING:
  593. default:
  594. return -1;
  595. }
  596. }