buffer_iocp.c 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. /*
  2. * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
  3. *
  4. * Redistribution and use in source and binary forms, with or without
  5. * modification, are permitted provided that the following conditions
  6. * are met:
  7. * 1. Redistributions of source code must retain the above copyright
  8. * notice, this list of conditions and the following disclaimer.
  9. * 2. Redistributions in binary form must reproduce the above copyright
  10. * notice, this list of conditions and the following disclaimer in the
  11. * documentation and/or other materials provided with the distribution.
  12. * 3. The name of the author may not be used to endorse or promote products
  13. * derived from this software without specific prior written permission.
  14. *
  15. * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
  16. * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
  17. * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
  18. * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
  19. * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
  20. * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  21. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  22. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  23. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
  24. * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  25. */
  26. /**
  27. @file buffer_iocp.c
  28. This module implements overlapped read and write functions for evbuffer
  29. objects on Windows.
  30. */
  31. #include "event2/event-config.h"
  32. #include "evconfig-private.h"
  33. #include "event2/buffer.h"
  34. #include "event2/buffer_compat.h"
  35. #include "event2/util.h"
  36. #include "event2/thread.h"
  37. #include "util-internal.h"
  38. #include "evthread-internal.h"
  39. #include "evbuffer-internal.h"
  40. #include "iocp-internal.h"
  41. #include "mm-internal.h"
  42. #include <winsock2.h>
  43. #include <winerror.h>
  44. #include <windows.h>
  45. #include <stdio.h>
  46. #define MAX_WSABUFS 16
  47. /** An evbuffer that can handle overlapped IO. */
  48. struct evbuffer_overlapped {
  49. struct evbuffer buffer;
  50. /** The socket that we're doing overlapped IO on. */
  51. evutil_socket_t fd;
  52. /** pending I/O type */
  53. unsigned read_in_progress : 1;
  54. unsigned write_in_progress : 1;
  55. /** The first pinned chain in the buffer. */
  56. struct evbuffer_chain *first_pinned;
  57. /** How many chains are pinned; how many of the fields in buffers
  58. * are we using. */
  59. int n_buffers;
  60. WSABUF buffers[MAX_WSABUFS];
  61. };
  62. /** Given an evbuffer, return the correponding evbuffer structure, or NULL if
  63. * the evbuffer isn't overlapped. */
  64. static inline struct evbuffer_overlapped *
  65. upcast_evbuffer(struct evbuffer *buf)
  66. {
  67. if (!buf || !buf->is_overlapped)
  68. return NULL;
  69. return EVUTIL_UPCAST(buf, struct evbuffer_overlapped, buffer);
  70. }
  71. /** Unpin all the chains noted as pinned in 'eo'. */
  72. static void
  73. pin_release(struct evbuffer_overlapped *eo, unsigned flag)
  74. {
  75. int i;
  76. struct evbuffer_chain *next, *chain = eo->first_pinned;
  77. for (i = 0; i < eo->n_buffers; ++i) {
  78. EVUTIL_ASSERT(chain);
  79. next = chain->next;
  80. evbuffer_chain_unpin_(chain, flag);
  81. chain = next;
  82. }
  83. }
  84. void
  85. evbuffer_commit_read_(struct evbuffer *evbuf, ev_ssize_t nBytes)
  86. {
  87. struct evbuffer_overlapped *buf = upcast_evbuffer(evbuf);
  88. struct evbuffer_chain **chainp;
  89. size_t remaining, len;
  90. unsigned i;
  91. EVBUFFER_LOCK(evbuf);
  92. EVUTIL_ASSERT(buf->read_in_progress && !buf->write_in_progress);
  93. EVUTIL_ASSERT(nBytes >= 0); /* XXXX Can this be false? */
  94. evbuffer_unfreeze(evbuf, 0);
  95. chainp = evbuf->last_with_datap;
  96. if (!((*chainp)->flags & EVBUFFER_MEM_PINNED_R))
  97. chainp = &(*chainp)->next;
  98. remaining = nBytes;
  99. for (i = 0; remaining > 0 && i < (unsigned)buf->n_buffers; ++i) {
  100. EVUTIL_ASSERT(*chainp);
  101. len = buf->buffers[i].len;
  102. if (remaining < len)
  103. len = remaining;
  104. (*chainp)->off += len;
  105. evbuf->last_with_datap = chainp;
  106. remaining -= len;
  107. chainp = &(*chainp)->next;
  108. }
  109. pin_release(buf, EVBUFFER_MEM_PINNED_R);
  110. buf->read_in_progress = 0;
  111. evbuf->total_len += nBytes;
  112. evbuf->n_add_for_cb += nBytes;
  113. evbuffer_invoke_callbacks_(evbuf);
  114. evbuffer_decref_and_unlock_(evbuf);
  115. }
  116. void
  117. evbuffer_commit_write_(struct evbuffer *evbuf, ev_ssize_t nBytes)
  118. {
  119. struct evbuffer_overlapped *buf = upcast_evbuffer(evbuf);
  120. EVBUFFER_LOCK(evbuf);
  121. EVUTIL_ASSERT(buf->write_in_progress && !buf->read_in_progress);
  122. evbuffer_unfreeze(evbuf, 1);
  123. evbuffer_drain(evbuf, nBytes);
  124. pin_release(buf,EVBUFFER_MEM_PINNED_W);
  125. buf->write_in_progress = 0;
  126. evbuffer_decref_and_unlock_(evbuf);
  127. }
  128. struct evbuffer *
  129. evbuffer_overlapped_new_(evutil_socket_t fd)
  130. {
  131. struct evbuffer_overlapped *evo;
  132. evo = mm_calloc(1, sizeof(struct evbuffer_overlapped));
  133. if (!evo)
  134. return NULL;
  135. LIST_INIT(&evo->buffer.callbacks);
  136. evo->buffer.refcnt = 1;
  137. evo->buffer.last_with_datap = &evo->buffer.first;
  138. evo->buffer.is_overlapped = 1;
  139. evo->fd = fd;
  140. return &evo->buffer;
  141. }
  142. int
  143. evbuffer_launch_write_(struct evbuffer *buf, ev_ssize_t at_most,
  144. struct event_overlapped *ol)
  145. {
  146. struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
  147. int r = -1;
  148. int i;
  149. struct evbuffer_chain *chain;
  150. DWORD bytesSent;
  151. if (!buf) {
  152. /* No buffer, or it isn't overlapped */
  153. return -1;
  154. }
  155. EVBUFFER_LOCK(buf);
  156. EVUTIL_ASSERT(!buf_o->read_in_progress);
  157. if (buf->freeze_start || buf_o->write_in_progress)
  158. goto done;
  159. if (!buf->total_len) {
  160. /* Nothing to write */
  161. r = 0;
  162. goto done;
  163. } else if (at_most < 0 || (size_t)at_most > buf->total_len) {
  164. at_most = buf->total_len;
  165. }
  166. evbuffer_freeze(buf, 1);
  167. buf_o->first_pinned = NULL;
  168. buf_o->n_buffers = 0;
  169. memset(buf_o->buffers, 0, sizeof(buf_o->buffers));
  170. chain = buf_o->first_pinned = buf->first;
  171. for (i=0; i < MAX_WSABUFS && chain; ++i, chain=chain->next) {
  172. WSABUF *b = &buf_o->buffers[i];
  173. b->buf = (char*)( chain->buffer + chain->misalign );
  174. evbuffer_chain_pin_(chain, EVBUFFER_MEM_PINNED_W);
  175. if ((size_t)at_most > chain->off) {
  176. /* XXXX Cast is safe for now, since win32 has no
  177. mmaped chains. But later, we need to have this
  178. add more WSAbufs if chain->off is greater than
  179. ULONG_MAX */
  180. b->len = (unsigned long)chain->off;
  181. at_most -= chain->off;
  182. } else {
  183. b->len = (unsigned long)at_most;
  184. ++i;
  185. break;
  186. }
  187. }
  188. buf_o->n_buffers = i;
  189. evbuffer_incref_(buf);
  190. if (WSASend(buf_o->fd, buf_o->buffers, i, &bytesSent, 0,
  191. &ol->overlapped, NULL)) {
  192. int error = WSAGetLastError();
  193. if (error != WSA_IO_PENDING) {
  194. /* An actual error. */
  195. pin_release(buf_o, EVBUFFER_MEM_PINNED_W);
  196. evbuffer_unfreeze(buf, 1);
  197. evbuffer_free(buf); /* decref */
  198. goto done;
  199. }
  200. }
  201. buf_o->write_in_progress = 1;
  202. r = 0;
  203. done:
  204. EVBUFFER_UNLOCK(buf);
  205. return r;
  206. }
  207. int
  208. evbuffer_launch_read_(struct evbuffer *buf, size_t at_most,
  209. struct event_overlapped *ol)
  210. {
  211. struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
  212. int r = -1, i;
  213. int nvecs;
  214. int npin=0;
  215. struct evbuffer_chain *chain=NULL, **chainp;
  216. DWORD bytesRead;
  217. DWORD flags = 0;
  218. struct evbuffer_iovec vecs[MAX_WSABUFS];
  219. if (!buf_o)
  220. return -1;
  221. EVBUFFER_LOCK(buf);
  222. EVUTIL_ASSERT(!buf_o->write_in_progress);
  223. if (buf->freeze_end || buf_o->read_in_progress)
  224. goto done;
  225. buf_o->first_pinned = NULL;
  226. buf_o->n_buffers = 0;
  227. memset(buf_o->buffers, 0, sizeof(buf_o->buffers));
  228. if (evbuffer_expand_fast_(buf, at_most, MAX_WSABUFS) == -1)
  229. goto done;
  230. evbuffer_freeze(buf, 0);
  231. nvecs = evbuffer_read_setup_vecs_(buf, at_most,
  232. vecs, MAX_WSABUFS, &chainp, 1);
  233. for (i=0;i<nvecs;++i) {
  234. WSABUF_FROM_EVBUFFER_IOV(
  235. &buf_o->buffers[i],
  236. &vecs[i]);
  237. }
  238. buf_o->n_buffers = nvecs;
  239. buf_o->first_pinned = chain = *chainp;
  240. npin=0;
  241. for ( ; chain; chain = chain->next) {
  242. evbuffer_chain_pin_(chain, EVBUFFER_MEM_PINNED_R);
  243. ++npin;
  244. }
  245. EVUTIL_ASSERT(npin == nvecs);
  246. evbuffer_incref_(buf);
  247. if (WSARecv(buf_o->fd, buf_o->buffers, nvecs, &bytesRead, &flags,
  248. &ol->overlapped, NULL)) {
  249. int error = WSAGetLastError();
  250. if (error != WSA_IO_PENDING) {
  251. /* An actual error. */
  252. pin_release(buf_o, EVBUFFER_MEM_PINNED_R);
  253. evbuffer_unfreeze(buf, 0);
  254. evbuffer_free(buf); /* decref */
  255. goto done;
  256. }
  257. }
  258. buf_o->read_in_progress = 1;
  259. r = 0;
  260. done:
  261. EVBUFFER_UNLOCK(buf);
  262. return r;
  263. }
  264. evutil_socket_t
  265. evbuffer_overlapped_get_fd_(struct evbuffer *buf)
  266. {
  267. struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
  268. return buf_o ? buf_o->fd : -1;
  269. }
  270. void
  271. evbuffer_overlapped_set_fd_(struct evbuffer *buf, evutil_socket_t fd)
  272. {
  273. struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
  274. EVBUFFER_LOCK(buf);
  275. /* XXX is this right?, should it cancel current I/O operations? */
  276. if (buf_o)
  277. buf_o->fd = fd;
  278. EVBUFFER_UNLOCK(buf);
  279. }