io.cc 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829
  1. /* Gearman server and library
  2. * Copyright (C) 2008 Brian Aker, Eric Day
  3. * All rights reserved.
  4. *
  5. * Use and distribution licensed under the BSD license. See
  6. * the COPYING file in the parent directory for full text.
  7. */
  8. /**
  9. * @file
  10. * @brief Connection Definitions
  11. */
  12. #include <config.h>
  13. #include <libgearman-server/common.h>
  14. #include <cstring>
  15. #include <cerrno>
  16. #include <cassert>
  17. static void _connection_close(gearmand_io_st *connection)
  18. {
  19. if (connection->fd == INVALID_SOCKET)
  20. {
  21. return;
  22. }
  23. if (connection->options.external_fd)
  24. {
  25. connection->options.external_fd= false;
  26. }
  27. else
  28. {
  29. (void)gearmand_sockfd_close(connection->fd);
  30. assert(! "We should never have an internal fd");
  31. }
  32. connection->_state= gearmand_io_st::GEARMAND_CON_UNIVERSAL_INVALID;
  33. connection->fd= INVALID_SOCKET;
  34. connection->events= 0;
  35. connection->revents= 0;
  36. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
  37. connection->send_buffer_ptr= connection->send_buffer;
  38. connection->send_buffer_size= 0;
  39. connection->send_data_size= 0;
  40. connection->send_data_offset= 0;
  41. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
  42. if (connection->recv_packet != NULL)
  43. {
  44. gearmand_packet_free(connection->recv_packet);
  45. connection->recv_packet= NULL;
  46. }
  47. connection->recv_buffer_ptr= connection->recv_buffer;
  48. connection->recv_buffer_size= 0;
  49. }
  50. static size_t _connection_read(gearman_server_con_st *con, void *data, size_t data_size, gearmand_error_t &ret)
  51. {
  52. ssize_t read_size;
  53. gearmand_io_st *connection= &con->con;
  54. while (1)
  55. {
  56. read_size= recv(connection->fd, data, data_size, MSG_DONTWAIT);
  57. if (read_size == 0)
  58. {
  59. ret= GEARMAN_LOST_CONNECTION;
  60. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,
  61. "Peer connection has called close() %s:%s",
  62. connection->context == NULL ? "-" : connection->context->host,
  63. connection->context == NULL ? "-" : connection->context->port);
  64. _connection_close(connection);
  65. return 0;
  66. }
  67. else if (read_size == -1)
  68. {
  69. switch (errno)
  70. {
  71. case EAGAIN:
  72. ret= gearmand_io_set_events(con, POLLIN);
  73. if (gearmand_failed(ret))
  74. {
  75. gearmand_perror("recv");
  76. return 0;
  77. }
  78. ret= GEARMAN_IO_WAIT;
  79. return 0;
  80. case EINTR:
  81. continue;
  82. case EPIPE:
  83. case ECONNRESET:
  84. case EHOSTDOWN:
  85. gearmand_perror("lost connection to client recv(EPIPE || ECONNRESET || EHOSTDOWN)");
  86. ret= GEARMAN_LOST_CONNECTION;
  87. break;
  88. default:
  89. gearmand_perror("recv");
  90. ret= GEARMAN_ERRNO;
  91. }
  92. gearmand_error("closing connection due to previous errno error");
  93. _connection_close(connection);
  94. return 0;
  95. }
  96. break;
  97. }
  98. ret= GEARMAN_SUCCESS;
  99. return size_t(read_size);
  100. }
  101. gearmand_error_t gearmand_connection_recv_data(gearman_server_con_st *con, void *data, size_t data_size)
  102. {
  103. gearmand_io_st *connection= &con->con;
  104. if (connection->recv_data_size == 0)
  105. {
  106. return GEARMAN_SUCCESS;
  107. }
  108. if ((connection->recv_data_size - connection->recv_data_offset) < data_size)
  109. {
  110. data_size= connection->recv_data_size - connection->recv_data_offset;
  111. }
  112. size_t recv_size= 0;
  113. if (connection->recv_buffer_size > 0)
  114. {
  115. if (connection->recv_buffer_size < data_size)
  116. recv_size= connection->recv_buffer_size;
  117. else
  118. recv_size= data_size;
  119. memcpy(data, connection->recv_buffer_ptr, recv_size);
  120. connection->recv_buffer_ptr+= recv_size;
  121. connection->recv_buffer_size-= recv_size;
  122. }
  123. gearmand_error_t ret;
  124. if (data_size != recv_size)
  125. {
  126. recv_size+= _connection_read(con, ((uint8_t *)data) + recv_size, data_size - recv_size, ret);
  127. connection->recv_data_offset+= recv_size;
  128. }
  129. else
  130. {
  131. connection->recv_data_offset+= recv_size;
  132. ret= GEARMAN_SUCCESS;
  133. }
  134. if (connection->recv_data_size == connection->recv_data_offset)
  135. {
  136. connection->recv_data_size= 0;
  137. connection->recv_data_offset= 0;
  138. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
  139. }
  140. return ret;
  141. }
  142. static gearmand_error_t _connection_flush(gearman_server_con_st *con)
  143. {
  144. gearmand_io_st *connection= &con->con;
  145. assert(connection->_state == gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED);
  146. while (1)
  147. {
  148. switch (connection->_state)
  149. {
  150. case gearmand_io_st::GEARMAND_CON_UNIVERSAL_INVALID:
  151. assert(0);
  152. return GEARMAN_ERRNO;
  153. case gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED:
  154. while (connection->send_buffer_size)
  155. {
  156. ssize_t write_size= send(connection->fd, connection->send_buffer_ptr, connection->send_buffer_size, MSG_NOSIGNAL|MSG_DONTWAIT);
  157. if (write_size == 0) // detect infinite loop?
  158. {
  159. gearmand_log_debug("send() sent zero bytes to peer %s:%s",
  160. connection->context == NULL ? "-" : connection->context->host,
  161. connection->context == NULL ? "-" : connection->context->port);
  162. continue;
  163. }
  164. else if (write_size == -1)
  165. {
  166. gearmand_error_t gret;
  167. switch (errno)
  168. {
  169. case EAGAIN:
  170. gret= gearmand_io_set_events(con, POLLOUT);
  171. if (gret != GEARMAN_SUCCESS)
  172. {
  173. return gret;
  174. }
  175. return GEARMAN_IO_WAIT;
  176. case EINTR:
  177. continue;
  178. case EPIPE:
  179. case ECONNRESET:
  180. case EHOSTDOWN:
  181. gearmand_perror("lost connection to client during send(EPIPE || ECONNRESET || EHOSTDOWN)");
  182. _connection_close(connection);
  183. return GEARMAN_LOST_CONNECTION;
  184. default:
  185. break;
  186. }
  187. gearmand_perror("send() failed, closing connection");
  188. _connection_close(connection);
  189. return GEARMAN_ERRNO;
  190. }
  191. connection->send_buffer_size-= static_cast<size_t>(write_size);
  192. if (connection->send_state == gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA)
  193. {
  194. connection->send_data_offset+= static_cast<size_t>(write_size);
  195. if (connection->send_data_offset == connection->send_data_size)
  196. {
  197. connection->send_data_size= 0;
  198. connection->send_data_offset= 0;
  199. break;
  200. }
  201. if (connection->send_buffer_size == 0)
  202. {
  203. return GEARMAN_SUCCESS;
  204. }
  205. }
  206. else if (connection->send_buffer_size == 0)
  207. {
  208. break;
  209. }
  210. connection->send_buffer_ptr+= write_size;
  211. }
  212. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
  213. connection->send_buffer_ptr= connection->send_buffer;
  214. return GEARMAN_SUCCESS;
  215. }
  216. }
  217. }
  218. /**
  219. * @addtogroup gearmand_io_static Static Connection Declarations
  220. * @ingroup gearman_connection
  221. * @{
  222. */
  223. void gearmand_connection_init(gearmand_connection_list_st *gearman,
  224. gearmand_io_st *connection,
  225. gearmand_con_st *dcon,
  226. gearmand_connection_options_t *options)
  227. {
  228. assert(gearman);
  229. assert(connection);
  230. connection->options.ready= false;
  231. connection->options.packet_in_use= false;
  232. connection->options.external_fd= false;
  233. connection->options.close_after_flush= false;
  234. if (options)
  235. {
  236. while (*options != GEARMAND_CON_MAX)
  237. {
  238. gearman_io_set_option(connection, *options, true);
  239. options++;
  240. }
  241. }
  242. connection->_state= gearmand_io_st::GEARMAND_CON_UNIVERSAL_INVALID;
  243. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
  244. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
  245. connection->events= 0;
  246. connection->revents= 0;
  247. connection->fd= INVALID_SOCKET;
  248. connection->created_id= 0;
  249. connection->created_id_next= 0;
  250. connection->send_buffer_size= 0;
  251. connection->send_data_size= 0;
  252. connection->send_data_offset= 0;
  253. connection->recv_buffer_size= 0;
  254. connection->recv_data_size= 0;
  255. connection->recv_data_offset= 0;
  256. connection->universal= gearman;
  257. if (gearman->con_list != NULL)
  258. gearman->con_list->prev= connection;
  259. connection->next= gearman->con_list;
  260. connection->prev= NULL;
  261. gearman->con_list= connection;
  262. gearman->con_count++;
  263. connection->context= dcon;
  264. connection->send_buffer_ptr= connection->send_buffer;
  265. connection->recv_packet= NULL;
  266. connection->recv_buffer_ptr= connection->recv_buffer;
  267. }
  268. void gearmand_io_free(gearmand_io_st *connection)
  269. {
  270. if (connection->fd != INVALID_SOCKET)
  271. _connection_close(connection);
  272. {
  273. if (connection->universal->con_list == connection)
  274. connection->universal->con_list= connection->next;
  275. if (connection->prev != NULL)
  276. connection->prev->next= connection->next;
  277. if (connection->next != NULL)
  278. connection->next->prev= connection->prev;
  279. connection->universal->con_count--;
  280. }
  281. if (connection->options.packet_in_use)
  282. {
  283. gearmand_packet_free(&(connection->packet));
  284. }
  285. }
  286. gearmand_error_t gearman_io_set_option(gearmand_io_st *connection,
  287. gearmand_connection_options_t options,
  288. bool value)
  289. {
  290. switch (options)
  291. {
  292. case GEARMAND_CON_READY:
  293. connection->options.ready= value;
  294. break;
  295. case GEARMAND_CON_PACKET_IN_USE:
  296. connection->options.packet_in_use= value;
  297. break;
  298. case GEARMAND_CON_EXTERNAL_FD:
  299. connection->options.external_fd= value;
  300. break;
  301. case GEARMAND_CON_CLOSE_AFTER_FLUSH:
  302. connection->options.close_after_flush= value;
  303. break;
  304. case GEARMAND_CON_MAX:
  305. return GEARMAN_INVALID_COMMAND;
  306. }
  307. return GEARMAN_SUCCESS;
  308. }
  309. /**
  310. * Set socket options for a connection.
  311. */
  312. static gearmand_error_t _io_setsockopt(gearmand_io_st &connection);
  313. /** @} */
  314. /*
  315. * Public Definitions
  316. */
  317. gearmand_error_t gearman_io_set_fd(gearmand_io_st *connection, int fd)
  318. {
  319. assert(connection);
  320. connection->options.external_fd= true;
  321. connection->fd= fd;
  322. connection->_state= gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED;
  323. return _io_setsockopt(*connection);
  324. }
  325. gearmand_con_st *gearman_io_context(const gearmand_io_st *connection)
  326. {
  327. return connection->context;
  328. }
  329. gearmand_error_t gearman_io_send(gearman_server_con_st *con,
  330. const gearmand_packet_st *packet, bool flush)
  331. {
  332. gearmand_error_t ret;
  333. size_t send_size;
  334. gearmand_io_st *connection= &con->con;
  335. switch (connection->send_state)
  336. {
  337. case gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE:
  338. if (! (packet->options.complete))
  339. {
  340. gearmand_error("packet not complete");
  341. return GEARMAN_INVALID_PACKET;
  342. }
  343. /* Pack first part of packet, which is everything but the payload. */
  344. while (1)
  345. {
  346. send_size= con->protocol.packet_pack_fn(packet, con,
  347. connection->send_buffer + connection->send_buffer_size,
  348. GEARMAN_SEND_BUFFER_SIZE -
  349. connection->send_buffer_size, &ret);
  350. if (ret == GEARMAN_SUCCESS)
  351. {
  352. connection->send_buffer_size+= send_size;
  353. break;
  354. }
  355. else if (ret == GEARMAN_IGNORE_PACKET)
  356. {
  357. return GEARMAN_SUCCESS;
  358. }
  359. else if (ret != GEARMAN_FLUSH_DATA)
  360. {
  361. return ret;
  362. }
  363. /* We were asked to flush when the buffer is already flushed! */
  364. if (connection->send_buffer_size == 0)
  365. {
  366. gearmand_error("send buffer too small");
  367. return GEARMAN_SEND_BUFFER_TOO_SMALL;
  368. }
  369. /* Flush buffer now if first part of packet won't fit in. */
  370. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_PRE_FLUSH;
  371. case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_PRE_FLUSH:
  372. ret= _connection_flush(con);
  373. if (ret != GEARMAN_SUCCESS)
  374. {
  375. return ret;
  376. }
  377. }
  378. /* Return here if we have no data to send. */
  379. if (packet->data_size == 0)
  380. break;
  381. /* If there is any room in the buffer, copy in data. */
  382. if (packet->data and (GEARMAN_SEND_BUFFER_SIZE - connection->send_buffer_size) > 0)
  383. {
  384. connection->send_data_offset= GEARMAN_SEND_BUFFER_SIZE - connection->send_buffer_size;
  385. if (connection->send_data_offset > packet->data_size)
  386. connection->send_data_offset= packet->data_size;
  387. memcpy(connection->send_buffer + connection->send_buffer_size, packet->data,
  388. connection->send_data_offset);
  389. connection->send_buffer_size+= connection->send_data_offset;
  390. /* Return if all data fit in the send buffer. */
  391. if (connection->send_data_offset == packet->data_size)
  392. {
  393. connection->send_data_offset= 0;
  394. break;
  395. }
  396. }
  397. /* Flush buffer now so we can start writing directly from data buffer. */
  398. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FORCE_FLUSH;
  399. case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FORCE_FLUSH:
  400. ret= _connection_flush(con);
  401. if (ret != GEARMAN_SUCCESS)
  402. {
  403. return ret;
  404. }
  405. connection->send_data_size= packet->data_size;
  406. /* If this is NULL, then ?? function will be used. */
  407. if (packet->data == NULL)
  408. {
  409. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA;
  410. return GEARMAN_SUCCESS;
  411. }
  412. /* Copy into the buffer if it fits, otherwise flush from packet buffer. */
  413. connection->send_buffer_size= packet->data_size - connection->send_data_offset;
  414. if (connection->send_buffer_size < GEARMAN_SEND_BUFFER_SIZE)
  415. {
  416. memcpy(connection->send_buffer,
  417. packet->data + connection->send_data_offset,
  418. connection->send_buffer_size);
  419. connection->send_data_size= 0;
  420. connection->send_data_offset= 0;
  421. break;
  422. }
  423. connection->send_buffer_ptr= const_cast<char *>(packet->data) + connection->send_data_offset;
  424. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA;
  425. case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH:
  426. case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA:
  427. ret= _connection_flush(con);
  428. if (ret == GEARMAN_SUCCESS && connection->options.close_after_flush)
  429. {
  430. _connection_close(connection);
  431. ret= GEARMAN_LOST_CONNECTION;
  432. gearmand_gerror_warn("failure while flusing data, closing connection", ret);
  433. }
  434. return ret;
  435. }
  436. if (flush)
  437. {
  438. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH;
  439. ret= _connection_flush(con);
  440. if (ret == GEARMAN_SUCCESS && connection->options.close_after_flush)
  441. {
  442. _connection_close(connection);
  443. ret= GEARMAN_LOST_CONNECTION;
  444. gearmand_gerror_warn("failure while flusing data, closing connection", ret);
  445. }
  446. return ret;
  447. }
  448. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
  449. return GEARMAN_SUCCESS;
  450. }
  451. #ifndef __INTEL_COMPILER
  452. #pragma GCC diagnostic ignored "-Wold-style-cast"
  453. #endif
  454. gearmand_error_t gearman_io_recv(gearman_server_con_st *con, bool recv_data)
  455. {
  456. gearmand_io_st *connection= &con->con;
  457. gearmand_packet_st *packet= &(con->packet->packet);
  458. switch (connection->recv_state)
  459. {
  460. case gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE:
  461. if (connection->_state != gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED)
  462. {
  463. gearmand_error("not connected");
  464. return GEARMAN_NOT_CONNECTED;
  465. }
  466. connection->recv_packet= packet;
  467. // The options being passed in are just defaults.
  468. gearmand_packet_init(connection->recv_packet, GEARMAN_MAGIC_TEXT, GEARMAN_COMMAND_TEXT);
  469. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_READ;
  470. case gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_READ:
  471. while (1)
  472. {
  473. gearmand_error_t ret;
  474. if (connection->recv_buffer_size > 0)
  475. {
  476. size_t recv_size= con->protocol.packet_unpack_fn(connection->recv_packet, con,
  477. connection->recv_buffer_ptr,
  478. connection->recv_buffer_size, &ret);
  479. connection->recv_buffer_ptr+= recv_size;
  480. connection->recv_buffer_size-= recv_size;
  481. if (gearmand_success(ret))
  482. {
  483. break;
  484. }
  485. else if (ret != GEARMAN_IO_WAIT)
  486. {
  487. gearmand_gerror_warn("protocol failure, closing connection", ret);
  488. _connection_close(connection);
  489. return ret;
  490. }
  491. }
  492. /* Shift buffer contents if needed. */
  493. if (connection->recv_buffer_size > 0)
  494. {
  495. memmove(connection->recv_buffer, connection->recv_buffer_ptr, connection->recv_buffer_size);
  496. }
  497. connection->recv_buffer_ptr= connection->recv_buffer;
  498. size_t recv_size= _connection_read(con, connection->recv_buffer + connection->recv_buffer_size,
  499. GEARMAN_RECV_BUFFER_SIZE - connection->recv_buffer_size, ret);
  500. if (gearmand_failed(ret))
  501. {
  502. // GEARMAN_LOST_CONNECTION is not worth a warning, clients/workers just
  503. // drop connections for close.
  504. if (ret != GEARMAN_LOST_CONNECTION)
  505. {
  506. gearmand_gerror_warn("Failed while in _connection_read()", ret);
  507. }
  508. return ret;
  509. }
  510. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "read %lu bytes", (unsigned long)recv_size);
  511. connection->recv_buffer_size+= recv_size;
  512. }
  513. if (packet->data_size == 0)
  514. {
  515. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
  516. break;
  517. }
  518. connection->recv_data_size= packet->data_size;
  519. if (not recv_data)
  520. {
  521. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_STATE_READ_DATA;
  522. break;
  523. }
  524. packet->data= static_cast<char *>(malloc(packet->data_size));
  525. if (not packet->data)
  526. {
  527. // Server up the memory error first, in case _connection_close()
  528. // creates any.
  529. gearmand_merror("malloc", char, packet->data_size);
  530. _connection_close(connection);
  531. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  532. }
  533. packet->options.free_data= true;
  534. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_STATE_READ_DATA;
  535. case gearmand_io_st::GEARMAND_CON_RECV_STATE_READ_DATA:
  536. while (connection->recv_data_size)
  537. {
  538. gearmand_error_t ret;
  539. ret= gearmand_connection_recv_data(con,
  540. ((uint8_t *)(packet->data)) +
  541. connection->recv_data_offset,
  542. packet->data_size -
  543. connection->recv_data_offset);
  544. if (gearmand_failed(ret))
  545. {
  546. return ret;
  547. }
  548. }
  549. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
  550. break;
  551. }
  552. packet= connection->recv_packet;
  553. connection->recv_packet= NULL;
  554. return GEARMAN_SUCCESS;
  555. }
  556. gearmand_error_t gearmand_io_set_events(gearman_server_con_st *con, short events)
  557. {
  558. gearmand_io_st *connection= &con->con;
  559. if ((connection->events | events) == connection->events)
  560. {
  561. return GEARMAN_SUCCESS;
  562. }
  563. connection->events|= events;
  564. if (connection->universal->event_watch_fn)
  565. {
  566. gearmand_error_t ret= connection->universal->event_watch_fn(connection, connection->events,
  567. (void *)connection->universal->event_watch_context);
  568. if (gearmand_failed(ret))
  569. {
  570. gearmand_gerror_warn("event watch failed, closing connection", ret);
  571. _connection_close(connection);
  572. return ret;
  573. }
  574. }
  575. return GEARMAN_SUCCESS;
  576. }
  577. gearmand_error_t gearmand_io_set_revents(gearman_server_con_st *con, short revents)
  578. {
  579. gearmand_io_st *connection= &con->con;
  580. if (revents != 0)
  581. connection->options.ready= true;
  582. connection->revents= revents;
  583. /* Remove external POLLOUT watch if we didn't ask for it. Otherwise we spin
  584. forever until another POLLIN state change. This is much more efficient
  585. than removing POLLOUT on every state change since some external polling
  586. mechanisms need to use a system call to change flags (like Linux epoll). */
  587. if (revents & POLLOUT && !(connection->events & POLLOUT) &&
  588. connection->universal->event_watch_fn != NULL)
  589. {
  590. gearmand_error_t ret= connection->universal->event_watch_fn(connection, connection->events,
  591. (void *)connection->universal->event_watch_context);
  592. if (gearmand_failed(ret))
  593. {
  594. gearmand_gerror_warn("event watch failed, closing connection", ret);
  595. _connection_close(connection);
  596. return ret;
  597. }
  598. }
  599. connection->events&= (short)~revents;
  600. return GEARMAN_SUCCESS;
  601. }
  602. /*
  603. * Static Definitions
  604. */
  605. static gearmand_error_t _io_setsockopt(gearmand_io_st &connection)
  606. {
  607. struct linger linger;
  608. struct timeval waittime;
  609. int setting= 1;
  610. if (setsockopt(connection.fd, IPPROTO_TCP, TCP_NODELAY, &setting, (socklen_t)sizeof(int)) and errno != EOPNOTSUPP)
  611. {
  612. gearmand_perror("setsockopt(TCP_NODELAY)");
  613. return GEARMAN_ERRNO;
  614. }
  615. linger.l_onoff= 1;
  616. linger.l_linger= GEARMAN_DEFAULT_SOCKET_TIMEOUT;
  617. if (setsockopt(connection.fd, SOL_SOCKET, SO_LINGER, &linger, (socklen_t)sizeof(struct linger)))
  618. {
  619. gearmand_perror("setsockopt(SO_LINGER)");
  620. return GEARMAN_ERRNO;
  621. }
  622. #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
  623. {
  624. setting= 1;
  625. // This is not considered a fatal error
  626. if (setsockopt(connection.fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&setting, sizeof(int)))
  627. {
  628. gearmand_perror("setsockopt(SO_NOSIGPIPE)");
  629. }
  630. }
  631. #endif
  632. waittime.tv_sec= GEARMAN_DEFAULT_SOCKET_TIMEOUT;
  633. waittime.tv_usec= 0;
  634. if (setsockopt(connection.fd, SOL_SOCKET, SO_SNDTIMEO, &waittime, (socklen_t)sizeof(struct timeval)) and errno != ENOPROTOOPT)
  635. {
  636. gearmand_perror("setsockopt(SO_SNDTIMEO)");
  637. return GEARMAN_ERRNO;
  638. }
  639. if (setsockopt(connection.fd, SOL_SOCKET, SO_RCVTIMEO, &waittime, (socklen_t)sizeof(struct timeval)) and errno != ENOPROTOOPT)
  640. {
  641. gearmand_error("setsockopt(SO_RCVTIMEO)");
  642. return GEARMAN_ERRNO;
  643. }
  644. setting= GEARMAN_DEFAULT_SOCKET_SEND_SIZE;
  645. if (setsockopt(connection.fd, SOL_SOCKET, SO_SNDBUF, &setting, (socklen_t)sizeof(int)))
  646. {
  647. gearmand_perror("setsockopt(SO_SNDBUF)");
  648. return GEARMAN_ERRNO;
  649. }
  650. setting= GEARMAN_DEFAULT_SOCKET_RECV_SIZE;
  651. if (setsockopt(connection.fd, SOL_SOCKET, SO_RCVBUF, &setting, (socklen_t)sizeof(int)))
  652. {
  653. gearmand_perror("setsockopt(SO_RCVBUF)");
  654. return GEARMAN_ERRNO;
  655. }
  656. int fcntl_flags;
  657. if ((fcntl_flags= fcntl(connection.fd, F_GETFL, 0)) == -1)
  658. {
  659. gearmand_perror("fcntl(F_GETFL)");
  660. return GEARMAN_ERRNO;
  661. }
  662. if ((fcntl(connection.fd, F_SETFL, fcntl_flags | O_NONBLOCK) == -1))
  663. {
  664. gearmand_perror("fcntl(F_SETFL)");
  665. return GEARMAN_ERRNO;
  666. }
  667. return GEARMAN_SUCCESS;
  668. }
  669. void gearmand_sockfd_close(int sockfd)
  670. {
  671. if (sockfd == INVALID_SOCKET)
  672. return;
  673. /* in case of death shutdown to avoid blocking at close() */
  674. if (shutdown(sockfd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
  675. {
  676. gearmand_perror("shutdown");
  677. assert(errno != ENOTSOCK);
  678. return;
  679. }
  680. if (closesocket(sockfd) == SOCKET_ERROR)
  681. {
  682. gearmand_perror("close");
  683. }
  684. }
  685. void gearmand_pipe_close(int pipefd)
  686. {
  687. if (pipefd == INVALID_SOCKET)
  688. {
  689. return;
  690. }
  691. if (closesocket(pipefd) == SOCKET_ERROR)
  692. {
  693. gearmand_perror("close");
  694. }
  695. }