|
@@ -141,6 +141,7 @@ gearman_connection_st::gearman_connection_st(gearman_universal_st &universal_arg
|
|
|
{
|
|
|
options.ready= false;
|
|
|
options.packet_in_use= false;
|
|
|
+ options.server_options_sent= false;
|
|
|
|
|
|
if (options_args)
|
|
|
{
|
|
@@ -321,6 +322,8 @@ void gearman_connection_st::close_socket()
|
|
|
recv_buffer_ptr= recv_buffer;
|
|
|
recv_buffer_size= 0;
|
|
|
|
|
|
+ options.server_options_sent= false;
|
|
|
+
|
|
|
// created_id_next is incremented for every outbound packet (except status).
|
|
|
// created_id is incremented for every response packet received, and also when
|
|
|
// no packets are received due to an error. There are lots of such error paths
|
|
@@ -350,7 +353,83 @@ void gearman_connection_st::reset_addrinfo()
|
|
|
addrinfo_next= NULL;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+/*
|
|
|
+ * The send_packet() method does not only send the passed-in packet_arg. If there are any server options
|
|
|
+ * established, and they haven't yet been sent over, then these options are sent over first.
|
|
|
+ * Only if that succeeds is the packet_arg sent.
|
|
|
+ * The reason for this is server options are only set once by the client/worker. In the older code, this
|
|
|
+ * resulted in them being sent over exactly once. If the connection was dropped and rebuilt, then the options
|
|
|
+ * were not sent over again, rendering them moot. This way, we're guaranteed that the options are always sent
|
|
|
+ * at least once to a connected server.
|
|
|
+ */
|
|
|
gearman_return_t gearman_connection_st::send_packet(const gearman_packet_st& packet_arg, const bool flush_buffer)
|
|
|
+{
|
|
|
+ if (!options.server_options_sent)
|
|
|
+ {
|
|
|
+ for (gearman_server_options_st* head= universal.server_options_list;
|
|
|
+ head;
|
|
|
+ head= head->next)
|
|
|
+ {
|
|
|
+ gearman_packet_st message;
|
|
|
+ const void *args[]= { head->option };
|
|
|
+ size_t args_size[]= { head->option_length };
|
|
|
+ gearman_return_t ret= gearman_packet_create_args(universal, message, GEARMAN_MAGIC_REQUEST,
|
|
|
+ GEARMAN_COMMAND_OPTION_REQ, args, args_size, 1);
|
|
|
+
|
|
|
+ if (gearman_failed(ret))
|
|
|
+ {
|
|
|
+ gearman_packet_free(&message);
|
|
|
+ gearman_error(universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "gearman_packet_create_args()");
|
|
|
+ return GEARMAN_MEMORY_ALLOCATION_FAILURE;
|
|
|
+ }
|
|
|
+
|
|
|
+ PUSH_BLOCKING(universal);
|
|
|
+ OptionCheck check(universal);
|
|
|
+ ret= _send_packet(message, true);
|
|
|
+ if (gearman_failed(ret))
|
|
|
+ {
|
|
|
+ gearman_packet_free(&message);
|
|
|
+ gearman_error(universal, ret, "Failed to send server-options packet");
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+
|
|
|
+ options.packet_in_use= true;
|
|
|
+ gearman_packet_st *packet_ptr= receiving(_packet, ret, true);
|
|
|
+ if (packet_ptr == NULL)
|
|
|
+ {
|
|
|
+ gearman_packet_free(&message);
|
|
|
+ options.packet_in_use= false;
|
|
|
+ gearman_error(universal, ret, "Failed in receiving()");
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (gearman_failed(ret) ||
|
|
|
+ gearman_failed(ret= check.success(this)))
|
|
|
+ {
|
|
|
+ gearman_packet_free(&message);
|
|
|
+ free_private_packet();
|
|
|
+ reset_recv_packet();
|
|
|
+ gearman_error(universal, ret, "receiving()");
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+
|
|
|
+ free_private_packet();
|
|
|
+ reset_recv_packet();
|
|
|
+ gearman_packet_free(&message);
|
|
|
+ }
|
|
|
+
|
|
|
+ options.server_options_sent= true;
|
|
|
+ }
|
|
|
+
|
|
|
+ return _send_packet(packet_arg, flush_buffer);
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+ * This is the real implementation that actually sends a packet. Read the comments for send_packet() for why
|
|
|
+ * that is. Note that this is a private method. External callers should only call send_packet().
|
|
|
+ */
|
|
|
+gearman_return_t gearman_connection_st::_send_packet(const gearman_packet_st& packet_arg, const bool flush_buffer)
|
|
|
{
|
|
|
switch (send_state)
|
|
|
{
|