io.cc 30 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2013 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief Connection Definitions
  41. */
  42. #include "gear_config.h"
  43. #include "libgearman-server/common.h"
  44. #include <libgearman-server/plugins/base.h>
  45. #include <cstring>
  46. #include <cerrno>
  47. #include <cassert>
  48. #ifndef SOCK_NONBLOCK
  49. # define SOCK_NONBLOCK 0
  50. #endif
  51. #ifndef MSG_DONTWAIT
  52. # define MSG_DONTWAIT 0
  53. #endif
  54. static void _connection_close(gearmand_io_st *connection)
  55. {
  56. if (connection->has_fd())
  57. {
  58. assert(connection->options.external_fd);
  59. if (connection->options.external_fd)
  60. {
  61. connection->options.external_fd= false;
  62. }
  63. else
  64. {
  65. gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "We should never have an internal fd");
  66. }
  67. connection->clear();
  68. connection->_state= gearmand_io_st::GEARMAND_CON_UNIVERSAL_INVALID;
  69. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
  70. connection->send_buffer_ptr= connection->send_buffer;
  71. connection->send_buffer_size= 0;
  72. connection->send_data_size= 0;
  73. connection->send_data_offset= 0;
  74. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
  75. if (connection->recv_packet != NULL)
  76. {
  77. gearmand_packet_free(connection->recv_packet);
  78. connection->recv_packet= NULL;
  79. }
  80. connection->recv_buffer_ptr= connection->recv_buffer;
  81. connection->recv_buffer_size= 0;
  82. }
  83. }
  84. const char* gearmand_io_st::host() const
  85. {
  86. if (context)
  87. {
  88. return context->host;
  89. }
  90. return "-";
  91. }
  92. const char* gearmand_io_st::port() const
  93. {
  94. if (context)
  95. {
  96. return context->port;
  97. }
  98. return "-";
  99. }
  100. #if __GNUC__ >= 7
  101. #pragma GCC diagnostic warning "-Wimplicit-fallthrough"
  102. #endif
  103. static size_t _connection_read(gearman_server_con_st *con, void *data, size_t data_size, gearmand_error_t &ret)
  104. {
  105. ssize_t read_size;
  106. gearmand_io_st *connection= &con->con;
  107. while (1)
  108. {
  109. #if defined(HAVE_SSL) && HAVE_SSL
  110. if (con->_ssl)
  111. {
  112. # if defined(HAVE_WOLFSSL) && HAVE_WOLFSSL
  113. read_size= wolfSSL_recv(con->_ssl, data, int(data_size), MSG_DONTWAIT);
  114. # else
  115. read_size= SSL_read(con->_ssl, data, int(data_size));
  116. # endif
  117. assert(HAVE_SSL); // Just to make sure if macro is aligned.
  118. int ssl_error;
  119. switch ((ssl_error= SSL_get_error(con->_ssl, int(read_size))))
  120. {
  121. case SSL_ERROR_NONE:
  122. break;
  123. case SSL_ERROR_ZERO_RETURN:
  124. read_size= 0;
  125. break;
  126. case SSL_ERROR_WANT_CONNECT:
  127. case SSL_ERROR_WANT_ACCEPT:
  128. case SSL_ERROR_WANT_READ:
  129. case SSL_ERROR_WANT_WRITE:
  130. case SSL_ERROR_WANT_X509_LOOKUP:
  131. read_size= SOCKET_ERROR;
  132. errno= EAGAIN;
  133. break;
  134. case SSL_ERROR_SYSCALL:
  135. if (errno) // If errno is really set, then let our normal error logic handle.
  136. {
  137. read_size= SOCKET_ERROR;
  138. break;
  139. }
  140. /* fall-thru */
  141. case SSL_ERROR_SSL:
  142. default:
  143. { // All other errors
  144. char errorString[SSL_ERROR_SIZE]= { 0 };
  145. ERR_error_string_n(ssl_error, errorString, sizeof(errorString));
  146. ret= GEARMAND_LOST_CONNECTION;
  147. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "SSL failure(%s) errno:%d", errorString, errno);
  148. _connection_close(connection);
  149. return 0;
  150. }
  151. }
  152. }
  153. else
  154. #endif
  155. {
  156. read_size= recv(connection->fd(), data, data_size, MSG_DONTWAIT);
  157. }
  158. if (read_size == 0)
  159. {
  160. ret= GEARMAND_LOST_CONNECTION;
  161. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Peer connection has called close()");
  162. _connection_close(connection);
  163. return 0;
  164. }
  165. else if (read_size == SOCKET_ERROR)
  166. {
  167. int local_errno= errno;
  168. switch (local_errno)
  169. {
  170. case EAGAIN:
  171. ret= gearmand_io_set_events(con, POLLIN);
  172. if (gearmand_failed(ret))
  173. {
  174. gearmand_perror(local_errno, "recv");
  175. return 0;
  176. }
  177. ret= GEARMAND_IO_WAIT;
  178. return 0;
  179. case EINTR:
  180. continue;
  181. case EPIPE:
  182. case ECONNRESET:
  183. case EHOSTDOWN:
  184. {
  185. ret= GEARMAND_LOST_CONNECTION;
  186. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Peer connection has called close()");
  187. _connection_close(connection);
  188. return 0;
  189. }
  190. default:
  191. ret= GEARMAND_ERRNO;
  192. }
  193. gearmand_perror(local_errno, "closing connection due to previous errno error");
  194. _connection_close(connection);
  195. return 0;
  196. }
  197. break;
  198. }
  199. ret= GEARMAND_SUCCESS;
  200. return size_t(read_size);
  201. }
  202. static gearmand_error_t gearmand_connection_recv_data(gearman_server_con_st *con, void *data, size_t data_size)
  203. {
  204. gearmand_io_st *connection= &con->con;
  205. if (connection->recv_data_size == 0)
  206. {
  207. return GEARMAND_SUCCESS;
  208. }
  209. if ((connection->recv_data_size - connection->recv_data_offset) < data_size)
  210. {
  211. data_size= connection->recv_data_size - connection->recv_data_offset;
  212. }
  213. size_t recv_size= 0;
  214. if (connection->recv_buffer_size > 0)
  215. {
  216. if (connection->recv_buffer_size < data_size)
  217. {
  218. recv_size= connection->recv_buffer_size;
  219. }
  220. else
  221. {
  222. recv_size= data_size;
  223. }
  224. memcpy(data, connection->recv_buffer_ptr, recv_size);
  225. connection->recv_buffer_ptr+= recv_size;
  226. connection->recv_buffer_size-= recv_size;
  227. }
  228. gearmand_error_t ret;
  229. if (data_size != recv_size)
  230. {
  231. recv_size+= _connection_read(con, ((uint8_t *)data) + recv_size, data_size - recv_size, ret);
  232. connection->recv_data_offset+= recv_size;
  233. }
  234. else
  235. {
  236. connection->recv_data_offset+= recv_size;
  237. ret= GEARMAND_SUCCESS;
  238. }
  239. if (connection->recv_data_size == connection->recv_data_offset)
  240. {
  241. connection->recv_data_size= 0;
  242. connection->recv_data_offset= 0;
  243. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
  244. }
  245. return ret;
  246. }
  247. static gearmand_error_t _connection_flush(gearman_server_con_st *con)
  248. {
  249. gearmand_io_st *connection= &con->con;
  250. assert(connection->_state == gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED);
  251. while (1)
  252. {
  253. switch (connection->_state)
  254. {
  255. case gearmand_io_st::GEARMAND_CON_UNIVERSAL_INVALID:
  256. assert(0);
  257. return GEARMAND_ERRNO;
  258. case gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED:
  259. uint32_t loop_counter= 0;
  260. while (connection->send_buffer_size)
  261. {
  262. ssize_t write_size;
  263. #if defined(HAVE_SSL) && HAVE_SSL
  264. if (con->_ssl)
  265. {
  266. #if defined(HAVE_WOLFSSL) && HAVE_WOLFSSL
  267. write_size= wolfSSL_send(con->_ssl, connection->send_buffer_ptr, int(connection->send_buffer_size), MSG_NOSIGNAL|MSG_DONTWAIT);
  268. #elif defined(HAVE_OPENSSL) && HAVE_OPENSSL
  269. write_size= SSL_write(con->_ssl, connection->send_buffer_ptr, int(connection->send_buffer_size));
  270. #endif
  271. assert(HAVE_SSL); // Just to make sure if macro is aligned.
  272. int ssl_error;
  273. switch ((ssl_error= SSL_get_error(con->_ssl, int(write_size))))
  274. {
  275. case SSL_ERROR_NONE:
  276. break;
  277. case SSL_ERROR_ZERO_RETURN:
  278. errno= ECONNRESET;
  279. write_size= SOCKET_ERROR;
  280. break;
  281. case SSL_ERROR_WANT_ACCEPT:
  282. case SSL_ERROR_WANT_CONNECT:
  283. case SSL_ERROR_WANT_READ:
  284. case SSL_ERROR_WANT_WRITE:
  285. case SSL_ERROR_WANT_X509_LOOKUP:
  286. write_size= SOCKET_ERROR;
  287. errno= EAGAIN;
  288. break;
  289. case SSL_ERROR_SYSCALL:
  290. if (errno) // If errno is really set, then let our normal error logic handle.
  291. {
  292. write_size= SOCKET_ERROR;
  293. break;
  294. }
  295. /* fall-thru */
  296. case SSL_ERROR_SSL:
  297. default:
  298. {
  299. char errorString[SSL_ERROR_SIZE]= { 0 };
  300. ERR_error_string_n(ssl_error, errorString, sizeof(errorString));
  301. _connection_close(connection);
  302. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_LOST_CONNECTION, "SSL failure(%s)",
  303. errorString);
  304. }
  305. }
  306. }
  307. else
  308. #endif
  309. {
  310. write_size= send(connection->fd(), connection->send_buffer_ptr, connection->send_buffer_size, MSG_NOSIGNAL|MSG_DONTWAIT);
  311. }
  312. if (write_size == 0) // detect infinite loop?
  313. {
  314. ++loop_counter;
  315. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "send() sent zero bytes of %u",
  316. uint32_t(connection->send_buffer_size));
  317. if (loop_counter > 5)
  318. {
  319. _connection_close(connection);
  320. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_LOST_CONNECTION, "send() failed to send data");
  321. }
  322. continue;
  323. }
  324. else if (write_size == SOCKET_ERROR)
  325. {
  326. int local_errno= errno;
  327. switch (local_errno)
  328. {
  329. #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
  330. case EWOULDBLOCK:
  331. #endif
  332. case EAGAIN:
  333. {
  334. gearmand_error_t gret= gearmand_io_set_events(con, POLLOUT);
  335. if (gret != GEARMAND_SUCCESS)
  336. {
  337. return gret;
  338. }
  339. return GEARMAND_IO_WAIT;
  340. }
  341. case EINTR:
  342. continue;
  343. case EPIPE:
  344. case ECONNRESET:
  345. case EHOSTDOWN:
  346. _connection_close(connection);
  347. return gearmand_perror(local_errno, "lost connection to client during send(EPIPE || ECONNRESET || EHOSTDOWN)");
  348. default:
  349. break;
  350. }
  351. _connection_close(connection);
  352. return gearmand_perror(local_errno, "send() failed, closing connection");
  353. }
  354. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "send() %u bytes to peer",
  355. uint32_t(write_size));
  356. connection->send_buffer_size-= static_cast<size_t>(write_size);
  357. if (connection->send_state == gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA)
  358. {
  359. connection->send_data_offset+= static_cast<size_t>(write_size);
  360. if (connection->send_data_offset == connection->send_data_size)
  361. {
  362. connection->send_data_size= 0;
  363. connection->send_data_offset= 0;
  364. break;
  365. }
  366. if (connection->send_buffer_size == 0)
  367. {
  368. return GEARMAND_SUCCESS;
  369. }
  370. }
  371. else if (connection->send_buffer_size == 0)
  372. {
  373. break;
  374. }
  375. connection->send_buffer_ptr+= write_size;
  376. }
  377. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
  378. connection->send_buffer_ptr= connection->send_buffer;
  379. return GEARMAND_SUCCESS;
  380. }
  381. }
  382. }
  383. /**
  384. * @addtogroup gearmand_io_static Static Connection Declarations
  385. * @ingroup gearman_connection
  386. * @{
  387. */
  388. void gearmand_connection_init(gearmand_connection_list_st *gearman,
  389. gearmand_io_st *connection,
  390. gearmand_con_st *dcon,
  391. gearmand_connection_options_t *options)
  392. {
  393. assert(gearman);
  394. assert(connection);
  395. connection->options.ready= false;
  396. connection->options.packet_in_use= false;
  397. connection->options.external_fd= false;
  398. connection->options.close_after_flush= false;
  399. if (options)
  400. {
  401. while (*options != GEARMAND_CON_MAX)
  402. {
  403. gearman_io_set_option(connection, *options, true);
  404. options++;
  405. }
  406. }
  407. connection->_state= gearmand_io_st::GEARMAND_CON_UNIVERSAL_INVALID;
  408. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
  409. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
  410. connection->clear();
  411. connection->created_id= 0;
  412. connection->created_id_next= 0;
  413. connection->send_buffer_size= 0;
  414. connection->send_data_size= 0;
  415. connection->send_data_offset= 0;
  416. connection->recv_buffer_size= 0;
  417. connection->recv_data_size= 0;
  418. connection->recv_data_offset= 0;
  419. connection->universal= gearman;
  420. GEARMAND_LIST__ADD(gearman->con, connection);
  421. connection->context= dcon;
  422. connection->send_buffer_ptr= connection->send_buffer;
  423. connection->recv_packet= NULL;
  424. connection->recv_buffer_ptr= connection->recv_buffer;
  425. }
  426. void gearmand_connection_list_st::list_free()
  427. {
  428. while (con_list)
  429. {
  430. gearmand_io_free(con_list);
  431. }
  432. }
  433. gearmand_connection_list_st::gearmand_connection_list_st() :
  434. con_count(0),
  435. con_list(NULL),
  436. event_watch_fn(NULL),
  437. event_watch_context(NULL)
  438. {
  439. }
  440. void gearmand_connection_list_st::init(gearmand_event_watch_fn *watch_fn, void *watch_context)
  441. {
  442. ready_con_count= 0;
  443. ready_con_list= NULL;
  444. con_count= 0;
  445. con_list= NULL;
  446. event_watch_fn= watch_fn;
  447. event_watch_context= watch_context;
  448. }
  449. void gearmand_io_free(gearmand_io_st *connection)
  450. {
  451. if (connection->has_fd())
  452. {
  453. _connection_close(connection);
  454. }
  455. if (connection->options.ready)
  456. {
  457. connection->options.ready= false;
  458. GEARMAND_LIST_DEL(connection->universal->ready_con, connection, ready_);
  459. }
  460. GEARMAND_LIST__DEL(connection->universal->con, connection);
  461. if (connection->options.packet_in_use)
  462. {
  463. gearmand_packet_free(&(connection->packet));
  464. }
  465. }
  466. gearmand_error_t gearman_io_set_option(gearmand_io_st *connection,
  467. gearmand_connection_options_t options,
  468. bool value)
  469. {
  470. switch (options)
  471. {
  472. case GEARMAND_CON_PACKET_IN_USE:
  473. connection->options.packet_in_use= value;
  474. break;
  475. case GEARMAND_CON_EXTERNAL_FD:
  476. connection->options.external_fd= value;
  477. break;
  478. case GEARMAND_CON_CLOSE_AFTER_FLUSH:
  479. connection->options.close_after_flush= value;
  480. break;
  481. case GEARMAND_CON_MAX:
  482. return GEARMAND_INVALID_COMMAND;
  483. }
  484. return GEARMAND_SUCCESS;
  485. }
  486. /** @} */
  487. /*
  488. * Public Definitions
  489. */
  490. gearmand_error_t gearman_io_set_fd(gearmand_io_st *connection, int fd_)
  491. {
  492. assert(connection);
  493. return connection->set_fd(fd_);
  494. }
  495. gearmand_con_st *gearman_io_context(const gearmand_io_st *connection)
  496. {
  497. return connection->context;
  498. }
  499. #if __GNUC__ >= 7
  500. #pragma GCC diagnostic warning "-Wimplicit-fallthrough"
  501. #endif
  502. gearmand_error_t gearman_io_send(gearman_server_con_st *con,
  503. const gearmand_packet_st *packet, bool flush)
  504. {
  505. size_t send_size;
  506. gearmand_io_st *connection= &con->con;
  507. switch (connection->send_state)
  508. {
  509. case gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE:
  510. if (! (packet->options.complete))
  511. {
  512. gearmand_error("packet not complete");
  513. return GEARMAND_INVALID_PACKET;
  514. }
  515. /* Pack first part of packet, which is everything but the payload. */
  516. while (1)
  517. {
  518. gearmand_error_t ret;
  519. send_size= con->protocol->pack(packet,
  520. con,
  521. connection->send_buffer +connection->send_buffer_size,
  522. GEARMAND_SEND_BUFFER_SIZE -connection->send_buffer_size,
  523. ret);
  524. if (ret == GEARMAND_SUCCESS)
  525. {
  526. connection->send_buffer_size+= send_size;
  527. break;
  528. }
  529. else if (ret == GEARMAND_IGNORE_PACKET)
  530. {
  531. return GEARMAND_SUCCESS;
  532. }
  533. else if (ret != GEARMAND_FLUSH_DATA)
  534. {
  535. return ret;
  536. }
  537. /* We were asked to flush when the buffer is already flushed! */
  538. if (connection->send_buffer_size == 0)
  539. {
  540. gearmand_error("send buffer too small");
  541. return GEARMAND_SEND_BUFFER_TOO_SMALL;
  542. }
  543. /* Flush buffer now if first part of packet won't fit in. */
  544. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_PRE_FLUSH;
  545. case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_PRE_FLUSH:
  546. {
  547. gearmand_error_t local_ret;
  548. if ((local_ret= _connection_flush(con)) != GEARMAND_SUCCESS)
  549. {
  550. return local_ret;
  551. }
  552. }
  553. }
  554. /* Return here if we have no data to send. */
  555. if (packet->data_size == 0)
  556. {
  557. break;
  558. }
  559. /* If there is any room in the buffer, copy in data. */
  560. if (packet->data and (GEARMAND_SEND_BUFFER_SIZE - connection->send_buffer_size) > 0)
  561. {
  562. connection->send_data_offset= GEARMAND_SEND_BUFFER_SIZE - connection->send_buffer_size;
  563. if (connection->send_data_offset > packet->data_size)
  564. {
  565. connection->send_data_offset= packet->data_size;
  566. }
  567. memcpy(connection->send_buffer +connection->send_buffer_size,
  568. packet->data,
  569. connection->send_data_offset);
  570. connection->send_buffer_size+= connection->send_data_offset;
  571. /* Return if all data fit in the send buffer. */
  572. if (connection->send_data_offset == packet->data_size)
  573. {
  574. connection->send_data_offset= 0;
  575. break;
  576. }
  577. }
  578. /* Flush buffer now so we can start writing directly from data buffer. */
  579. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FORCE_FLUSH;
  580. /* fall-thru */
  581. case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FORCE_FLUSH:
  582. {
  583. gearmand_error_t local_ret;
  584. if ((local_ret= _connection_flush(con)) != GEARMAND_SUCCESS)
  585. {
  586. return local_ret;
  587. }
  588. /* fall-thru */
  589. }
  590. connection->send_data_size= packet->data_size;
  591. /* If this is NULL, then ?? function will be used. */
  592. if (packet->data == NULL)
  593. {
  594. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA;
  595. return GEARMAND_SUCCESS;
  596. }
  597. /* Copy into the buffer if it fits, otherwise flush from packet buffer. */
  598. connection->send_buffer_size= packet->data_size - connection->send_data_offset;
  599. if (connection->send_buffer_size < GEARMAND_SEND_BUFFER_SIZE)
  600. {
  601. memcpy(connection->send_buffer,
  602. packet->data + connection->send_data_offset,
  603. connection->send_buffer_size);
  604. connection->send_data_size= 0;
  605. connection->send_data_offset= 0;
  606. break;
  607. }
  608. connection->send_buffer_ptr= const_cast<char *>(packet->data) + connection->send_data_offset;
  609. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA;
  610. /* fall-thru */
  611. case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH:
  612. case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA:
  613. {
  614. gearmand_error_t local_ret= _connection_flush(con);
  615. if (local_ret == GEARMAND_SUCCESS and
  616. connection->options.close_after_flush)
  617. {
  618. _connection_close(connection);
  619. local_ret= GEARMAND_LOST_CONNECTION;
  620. gearmand_debug("closing connection after flush by request");
  621. }
  622. return local_ret;
  623. }
  624. }
  625. /* fall-thru */
  626. if (flush)
  627. {
  628. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH;
  629. gearmand_error_t local_ret= _connection_flush(con);
  630. if (local_ret == GEARMAND_SUCCESS and connection->options.close_after_flush)
  631. {
  632. _connection_close(connection);
  633. local_ret= GEARMAND_LOST_CONNECTION;
  634. gearmand_debug("closing connection after flush by request");
  635. }
  636. return local_ret;
  637. }
  638. connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
  639. return GEARMAND_SUCCESS;
  640. }
  641. #pragma GCC diagnostic push
  642. #pragma GCC diagnostic ignored "-Wold-style-cast"
  643. gearmand_error_t gearman_io_recv(gearman_server_con_st *con, bool recv_data)
  644. {
  645. gearmand_io_st *connection= &con->con;
  646. gearmand_packet_st *packet= &(con->packet->packet);
  647. switch (connection->recv_state)
  648. {
  649. case gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE:
  650. if (connection->_state != gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED)
  651. {
  652. gearmand_error("not connected");
  653. return GEARMAND_NOT_CONNECTED;
  654. }
  655. connection->recv_packet= packet;
  656. // The options being passed in are just defaults.
  657. connection->recv_packet->reset(GEARMAN_MAGIC_TEXT, GEARMAN_COMMAND_TEXT);
  658. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_READ;
  659. /* fall-thru */
  660. case gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_READ:
  661. while (1)
  662. {
  663. gearmand_error_t ret;
  664. if (connection->recv_buffer_size > 0)
  665. {
  666. assert(con->protocol);
  667. size_t recv_size= con->protocol->unpack(connection->recv_packet,
  668. con,
  669. connection->recv_buffer_ptr,
  670. connection->recv_buffer_size, ret);
  671. connection->recv_buffer_ptr+= recv_size;
  672. connection->recv_buffer_size-= recv_size;
  673. if (gearmand_success(ret))
  674. {
  675. break;
  676. }
  677. else if (ret != GEARMAND_IO_WAIT)
  678. {
  679. gearmand_gerror_warn("protocol failure, closing connection", ret);
  680. _connection_close(connection);
  681. return ret;
  682. }
  683. }
  684. /* Shift buffer contents if needed. */
  685. if (connection->recv_buffer_size > 0)
  686. {
  687. memmove(connection->recv_buffer, connection->recv_buffer_ptr, connection->recv_buffer_size);
  688. }
  689. connection->recv_buffer_ptr= connection->recv_buffer;
  690. size_t recv_size= _connection_read(con, connection->recv_buffer + connection->recv_buffer_size,
  691. GEARMAND_RECV_BUFFER_SIZE - connection->recv_buffer_size, ret);
  692. if (gearmand_failed(ret))
  693. {
  694. // GEARMAND_LOST_CONNECTION is not worth a warning, clients/workers just
  695. // drop connections for close.
  696. if (ret != GEARMAND_LOST_CONNECTION)
  697. {
  698. gearmand_gerror_warn("Failed while in _connection_read()", ret);
  699. }
  700. return ret;
  701. }
  702. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "read %lu bytes",
  703. (unsigned long)recv_size);
  704. connection->recv_buffer_size+= recv_size;
  705. }
  706. if (packet->data_size == 0)
  707. {
  708. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
  709. break;
  710. }
  711. connection->recv_data_size= packet->data_size;
  712. if (not recv_data)
  713. {
  714. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_STATE_READ_DATA;
  715. break;
  716. }
  717. packet->data= static_cast<char *>(realloc(NULL, packet->data_size));
  718. if (not packet->data)
  719. {
  720. // Server up the memory error first, in case _connection_close()
  721. // creates any.
  722. gearmand_merror("realloc", char, packet->data_size);
  723. _connection_close(connection);
  724. return GEARMAND_MEMORY_ALLOCATION_FAILURE;
  725. }
  726. packet->options.free_data= true;
  727. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_STATE_READ_DATA;
  728. /* fall-thru */
  729. case gearmand_io_st::GEARMAND_CON_RECV_STATE_READ_DATA:
  730. while (connection->recv_data_size)
  731. {
  732. gearmand_error_t ret;
  733. ret= gearmand_connection_recv_data(con,
  734. ((uint8_t *)(packet->data)) +
  735. connection->recv_data_offset,
  736. packet->data_size -
  737. connection->recv_data_offset);
  738. if (gearmand_failed(ret))
  739. {
  740. return ret;
  741. }
  742. }
  743. connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
  744. break;
  745. }
  746. packet= connection->recv_packet;
  747. connection->recv_packet= NULL;
  748. return GEARMAND_SUCCESS;
  749. }
  750. gearmand_error_t gearmand_io_set_events(gearman_server_con_st *con, short events)
  751. {
  752. gearmand_io_st *connection= &con->con;
  753. if ((connection->events | events) == connection->events)
  754. {
  755. return GEARMAND_SUCCESS;
  756. }
  757. connection->events|= events;
  758. if (connection->universal->event_watch_fn)
  759. {
  760. gearmand_error_t ret= connection->universal->event_watch_fn(connection, connection->events,
  761. (void *)connection->universal->event_watch_context);
  762. if (gearmand_failed(ret))
  763. {
  764. gearmand_gerror_warn("event watch failed, closing connection", ret);
  765. _connection_close(connection);
  766. return ret;
  767. }
  768. }
  769. return GEARMAND_SUCCESS;
  770. }
  771. gearmand_error_t gearmand_io_set_revents(gearman_server_con_st *con, short revents)
  772. {
  773. gearmand_io_st *connection= &con->con;
  774. if (revents != 0)
  775. {
  776. connection->options.ready= true;
  777. GEARMAND_LIST_ADD(connection->universal->ready_con, connection, ready_);
  778. }
  779. connection->revents= revents;
  780. /* Remove external POLLOUT watch if we didn't ask for it. Otherwise we spin
  781. forever until another POLLIN state change. This is much more efficient
  782. than removing POLLOUT on every state change since some external polling
  783. mechanisms need to use a system call to change flags (like Linux epoll). */
  784. if (revents & POLLOUT && !(connection->events & POLLOUT) &&
  785. connection->universal->event_watch_fn != NULL)
  786. {
  787. gearmand_error_t ret= connection->universal->event_watch_fn(connection, connection->events,
  788. (void *)connection->universal->event_watch_context);
  789. if (gearmand_failed(ret))
  790. {
  791. gearmand_gerror_warn("event watch failed, closing connection", ret);
  792. _connection_close(connection);
  793. return ret;
  794. }
  795. }
  796. connection->events&= (short)~revents;
  797. return GEARMAND_SUCCESS;
  798. }
  799. /*
  800. * Static Definitions
  801. */
  802. gearmand_error_t gearmand_io_st::_io_setsockopt()
  803. {
  804. {
  805. int setting= 1;
  806. if (setsockopt(_fd, IPPROTO_TCP, TCP_NODELAY, &setting, (socklen_t)sizeof(int)) and errno != EOPNOTSUPP)
  807. {
  808. return gearmand_perror(errno, "setsockopt(TCP_NODELAY)");
  809. }
  810. }
  811. {
  812. struct linger linger;
  813. linger.l_onoff= 1;
  814. linger.l_linger= GEARMAND_DEFAULT_SOCKET_TIMEOUT;
  815. if (setsockopt(_fd, SOL_SOCKET, SO_LINGER, &linger, (socklen_t)sizeof(struct linger)))
  816. {
  817. return gearmand_perror(errno, "setsockopt(SO_LINGER)");
  818. }
  819. }
  820. #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
  821. {
  822. int setting= 1;
  823. // This is not considered a fatal error
  824. if (setsockopt(_fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&setting, sizeof(int)))
  825. {
  826. gearmand_perror(errno, "setsockopt(SO_NOSIGPIPE)");
  827. }
  828. }
  829. #endif
  830. #if 0
  831. if (0)
  832. {
  833. struct timeval waittime;
  834. waittime.tv_sec= GEARMAND_DEFAULT_SOCKET_TIMEOUT;
  835. waittime.tv_usec= 0;
  836. if (setsockopt(_fd, SOL_SOCKET, SO_SNDTIMEO, &waittime, (socklen_t)sizeof(struct timeval)) and errno != ENOPROTOOPT)
  837. {
  838. return gearmand_perror(errno, "setsockopt(SO_SNDTIMEO)");
  839. }
  840. if (setsockopt(_fd, SOL_SOCKET, SO_RCVTIMEO, &waittime, (socklen_t)sizeof(struct timeval)) and errno != ENOPROTOOPT)
  841. {
  842. return gearmand_perror(errno, "setsockopt(SO_RCVTIMEO)");
  843. }
  844. }
  845. #endif
  846. #if 0
  847. if (0)
  848. {
  849. int setting= GEARMAND_DEFAULT_SOCKET_SEND_SIZE;
  850. if (setsockopt(_fd, SOL_SOCKET, SO_SNDBUF, &setting, (socklen_t)sizeof(int)))
  851. {
  852. return gearmand_perror(errno, "setsockopr(SO_SNDBUF)");
  853. }
  854. setting= GEARMAND_DEFAULT_SOCKET_RECV_SIZE;
  855. if (setsockopt(_fd, SOL_SOCKET, SO_RCVBUF, &setting, (socklen_t)sizeof(int)))
  856. {
  857. return gearmand_perror(errno, "setsockopt(SO_RCVBUF)");
  858. }
  859. }
  860. #endif
  861. if (SOCK_NONBLOCK == 0)
  862. {
  863. gearmand_error_t local_ret;
  864. if ((local_ret= gearmand_sockfd_nonblock(_fd)))
  865. {
  866. return local_ret;
  867. }
  868. }
  869. return GEARMAND_SUCCESS;
  870. }
  871. gearmand_error_t gearmand_sockfd_nonblock(const int& sockfd)
  872. {
  873. int flags;
  874. do
  875. {
  876. flags= fcntl(sockfd, F_GETFL, 0);
  877. if (flags == -1)
  878. {
  879. gearmand_log_perror_warn(GEARMAN_DEFAULT_LOG_PARAM, flags, "gearmand_sockfd_nonblock");
  880. }
  881. } while (flags == -1 and (errno == EINTR or errno == EAGAIN));
  882. if (flags == -1)
  883. {
  884. return gearmand_perror(errno, "fcntl(F_GETFL)");
  885. }
  886. else if ((flags & O_NONBLOCK) == 0)
  887. {
  888. int retval;
  889. do
  890. {
  891. retval= fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
  892. if (retval == -1)
  893. {
  894. gearmand_log_perror_warn(GEARMAN_DEFAULT_LOG_PARAM, retval, "gearmand_sockfd_nonblock");
  895. }
  896. } while (retval == -1 and (errno == EINTR or errno == EAGAIN));
  897. if (retval == -1)
  898. {
  899. return gearmand_perror(errno, "fcntl(F_SETFL)");
  900. }
  901. }
  902. return GEARMAND_SUCCESS;
  903. }
  904. void gearmand_sockfd_close(int& sockfd)
  905. {
  906. if (sockfd != INVALID_SOCKET)
  907. {
  908. /* in case of death shutdown to avoid blocking at close() */
  909. if (shutdown(sockfd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
  910. {
  911. gearmand_perror(errno, "shutdown");
  912. assert(errno != ENOTSOCK);
  913. }
  914. else if (closesocket(sockfd) == SOCKET_ERROR)
  915. {
  916. gearmand_perror(errno, "close");
  917. }
  918. sockfd= INVALID_SOCKET;
  919. }
  920. else
  921. {
  922. gearmand_debug("gearmand_sockfd_close() called with an invalid socket, this was probably ok");
  923. }
  924. }
  925. void gearmand_pipe_close(int& pipefd)
  926. {
  927. if (pipefd == INVALID_SOCKET)
  928. {
  929. gearmand_error("gearmand_pipe_close() called with an invalid socket");
  930. return;
  931. }
  932. if (closesocket(pipefd) == SOCKET_ERROR)
  933. {
  934. gearmand_perror(errno, "close");
  935. }
  936. pipefd= -1;
  937. }
  938. #pragma GCC diagnostic pop