Browse Source

Fix remaining bit in connection

Brian Aker 14 years ago
parent
commit
ffb3686b3e

+ 2 - 1
libgearman/client.cc

@@ -1472,7 +1472,8 @@ static gearman_return_t _client_run_task(gearman_client_st *client, gearman_task
 
     client->options.no_new= false;
     task->state= GEARMAN_TASK_STATE_WORK;
-    return gearman_connection_set_events(task->con, POLLIN);
+    task->con->set_events(POLLIN);
+    return GEARMAN_SUCCESS;
 
   case GEARMAN_TASK_STATE_WORK:
     if (task->recv->command == GEARMAN_COMMAND_JOB_CREATED)

+ 57 - 70
libgearman/connection.cc

@@ -62,8 +62,6 @@
 #include <fcntl.h>
 #endif
 
-static void gearman_connection_reset_addrinfo(gearman_connection_st *connection);
-
 static gearman_return_t gearman_connection_set_option(gearman_connection_st *connection,
                                                       gearman_connection_options_t options,
                                                       bool value);
@@ -170,25 +168,28 @@ gearman_connection_st *gearman_connection_copy(gearman_universal_st& universal,
   return connection;
 }
 
-void gearman_connection_free(gearman_connection_st *connection)
+gearman_connection_st::~gearman_connection_st()
 {
-  if (connection->fd != -1)
-    connection->close();
+  if (fd != -1)
+    close();
+
+  reset_addrinfo();
+
+  { // Remove from universal list
+    if (universal.con_list == this)
+      universal.con_list= next;
 
-  gearman_connection_reset_addrinfo(connection);
+    if (prev)
+      prev->next= next;
 
-  if (connection->universal.con_list == connection)
-    connection->universal.con_list= connection->next;
-  if (connection->prev != NULL)
-    connection->prev->next= connection->next;
-  if (connection->next != NULL)
-    connection->next->prev= connection->prev;
-  connection->universal.con_count--;
+    if (next)
+      next->prev= prev;
 
-  if (connection->options.packet_in_use)
-    gearman_packet_free(&(connection->_packet));
+    universal.con_count--;
+  }
 
-  delete connection;
+  if (options.packet_in_use)
+    gearman_packet_free(&(_packet));
 }
 
 gearman_return_t gearman_connection_set_option(gearman_connection_st *connection,
@@ -231,7 +232,7 @@ static gearman_return_t _con_setsockopt(gearman_connection_st *connection);
 
 void gearman_connection_st::set_host(const char *host_arg, const in_port_t port_arg)
 {
-  gearman_connection_reset_addrinfo(this);
+  reset_addrinfo();
 
   strncpy(host, host_arg == NULL ? GEARMAN_DEFAULT_TCP_HOST : host_arg, NI_MAXHOST);
   host[NI_MAXHOST - 1]= 0;
@@ -279,15 +280,15 @@ void gearman_connection_st::close()
   recv_buffer_size= 0;
 }
 
-void gearman_connection_reset_addrinfo(gearman_connection_st *connection)
+void gearman_connection_st::reset_addrinfo()
 {
-  if (connection->addrinfo != NULL)
+  if (addrinfo)
   {
-    freeaddrinfo(connection->addrinfo);
-    connection->addrinfo= NULL;
+    freeaddrinfo(addrinfo);
+    addrinfo= NULL;
   }
 
-  connection->addrinfo_next= NULL;
+  addrinfo_next= NULL;
 }
 
 gearman_return_t gearman_connection_st::send(const gearman_packet_st *packet_arg, const bool flush_buffer)
@@ -568,9 +569,7 @@ gearman_return_t gearman_connection_st::flush()
           break;
         }
 
-        gearman_return_t gret= gearman_connection_set_events(this, POLLOUT);
-        if (gearman_failed(gret))
-          return gret;
+        set_events(POLLOUT);
 
         if (gearman_universal_is_non_blocking(universal))
         {
@@ -579,7 +578,7 @@ gearman_return_t gearman_connection_st::flush()
           return GEARMAN_IO_WAIT;
         }
 
-        gret= gearman_wait(universal);
+        gearman_return_t gret= gearman_wait(universal);
         if (gearman_failed(gret))
           return gret;
       }
@@ -647,9 +646,7 @@ gearman_return_t gearman_connection_st::flush()
         {
           if (errno == EAGAIN)
           {
-            gearman_return_t gret= gearman_connection_set_events(this, POLLOUT);
-            if (gearman_failed(gret))
-              return gret;
+            set_events(POLLOUT);
 
             if (gearman_universal_is_non_blocking(universal))
             {
@@ -657,7 +654,7 @@ gearman_return_t gearman_connection_st::flush()
               return GEARMAN_IO_WAIT;
             }
 
-            gret= gearman_wait(universal);
+            gearman_return_t gret= gearman_wait(universal);
             if (gearman_failed(gret))
               return gret;
 
@@ -758,8 +755,7 @@ gearman_packet_st *gearman_connection_st::recv(gearman_packet_st& packet_arg,
         memmove(recv_buffer, recv_buffer_ptr, recv_buffer_size);
       recv_buffer_ptr= recv_buffer;
 
-      size_t recv_size= gearman_connection_read(this, recv_buffer + recv_buffer_size,
-                                                GEARMAN_RECV_BUFFER_SIZE - recv_buffer_size, &ret);
+      size_t recv_size= read(recv_buffer + recv_buffer_size, GEARMAN_RECV_BUFFER_SIZE - recv_buffer_size, ret);
       if (gearman_failed(ret))
       {
         return NULL;
@@ -851,9 +847,7 @@ size_t gearman_connection_st::recv(void *data, size_t data_size, gearman_return_
 
   if (data_size != recv_size)
   {
-    recv_size+= gearman_connection_read(this,
-                                        static_cast<uint8_t *>(const_cast<void *>(data)) + recv_size,
-                                        data_size - recv_size, &ret);
+    recv_size+= read(static_cast<uint8_t *>(const_cast<void *>(data)) + recv_size, data_size - recv_size, ret);
     recv_data_offset+= recv_size;
   }
   else
@@ -872,41 +866,38 @@ size_t gearman_connection_st::recv(void *data, size_t data_size, gearman_return_
   return recv_size;
 }
 
-size_t gearman_connection_read(gearman_connection_st *connection, void *data, size_t data_size,
-                               gearman_return_t *ret_ptr)
+size_t gearman_connection_st::read(void *data, size_t data_size, gearman_return_t& ret)
 {
   ssize_t read_size;
 
   while (1)
   {
-    read_size= read(connection->fd, data, data_size);
+    read_size= ::read(fd, data, data_size);
     if (read_size == 0)
     {
-      if (not (connection->options.ignore_lost_connection))
+      if (not (options.ignore_lost_connection))
       {
-        gearman_error(&connection->universal, GEARMAN_LOST_CONNECTION, "lost connection to server (EOF)");
+        gearman_error(&universal, GEARMAN_LOST_CONNECTION, "lost connection to server (EOF)");
       }
-      connection->close();
-      *ret_ptr= GEARMAN_LOST_CONNECTION;
+      close();
+      ret= GEARMAN_LOST_CONNECTION;
       return 0;
     }
     else if (read_size == -1)
     {
       if (errno == EAGAIN)
       {
-        *ret_ptr= gearman_connection_set_events(connection, POLLIN);
-        if (*ret_ptr != GEARMAN_SUCCESS)
-          return 0;
+        set_events(POLLIN);
 
-        if (gearman_universal_is_non_blocking(connection->universal))
+        if (gearman_universal_is_non_blocking(universal))
         {
-          gearman_gerror(&connection->universal, GEARMAN_IO_WAIT);
-          *ret_ptr= GEARMAN_IO_WAIT;
+          gearman_gerror(&universal, GEARMAN_IO_WAIT);
+          ret= GEARMAN_IO_WAIT;
           return 0;
         }
 
-        *ret_ptr= gearman_wait(connection->universal);
-        if (gearman_failed(*ret_ptr))
+        ret= gearman_wait(universal);
+        if (gearman_failed(ret))
         {
           return 0;
         }
@@ -919,48 +910,44 @@ size_t gearman_connection_read(gearman_connection_st *connection, void *data, si
       }
       else if (errno == EPIPE || errno == ECONNRESET || errno == EHOSTDOWN)
       {
-        if (not (connection->options.ignore_lost_connection))
+        if (not (options.ignore_lost_connection))
         {
-          gearman_perror(connection->universal, "lost connection to server during read");
+          gearman_perror(universal, "lost connection to server during read");
         }
-        *ret_ptr= GEARMAN_LOST_CONNECTION;
+        ret= GEARMAN_LOST_CONNECTION;
       }
       else
       {
-        gearman_perror(connection->universal, "read");
-        *ret_ptr= GEARMAN_ERRNO;
+        gearman_perror(universal, "read");
+        ret= GEARMAN_ERRNO;
       }
 
-      connection->close();
+      close();
       return 0;
     }
 
     break;
   }
 
-  *ret_ptr= GEARMAN_SUCCESS;
+  ret= GEARMAN_SUCCESS;
   return size_t(read_size);
 }
 
-gearman_return_t gearman_connection_set_events(gearman_connection_st *connection, short events)
+void gearman_connection_st::set_events(short arg)
 {
-  if ((connection->events | events) == connection->events)
-    return GEARMAN_SUCCESS;
-
-  connection->events|= events;
+  if ((events | arg) == events)
+    return;
 
-  return GEARMAN_SUCCESS;
+  events|= arg;
 }
 
-gearman_return_t gearman_connection_set_revents(gearman_connection_st *connection, short revents)
+void gearman_connection_st::set_revents(short arg)
 {
-  if (revents != 0)
-    connection->options.ready= true;
+  if (arg)
+    options.ready= true;
 
-  connection->revents= revents;
-  connection->events&= short(~revents);
-
-  return GEARMAN_SUCCESS;
+  revents= arg;
+  events&= short(~arg);
 }
 
 /*

+ 3 - 54
libgearman/connection.h

@@ -36,10 +36,6 @@
  *
  */
 
-/**
- * @file
- * @brief Connection Declarations
- */
 
 #pragma once
 
@@ -47,17 +43,9 @@
 #include <sys/socket.h>
 #include <netdb.h>
 
-/**
- * @addtogroup gearman_con Connection Declarations
- * @ingroup gearman_universal
- *
- * This is a low level interface for gearman connections. This is used
- * internally by both client and worker interfaces, so you probably want to
- * look there first. This is usually used to write lower level clients, workers,
- * proxies, or your own server.
- *
- * @{
- */
+/*
+  Do not define these enum in your application. There are left publically due to one client.
+*/
 
 enum gearman_con_recv_t {
   GEARMAN_CON_RECV_UNIVERSAL_NONE,
@@ -79,42 +67,3 @@ enum gearman_con_universal_t {
   GEARMAN_CON_UNIVERSAL_CONNECTING,
   GEARMAN_CON_UNIVERSAL_CONNECTED
 };
-
-/**
- * @ingroup gearman_connection
- */
-
-#ifdef GEARMAN_CORE
-
-/**
- * Free a connection structure.
- *
- * @param[in] connection Structure previously initialized with gearman_connection_create(),
- *  gearman_connection_create_args(), or gearman_connection_clone().
- */
-GEARMAN_INTERNAL_API
-void gearman_connection_free(gearman_connection_st *connection);
-
-/**
- * Read data from a connection.
- */
-GEARMAN_INTERNAL_API
-size_t gearman_connection_read(gearman_connection_st *connection, void *data, size_t data_size,
-                               gearman_return_t *ret_ptr);
-
-/**
- * Set events to be watched for a connection.
- */
-GEARMAN_INTERNAL_API
-gearman_return_t gearman_connection_set_events(gearman_connection_st *connection, short events);
-
-/**
- * Set events that are ready for a connection. This is used with the external
- * event callbacks.
- */
-GEARMAN_INTERNAL_API
-gearman_return_t gearman_connection_set_revents(gearman_connection_st *connection, short revents);
-
-/** @} */
-
-#endif /* GEARMAN_CORE */

+ 13 - 0
libgearman/connection.hpp

@@ -81,6 +81,8 @@ struct gearman_connection_st
   gearman_connection_st(gearman_universal_st &universal_arg,
                         gearman_connection_options_t *options);
 
+  ~gearman_connection_st();
+
   void set_host( const char *host, const in_port_t port);
 
   gearman_return_t send(const gearman_packet_st *packet, const bool flush_buffer);
@@ -96,6 +98,17 @@ struct gearman_connection_st
 
   // Receive packet data from a connection.
   size_t recv(void *data, size_t data_size, gearman_return_t&);
+
+  size_t read(void *data, size_t data_size, gearman_return_t&);
+
+  // Set events to be watched for a connection.
+  void set_events(short events);
+
+ // Set events that are ready for a connection. This is used with the
+ // external event callbacks.
+  void set_revents(short revents);
+
+  void reset_addrinfo();
 };
 
 /**

+ 2 - 4
libgearman/universal.cc

@@ -149,7 +149,7 @@ void gearman_set_workload_free_fn(gearman_universal_st *universal,
 void gearman_free_all_cons(gearman_universal_st *universal)
 {
   while (universal->con_list)
-    gearman_connection_free(universal->con_list);
+    delete universal->con_list;
 }
 
 gearman_return_t gearman_flush_all(gearman_universal_st *universal)
@@ -234,9 +234,7 @@ gearman_return_t gearman_wait(gearman_universal_st& universal)
     if (con->events == 0)
       continue;
 
-    gearman_return_t gret= gearman_connection_set_revents(con, pfds[x].revents);
-    if (gearman_failed(gret))
-      return gret;
+    con->set_revents(pfds[x].revents);
 
     x++;
   }

+ 6 - 6
libgearman/worker.cc

@@ -541,6 +541,10 @@ gearman_job_st *gearman_worker_grab_job(gearman_worker_st *worker,
 {
   struct _worker_function_st *function;
   uint32_t active;
+  gearman_return_t unused;
+
+  if (not ret_ptr)
+    ret_ptr= &unused;
 
   while (1)
   {
@@ -761,12 +765,7 @@ gearman_job_st *gearman_worker_grab_job(gearman_worker_st *worker,
         if (worker->con->fd == -1)
           continue;
 
-        *ret_ptr= gearman_connection_set_events(worker->con, POLLIN);
-        if (gearman_failed(*ret_ptr))
-	{
-          return NULL;
-	}
-
+        worker->con->set_events(POLLIN);
         active++;
       }
 
@@ -1031,6 +1030,7 @@ static gearman_worker_st *_worker_allocate(gearman_worker_st *worker, bool is_cl
   worker->options.packet_init= false;
   worker->options.change= false;
   worker->options.grab_uniq= false;
+  worker->options.grab_all= false;
   worker->options.timeout_return= false;
 
   worker->state= GEARMAN_WORKER_STATE_START;

+ 35 - 5
tests/client_test.cc

@@ -1,9 +1,39 @@
-/* Gearman server and library
- * Copyright (C) 2008 Brian Aker, Eric Day
- * All rights reserved.
+/*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
+ * 
+ *  Gearmand client and server library.
+ *
+ *  Copyright (C) 2011 Data Differential, http://datadifferential.com/
+ *  Copyright (C) 2008 Brian Aker, Eric Day
+ *  All rights reserved.
+ *
+ *  Redistribution and use in source and binary forms, with or without
+ *  modification, are permitted provided that the following conditions are
+ *  met:
+ *
+ *      * Redistributions of source code must retain the above copyright
+ *  notice, this list of conditions and the following disclaimer.
+ *
+ *      * Redistributions in binary form must reproduce the above
+ *  copyright notice, this list of conditions and the following disclaimer
+ *  in the documentation and/or other materials provided with the
+ *  distribution.
+ *
+ *      * The names of its contributors may not be used to endorse or
+ *  promote products derived from this software without specific prior
+ *  written permission.
+ *
+ *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  *
- * Use and distribution licensed under the BSD license.  See
- * the COPYING file in the parent directory for full text.
  */
 
 #include "config.h"

+ 2 - 2
tests/internals.cc

@@ -254,7 +254,7 @@ static test_return_t connection_init_test(void *)
   test_false(connection_ptr->options.ignore_lost_connection);
   test_false(connection_ptr->options.close_after_flush);
 
-  gearman_connection_free(connection_ptr);
+  delete connection_ptr;
 
   return TEST_SUCCESS;
 }
@@ -274,7 +274,7 @@ static test_return_t connection_alloc_test(void *)
   test_false(connection_ptr->options.ignore_lost_connection);
   test_false(connection_ptr->options.close_after_flush);
 
-  gearman_connection_free(connection_ptr);
+  delete connection_ptr;
 
   return TEST_SUCCESS;
 }

+ 2 - 2
tests/regression.cc

@@ -116,7 +116,7 @@ static test_return_t bug372074_test(void *)
 
     gearman_packet_free(&packet);
 
-    gearman_connection_free(con_ptr);
+    delete con_ptr;
 
     test_truth(con_ptr= gearman_connection_create(universal, NULL));
 
@@ -160,7 +160,7 @@ static test_return_t bug372074_test(void *)
 
     gearman_packet_free(&packet);
 
-    gearman_connection_free(con_ptr);
+    delete con_ptr;
   }
 
   gearman_universal_free(universal);

+ 2 - 2
tests/worker_test.cc

@@ -392,8 +392,8 @@ static test_return_t abandoned_worker_test(void *)
   worker2->recv(packet, ret, false);
   test_truth(not (ret != GEARMAN_SUCCESS || packet.command != GEARMAN_COMMAND_ERROR));
 
-  gearman_connection_free(worker1);
-  gearman_connection_free(worker2);
+  delete worker1;
+  delete worker2;
   gearman_packet_free(&packet);
   gearman_universal_free(universal);