Browse Source

Adjust abstract around connection/packet

Brian Aker 12 years ago
parent
commit
9e0dc21f99

+ 2 - 3
libgearman-server/connection.c

@@ -62,7 +62,7 @@ static gearman_server_con_st * _server_con_create(gearman_server_thread_st *thre
   }
   else
   {
-    con= (gearman_server_con_st *)malloc(sizeof(gearman_server_con_st));
+    con= build_gearman_server_con_st();
     if (con == NULL)
     {
       gearmand_perror("malloc");
@@ -246,8 +246,7 @@ void gearman_server_con_free(gearman_server_con_st *con)
     return;
   }
 
-  gearmand_debug("free");
-  free(con);
+  destroy_gearman_server_con_st(con);
 }
 
 gearmand_io_st *gearman_server_con_con(gearman_server_con_st *con)

+ 1 - 0
libgearman-server/connection_plus.cc

@@ -46,6 +46,7 @@ gearman_server_con_st* build_gearman_server_con_st(void)
 
 void destroy_gearman_server_con_st(gearman_server_con_st* arg)
 {
+  gearmand_debug("delete gearman_server_con_st");
   delete arg;
 }
 

+ 6 - 12
libgearman-server/gearmand.cc

@@ -1039,16 +1039,10 @@ static bool gearman_server_create(gearman_server_st *server,
 
 static void gearman_server_free(gearman_server_st *server)
 {
-  uint32_t key;
-  gearman_server_packet_st *packet;
-  gearman_server_job_st *job;
-  gearman_server_client_st *client;
-  gearman_server_worker_st *worker;
-
   /* All threads should be cleaned up before calling this. */
   assert(server->thread_list == NULL);
 
-  for (key= 0; key < GEARMAND_JOB_HASH_SIZE; key++)
+  for (uint32_t key= 0; key < GEARMAND_JOB_HASH_SIZE; key++)
   {
     while (server->job_hash[key] != NULL)
     {
@@ -1063,28 +1057,28 @@ static void gearman_server_free(gearman_server_st *server)
 
   while (server->free_packet_list != NULL)
   {
-    packet= server->free_packet_list;
+    gearman_server_packet_st *packet= server->free_packet_list;
     server->free_packet_list= packet->next;
-    free(packet);
+    delete packet;
   }
 
   while (server->free_job_list != NULL)
   {
-    job= server->free_job_list;
+    gearman_server_job_st* job= server->free_job_list;
     server->free_job_list= job->next;
     free(job);
   }
 
   while (server->free_client_list != NULL)
   {
-    client= server->free_client_list;
+    gearman_server_client_st* client= server->free_client_list;
     server->free_client_list= client->con_next;
     free(client);
   }
 
   while (server->free_worker_list != NULL)
   {
-    worker= server->free_worker_list;
+    gearman_server_worker_st* worker= server->free_worker_list;
     server->free_worker_list= worker->con_next;
     free(worker);
   }

+ 10 - 6
libgearman-server/io.cc

@@ -137,9 +137,13 @@ gearmand_error_t gearmand_connection_recv_data(gearman_server_con_st *con, void
   if (connection->recv_buffer_size > 0)
   {
     if (connection->recv_buffer_size < data_size)
+    {
       recv_size= connection->recv_buffer_size;
+    }
     else
+    {
       recv_size= data_size;
+    }
 
     memcpy(data, connection->recv_buffer_ptr, recv_size);
     connection->recv_buffer_ptr+= recv_size;
@@ -195,17 +199,17 @@ static gearmand_error_t _connection_flush(gearman_server_con_st *con)
         }
         else if (write_size == -1)
         {
-          gearmand_error_t gret;
-
           switch (errno)
           {
           case EAGAIN:
-            gret= gearmand_io_set_events(con, POLLOUT);
-            if (gret != GEARMAN_SUCCESS)
             {
-              return gret;
+              gearmand_error_t gret= gearmand_io_set_events(con, POLLOUT);
+              if (gret != GEARMAN_SUCCESS)
+              {
+                return gret;
+              }
+              return GEARMAN_IO_WAIT;
             }
-            return GEARMAN_IO_WAIT;
 
           case EINTR:
             continue;

+ 73 - 29
libgearman-server/packet.cc

@@ -19,6 +19,7 @@
 #include <libgearman-server/fifo.h>
 #include <cassert>
 #include <cstring>
+#include <memory>
 
 #ifndef __INTEL_COMPILER
 #pragma GCC diagnostic ignored "-Wold-style-cast"
@@ -55,10 +56,10 @@ gearman_server_packet_create(gearman_server_thread_st *thread,
 
   if (server_packet == NULL)
   {
-    server_packet= (gearman_server_packet_st *)malloc(sizeof(gearman_server_packet_st));
+    server_packet= new (std::nothrow) gearman_server_packet_st;
     if (server_packet == NULL)
     {
-      gearmand_perror("malloc");
+      gearmand_perror("new() gearman_server_packet_st");
       return NULL;
     }
   }
@@ -68,6 +69,11 @@ gearman_server_packet_create(gearman_server_thread_st *thread,
   return server_packet;
 }
 
+void destroy_gearman_server_packet_st(gearman_server_packet_st *packet)
+{
+  delete packet;
+}
+
 void gearman_server_packet_free(gearman_server_packet_st *packet,
                                 gearman_server_thread_st *thread,
                                 bool from_thread)
@@ -82,8 +88,8 @@ void gearman_server_packet_free(gearman_server_packet_st *packet,
     }
     else
     {
-      gearmand_debug("free");
-      free(packet);
+      gearmand_debug("delete() gearman_server_packet_st");
+      delete packet;
     }
   }
   else
@@ -96,8 +102,8 @@ void gearman_server_packet_free(gearman_server_packet_st *packet,
     }
     else
     {
-      gearmand_debug("free");
-      free(packet);
+      gearmand_debug("delete() gearman_server_packet_st");
+      delete packet;
     }
   }
 }
@@ -112,8 +118,10 @@ gearmand_error_t gearman_server_io_packet_add(gearman_server_con_st *con,
   va_list ap;
 
   server_packet= gearman_server_packet_create(con->thread, false);
-  if (not server_packet)
+  if (server_packet == NULL)
+  {
     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
+  }
 
   gearmand_packet_init(&(server_packet->packet), magic, command);
 
@@ -150,9 +158,18 @@ gearmand_error_t gearman_server_io_packet_add(gearman_server_con_st *con,
     server_packet->packet.options.free_data= true;
   }
 
-  (void) pthread_mutex_lock(&con->thread->lock);
-  gearmand_server_con_fifo_add(con, server_packet);
-  (void) pthread_mutex_unlock(&con->thread->lock);
+  if (pthread_mutex_lock(&con->thread->lock) == 0)
+  {
+    gearmand_server_con_fifo_add(con, server_packet);
+    if (pthread_mutex_unlock(&con->thread->lock) != 0)
+    {
+      gearmand_fatal("pthread_mutex_unlock()");
+    }
+  }
+  else
+  {
+    gearmand_fatal("pthread_mutex_lock()");
+  }
 
   gearman_server_con_io_add(con);
 
@@ -165,9 +182,18 @@ void gearman_server_io_packet_remove(gearman_server_con_st *con)
 
   gearmand_packet_free(&(server_packet->packet));
 
-  (void) pthread_mutex_lock(&con->thread->lock);
-  gearmand_server_con_fifo_free(con, server_packet);
-  (void) pthread_mutex_unlock(&con->thread->lock);
+  if (pthread_mutex_lock(&con->thread->lock) == 0)
+  {
+    gearmand_server_con_fifo_free(con, server_packet);
+    if (pthread_mutex_unlock(&con->thread->lock) != 0)
+    {
+      gearmand_fatal("pthread_mutex_unlock()");
+    }
+  }
+  else
+  {
+    gearmand_fatal("pthread_mutex_lock()");
+  }
 
   gearman_server_packet_free(server_packet, con->thread, true);
 }
@@ -175,9 +201,18 @@ void gearman_server_io_packet_remove(gearman_server_con_st *con)
 void gearman_server_proc_packet_add(gearman_server_con_st *con,
                                     gearman_server_packet_st *packet)
 {
-  (void) pthread_mutex_lock(&con->thread->lock);
-  gearmand_server_con_fifo_proc_add(con, packet);
-  (void) pthread_mutex_unlock(&con->thread->lock);
+  if (pthread_mutex_lock(&con->thread->lock) == 0)
+  {
+    gearmand_server_con_fifo_proc_add(con, packet);
+    if (pthread_mutex_unlock(&con->thread->lock) != 0)
+    {
+      gearmand_fatal("pthread_mutex_unlock()");
+    }
+  }
+  else
+  {
+    gearmand_fatal("pthread_mutex_lock()");
+  }
 
   gearman_server_con_proc_add(con);
 }
@@ -188,11 +223,22 @@ gearman_server_proc_packet_remove(gearman_server_con_st *con)
   gearman_server_packet_st *server_packet= con->proc_packet_list;
 
   if (server_packet == NULL)
+  {
     return NULL;
+  }
 
-  (void) pthread_mutex_lock(&con->thread->lock);
-  gearmand_server_con_fifo_proc_free(con, server_packet);
-  (void) pthread_mutex_unlock(&con->thread->lock);
+  if (pthread_mutex_lock(&con->thread->lock) == 0)
+  {
+    gearmand_server_con_fifo_proc_free(con, server_packet);
+    if (pthread_mutex_unlock(&con->thread->lock) != 0)
+    {
+      gearmand_fatal("pthread_mutex_unlock()");
+    }
+  }
+  else
+  {
+    gearmand_fatal("pthread_mutex_lock()");
+  }
 
   return server_packet;
 }
@@ -206,10 +252,8 @@ const char *gearmand_strcommand(gearmand_packet_st *packet)
 inline static gearmand_error_t packet_create_arg(gearmand_packet_st *packet,
                                                  const void *arg, size_t arg_size)
 {
-  size_t offset;
-
-  if (packet->argc == gearman_command_info(packet->command)->argc &&
-      (not (gearman_command_info(packet->command)->data) ||
+  if (packet->argc == gearman_command_info(packet->command)->argc and
+      (not (gearman_command_info(packet->command)->data) or
        packet->data))
   {
     gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "too many arguments for command(%s)", gearman_command_info(packet->command)->name);
@@ -224,7 +268,9 @@ inline static gearmand_error_t packet_create_arg(gearmand_packet_st *packet,
   }
 
   if (packet->args_size == 0 and packet->magic != GEARMAN_MAGIC_TEXT)
+  {
     packet->args_size= GEARMAN_PACKET_HEADER_SIZE;
+  }
 
   if ((packet->args_size + arg_size) < GEARMAN_ARGS_BUFFER_SIZE)
   {
@@ -241,7 +287,7 @@ inline static gearmand_error_t packet_create_arg(gearmand_packet_st *packet,
     else
     {
       char *new_args= (char *)realloc(packet->args, packet->args_size + arg_size);
-      if (not new_args)
+      if (new_args == NULL)
       {
         gearmand_perror("realloc");
         return GEARMAN_MEMORY_ALLOCATION_FAILURE;
@@ -255,6 +301,7 @@ inline static gearmand_error_t packet_create_arg(gearmand_packet_st *packet,
   packet->arg_size[packet->argc]= arg_size;
   packet->argc++;
 
+  size_t offset;
   if (packet->magic == GEARMAN_MAGIC_TEXT)
   {
     offset= 0;
@@ -322,9 +369,6 @@ void gearmand_packet_free(gearmand_packet_st *packet)
 
 gearmand_error_t gearmand_packet_pack_header(gearmand_packet_st *packet)
 {
-  uint64_t length_64;
-  uint32_t tmp;
-
   if (packet->magic == GEARMAN_MAGIC_TEXT)
   {
     packet->options.complete= true;
@@ -362,11 +406,11 @@ gearmand_error_t gearmand_packet_pack_header(gearmand_packet_st *packet)
     return GEARMAN_INVALID_COMMAND;
   }
 
-  tmp= packet->command;
+  uint32_t tmp= packet->command;
   tmp= htonl(tmp);
   memcpy(packet->args + 4, &tmp, 4);
 
-  length_64= packet->args_size + packet->data_size - GEARMAN_PACKET_HEADER_SIZE;
+  uint64_t length_64= packet->args_size + packet->data_size - GEARMAN_PACKET_HEADER_SIZE;
 
   // Check for overflow on 32bit(portable?).
   if (length_64 >= UINT32_MAX || length_64 < packet->data_size)

+ 2 - 0
libgearman-server/packet.h

@@ -116,6 +116,8 @@ GEARMAN_INTERNAL_API
 GEARMAN_INTERNAL_API
   gearmand_error_t gearmand_packet_pack_header(gearmand_packet_st *packet);
 
+void destroy_gearman_server_packet_st(gearman_server_packet_st *packet);
+
 /** @} */
 
 #ifdef __cplusplus

+ 6 - 7
libgearman-server/thread.c

@@ -112,9 +112,6 @@ bool gearman_server_thread_init(gearman_server_st *server,
 
 void gearman_server_thread_free(gearman_server_thread_st *thread)
 {
-  gearman_server_con_st *con;
-  gearman_server_packet_st *packet;
-
   _proc_thread_kill(Server);
   
   while (thread->con_list != NULL)
@@ -124,22 +121,24 @@ void gearman_server_thread_free(gearman_server_thread_st *thread)
 
   while (thread->free_con_list != NULL)
   {
-    con= thread->free_con_list;
+    gearman_server_con_st *con= thread->free_con_list;
     thread->free_con_list= con->next;
     gearmand_debug("free");
-    free(con);
+    destroy_gearman_server_con_st(con);
   }
 
   while (thread->free_packet_list != NULL)
   {
-    packet= thread->free_packet_list;
+    gearman_server_packet_st *packet= thread->free_packet_list;
     thread->free_packet_list= packet->next;
     gearmand_debug("free");
-    free(packet);
+    destroy_gearman_server_packet_st(packet);
   }
 
   if (thread->gearman != NULL)
+  {
     gearman_connection_list_free(thread->gearman);
+  }
 
   pthread_mutex_destroy(&(thread->lock));
 

+ 0 - 1
tests/libgearman-1.0/task.cc

@@ -53,7 +53,6 @@ using namespace libtest;
 
 test_return_t gearman_client_add_task_status_by_unique_NOT_FOUND_TEST(void *object)
 {
-  return TEST_SUCCESS;
   gearman_client_st *client= (gearman_client_st *)object;
   const char *worker_function= (const char *)gearman_client_context(client);