connection.cc 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief Connection Definitions
  41. */
  42. #include <libgearman/common.h>
  43. #include <libgearman/connection.h>
  44. #include <libgearman/packet.hpp>
  45. #include <libgearman/universal.hpp>
  46. #include <libgearman/allocator.hpp>
  47. #include <cassert>
  48. #include <cerrno>
  49. #include <cstdio>
  50. #include <cstdlib>
  51. #include <cstring>
  52. #include <memory>
  53. #include <unistd.h>
  54. #if HAVE_NETINET_TCP_H
  55. #include <netinet/tcp.h> /* for TCP_NODELAY */
  56. #endif
  57. #ifdef HAVE_FCNTL_H
  58. #include <fcntl.h>
  59. #endif
  60. static gearman_return_t gearman_connection_set_option(gearman_connection_st *connection,
  61. gearman_connection_options_t options,
  62. bool value);
  63. /**
  64. * @addtogroup gearman_connection_static Static Connection Declarations
  65. * @ingroup gearman_connection
  66. * @{
  67. */
  68. gearman_connection_st::gearman_connection_st(gearman_universal_st &universal_arg,
  69. gearman_connection_options_t *options_args) :
  70. state(GEARMAN_CON_UNIVERSAL_ADDRINFO),
  71. send_state(GEARMAN_CON_SEND_STATE_NONE),
  72. recv_state(GEARMAN_CON_RECV_UNIVERSAL_NONE),
  73. port(0),
  74. events(0),
  75. revents(0),
  76. fd(-1),
  77. cached_errno(0),
  78. created_id(0),
  79. created_id_next(0),
  80. send_buffer_size(0),
  81. send_data_size(0),
  82. send_data_offset(0),
  83. recv_buffer_size(0),
  84. recv_data_size(0),
  85. recv_data_offset(0),
  86. universal(universal_arg)
  87. {
  88. options.ready= false;
  89. options.packet_in_use= false;
  90. if (options_args)
  91. {
  92. while (*options_args != GEARMAN_CON_MAX)
  93. {
  94. gearman_connection_set_option(this, *options_args, true);
  95. options_args++;
  96. }
  97. }
  98. if (universal.con_list)
  99. universal.con_list->prev= this;
  100. next= universal.con_list;
  101. prev= NULL;
  102. universal.con_list= this;
  103. universal.con_count++;
  104. context= NULL;
  105. addrinfo= NULL;
  106. addrinfo_next= NULL;
  107. send_buffer_ptr= send_buffer;
  108. recv_packet= NULL;
  109. recv_buffer_ptr= recv_buffer;
  110. host[0]= 0;
  111. }
  112. gearman_connection_st *gearman_connection_create(gearman_universal_st &universal,
  113. gearman_connection_options_t *options)
  114. {
  115. gearman_connection_st *connection= new (std::nothrow) gearman_connection_st(universal, options);
  116. if (not connection)
  117. {
  118. gearman_perror(universal, "gearman_connection_st new");
  119. return NULL;
  120. }
  121. return connection;
  122. }
  123. gearman_connection_st *gearman_connection_create_args(gearman_universal_st& universal,
  124. const char *host, in_port_t port)
  125. {
  126. gearman_connection_st *connection= gearman_connection_create(universal, NULL);
  127. if (not connection)
  128. return NULL;
  129. connection->set_host(host, port);
  130. return connection;
  131. }
  132. gearman_connection_st *gearman_connection_copy(gearman_universal_st& universal,
  133. const gearman_connection_st& from)
  134. {
  135. gearman_connection_st *connection= gearman_connection_create(universal, NULL);
  136. if (not connection)
  137. return connection;
  138. connection->options.ready= from.options.ready;
  139. connection->options.packet_in_use= from.options.packet_in_use;
  140. strcpy(connection->host, from.host);
  141. connection->port= from.port;
  142. return connection;
  143. }
  144. gearman_connection_st::~gearman_connection_st()
  145. {
  146. if (fd != -1)
  147. close();
  148. reset_addrinfo();
  149. { // Remove from universal list
  150. if (universal.con_list == this)
  151. universal.con_list= next;
  152. if (prev)
  153. prev->next= next;
  154. if (next)
  155. next->prev= prev;
  156. universal.con_count--;
  157. }
  158. if (options.packet_in_use)
  159. gearman_packet_free(&(_packet));
  160. }
  161. gearman_return_t gearman_connection_set_option(gearman_connection_st *connection,
  162. gearman_connection_options_t options,
  163. bool value)
  164. {
  165. switch (options)
  166. {
  167. case GEARMAN_CON_READY:
  168. connection->options.ready= value;
  169. break;
  170. case GEARMAN_CON_PACKET_IN_USE:
  171. connection->options.packet_in_use= value;
  172. break;
  173. case GEARMAN_CON_IGNORE_LOST_CONNECTION:
  174. break;
  175. case GEARMAN_CON_CLOSE_AFTER_FLUSH:
  176. break;
  177. case GEARMAN_CON_EXTERNAL_FD:
  178. case GEARMAN_CON_MAX:
  179. default:
  180. return GEARMAN_INVALID_COMMAND;
  181. }
  182. return GEARMAN_SUCCESS;
  183. }
  184. /**
  185. * Set socket options for a connection.
  186. */
  187. static gearman_return_t _con_setsockopt(gearman_connection_st *connection);
  188. /** @} */
  189. /*
  190. * Public Definitions
  191. */
  192. void gearman_connection_st::set_host(const char *host_arg, const in_port_t port_arg)
  193. {
  194. reset_addrinfo();
  195. strncpy(host, host_arg == NULL ? GEARMAN_DEFAULT_TCP_HOST : host_arg, NI_MAXHOST);
  196. host[NI_MAXHOST - 1]= 0;
  197. port= in_port_t(port_arg == 0 ? GEARMAN_DEFAULT_TCP_PORT : port_arg);
  198. }
  199. void gearman_connection_st::close()
  200. {
  201. if (fd == INVALID_SOCKET)
  202. return;
  203. /* in case of death shutdown to avoid blocking at close() */
  204. if (shutdown(fd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
  205. {
  206. gearman_perror(universal, "shutdown");
  207. assert(errno != ENOTSOCK);
  208. return;
  209. }
  210. if (closesocket(fd) == SOCKET_ERROR)
  211. {
  212. gearman_perror(universal, "close");
  213. }
  214. state= GEARMAN_CON_UNIVERSAL_ADDRINFO;
  215. fd= INVALID_SOCKET;
  216. events= 0;
  217. revents= 0;
  218. send_state= GEARMAN_CON_SEND_STATE_NONE;
  219. send_buffer_ptr= send_buffer;
  220. send_buffer_size= 0;
  221. send_data_size= 0;
  222. send_data_offset= 0;
  223. recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
  224. if (recv_packet)
  225. {
  226. gearman_packet_free(recv_packet);
  227. recv_packet= NULL;
  228. }
  229. recv_buffer_ptr= recv_buffer;
  230. recv_buffer_size= 0;
  231. }
  232. void gearman_connection_st::reset_addrinfo()
  233. {
  234. if (addrinfo)
  235. {
  236. freeaddrinfo(addrinfo);
  237. addrinfo= NULL;
  238. }
  239. addrinfo_next= NULL;
  240. }
  241. gearman_return_t gearman_connection_st::send(const gearman_packet_st& packet_arg, const bool flush_buffer)
  242. {
  243. switch (send_state)
  244. {
  245. case GEARMAN_CON_SEND_STATE_NONE:
  246. if (not (packet_arg.options.complete))
  247. {
  248. return gearman_error(universal, GEARMAN_INVALID_PACKET, "packet not complete");
  249. }
  250. /* Pack first part of packet, which is everything but the payload. */
  251. while (1)
  252. {
  253. gearman_return_t rc;
  254. { // Scoping to shut compiler up about switch/case jump
  255. size_t send_size= gearman_packet_pack(packet_arg,
  256. send_buffer + send_buffer_size,
  257. GEARMAN_SEND_BUFFER_SIZE -
  258. send_buffer_size, rc);
  259. if (gearman_success(rc))
  260. {
  261. send_buffer_size+= send_size;
  262. break;
  263. }
  264. else if (rc == GEARMAN_IGNORE_PACKET)
  265. {
  266. return GEARMAN_SUCCESS;
  267. }
  268. else if (rc != GEARMAN_FLUSH_DATA)
  269. {
  270. return rc;
  271. }
  272. }
  273. /* We were asked to flush when the buffer is already flushed! */
  274. if (send_buffer_size == 0)
  275. {
  276. gearman_universal_set_error(universal, GEARMAN_SEND_BUFFER_TOO_SMALL, __func__, AT,
  277. "send buffer too small (%u)", GEARMAN_SEND_BUFFER_SIZE);
  278. return GEARMAN_SEND_BUFFER_TOO_SMALL;
  279. }
  280. /* Flush buffer now if first part of packet won't fit in. */
  281. send_state= GEARMAN_CON_SEND_UNIVERSAL_PRE_FLUSH;
  282. case GEARMAN_CON_SEND_UNIVERSAL_PRE_FLUSH:
  283. {
  284. gearman_return_t ret= flush();
  285. if (gearman_failed(ret))
  286. {
  287. return ret;
  288. }
  289. }
  290. }
  291. /* Return here if we have no data to send. */
  292. if (not packet_arg.data_size)
  293. {
  294. break;
  295. }
  296. /* If there is any room in the buffer, copy in data. */
  297. if (packet_arg.data and
  298. (GEARMAN_SEND_BUFFER_SIZE - send_buffer_size) > 0)
  299. {
  300. send_data_offset= GEARMAN_SEND_BUFFER_SIZE - send_buffer_size;
  301. if (send_data_offset > packet_arg.data_size)
  302. send_data_offset= packet_arg.data_size;
  303. memcpy(send_buffer + send_buffer_size, packet_arg.data, send_data_offset);
  304. send_buffer_size+= send_data_offset;
  305. /* Return if all data fit in the send buffer. */
  306. if (send_data_offset == packet_arg.data_size)
  307. {
  308. send_data_offset= 0;
  309. break;
  310. }
  311. }
  312. /* Flush buffer now so we can start writing directly from data buffer. */
  313. send_state= GEARMAN_CON_SEND_UNIVERSAL_FORCE_FLUSH;
  314. case GEARMAN_CON_SEND_UNIVERSAL_FORCE_FLUSH:
  315. {
  316. gearman_return_t ret= flush();
  317. if (gearman_failed(ret))
  318. return ret;
  319. }
  320. send_data_size= packet_arg.data_size;
  321. /* If this is NULL, then gearman_connection_send_data function will be used. */
  322. if (not packet_arg.data)
  323. {
  324. send_state= GEARMAN_CON_SEND_UNIVERSAL_FLUSH_DATA;
  325. return GEARMAN_SUCCESS;
  326. }
  327. /* Copy into the buffer if it fits, otherwise flush from packet buffer. */
  328. send_buffer_size= packet_arg.data_size - send_data_offset;
  329. if (send_buffer_size < GEARMAN_SEND_BUFFER_SIZE)
  330. {
  331. memcpy(send_buffer,
  332. static_cast<char *>(const_cast<void *>(packet_arg.data)) + send_data_offset,
  333. send_buffer_size);
  334. send_data_size= 0;
  335. send_data_offset= 0;
  336. break;
  337. }
  338. send_buffer_ptr= static_cast<char *>(const_cast<void *>(packet_arg.data)) + send_data_offset;
  339. send_state= GEARMAN_CON_SEND_UNIVERSAL_FLUSH_DATA;
  340. case GEARMAN_CON_SEND_UNIVERSAL_FLUSH:
  341. case GEARMAN_CON_SEND_UNIVERSAL_FLUSH_DATA:
  342. return flush();
  343. }
  344. if (flush_buffer)
  345. {
  346. send_state= GEARMAN_CON_SEND_UNIVERSAL_FLUSH;
  347. return flush();
  348. }
  349. send_state= GEARMAN_CON_SEND_STATE_NONE;
  350. return GEARMAN_SUCCESS;
  351. }
  352. size_t gearman_connection_st::send_and_flush(const void *data, size_t data_size, gearman_return_t *ret_ptr)
  353. {
  354. if (send_state != GEARMAN_CON_SEND_UNIVERSAL_FLUSH_DATA)
  355. {
  356. return gearman_error(universal, GEARMAN_NOT_FLUSHING, "not flushing");
  357. }
  358. if (data_size > (send_data_size - send_data_offset))
  359. {
  360. return gearman_error(universal, GEARMAN_DATA_TOO_LARGE, "data too large");
  361. }
  362. send_buffer_ptr= static_cast<char *>(const_cast<void *>(data));
  363. send_buffer_size= data_size;
  364. *ret_ptr= flush();
  365. return data_size -send_buffer_size;
  366. }
  367. gearman_return_t gearman_connection_st::flush()
  368. {
  369. while (1)
  370. {
  371. switch (state)
  372. {
  373. case GEARMAN_CON_UNIVERSAL_ADDRINFO:
  374. {
  375. if (addrinfo)
  376. {
  377. freeaddrinfo(addrinfo);
  378. addrinfo= NULL;
  379. }
  380. char port_str[NI_MAXSERV];
  381. snprintf(port_str, NI_MAXSERV, "%hu", uint16_t(port));
  382. struct addrinfo ai;
  383. memset(&ai, 0, sizeof(struct addrinfo));
  384. ai.ai_socktype= SOCK_STREAM;
  385. ai.ai_protocol= IPPROTO_TCP;
  386. int ret;
  387. if ((ret= getaddrinfo(host, port_str, &ai, &(addrinfo))))
  388. {
  389. gearman_universal_set_error(universal, GEARMAN_GETADDRINFO, AT, "getaddrinfo:%s", gai_strerror(ret));
  390. return GEARMAN_GETADDRINFO;
  391. }
  392. addrinfo_next= addrinfo;
  393. }
  394. case GEARMAN_CON_UNIVERSAL_CONNECT:
  395. if (fd != INVALID_SOCKET)
  396. {
  397. close();
  398. }
  399. if (addrinfo_next == NULL)
  400. {
  401. state= GEARMAN_CON_UNIVERSAL_ADDRINFO;
  402. return gearman_universal_set_error(universal, GEARMAN_COULD_NOT_CONNECT, GEARMAN_AT, "%s:%hu", host, uint16_t(port));
  403. }
  404. fd= socket(addrinfo_next->ai_family, addrinfo_next->ai_socktype, addrinfo_next->ai_protocol);
  405. if (fd == INVALID_SOCKET)
  406. {
  407. state= GEARMAN_CON_UNIVERSAL_ADDRINFO;
  408. return gearman_perror(universal, "socket");
  409. }
  410. {
  411. gearman_return_t gret= _con_setsockopt(this);
  412. if (gearman_failed(gret))
  413. {
  414. close();
  415. return gret;
  416. }
  417. }
  418. while (1)
  419. {
  420. if (not connect(fd, addrinfo_next->ai_addr, addrinfo_next->ai_addrlen))
  421. {
  422. state= GEARMAN_CON_UNIVERSAL_CONNECTED;
  423. addrinfo_next= NULL;
  424. break;
  425. }
  426. if (errno == EAGAIN || errno == EINTR)
  427. continue;
  428. if (errno == EINPROGRESS)
  429. {
  430. state= GEARMAN_CON_UNIVERSAL_CONNECTING;
  431. break;
  432. }
  433. if (errno == ECONNREFUSED || errno == ENETUNREACH || errno == ETIMEDOUT)
  434. {
  435. state= GEARMAN_CON_UNIVERSAL_CONNECT;
  436. addrinfo_next= addrinfo_next->ai_next;
  437. break;
  438. }
  439. gearman_perror(universal, "connect");
  440. close();
  441. return GEARMAN_ERRNO;
  442. }
  443. if (state != GEARMAN_CON_UNIVERSAL_CONNECTING)
  444. break;
  445. case GEARMAN_CON_UNIVERSAL_CONNECTING:
  446. while (1)
  447. {
  448. if (revents & (POLLERR | POLLHUP | POLLNVAL))
  449. {
  450. state= GEARMAN_CON_UNIVERSAL_CONNECT;
  451. addrinfo_next= addrinfo_next->ai_next;
  452. break;
  453. }
  454. else if (revents & POLLOUT)
  455. {
  456. state= GEARMAN_CON_UNIVERSAL_CONNECTED;
  457. break;
  458. }
  459. set_events(POLLOUT);
  460. if (gearman_universal_is_non_blocking(universal))
  461. {
  462. state= GEARMAN_CON_UNIVERSAL_CONNECTING;
  463. return gearman_gerror(universal, GEARMAN_IO_WAIT);
  464. }
  465. gearman_return_t gret= gearman_wait(universal);
  466. if (gearman_failed(gret))
  467. return gret;
  468. }
  469. if (state != GEARMAN_CON_UNIVERSAL_CONNECTED)
  470. break;
  471. case GEARMAN_CON_UNIVERSAL_CONNECTED:
  472. while (send_buffer_size != 0)
  473. {
  474. ssize_t write_size= ::send(fd, send_buffer_ptr, send_buffer_size,
  475. gearman_universal_is_non_blocking(universal) ? MSG_NOSIGNAL| MSG_DONTWAIT : MSG_NOSIGNAL);
  476. if (write_size == 0) // Zero value on send()
  477. { }
  478. else if (write_size == -1)
  479. {
  480. if (errno == EAGAIN)
  481. {
  482. set_events(POLLOUT);
  483. if (gearman_universal_is_non_blocking(universal))
  484. {
  485. gearman_gerror(universal, GEARMAN_IO_WAIT);
  486. return GEARMAN_IO_WAIT;
  487. }
  488. gearman_return_t gret= gearman_wait(universal);
  489. if (gearman_failed(gret))
  490. return gret;
  491. continue;
  492. }
  493. else if (errno == EINTR)
  494. {
  495. continue;
  496. }
  497. else if (errno == EPIPE || errno == ECONNRESET || errno == EHOSTDOWN)
  498. {
  499. gearman_perror(universal, "lost connection to server during send");
  500. close();
  501. return GEARMAN_LOST_CONNECTION;
  502. }
  503. gearman_perror(universal, "send");
  504. close();
  505. return GEARMAN_ERRNO;
  506. }
  507. send_buffer_size-= size_t(write_size);
  508. if (send_state == GEARMAN_CON_SEND_UNIVERSAL_FLUSH_DATA)
  509. {
  510. send_data_offset+= size_t(write_size);
  511. if (send_data_offset == send_data_size)
  512. {
  513. send_data_size= 0;
  514. send_data_offset= 0;
  515. break;
  516. }
  517. if (send_buffer_size == 0)
  518. return GEARMAN_SUCCESS;
  519. }
  520. else if (send_buffer_size == 0)
  521. {
  522. break;
  523. }
  524. send_buffer_ptr+= write_size;
  525. }
  526. send_state= GEARMAN_CON_SEND_STATE_NONE;
  527. send_buffer_ptr= send_buffer;
  528. return GEARMAN_SUCCESS;
  529. }
  530. }
  531. }
  532. gearman_packet_st *gearman_connection_st::receiving(gearman_packet_st& packet_arg,
  533. gearman_return_t& ret, const bool recv_data)
  534. {
  535. switch (recv_state)
  536. {
  537. case GEARMAN_CON_RECV_UNIVERSAL_NONE:
  538. if (state != GEARMAN_CON_UNIVERSAL_CONNECTED)
  539. {
  540. gearman_error(universal, GEARMAN_NOT_CONNECTED, "not connected");
  541. ret= GEARMAN_NOT_CONNECTED;
  542. return NULL;
  543. }
  544. recv_packet= gearman_packet_create(universal, &packet_arg);
  545. if (not recv_packet)
  546. {
  547. ret= GEARMAN_MEMORY_ALLOCATION_FAILURE;
  548. return NULL;
  549. }
  550. recv_state= GEARMAN_CON_RECV_UNIVERSAL_READ;
  551. case GEARMAN_CON_RECV_UNIVERSAL_READ:
  552. while (1)
  553. {
  554. if (recv_buffer_size > 0)
  555. {
  556. size_t recv_size= gearman_packet_unpack(*recv_packet,
  557. recv_buffer_ptr,
  558. recv_buffer_size, ret);
  559. recv_buffer_ptr+= recv_size;
  560. recv_buffer_size-= recv_size;
  561. if (gearman_success(ret))
  562. {
  563. break;
  564. }
  565. else if (ret != GEARMAN_IO_WAIT)
  566. {
  567. close();
  568. return NULL;
  569. }
  570. }
  571. /* Shift buffer contents if needed. */
  572. if (recv_buffer_size > 0)
  573. {
  574. memmove(recv_buffer, recv_buffer_ptr, recv_buffer_size);
  575. }
  576. recv_buffer_ptr= recv_buffer;
  577. size_t recv_size= recv(recv_buffer + recv_buffer_size, GEARMAN_RECV_BUFFER_SIZE - recv_buffer_size, ret);
  578. if (gearman_failed(ret))
  579. {
  580. return NULL;
  581. }
  582. recv_buffer_size+= recv_size;
  583. }
  584. if (packet_arg.data_size == 0)
  585. {
  586. recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
  587. break;
  588. }
  589. recv_data_size= packet_arg.data_size;
  590. if (not recv_data)
  591. {
  592. recv_state= GEARMAN_CON_RECV_STATE_READ_DATA;
  593. break;
  594. }
  595. assert(packet_arg.universal);
  596. packet_arg.data= gearman_malloc((*packet_arg.universal), packet_arg.data_size);
  597. if (not packet_arg.data)
  598. {
  599. ret= GEARMAN_MEMORY_ALLOCATION_FAILURE;
  600. close();
  601. return NULL;
  602. }
  603. packet_arg.options.free_data= true;
  604. recv_state= GEARMAN_CON_RECV_STATE_READ_DATA;
  605. case GEARMAN_CON_RECV_STATE_READ_DATA:
  606. while (recv_data_size)
  607. {
  608. (void)receiving(static_cast<uint8_t *>(const_cast<void *>(packet_arg.data)) +
  609. recv_data_offset,
  610. packet_arg.data_size -recv_data_offset, ret);
  611. if (gearman_failed(ret))
  612. {
  613. return NULL;
  614. }
  615. }
  616. recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
  617. break;
  618. }
  619. gearman_packet_st *tmp_packet_arg= recv_packet;
  620. recv_packet= NULL;
  621. return tmp_packet_arg;
  622. }
  623. size_t gearman_connection_st::receiving(void *data, size_t data_size, gearman_return_t& ret)
  624. {
  625. size_t recv_size= 0;
  626. if (recv_data_size == 0)
  627. {
  628. ret= GEARMAN_SUCCESS;
  629. return 0;
  630. }
  631. if ((recv_data_size - recv_data_offset) < data_size)
  632. data_size= recv_data_size - recv_data_offset;
  633. if (recv_buffer_size > 0)
  634. {
  635. if (recv_buffer_size < data_size)
  636. recv_size= recv_buffer_size;
  637. else
  638. recv_size= data_size;
  639. memcpy(data, recv_buffer_ptr, recv_size);
  640. recv_buffer_ptr+= recv_size;
  641. recv_buffer_size-= recv_size;
  642. }
  643. if (data_size != recv_size)
  644. {
  645. recv_size+= recv(static_cast<uint8_t *>(const_cast<void *>(data)) + recv_size, data_size - recv_size, ret);
  646. recv_data_offset+= recv_size;
  647. }
  648. else
  649. {
  650. recv_data_offset+= recv_size;
  651. ret= GEARMAN_SUCCESS;
  652. }
  653. if (recv_data_size == recv_data_offset)
  654. {
  655. recv_data_size= 0;
  656. recv_data_offset= 0;
  657. recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
  658. }
  659. return recv_size;
  660. }
  661. size_t gearman_connection_st::recv(void *data, size_t data_size, gearman_return_t& ret)
  662. {
  663. ssize_t read_size;
  664. while (1)
  665. {
  666. read_size= ::recv(fd, data, data_size, 0);
  667. if (read_size == 0)
  668. {
  669. gearman_error(universal, GEARMAN_LOST_CONNECTION, "lost connection to server (EOF)");
  670. close();
  671. ret= GEARMAN_LOST_CONNECTION;
  672. return 0;
  673. }
  674. else if (read_size == -1)
  675. {
  676. if (errno == EAGAIN)
  677. {
  678. set_events(POLLIN);
  679. if (gearman_universal_is_non_blocking(universal))
  680. {
  681. gearman_gerror(universal, GEARMAN_IO_WAIT);
  682. ret= GEARMAN_IO_WAIT;
  683. return 0;
  684. }
  685. ret= gearman_wait(universal);
  686. if (gearman_failed(ret))
  687. {
  688. return 0;
  689. }
  690. continue;
  691. }
  692. else if (errno == EINTR)
  693. {
  694. continue;
  695. }
  696. else if (errno == EPIPE || errno == ECONNRESET || errno == EHOSTDOWN)
  697. {
  698. gearman_perror(universal, "lost connection to server during read");
  699. ret= GEARMAN_LOST_CONNECTION;
  700. }
  701. else
  702. {
  703. gearman_perror(universal, "read");
  704. ret= GEARMAN_ERRNO;
  705. }
  706. close();
  707. return 0;
  708. }
  709. break;
  710. }
  711. ret= GEARMAN_SUCCESS;
  712. return size_t(read_size);
  713. }
  714. void gearman_connection_st::set_events(short arg)
  715. {
  716. if ((events | arg) == events)
  717. return;
  718. events|= arg;
  719. }
  720. void gearman_connection_st::set_revents(short arg)
  721. {
  722. if (arg)
  723. options.ready= true;
  724. revents= arg;
  725. events&= short(~arg);
  726. }
  727. /*
  728. * Static Definitions
  729. */
  730. static gearman_return_t _con_setsockopt(gearman_connection_st *connection)
  731. {
  732. int ret;
  733. struct linger linger;
  734. struct timeval waittime;
  735. ret= 1;
  736. ret= setsockopt(connection->fd, IPPROTO_TCP, TCP_NODELAY, &ret,
  737. socklen_t(sizeof(int)));
  738. if (ret == -1 && errno != EOPNOTSUPP)
  739. {
  740. gearman_perror(connection->universal, "setsockopt(TCP_NODELAY)");
  741. return GEARMAN_ERRNO;
  742. }
  743. linger.l_onoff= 1;
  744. linger.l_linger= GEARMAN_DEFAULT_SOCKET_TIMEOUT;
  745. ret= setsockopt(connection->fd, SOL_SOCKET, SO_LINGER, &linger,
  746. socklen_t(sizeof(struct linger)));
  747. if (ret == -1)
  748. {
  749. gearman_perror(connection->universal, "setsockopt(SO_LINGER)");
  750. return GEARMAN_ERRNO;
  751. }
  752. waittime.tv_sec= GEARMAN_DEFAULT_SOCKET_TIMEOUT;
  753. waittime.tv_usec= 0;
  754. ret= setsockopt(connection->fd, SOL_SOCKET, SO_SNDTIMEO, &waittime,
  755. socklen_t(sizeof(struct timeval)));
  756. if (ret == -1 && errno != ENOPROTOOPT)
  757. {
  758. gearman_perror(connection->universal, "setsockopt(SO_SNDTIMEO)");
  759. return GEARMAN_ERRNO;
  760. }
  761. {
  762. int optval= 1;
  763. ret= setsockopt(connection->fd, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval));
  764. if (ret == -1 && errno != ENOPROTOOPT)
  765. {
  766. gearman_perror(connection->universal, "setsockopt(SO_RCVTIMEO)");
  767. return GEARMAN_ERRNO;
  768. }
  769. }
  770. ret= setsockopt(connection->fd, SOL_SOCKET, SO_RCVTIMEO, &waittime,
  771. socklen_t(sizeof(struct timeval)));
  772. if (ret == -1 && errno != ENOPROTOOPT)
  773. {
  774. gearman_perror(connection->universal, "setsockopt(SO_RCVTIMEO)");
  775. return GEARMAN_ERRNO;
  776. }
  777. ret= GEARMAN_DEFAULT_SOCKET_SEND_SIZE;
  778. ret= setsockopt(connection->fd, SOL_SOCKET, SO_SNDBUF, &ret, socklen_t(sizeof(int)));
  779. if (ret == -1)
  780. {
  781. gearman_perror(connection->universal, "setsockopt(SO_SNDBUF)");
  782. return GEARMAN_ERRNO;
  783. }
  784. #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
  785. {
  786. ret= 1;
  787. setsockopt(connection->fd, SOL_SOCKET, SO_NOSIGPIPE, static_cast<void *>(&ret), sizeof(int));
  788. // This is not considered a fatal error
  789. if (ret == -1)
  790. {
  791. gearman_perror(connection->universal, "setsockopt(SO_NOSIGPIPE)");
  792. }
  793. }
  794. #endif
  795. ret= GEARMAN_DEFAULT_SOCKET_RECV_SIZE;
  796. ret= setsockopt(connection->fd, SOL_SOCKET, SO_RCVBUF, &ret, socklen_t(sizeof(int)));
  797. if (ret == -1)
  798. {
  799. gearman_perror(connection->universal, "setsockopt(SO_RCVBUF)");
  800. return GEARMAN_ERRNO;
  801. }
  802. ret= fcntl(connection->fd, F_GETFL, 0);
  803. if (ret == -1)
  804. {
  805. gearman_perror(connection->universal, "fcntl(F_GETFL)");
  806. return GEARMAN_ERRNO;
  807. }
  808. ret= fcntl(connection->fd, F_SETFL, ret | O_NONBLOCK);
  809. if (ret == -1)
  810. {
  811. gearman_perror(connection->universal, "fcntl(F_SETFL)");
  812. return GEARMAN_ERRNO;
  813. }
  814. return GEARMAN_SUCCESS;
  815. }