io.cc 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2013 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief Connection Definitions
  41. */
  42. #include "gear_config.h"
  43. #include "libgearman-server/common.h"
  44. #include <libgearman-server/plugins/base.h>
  45. #include <cstring>
  46. #include <cerrno>
  47. #include <cassert>
  48. #ifndef SOCK_NONBLOCK
  49. # define SOCK_NONBLOCK 0
  50. #endif
  51. static void _connection_close(gearmand_io_st *connection)
  52. {
  53. if (connection->fd == INVALID_SOCKET)
  54. {
  55. return;
  56. }
  57. if (connection->options.external_fd)
  58. {
  59. connection->options.external_fd= false;
  60. }
  61. else
  62. {
  63. #if defined(HAVE_CYASSL) && HAVE_CYASSL
  64. if (connection->root and connection->root->_ssl)
  65. {
  66. CyaSSL_shutdown(connection->root->_ssl);
  67. }
  68. #endif
  69. (void)gearmand_sockfd_close(connection->fd);
  70. assert_msg(false, "We should never have an internal fd");
  71. }
  72. connection->_state= gearmand_io_st::GEARMAND_CON_UNIVERSAL_INVALID;
  73. connection->fd= INVALID_SOCKET;
  74. connection->events= 0;
  75. connection->revents= 0;
  76. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
  77. connection->send_buffer_ptr= connection->send_buffer;
  78. connection->send_buffer_size= 0;
  79. connection->send_data_size= 0;
  80. connection->send_data_offset= 0;
  81. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
  82. if (connection->recv_packet != NULL)
  83. {
  84. gearmand_packet_free(connection->recv_packet);
  85. connection->recv_packet= NULL;
  86. }
  87. connection->recv_buffer_ptr= connection->recv_buffer;
  88. connection->recv_buffer_size= 0;
  89. }
  90. static size_t _connection_read(gearman_server_con_st *con, void *data, size_t data_size, gearmand_error_t &ret)
  91. {
  92. ssize_t read_size;
  93. gearmand_io_st *connection= &con->con;
  94. while (1)
  95. {
  96. #if defined(HAVE_CYASSL) && HAVE_CYASSL
  97. read_size= CyaSSL_recv(con->_ssl, data, data_size, MSG_DONTWAIT);
  98. #else
  99. read_size= recv(connection->fd, data, data_size, MSG_DONTWAIT);
  100. #endif
  101. if (read_size == 0)
  102. {
  103. ret= GEARMAND_LOST_CONNECTION;
  104. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,
  105. "Peer connection has called close() %s:%s",
  106. connection->context == NULL ? "-" : connection->context->host,
  107. connection->context == NULL ? "-" : connection->context->port);
  108. _connection_close(connection);
  109. return 0;
  110. }
  111. else if (read_size == -1)
  112. {
  113. int local_errno= errno;
  114. switch (local_errno)
  115. {
  116. case EAGAIN:
  117. ret= gearmand_io_set_events(con, POLLIN);
  118. if (gearmand_failed(ret))
  119. {
  120. gearmand_perror(local_errno, "recv");
  121. return 0;
  122. }
  123. ret= GEARMAND_IO_WAIT;
  124. return 0;
  125. case EINTR:
  126. continue;
  127. case EPIPE:
  128. case ECONNRESET:
  129. case EHOSTDOWN:
  130. {
  131. ret= GEARMAND_LOST_CONNECTION;
  132. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,
  133. "Peer connection has called close() %s:%s",
  134. connection->context == NULL ? "-" : connection->context->host,
  135. connection->context == NULL ? "-" : connection->context->port);
  136. _connection_close(connection);
  137. return 0;
  138. }
  139. default:
  140. ret= GEARMAND_ERRNO;
  141. }
  142. gearmand_perror(local_errno, "closing connection due to previous errno error");
  143. _connection_close(connection);
  144. return 0;
  145. }
  146. break;
  147. }
  148. ret= GEARMAND_SUCCESS;
  149. return size_t(read_size);
  150. }
  151. static gearmand_error_t gearmand_connection_recv_data(gearman_server_con_st *con, void *data, size_t data_size)
  152. {
  153. gearmand_io_st *connection= &con->con;
  154. if (connection->recv_data_size == 0)
  155. {
  156. return GEARMAND_SUCCESS;
  157. }
  158. if ((connection->recv_data_size - connection->recv_data_offset) < data_size)
  159. {
  160. data_size= connection->recv_data_size - connection->recv_data_offset;
  161. }
  162. size_t recv_size= 0;
  163. if (connection->recv_buffer_size > 0)
  164. {
  165. if (connection->recv_buffer_size < data_size)
  166. {
  167. recv_size= connection->recv_buffer_size;
  168. }
  169. else
  170. {
  171. recv_size= data_size;
  172. }
  173. memcpy(data, connection->recv_buffer_ptr, recv_size);
  174. connection->recv_buffer_ptr+= recv_size;
  175. connection->recv_buffer_size-= recv_size;
  176. }
  177. gearmand_error_t ret;
  178. if (data_size != recv_size)
  179. {
  180. recv_size+= _connection_read(con, ((uint8_t *)data) + recv_size, data_size - recv_size, ret);
  181. connection->recv_data_offset+= recv_size;
  182. }
  183. else
  184. {
  185. connection->recv_data_offset+= recv_size;
  186. ret= GEARMAND_SUCCESS;
  187. }
  188. if (connection->recv_data_size == connection->recv_data_offset)
  189. {
  190. connection->recv_data_size= 0;
  191. connection->recv_data_offset= 0;
  192. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
  193. }
  194. return ret;
  195. }
  196. static gearmand_error_t _connection_flush(gearman_server_con_st *con)
  197. {
  198. gearmand_io_st *connection= &con->con;
  199. assert(connection->_state == gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED);
  200. while (1)
  201. {
  202. switch (connection->_state)
  203. {
  204. case gearmand_io_st::GEARMAND_CON_UNIVERSAL_INVALID:
  205. assert(0);
  206. return GEARMAND_ERRNO;
  207. case gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED:
  208. while (connection->send_buffer_size)
  209. {
  210. ssize_t write_size;
  211. #if defined(HAVE_CYASSL) && HAVE_CYASSL
  212. write_size= CyaSSL_send(con->_ssl, connection->send_buffer_ptr, connection->send_buffer_size, MSG_NOSIGNAL|MSG_DONTWAIT);
  213. #else
  214. write_size= send(connection->fd, connection->send_buffer_ptr, connection->send_buffer_size, MSG_NOSIGNAL|MSG_DONTWAIT);
  215. #endif
  216. if (write_size == 0) // detect infinite loop?
  217. {
  218. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "send() sent zero bytes to peer %s:%s",
  219. connection->context == NULL ? "-" : connection->context->host,
  220. connection->context == NULL ? "-" : connection->context->port);
  221. continue;
  222. }
  223. else if (write_size == -1)
  224. {
  225. int local_errno= errno;
  226. switch (local_errno)
  227. {
  228. case EAGAIN:
  229. {
  230. gearmand_error_t gret= gearmand_io_set_events(con, POLLOUT);
  231. if (gret != GEARMAND_SUCCESS)
  232. {
  233. return gret;
  234. }
  235. return GEARMAND_IO_WAIT;
  236. }
  237. case EINTR:
  238. continue;
  239. case EPIPE:
  240. case ECONNRESET:
  241. case EHOSTDOWN:
  242. gearmand_perror(local_errno, "lost connection to client during send(EPIPE || ECONNRESET || EHOSTDOWN)");
  243. _connection_close(connection);
  244. return GEARMAND_LOST_CONNECTION;
  245. default:
  246. break;
  247. }
  248. gearmand_perror(local_errno, "send() failed, closing connection");
  249. _connection_close(connection);
  250. return GEARMAND_ERRNO;
  251. }
  252. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "send() %u bytes to peer %s:%s",
  253. uint32_t(write_size),
  254. connection->context == NULL ? "-" : connection->context->host,
  255. connection->context == NULL ? "-" : connection->context->port);
  256. connection->send_buffer_size-= static_cast<size_t>(write_size);
  257. if (connection->send_state == gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA)
  258. {
  259. connection->send_data_offset+= static_cast<size_t>(write_size);
  260. if (connection->send_data_offset == connection->send_data_size)
  261. {
  262. connection->send_data_size= 0;
  263. connection->send_data_offset= 0;
  264. break;
  265. }
  266. if (connection->send_buffer_size == 0)
  267. {
  268. return GEARMAND_SUCCESS;
  269. }
  270. }
  271. else if (connection->send_buffer_size == 0)
  272. {
  273. break;
  274. }
  275. connection->send_buffer_ptr+= write_size;
  276. }
  277. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
  278. connection->send_buffer_ptr= connection->send_buffer;
  279. return GEARMAND_SUCCESS;
  280. }
  281. }
  282. }
  283. /**
  284. * @addtogroup gearmand_io_static Static Connection Declarations
  285. * @ingroup gearman_connection
  286. * @{
  287. */
  288. void gearmand_connection_init(gearmand_connection_list_st *gearman,
  289. gearmand_io_st *connection,
  290. gearmand_con_st *dcon,
  291. gearmand_connection_options_t *options)
  292. {
  293. assert(gearman);
  294. assert(connection);
  295. connection->options.ready= false;
  296. connection->options.packet_in_use= false;
  297. connection->options.external_fd= false;
  298. connection->options.close_after_flush= false;
  299. if (options)
  300. {
  301. while (*options != GEARMAND_CON_MAX)
  302. {
  303. gearman_io_set_option(connection, *options, true);
  304. options++;
  305. }
  306. }
  307. connection->_state= gearmand_io_st::GEARMAND_CON_UNIVERSAL_INVALID;
  308. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
  309. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
  310. connection->events= 0;
  311. connection->revents= 0;
  312. connection->fd= INVALID_SOCKET;
  313. connection->created_id= 0;
  314. connection->created_id_next= 0;
  315. connection->send_buffer_size= 0;
  316. connection->send_data_size= 0;
  317. connection->send_data_offset= 0;
  318. connection->recv_buffer_size= 0;
  319. connection->recv_data_size= 0;
  320. connection->recv_data_offset= 0;
  321. connection->universal= gearman;
  322. GEARMAND_LIST__ADD(gearman->con, connection);
  323. connection->context= dcon;
  324. connection->send_buffer_ptr= connection->send_buffer;
  325. connection->recv_packet= NULL;
  326. connection->recv_buffer_ptr= connection->recv_buffer;
  327. }
  328. void gearmand_connection_list_st::list_free()
  329. {
  330. while (con_list)
  331. {
  332. gearmand_io_free(con_list);
  333. }
  334. }
  335. gearmand_connection_list_st::gearmand_connection_list_st() :
  336. con_count(0),
  337. con_list(NULL),
  338. event_watch_fn(NULL),
  339. event_watch_context(NULL)
  340. {
  341. }
  342. void gearmand_connection_list_st::init(gearmand_event_watch_fn *watch_fn, void *watch_context)
  343. {
  344. ready_con_count= 0;
  345. ready_con_list= NULL;
  346. con_count= 0;
  347. con_list= NULL;
  348. event_watch_fn= watch_fn;
  349. event_watch_context= watch_context;
  350. }
  351. void gearmand_io_free(gearmand_io_st *connection)
  352. {
  353. if (connection->fd != INVALID_SOCKET)
  354. _connection_close(connection);
  355. if (connection->options.ready)
  356. {
  357. connection->options.ready= false;
  358. GEARMAND_LIST_DEL(connection->universal->ready_con, connection, ready_);
  359. }
  360. GEARMAND_LIST__DEL(connection->universal->con, connection);
  361. if (connection->options.packet_in_use)
  362. {
  363. gearmand_packet_free(&(connection->packet));
  364. }
  365. }
  366. gearmand_error_t gearman_io_set_option(gearmand_io_st *connection,
  367. gearmand_connection_options_t options,
  368. bool value)
  369. {
  370. switch (options)
  371. {
  372. case GEARMAND_CON_PACKET_IN_USE:
  373. connection->options.packet_in_use= value;
  374. break;
  375. case GEARMAND_CON_EXTERNAL_FD:
  376. connection->options.external_fd= value;
  377. break;
  378. case GEARMAND_CON_CLOSE_AFTER_FLUSH:
  379. connection->options.close_after_flush= value;
  380. break;
  381. case GEARMAND_CON_MAX:
  382. return GEARMAND_INVALID_COMMAND;
  383. }
  384. return GEARMAND_SUCCESS;
  385. }
  386. /**
  387. * Set socket options for a connection.
  388. */
  389. static gearmand_error_t _io_setsockopt(gearmand_io_st &connection);
  390. /** @} */
  391. /*
  392. * Public Definitions
  393. */
  394. gearmand_error_t gearman_io_set_fd(gearmand_io_st *connection, int fd)
  395. {
  396. assert(connection);
  397. connection->options.external_fd= true;
  398. connection->fd= fd;
  399. connection->_state= gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED;
  400. return _io_setsockopt(*connection);
  401. }
  402. gearmand_con_st *gearman_io_context(const gearmand_io_st *connection)
  403. {
  404. return connection->context;
  405. }
  406. gearmand_error_t gearman_io_send(gearman_server_con_st *con,
  407. const gearmand_packet_st *packet, bool flush)
  408. {
  409. size_t send_size;
  410. gearmand_io_st *connection= &con->con;
  411. switch (connection->send_state)
  412. {
  413. case gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE:
  414. if (! (packet->options.complete))
  415. {
  416. gearmand_error("packet not complete");
  417. return GEARMAND_INVALID_PACKET;
  418. }
  419. /* Pack first part of packet, which is everything but the payload. */
  420. while (1)
  421. {
  422. gearmand_error_t ret;
  423. send_size= con->protocol->pack(packet,
  424. con,
  425. connection->send_buffer +connection->send_buffer_size,
  426. GEARMAND_SEND_BUFFER_SIZE -connection->send_buffer_size,
  427. ret);
  428. if (ret == GEARMAND_SUCCESS)
  429. {
  430. connection->send_buffer_size+= send_size;
  431. break;
  432. }
  433. else if (ret == GEARMAND_IGNORE_PACKET)
  434. {
  435. return GEARMAND_SUCCESS;
  436. }
  437. else if (ret != GEARMAND_FLUSH_DATA)
  438. {
  439. return ret;
  440. }
  441. /* We were asked to flush when the buffer is already flushed! */
  442. if (connection->send_buffer_size == 0)
  443. {
  444. gearmand_error("send buffer too small");
  445. return GEARMAND_SEND_BUFFER_TOO_SMALL;
  446. }
  447. /* Flush buffer now if first part of packet won't fit in. */
  448. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_PRE_FLUSH;
  449. case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_PRE_FLUSH:
  450. {
  451. gearmand_error_t local_ret;
  452. if ((local_ret= _connection_flush(con)) != GEARMAND_SUCCESS)
  453. {
  454. return local_ret;
  455. }
  456. }
  457. }
  458. /* Return here if we have no data to send. */
  459. if (packet->data_size == 0)
  460. {
  461. break;
  462. }
  463. /* If there is any room in the buffer, copy in data. */
  464. if (packet->data and (GEARMAND_SEND_BUFFER_SIZE - connection->send_buffer_size) > 0)
  465. {
  466. connection->send_data_offset= GEARMAND_SEND_BUFFER_SIZE - connection->send_buffer_size;
  467. if (connection->send_data_offset > packet->data_size)
  468. {
  469. connection->send_data_offset= packet->data_size;
  470. }
  471. memcpy(connection->send_buffer +connection->send_buffer_size,
  472. packet->data,
  473. connection->send_data_offset);
  474. connection->send_buffer_size+= connection->send_data_offset;
  475. /* Return if all data fit in the send buffer. */
  476. if (connection->send_data_offset == packet->data_size)
  477. {
  478. connection->send_data_offset= 0;
  479. break;
  480. }
  481. }
  482. /* Flush buffer now so we can start writing directly from data buffer. */
  483. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FORCE_FLUSH;
  484. case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FORCE_FLUSH:
  485. {
  486. gearmand_error_t local_ret;
  487. if ((local_ret= _connection_flush(con)) != GEARMAND_SUCCESS)
  488. {
  489. return local_ret;
  490. }
  491. }
  492. connection->send_data_size= packet->data_size;
  493. /* If this is NULL, then ?? function will be used. */
  494. if (packet->data == NULL)
  495. {
  496. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA;
  497. return GEARMAND_SUCCESS;
  498. }
  499. /* Copy into the buffer if it fits, otherwise flush from packet buffer. */
  500. connection->send_buffer_size= packet->data_size - connection->send_data_offset;
  501. if (connection->send_buffer_size < GEARMAND_SEND_BUFFER_SIZE)
  502. {
  503. memcpy(connection->send_buffer,
  504. packet->data + connection->send_data_offset,
  505. connection->send_buffer_size);
  506. connection->send_data_size= 0;
  507. connection->send_data_offset= 0;
  508. break;
  509. }
  510. connection->send_buffer_ptr= const_cast<char *>(packet->data) + connection->send_data_offset;
  511. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA;
  512. case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH:
  513. case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA:
  514. {
  515. gearmand_error_t local_ret= _connection_flush(con);
  516. if (local_ret == GEARMAND_SUCCESS and
  517. connection->options.close_after_flush)
  518. {
  519. _connection_close(connection);
  520. local_ret= GEARMAND_LOST_CONNECTION;
  521. gearmand_debug("closing connection after flush by request");
  522. }
  523. return local_ret;
  524. }
  525. }
  526. if (flush)
  527. {
  528. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH;
  529. gearmand_error_t local_ret= _connection_flush(con);
  530. if (local_ret == GEARMAND_SUCCESS and connection->options.close_after_flush)
  531. {
  532. _connection_close(connection);
  533. local_ret= GEARMAND_LOST_CONNECTION;
  534. gearmand_debug("closing connection after flush by request");
  535. }
  536. return local_ret;
  537. }
  538. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
  539. return GEARMAND_SUCCESS;
  540. }
  541. #pragma GCC diagnostic push
  542. #ifndef __INTEL_COMPILER
  543. #pragma GCC diagnostic ignored "-Wold-style-cast"
  544. #endif
  545. gearmand_error_t gearman_io_recv(gearman_server_con_st *con, bool recv_data)
  546. {
  547. gearmand_io_st *connection= &con->con;
  548. gearmand_packet_st *packet= &(con->packet->packet);
  549. switch (connection->recv_state)
  550. {
  551. case gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE:
  552. if (connection->_state != gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED)
  553. {
  554. gearmand_error("not connected");
  555. return GEARMAND_NOT_CONNECTED;
  556. }
  557. connection->recv_packet= packet;
  558. // The options being passed in are just defaults.
  559. connection->recv_packet->reset(GEARMAN_MAGIC_TEXT, GEARMAN_COMMAND_TEXT);
  560. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_READ;
  561. case gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_READ:
  562. while (1)
  563. {
  564. gearmand_error_t ret;
  565. if (connection->recv_buffer_size > 0)
  566. {
  567. assert(con->protocol);
  568. size_t recv_size= con->protocol->unpack(connection->recv_packet,
  569. con,
  570. connection->recv_buffer_ptr,
  571. connection->recv_buffer_size, ret);
  572. connection->recv_buffer_ptr+= recv_size;
  573. connection->recv_buffer_size-= recv_size;
  574. if (gearmand_success(ret))
  575. {
  576. break;
  577. }
  578. else if (ret != GEARMAND_IO_WAIT)
  579. {
  580. gearmand_gerror_warn("protocol failure, closing connection", ret);
  581. _connection_close(connection);
  582. return ret;
  583. }
  584. }
  585. /* Shift buffer contents if needed. */
  586. if (connection->recv_buffer_size > 0)
  587. {
  588. memmove(connection->recv_buffer, connection->recv_buffer_ptr, connection->recv_buffer_size);
  589. }
  590. connection->recv_buffer_ptr= connection->recv_buffer;
  591. size_t recv_size= _connection_read(con, connection->recv_buffer + connection->recv_buffer_size,
  592. GEARMAND_RECV_BUFFER_SIZE - connection->recv_buffer_size, ret);
  593. if (gearmand_failed(ret))
  594. {
  595. // GEARMAND_LOST_CONNECTION is not worth a warning, clients/workers just
  596. // drop connections for close.
  597. if (ret != GEARMAND_LOST_CONNECTION)
  598. {
  599. gearmand_gerror_warn("Failed while in _connection_read()", ret);
  600. }
  601. return ret;
  602. }
  603. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "read %lu bytes", (unsigned long)recv_size);
  604. connection->recv_buffer_size+= recv_size;
  605. }
  606. if (packet->data_size == 0)
  607. {
  608. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
  609. break;
  610. }
  611. connection->recv_data_size= packet->data_size;
  612. if (not recv_data)
  613. {
  614. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_STATE_READ_DATA;
  615. break;
  616. }
  617. packet->data= static_cast<char *>(realloc(NULL, packet->data_size));
  618. if (not packet->data)
  619. {
  620. // Server up the memory error first, in case _connection_close()
  621. // creates any.
  622. gearmand_merror("realloc", char, packet->data_size);
  623. _connection_close(connection);
  624. return GEARMAND_MEMORY_ALLOCATION_FAILURE;
  625. }
  626. packet->options.free_data= true;
  627. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_STATE_READ_DATA;
  628. case gearmand_io_st::GEARMAND_CON_RECV_STATE_READ_DATA:
  629. while (connection->recv_data_size)
  630. {
  631. gearmand_error_t ret;
  632. ret= gearmand_connection_recv_data(con,
  633. ((uint8_t *)(packet->data)) +
  634. connection->recv_data_offset,
  635. packet->data_size -
  636. connection->recv_data_offset);
  637. if (gearmand_failed(ret))
  638. {
  639. return ret;
  640. }
  641. }
  642. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
  643. break;
  644. }
  645. packet= connection->recv_packet;
  646. connection->recv_packet= NULL;
  647. return GEARMAND_SUCCESS;
  648. }
  649. gearmand_error_t gearmand_io_set_events(gearman_server_con_st *con, short events)
  650. {
  651. gearmand_io_st *connection= &con->con;
  652. if ((connection->events | events) == connection->events)
  653. {
  654. return GEARMAND_SUCCESS;
  655. }
  656. connection->events|= events;
  657. if (connection->universal->event_watch_fn)
  658. {
  659. gearmand_error_t ret= connection->universal->event_watch_fn(connection, connection->events,
  660. (void *)connection->universal->event_watch_context);
  661. if (gearmand_failed(ret))
  662. {
  663. gearmand_gerror_warn("event watch failed, closing connection", ret);
  664. _connection_close(connection);
  665. return ret;
  666. }
  667. }
  668. return GEARMAND_SUCCESS;
  669. }
  670. gearmand_error_t gearmand_io_set_revents(gearman_server_con_st *con, short revents)
  671. {
  672. gearmand_io_st *connection= &con->con;
  673. if (revents != 0)
  674. {
  675. connection->options.ready= true;
  676. GEARMAND_LIST_ADD(connection->universal->ready_con, connection, ready_);
  677. }
  678. connection->revents= revents;
  679. /* Remove external POLLOUT watch if we didn't ask for it. Otherwise we spin
  680. forever until another POLLIN state change. This is much more efficient
  681. than removing POLLOUT on every state change since some external polling
  682. mechanisms need to use a system call to change flags (like Linux epoll). */
  683. if (revents & POLLOUT && !(connection->events & POLLOUT) &&
  684. connection->universal->event_watch_fn != NULL)
  685. {
  686. gearmand_error_t ret= connection->universal->event_watch_fn(connection, connection->events,
  687. (void *)connection->universal->event_watch_context);
  688. if (gearmand_failed(ret))
  689. {
  690. gearmand_gerror_warn("event watch failed, closing connection", ret);
  691. _connection_close(connection);
  692. return ret;
  693. }
  694. }
  695. connection->events&= (short)~revents;
  696. return GEARMAND_SUCCESS;
  697. }
  698. /*
  699. * Static Definitions
  700. */
  701. static gearmand_error_t _io_setsockopt(gearmand_io_st &connection)
  702. {
  703. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "setsockopt() fd:%d", connection.fd);
  704. {
  705. int setting= 1;
  706. if (setsockopt(connection.fd, IPPROTO_TCP, TCP_NODELAY, &setting, (socklen_t)sizeof(int)) and errno != EOPNOTSUPP)
  707. {
  708. return gearmand_perror(errno, "setsockopt(TCP_NODELAY)");
  709. }
  710. }
  711. {
  712. struct linger linger;
  713. linger.l_onoff= 1;
  714. linger.l_linger= GEARMAND_DEFAULT_SOCKET_TIMEOUT;
  715. if (setsockopt(connection.fd, SOL_SOCKET, SO_LINGER, &linger, (socklen_t)sizeof(struct linger)))
  716. {
  717. return gearmand_perror(errno, "setsockopt(SO_LINGER)");
  718. }
  719. }
  720. #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
  721. {
  722. int setting= 1;
  723. // This is not considered a fatal error
  724. if (setsockopt(connection.fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&setting, sizeof(int)))
  725. {
  726. gearmand_perror(errno, "setsockopt(SO_NOSIGPIPE)");
  727. }
  728. }
  729. #endif
  730. #if 0
  731. if (0)
  732. {
  733. struct timeval waittime;
  734. waittime.tv_sec= GEARMAND_DEFAULT_SOCKET_TIMEOUT;
  735. waittime.tv_usec= 0;
  736. if (setsockopt(connection.fd, SOL_SOCKET, SO_SNDTIMEO, &waittime, (socklen_t)sizeof(struct timeval)) and errno != ENOPROTOOPT)
  737. {
  738. return gearmand_perror(errno, "setsockopt(SO_SNDTIMEO)");
  739. }
  740. if (setsockopt(connection.fd, SOL_SOCKET, SO_RCVTIMEO, &waittime, (socklen_t)sizeof(struct timeval)) and errno != ENOPROTOOPT)
  741. {
  742. return gearmand_perror(errno, "setsockopt(SO_RCVTIMEO)");
  743. }
  744. }
  745. #endif
  746. #if 0
  747. if (0)
  748. {
  749. int setting= GEARMAND_DEFAULT_SOCKET_SEND_SIZE;
  750. if (setsockopt(connection.fd, SOL_SOCKET, SO_SNDBUF, &setting, (socklen_t)sizeof(int)))
  751. {
  752. return gearmand_perror(errno, "setsockopr(SO_SNDBUF)");
  753. }
  754. setting= GEARMAND_DEFAULT_SOCKET_RECV_SIZE;
  755. if (setsockopt(connection.fd, SOL_SOCKET, SO_RCVBUF, &setting, (socklen_t)sizeof(int)))
  756. {
  757. return gearmand_perror(errno, "setsockopt(SO_RCVBUF)");
  758. }
  759. }
  760. #endif
  761. if (SOCK_NONBLOCK == 0)
  762. {
  763. int flags;
  764. do
  765. {
  766. flags= fcntl(connection.fd, F_GETFL, 0);
  767. } while (flags == -1 and (errno == EINTR or errno == EAGAIN));
  768. if (flags == -1)
  769. {
  770. return gearmand_perror(errno, "fcntl(F_GETFL)");
  771. }
  772. else if ((flags & O_NONBLOCK) == 0)
  773. {
  774. int retval;
  775. do
  776. {
  777. retval= fcntl(connection.fd, F_SETFL, flags | O_NONBLOCK);
  778. } while (retval == -1 and (errno == EINTR or errno == EAGAIN));
  779. if (retval == -1)
  780. {
  781. return gearmand_perror(errno, "fcntl(F_SETFL)");
  782. }
  783. }
  784. }
  785. return GEARMAND_SUCCESS;
  786. }
  787. void gearmand_sockfd_close(int& sockfd)
  788. {
  789. if (sockfd == INVALID_SOCKET)
  790. {
  791. gearmand_error("gearmand_sockfd_close() called with an invalid socket");
  792. return;
  793. }
  794. /* in case of death shutdown to avoid blocking at close() */
  795. if (shutdown(sockfd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
  796. {
  797. gearmand_perror(errno, "shutdown");
  798. assert(errno != ENOTSOCK);
  799. }
  800. else if (closesocket(sockfd) == SOCKET_ERROR)
  801. {
  802. gearmand_perror(errno, "close");
  803. }
  804. sockfd= INVALID_SOCKET;
  805. }
  806. void gearmand_pipe_close(int& pipefd)
  807. {
  808. if (pipefd == INVALID_SOCKET)
  809. {
  810. gearmand_error("gearmand_pipe_close() called with an invalid socket");
  811. return;
  812. }
  813. if (closesocket(pipefd) == SOCKET_ERROR)
  814. {
  815. gearmand_perror(errno, "close");
  816. }
  817. pipefd= -1;
  818. }
  819. #pragma GCC diagnostic pop