12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034 |
- /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
- *
- * Gearmand client and server library.
- *
- * Copyright (C) 2011-2013 Data Differential, http://datadifferential.com/
- * Copyright (C) 2008 Brian Aker, Eric Day
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- *
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * The names of its contributors may not be used to endorse or
- * promote products derived from this software without specific prior
- * written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
- /**
- * @file
- * @brief Connection Definitions
- */
- #include "gear_config.h"
- #include "libgearman-server/common.h"
- #include <libgearman-server/plugins/base.h>
- #include <cstring>
- #include <cerrno>
- #include <cassert>
- #ifndef SOCK_NONBLOCK
- # define SOCK_NONBLOCK 0
- #endif
- #ifndef MSG_DONTWAIT
- # define MSG_DONTWAIT 0
- #endif
- static void _connection_close(gearmand_io_st *connection)
- {
- if (connection->fd == INVALID_SOCKET)
- {
- return;
- }
- if (connection->options.external_fd)
- {
- connection->options.external_fd= false;
- }
- else
- {
- #if defined(HAVE_CYASSL) && HAVE_CYASSL
- if (connection->root and connection->root->_ssl)
- {
- CyaSSL_shutdown(connection->root->_ssl);
- CyaSSL_free(connection->root->_ssl);
- connection->root->_ssl= NULL;
- }
- #endif
- (void)gearmand_sockfd_close(connection->fd);
- assert_msg(false, "We should never have an internal fd");
- }
- connection->_state= gearmand_io_st::GEARMAND_CON_UNIVERSAL_INVALID;
- connection->fd= INVALID_SOCKET;
- connection->events= 0;
- connection->revents= 0;
- connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
- connection->send_buffer_ptr= connection->send_buffer;
- connection->send_buffer_size= 0;
- connection->send_data_size= 0;
- connection->send_data_offset= 0;
- connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
- if (connection->recv_packet != NULL)
- {
- gearmand_packet_free(connection->recv_packet);
- connection->recv_packet= NULL;
- }
- connection->recv_buffer_ptr= connection->recv_buffer;
- connection->recv_buffer_size= 0;
- }
- const char* gearmand_io_st::host() const
- {
- if (context)
- {
- return context->host;
- }
- return "-";
- }
- const char* gearmand_io_st::port() const
- {
- if (context)
- {
- return context->port;
- }
- return "-";
- }
- static size_t _connection_read(gearman_server_con_st *con, void *data, size_t data_size, gearmand_error_t &ret)
- {
- ssize_t read_size;
- gearmand_io_st *connection= &con->con;
- while (1)
- {
- #if defined(HAVE_CYASSL) && HAVE_CYASSL
- if (con->_ssl)
- {
- read_size= CyaSSL_recv(con->_ssl, data, data_size, MSG_DONTWAIT);
- }
- else
- #endif
- {
- read_size= recv(connection->fd, data, data_size, MSG_DONTWAIT);
- }
- if (read_size == 0)
- {
- ret= GEARMAND_LOST_CONNECTION;
- gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Peer connection has called close()");
- _connection_close(connection);
- return 0;
- }
- else if (read_size == -1)
- {
- int local_errno= errno;
- switch (local_errno)
- {
- case EAGAIN:
- ret= gearmand_io_set_events(con, POLLIN);
- if (gearmand_failed(ret))
- {
- gearmand_perror(local_errno, "recv");
- return 0;
- }
- ret= GEARMAND_IO_WAIT;
- return 0;
- case EINTR:
- continue;
- case EPIPE:
- case ECONNRESET:
- case EHOSTDOWN:
- {
- ret= GEARMAND_LOST_CONNECTION;
- gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Peer connection has called close()");
- _connection_close(connection);
- return 0;
- }
- default:
- ret= GEARMAND_ERRNO;
- }
- gearmand_perror(local_errno, "closing connection due to previous errno error");
- _connection_close(connection);
- return 0;
- }
- break;
- }
- ret= GEARMAND_SUCCESS;
- return size_t(read_size);
- }
- static gearmand_error_t gearmand_connection_recv_data(gearman_server_con_st *con, void *data, size_t data_size)
- {
- gearmand_io_st *connection= &con->con;
- if (connection->recv_data_size == 0)
- {
- return GEARMAND_SUCCESS;
- }
- if ((connection->recv_data_size - connection->recv_data_offset) < data_size)
- {
- data_size= connection->recv_data_size - connection->recv_data_offset;
- }
- size_t recv_size= 0;
- if (connection->recv_buffer_size > 0)
- {
- if (connection->recv_buffer_size < data_size)
- {
- recv_size= connection->recv_buffer_size;
- }
- else
- {
- recv_size= data_size;
- }
- memcpy(data, connection->recv_buffer_ptr, recv_size);
- connection->recv_buffer_ptr+= recv_size;
- connection->recv_buffer_size-= recv_size;
- }
- gearmand_error_t ret;
- if (data_size != recv_size)
- {
- recv_size+= _connection_read(con, ((uint8_t *)data) + recv_size, data_size - recv_size, ret);
- connection->recv_data_offset+= recv_size;
- }
- else
- {
- connection->recv_data_offset+= recv_size;
- ret= GEARMAND_SUCCESS;
- }
- if (connection->recv_data_size == connection->recv_data_offset)
- {
- connection->recv_data_size= 0;
- connection->recv_data_offset= 0;
- connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
- }
- return ret;
- }
- static gearmand_error_t _connection_flush(gearman_server_con_st *con)
- {
- gearmand_io_st *connection= &con->con;
- assert(connection->_state == gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED);
- while (1)
- {
- switch (connection->_state)
- {
- case gearmand_io_st::GEARMAND_CON_UNIVERSAL_INVALID:
- assert(0);
- return GEARMAND_ERRNO;
- case gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED:
- uint32_t loop_counter= 0;
- while (connection->send_buffer_size)
- {
- ssize_t write_size;
- #if defined(HAVE_CYASSL) && HAVE_CYASSL
- if (con->_ssl)
- {
- write_size= CyaSSL_send(con->_ssl, connection->send_buffer_ptr, connection->send_buffer_size, MSG_NOSIGNAL|MSG_DONTWAIT);
- // I consider this to be a bug in CyaSSL_send() that is uses a zero in this manner
- if (write_size <= 0)
- {
- int err;
- switch ((err= CyaSSL_get_error(con->_ssl, write_size)))
- {
- case SSL_ERROR_WANT_CONNECT:
- case SSL_ERROR_WANT_ACCEPT:
- write_size= -1;
- errno= EAGAIN;
- break;
- case SSL_ERROR_WANT_WRITE:
- case SSL_ERROR_WANT_READ:
- write_size= -1;
- errno= EAGAIN;
- break;
- default:
- {
- char errorString[80];
- CyaSSL_ERR_error_string(err, errorString);
- _connection_close(connection);
- return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_LOST_CONNECTION, "SSL failure(%s)",
- errorString);
- }
- }
- }
- }
- else
- #endif
- {
- write_size= send(connection->fd, connection->send_buffer_ptr, connection->send_buffer_size, MSG_NOSIGNAL|MSG_DONTWAIT);
- }
- if (write_size == 0) // detect infinite loop?
- {
- ++loop_counter;
- gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "send() sent zero bytes of %u",
- uint32_t(connection->send_buffer_size));
- if (loop_counter > 5)
- {
- _connection_close(connection);
- return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_LOST_CONNECTION, "send() failed to send data");
- }
- continue;
- }
- else if (write_size == -1)
- {
- int local_errno= errno;
- switch (local_errno)
- {
- #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
- case EWOULDBLOCK:
- #endif
- case EAGAIN:
- {
- gearmand_error_t gret= gearmand_io_set_events(con, POLLOUT);
- if (gret != GEARMAND_SUCCESS)
- {
- return gret;
- }
- return GEARMAND_IO_WAIT;
- }
- case EINTR:
- continue;
- case EPIPE:
- case ECONNRESET:
- case EHOSTDOWN:
- gearmand_perror(local_errno, "lost connection to client during send(EPIPE || ECONNRESET || EHOSTDOWN)");
- _connection_close(connection);
- return GEARMAND_LOST_CONNECTION;
- default:
- break;
- }
- gearmand_perror(local_errno, "send() failed, closing connection");
- _connection_close(connection);
- return GEARMAND_ERRNO;
- }
- gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "send() %u bytes to peer",
- uint32_t(write_size));
- connection->send_buffer_size-= static_cast<size_t>(write_size);
- if (connection->send_state == gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA)
- {
- connection->send_data_offset+= static_cast<size_t>(write_size);
- if (connection->send_data_offset == connection->send_data_size)
- {
- connection->send_data_size= 0;
- connection->send_data_offset= 0;
- break;
- }
- if (connection->send_buffer_size == 0)
- {
- return GEARMAND_SUCCESS;
- }
- }
- else if (connection->send_buffer_size == 0)
- {
- break;
- }
- connection->send_buffer_ptr+= write_size;
- }
- connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
- connection->send_buffer_ptr= connection->send_buffer;
- return GEARMAND_SUCCESS;
- }
- }
- }
- /**
- * @addtogroup gearmand_io_static Static Connection Declarations
- * @ingroup gearman_connection
- * @{
- */
- void gearmand_connection_init(gearmand_connection_list_st *gearman,
- gearmand_io_st *connection,
- gearmand_con_st *dcon,
- gearmand_connection_options_t *options)
- {
- assert(gearman);
- assert(connection);
- connection->options.ready= false;
- connection->options.packet_in_use= false;
- connection->options.external_fd= false;
- connection->options.close_after_flush= false;
- if (options)
- {
- while (*options != GEARMAND_CON_MAX)
- {
- gearman_io_set_option(connection, *options, true);
- options++;
- }
- }
- connection->_state= gearmand_io_st::GEARMAND_CON_UNIVERSAL_INVALID;
- connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
- connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
- connection->events= 0;
- connection->revents= 0;
- connection->fd= INVALID_SOCKET;
- connection->created_id= 0;
- connection->created_id_next= 0;
- connection->send_buffer_size= 0;
- connection->send_data_size= 0;
- connection->send_data_offset= 0;
- connection->recv_buffer_size= 0;
- connection->recv_data_size= 0;
- connection->recv_data_offset= 0;
- connection->universal= gearman;
- GEARMAND_LIST__ADD(gearman->con, connection);
- connection->context= dcon;
- connection->send_buffer_ptr= connection->send_buffer;
- connection->recv_packet= NULL;
- connection->recv_buffer_ptr= connection->recv_buffer;
- }
- void gearmand_connection_list_st::list_free()
- {
- while (con_list)
- {
- gearmand_io_free(con_list);
- }
- }
- gearmand_connection_list_st::gearmand_connection_list_st() :
- con_count(0),
- con_list(NULL),
- event_watch_fn(NULL),
- event_watch_context(NULL)
- {
- }
- void gearmand_connection_list_st::init(gearmand_event_watch_fn *watch_fn, void *watch_context)
- {
- ready_con_count= 0;
- ready_con_list= NULL;
- con_count= 0;
- con_list= NULL;
- event_watch_fn= watch_fn;
- event_watch_context= watch_context;
- }
- void gearmand_io_free(gearmand_io_st *connection)
- {
- if (connection->fd != INVALID_SOCKET)
- _connection_close(connection);
- if (connection->options.ready)
- {
- connection->options.ready= false;
- GEARMAND_LIST_DEL(connection->universal->ready_con, connection, ready_);
- }
- GEARMAND_LIST__DEL(connection->universal->con, connection);
- if (connection->options.packet_in_use)
- {
- gearmand_packet_free(&(connection->packet));
- }
- }
- gearmand_error_t gearman_io_set_option(gearmand_io_st *connection,
- gearmand_connection_options_t options,
- bool value)
- {
- switch (options)
- {
- case GEARMAND_CON_PACKET_IN_USE:
- connection->options.packet_in_use= value;
- break;
- case GEARMAND_CON_EXTERNAL_FD:
- connection->options.external_fd= value;
- break;
- case GEARMAND_CON_CLOSE_AFTER_FLUSH:
- connection->options.close_after_flush= value;
- break;
- case GEARMAND_CON_MAX:
- return GEARMAND_INVALID_COMMAND;
- }
- return GEARMAND_SUCCESS;
- }
- /**
- * Set socket options for a connection.
- */
- static gearmand_error_t _io_setsockopt(gearmand_io_st &connection);
- /** @} */
- /*
- * Public Definitions
- */
- gearmand_error_t gearman_io_set_fd(gearmand_io_st *connection, int fd)
- {
- assert(connection);
- connection->options.external_fd= true;
- connection->fd= fd;
- connection->_state= gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED;
- return _io_setsockopt(*connection);
- }
- gearmand_con_st *gearman_io_context(const gearmand_io_st *connection)
- {
- return connection->context;
- }
- gearmand_error_t gearman_io_send(gearman_server_con_st *con,
- const gearmand_packet_st *packet, bool flush)
- {
- size_t send_size;
- gearmand_io_st *connection= &con->con;
- switch (connection->send_state)
- {
- case gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE:
- if (! (packet->options.complete))
- {
- gearmand_error("packet not complete");
- return GEARMAND_INVALID_PACKET;
- }
- /* Pack first part of packet, which is everything but the payload. */
- while (1)
- {
- gearmand_error_t ret;
- send_size= con->protocol->pack(packet,
- con,
- connection->send_buffer +connection->send_buffer_size,
- GEARMAND_SEND_BUFFER_SIZE -connection->send_buffer_size,
- ret);
- if (ret == GEARMAND_SUCCESS)
- {
- connection->send_buffer_size+= send_size;
- break;
- }
- else if (ret == GEARMAND_IGNORE_PACKET)
- {
- return GEARMAND_SUCCESS;
- }
- else if (ret != GEARMAND_FLUSH_DATA)
- {
- return ret;
- }
- /* We were asked to flush when the buffer is already flushed! */
- if (connection->send_buffer_size == 0)
- {
- gearmand_error("send buffer too small");
- return GEARMAND_SEND_BUFFER_TOO_SMALL;
- }
- /* Flush buffer now if first part of packet won't fit in. */
- connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_PRE_FLUSH;
- case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_PRE_FLUSH:
- {
- gearmand_error_t local_ret;
- if ((local_ret= _connection_flush(con)) != GEARMAND_SUCCESS)
- {
- return local_ret;
- }
- }
- }
- /* Return here if we have no data to send. */
- if (packet->data_size == 0)
- {
- break;
- }
- /* If there is any room in the buffer, copy in data. */
- if (packet->data and (GEARMAND_SEND_BUFFER_SIZE - connection->send_buffer_size) > 0)
- {
- connection->send_data_offset= GEARMAND_SEND_BUFFER_SIZE - connection->send_buffer_size;
- if (connection->send_data_offset > packet->data_size)
- {
- connection->send_data_offset= packet->data_size;
- }
- memcpy(connection->send_buffer +connection->send_buffer_size,
- packet->data,
- connection->send_data_offset);
- connection->send_buffer_size+= connection->send_data_offset;
- /* Return if all data fit in the send buffer. */
- if (connection->send_data_offset == packet->data_size)
- {
- connection->send_data_offset= 0;
- break;
- }
- }
- /* Flush buffer now so we can start writing directly from data buffer. */
- connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FORCE_FLUSH;
- case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FORCE_FLUSH:
- {
- gearmand_error_t local_ret;
- if ((local_ret= _connection_flush(con)) != GEARMAND_SUCCESS)
- {
- return local_ret;
- }
- }
- connection->send_data_size= packet->data_size;
- /* If this is NULL, then ?? function will be used. */
- if (packet->data == NULL)
- {
- connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA;
- return GEARMAND_SUCCESS;
- }
- /* Copy into the buffer if it fits, otherwise flush from packet buffer. */
- connection->send_buffer_size= packet->data_size - connection->send_data_offset;
- if (connection->send_buffer_size < GEARMAND_SEND_BUFFER_SIZE)
- {
- memcpy(connection->send_buffer,
- packet->data + connection->send_data_offset,
- connection->send_buffer_size);
- connection->send_data_size= 0;
- connection->send_data_offset= 0;
- break;
- }
- connection->send_buffer_ptr= const_cast<char *>(packet->data) + connection->send_data_offset;
- connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA;
- case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH:
- case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA:
- {
- gearmand_error_t local_ret= _connection_flush(con);
- if (local_ret == GEARMAND_SUCCESS and
- connection->options.close_after_flush)
- {
- _connection_close(connection);
- local_ret= GEARMAND_LOST_CONNECTION;
- gearmand_debug("closing connection after flush by request");
- }
- return local_ret;
- }
- }
- if (flush)
- {
- connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH;
- gearmand_error_t local_ret= _connection_flush(con);
- if (local_ret == GEARMAND_SUCCESS and connection->options.close_after_flush)
- {
- _connection_close(connection);
- local_ret= GEARMAND_LOST_CONNECTION;
- gearmand_debug("closing connection after flush by request");
- }
- return local_ret;
- }
- connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
- return GEARMAND_SUCCESS;
- }
- #pragma GCC diagnostic push
- #ifndef __INTEL_COMPILER
- #pragma GCC diagnostic ignored "-Wold-style-cast"
- #endif
- gearmand_error_t gearman_io_recv(gearman_server_con_st *con, bool recv_data)
- {
- gearmand_io_st *connection= &con->con;
- gearmand_packet_st *packet= &(con->packet->packet);
- switch (connection->recv_state)
- {
- case gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE:
- if (connection->_state != gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED)
- {
- gearmand_error("not connected");
- return GEARMAND_NOT_CONNECTED;
- }
- connection->recv_packet= packet;
- // The options being passed in are just defaults.
- connection->recv_packet->reset(GEARMAN_MAGIC_TEXT, GEARMAN_COMMAND_TEXT);
- connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_READ;
- case gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_READ:
- while (1)
- {
- gearmand_error_t ret;
- if (connection->recv_buffer_size > 0)
- {
- assert(con->protocol);
- size_t recv_size= con->protocol->unpack(connection->recv_packet,
- con,
- connection->recv_buffer_ptr,
- connection->recv_buffer_size, ret);
- connection->recv_buffer_ptr+= recv_size;
- connection->recv_buffer_size-= recv_size;
- if (gearmand_success(ret))
- {
- break;
- }
- else if (ret != GEARMAND_IO_WAIT)
- {
- gearmand_gerror_warn("protocol failure, closing connection", ret);
- _connection_close(connection);
- return ret;
- }
- }
- /* Shift buffer contents if needed. */
- if (connection->recv_buffer_size > 0)
- {
- memmove(connection->recv_buffer, connection->recv_buffer_ptr, connection->recv_buffer_size);
- }
- connection->recv_buffer_ptr= connection->recv_buffer;
- size_t recv_size= _connection_read(con, connection->recv_buffer + connection->recv_buffer_size,
- GEARMAND_RECV_BUFFER_SIZE - connection->recv_buffer_size, ret);
- if (gearmand_failed(ret))
- {
- // GEARMAND_LOST_CONNECTION is not worth a warning, clients/workers just
- // drop connections for close.
- if (ret != GEARMAND_LOST_CONNECTION)
- {
- gearmand_gerror_warn("Failed while in _connection_read()", ret);
- }
- return ret;
- }
- gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "read %lu bytes",
- (unsigned long)recv_size);
- connection->recv_buffer_size+= recv_size;
- }
- if (packet->data_size == 0)
- {
- connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
- break;
- }
- connection->recv_data_size= packet->data_size;
- if (not recv_data)
- {
- connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_STATE_READ_DATA;
- break;
- }
- packet->data= static_cast<char *>(realloc(NULL, packet->data_size));
- if (not packet->data)
- {
- // Server up the memory error first, in case _connection_close()
- // creates any.
- gearmand_merror("realloc", char, packet->data_size);
- _connection_close(connection);
- return GEARMAND_MEMORY_ALLOCATION_FAILURE;
- }
- packet->options.free_data= true;
- connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_STATE_READ_DATA;
- case gearmand_io_st::GEARMAND_CON_RECV_STATE_READ_DATA:
- while (connection->recv_data_size)
- {
- gearmand_error_t ret;
- ret= gearmand_connection_recv_data(con,
- ((uint8_t *)(packet->data)) +
- connection->recv_data_offset,
- packet->data_size -
- connection->recv_data_offset);
- if (gearmand_failed(ret))
- {
- return ret;
- }
- }
- connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
- break;
- }
- packet= connection->recv_packet;
- connection->recv_packet= NULL;
- return GEARMAND_SUCCESS;
- }
- gearmand_error_t gearmand_io_set_events(gearman_server_con_st *con, short events)
- {
- gearmand_io_st *connection= &con->con;
- if ((connection->events | events) == connection->events)
- {
- return GEARMAND_SUCCESS;
- }
- connection->events|= events;
- if (connection->universal->event_watch_fn)
- {
- gearmand_error_t ret= connection->universal->event_watch_fn(connection, connection->events,
- (void *)connection->universal->event_watch_context);
- if (gearmand_failed(ret))
- {
- gearmand_gerror_warn("event watch failed, closing connection", ret);
- _connection_close(connection);
- return ret;
- }
- }
- return GEARMAND_SUCCESS;
- }
- gearmand_error_t gearmand_io_set_revents(gearman_server_con_st *con, short revents)
- {
- gearmand_io_st *connection= &con->con;
- if (revents != 0)
- {
- connection->options.ready= true;
- GEARMAND_LIST_ADD(connection->universal->ready_con, connection, ready_);
- }
- connection->revents= revents;
- /* Remove external POLLOUT watch if we didn't ask for it. Otherwise we spin
- forever until another POLLIN state change. This is much more efficient
- than removing POLLOUT on every state change since some external polling
- mechanisms need to use a system call to change flags (like Linux epoll). */
- if (revents & POLLOUT && !(connection->events & POLLOUT) &&
- connection->universal->event_watch_fn != NULL)
- {
- gearmand_error_t ret= connection->universal->event_watch_fn(connection, connection->events,
- (void *)connection->universal->event_watch_context);
- if (gearmand_failed(ret))
- {
- gearmand_gerror_warn("event watch failed, closing connection", ret);
- _connection_close(connection);
- return ret;
- }
- }
- connection->events&= (short)~revents;
- return GEARMAND_SUCCESS;
- }
- /*
- * Static Definitions
- */
- static gearmand_error_t _io_setsockopt(gearmand_io_st &connection)
- {
- {
- int setting= 1;
- if (setsockopt(connection.fd, IPPROTO_TCP, TCP_NODELAY, &setting, (socklen_t)sizeof(int)) and errno != EOPNOTSUPP)
- {
- return gearmand_perror(errno, "setsockopt(TCP_NODELAY)");
- }
- }
- {
- struct linger linger;
- linger.l_onoff= 1;
- linger.l_linger= GEARMAND_DEFAULT_SOCKET_TIMEOUT;
- if (setsockopt(connection.fd, SOL_SOCKET, SO_LINGER, &linger, (socklen_t)sizeof(struct linger)))
- {
- return gearmand_perror(errno, "setsockopt(SO_LINGER)");
- }
- }
- #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
- {
- int setting= 1;
- // This is not considered a fatal error
- if (setsockopt(connection.fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&setting, sizeof(int)))
- {
- gearmand_perror(errno, "setsockopt(SO_NOSIGPIPE)");
- }
- }
- #endif
- #if 0
- if (0)
- {
- struct timeval waittime;
- waittime.tv_sec= GEARMAND_DEFAULT_SOCKET_TIMEOUT;
- waittime.tv_usec= 0;
- if (setsockopt(connection.fd, SOL_SOCKET, SO_SNDTIMEO, &waittime, (socklen_t)sizeof(struct timeval)) and errno != ENOPROTOOPT)
- {
- return gearmand_perror(errno, "setsockopt(SO_SNDTIMEO)");
- }
- if (setsockopt(connection.fd, SOL_SOCKET, SO_RCVTIMEO, &waittime, (socklen_t)sizeof(struct timeval)) and errno != ENOPROTOOPT)
- {
- return gearmand_perror(errno, "setsockopt(SO_RCVTIMEO)");
- }
- }
- #endif
- #if 0
- if (0)
- {
- int setting= GEARMAND_DEFAULT_SOCKET_SEND_SIZE;
- if (setsockopt(connection.fd, SOL_SOCKET, SO_SNDBUF, &setting, (socklen_t)sizeof(int)))
- {
- return gearmand_perror(errno, "setsockopr(SO_SNDBUF)");
- }
- setting= GEARMAND_DEFAULT_SOCKET_RECV_SIZE;
- if (setsockopt(connection.fd, SOL_SOCKET, SO_RCVBUF, &setting, (socklen_t)sizeof(int)))
- {
- return gearmand_perror(errno, "setsockopt(SO_RCVBUF)");
- }
- }
- #endif
- if (SOCK_NONBLOCK == 0)
- {
- gearmand_error_t local_ret;
- if ((local_ret= gearmand_sockfd_nonblock(connection.fd)))
- {
- return local_ret;
- }
- }
- return GEARMAND_SUCCESS;
- }
- gearmand_error_t gearmand_sockfd_nonblock(const int& sockfd)
- {
- int flags;
- do
- {
- flags= fcntl(sockfd, F_GETFL, 0);
- } while (flags == -1 and (errno == EINTR or errno == EAGAIN));
- if (flags == -1)
- {
- return gearmand_perror(errno, "fcntl(F_GETFL)");
- }
- else if ((flags & O_NONBLOCK) == 0)
- {
- int retval;
- do
- {
- retval= fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
- } while (retval == -1 and (errno == EINTR or errno == EAGAIN));
- if (retval == -1)
- {
- return gearmand_perror(errno, "fcntl(F_SETFL)");
- }
- }
- return GEARMAND_SUCCESS;
- }
- void gearmand_sockfd_close(int& sockfd)
- {
- if (sockfd != INVALID_SOCKET)
- {
- /* in case of death shutdown to avoid blocking at close() */
- if (shutdown(sockfd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
- {
- gearmand_perror(errno, "shutdown");
- assert(errno != ENOTSOCK);
- }
- else if (closesocket(sockfd) == SOCKET_ERROR)
- {
- gearmand_perror(errno, "close");
- }
- sockfd= INVALID_SOCKET;
- }
- else
- {
- gearmand_debug("gearmand_sockfd_close() called with an invalid socket, this was probably ok");
- }
- }
- void gearmand_pipe_close(int& pipefd)
- {
- if (pipefd == INVALID_SOCKET)
- {
- gearmand_error("gearmand_pipe_close() called with an invalid socket");
- return;
- }
- if (closesocket(pipefd) == SOCKET_ERROR)
- {
- gearmand_perror(errno, "close");
- }
- pipefd= -1;
- }
- #pragma GCC diagnostic pop
|