Browse Source

Test pthread calls a bit more, some abstraction.

Brian Aker 12 years ago
parent
commit
c95f2797e0

+ 0 - 4
libgearman-server/gearmand_con.c

@@ -271,7 +271,3 @@ void gearmand_con_check_queue(gearmand_thread_st *thread)
     }
   }
 }
-
-/*
- * Private definitions
- */

+ 32 - 85
libgearman-server/job.c

@@ -64,10 +64,38 @@
  * Get a server job structure from the unique ID. If data_size is non-zero,
  * then unique points to the workload data and not a real unique key.
  */
-static gearman_server_job_st *
-_server_job_get_unique(gearman_server_st *server, uint32_t unique_key,
-                       gearman_server_function_st *server_function,
-                       const char *unique, size_t data_size);
+static gearman_server_job_st * _server_job_get_unique(gearman_server_st *server, uint32_t unique_key,
+                                                      gearman_server_function_st *server_function,
+                                                      const char *unique, size_t data_size)
+{
+  gearman_server_job_st *server_job;
+
+  for (server_job= server->unique_hash[unique_key % GEARMAND_JOB_HASH_SIZE];
+       server_job != NULL; server_job= server_job->unique_next)
+  {
+    if (data_size == 0)
+    {
+      if (server_job->function == server_function &&
+          server_job->unique_key == unique_key &&
+          !strcmp(server_job->unique, unique))
+      {
+        return server_job;
+      }
+    }
+    else
+    {
+      if (server_job->function == server_function &&
+          server_job->unique_key == unique_key &&
+          server_job->data_size == data_size &&
+          memcmp(server_job->data, unique, data_size) == 0)
+      {
+        return server_job;
+      }
+    }
+  }
+
+  return NULL;
+}
 
 /** @} */
 
@@ -277,53 +305,6 @@ gearman_server_job_add_reducer(gearman_server_st *server,
   return server_job;
 }
 
-gearman_server_job_st *
-gearman_server_job_create(gearman_server_st *server)
-{
-  gearman_server_job_st *server_job;
-
-  if (server->free_job_count > 0)
-  {
-    server_job= server->free_job_list;
-    gearmand_server_free_job_list_free(server, server_job);
-  }
-  else
-  {
-    server_job= build_gearman_server_job_st();
-    if (server_job == NULL)
-    {
-      return NULL;
-    }
-  }
-
-  server_job->ignore_job= false;
-  server_job->job_queued= false;
-  server_job->retries= 0;
-  server_job->priority= 0;
-  server_job->job_handle_key= 0;
-  server_job->unique_key= 0;
-  server_job->client_count= 0;
-  server_job->numerator= 0;
-  server_job->denominator= 0;
-  server_job->data_size= 0;
-  server_job->next= NULL;
-  server_job->prev= NULL;
-  server_job->unique_next= NULL;
-  server_job->unique_prev= NULL;
-  server_job->worker_next= NULL;
-  server_job->worker_prev= NULL;
-  server_job->function= NULL;
-  server_job->function_next= NULL;
-  server_job->data= NULL;
-  server_job->client_list= NULL;
-  server_job->worker= NULL;
-  server_job->job_handle[0]= 0;
-  server_job->unique[0]= 0;
-  server_job->unique_length= 0;
-
-  return server_job;
-}
-
 void gearman_server_job_free(gearman_server_job_st *server_job)
 {
   if (server_job == NULL)
@@ -473,37 +454,3 @@ gearmand_error_t gearman_server_job_queue(gearman_server_job_st *job)
 /*
  * Private definitions
  */
-
-static gearman_server_job_st *
-_server_job_get_unique(gearman_server_st *server, uint32_t unique_key,
-                       gearman_server_function_st *server_function,
-                       const char *unique, size_t data_size)
-{
-  gearman_server_job_st *server_job;
-
-  for (server_job= server->unique_hash[unique_key % GEARMAND_JOB_HASH_SIZE];
-       server_job != NULL; server_job= server_job->unique_next)
-  {
-    if (data_size == 0)
-    {
-      if (server_job->function == server_function &&
-          server_job->unique_key == unique_key &&
-          !strcmp(server_job->unique, unique))
-      {
-        return server_job;
-      }
-    }
-    else
-    {
-      if (server_job->function == server_function &&
-          server_job->unique_key == unique_key &&
-          server_job->data_size == data_size &&
-          !memcmp(server_job->data, unique, data_size))
-      {
-        return server_job;
-      }
-    }
-  }
-
-  return NULL;
-}

+ 0 - 2
libgearman-server/job.h

@@ -112,8 +112,6 @@ gearman_server_job_st *gearman_server_job_get_by_unique(gearman_server_st *serve
 
 /** @} */
 
-gearman_server_job_st* build_gearman_server_job_st(void);
-
 void destroy_gearman_server_job_st(gearman_server_job_st *);
 
 #ifdef __cplusplus

+ 64 - 3
libgearman-server/job_plus.cc

@@ -36,10 +36,13 @@
  */
 
 #include <config.h>
+
 #include <libgearman-server/common.h>
+#include <libgearman-server/list.h>
 
 #include <cstring>
 #include <memory>
+#include <cassert>
 
 /**
  * Generate hash key for job handles and unique IDs.
@@ -292,19 +295,31 @@ void *_proc(void *data)
 
   while (1)
   {
-    (void) pthread_mutex_lock(&(server->proc_lock));
+    if (pthread_mutex_lock(&(server->proc_lock)) == -1)
+    {
+      gearmand_fatal("pthread_mutex_lock()");
+      return NULL;
+    }
+
     while (server->proc_wakeup == false)
     {
       if (server->proc_shutdown)
       {
-        (void) pthread_mutex_unlock(&(server->proc_lock));
+        if (pthread_mutex_unlock(&(server->proc_lock)) == -1)
+        {
+          gearmand_fatal("pthread_mutex_unlock()");
+          assert(!"pthread_mutex_lock");
+        }
         return NULL;
       }
 
       (void) pthread_cond_wait(&(server->proc_cond), &(server->proc_lock));
     }
     server->proc_wakeup= false;
-    (void) pthread_mutex_unlock(&(server->proc_lock));
+    if (pthread_mutex_unlock(&(server->proc_lock)) == -1)
+    {
+      gearmand_fatal("pthread_mutex_unlock()");
+    }
 
     for (gearman_server_thread_st *thread= server->thread_list; thread != NULL; thread= thread->next)
     {
@@ -350,3 +365,49 @@ void *_proc(void *data)
     }
   }
 }
+
+gearman_server_job_st * gearman_server_job_create(gearman_server_st *server)
+{
+  gearman_server_job_st *server_job;
+
+  if (server->free_job_count > 0)
+  {
+    server_job= server->free_job_list;
+    gearmand_server_free_job_list_free(server, server_job);
+  }
+  else
+  {
+    server_job= new (std::nothrow) gearman_server_job_st;
+    if (server_job == NULL)
+    {
+      return NULL;
+    }
+  }
+
+  server_job->ignore_job= false;
+  server_job->job_queued= false;
+  server_job->retries= 0;
+  server_job->priority= GEARMAND_JOB_PRIORITY_NORMAL;
+  server_job->job_handle_key= 0;
+  server_job->unique_key= 0;
+  server_job->client_count= 0;
+  server_job->numerator= 0;
+  server_job->denominator= 0;
+  server_job->data_size= 0;
+  server_job->next= NULL;
+  server_job->prev= NULL;
+  server_job->unique_next= NULL;
+  server_job->unique_prev= NULL;
+  server_job->worker_next= NULL;
+  server_job->worker_prev= NULL;
+  server_job->function= NULL;
+  server_job->function_next= NULL;
+  server_job->data= NULL;
+  server_job->client_list= NULL;
+  server_job->worker= NULL;
+  server_job->job_handle[0]= 0;
+  server_job->unique[0]= 0;
+  server_job->unique_length= 0;
+
+  return server_job;
+}

+ 1 - 1
libgearman-server/packet.cc

@@ -78,7 +78,7 @@ void gearman_server_packet_free(gearman_server_packet_st *packet,
                                 gearman_server_thread_st *thread,
                                 bool from_thread)
 {
-  if (from_thread && Server->flags.threaded)
+  if (from_thread and Server->flags.threaded)
   {
     if (thread->free_packet_count < GEARMAN_MAX_FREE_SERVER_PACKET)
     {

+ 31 - 7
libgearman-server/text.cc

@@ -101,7 +101,11 @@ gearmand_error_t server_run_text(gearman_server_con_st *server_con,
             char *new_data= (char *)realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
             if (new_data == NULL)
             {
-              (void) pthread_mutex_unlock(&thread->lock);
+              if (pthread_mutex_unlock(&(thread->lock)) == -1)
+              {
+                gearmand_fatal("pthread_mutex_unlock()");
+                assert(!"pthread_mutex_lock");
+              }
               gearmand_perror("realloc");
               gearmand_debug("free");
               free(data);
@@ -117,7 +121,11 @@ gearmand_error_t server_run_text(gearman_server_con_st *server_con,
 
           if ((size_t)sn_checked_length > total - size || sn_checked_length < 0)
           {
-            (void) pthread_mutex_unlock(&thread->lock);
+            if (pthread_mutex_unlock(&(thread->lock)) == -1)
+            {
+              gearmand_fatal("pthread_mutex_unlock()");
+              assert(!"pthread_mutex_lock");
+            }
             gearmand_debug("free");
             free(data);
             gearmand_perror("snprintf");
@@ -138,7 +146,11 @@ gearmand_error_t server_run_text(gearman_server_con_st *server_con,
 
             if ((size_t)checked_length > total - size || checked_length < 0)
             {
-              (void) pthread_mutex_unlock(&thread->lock);
+              if (pthread_mutex_unlock(&(thread->lock)) == -1)
+              {
+                gearmand_fatal("pthread_mutex_unlock()");
+                assert(!"pthread_mutex_lock");
+              }
               gearmand_debug("free");
               free(data);
               gearmand_perror("snprintf");
@@ -158,7 +170,11 @@ gearmand_error_t server_run_text(gearman_server_con_st *server_con,
           int checked_length= snprintf(data + size, total - size, "\n");
           if ((size_t)checked_length > total - size || checked_length < 0)
           {
-            (void) pthread_mutex_unlock(&thread->lock);
+            if (pthread_mutex_unlock(&(thread->lock)) == -1)
+            {
+              gearmand_fatal("pthread_mutex_unlock()");
+              assert(!"pthread_mutex_lock");
+            }
             gearmand_debug("free");
             free(data);
             gearmand_perror("snprintf");
@@ -167,7 +183,11 @@ gearmand_error_t server_run_text(gearman_server_con_st *server_con,
           size+= (size_t)checked_length;
         }
 
-        (void) pthread_mutex_unlock(&thread->lock);
+        if (pthread_mutex_unlock(&(thread->lock)) == -1)
+        {
+          gearmand_fatal("pthread_mutex_unlock()");
+          assert(!"pthread_mutex_lock");
+        }
       }
       else
       {
@@ -417,10 +437,14 @@ gearmand_error_t server_run_text(gearman_server_con_st *server_con,
   server_packet->packet.data_size= strlen(data);
 
   int error;
-  if (! (error= pthread_mutex_lock(&server_con->thread->lock)))
+  if ((error= pthread_mutex_lock(&server_con->thread->lock)) == 0)
   {
     GEARMAN_FIFO_ADD(server_con->io_packet, server_packet,);
-    (void) pthread_mutex_unlock(&server_con->thread->lock);
+    if (pthread_mutex_unlock(&(server_con->thread->lock)) == -1)
+    {
+      gearmand_fatal("pthread_mutex_unlock()");
+      assert(!"pthread_mutex_lock");
+    }
   }
   else
   {