Browse Source

Improve testing and assert around gearmand hostile test.

Brian Aker 11 years ago
parent
commit
7047951e8d

+ 48 - 36
libgearman/client.cc

@@ -1004,6 +1004,7 @@ void gearman_client_task_free_all(gearman_client_st *client_shell)
     Client* client= client_shell->impl();
     while (client->task_list)
     {
+      assert(client == client->task_list->impl()->client);
       gearman_task_free(client->task_list);
     }
   }
@@ -1013,7 +1014,7 @@ void gearman_client_task_free_all(gearman_client_st *client_shell)
 void gearman_client_set_task_context_free_fn(gearman_client_st *client,
                                              gearman_task_context_free_fn *function)
 {
-  if (client)
+  if (client and client->impl())
   {
     client->impl()->task_context_free_fn= function;
   }
@@ -1026,12 +1027,12 @@ gearman_return_t gearman_client_set_memory_allocators(gearman_client_st *client,
                                                       gearman_calloc_fn *calloc_fn,
                                                       void *context)
 {
-  if (client == NULL)
+  if (client and client->impl())
   {
-    return GEARMAN_INVALID_ARGUMENT;
+    return gearman_set_memory_allocator(client->impl()->universal.allocator, malloc_fn, free_fn, realloc_fn, calloc_fn, context);
   }
 
-  return gearman_set_memory_allocator(client->impl()->universal.allocator, malloc_fn, free_fn, realloc_fn, calloc_fn, context);
+  return GEARMAN_INVALID_ARGUMENT;
 }
 
 
@@ -1050,20 +1051,20 @@ gearman_task_st *gearman_client_add_task(gearman_client_st *client,
     ret_ptr= &unused;
   }
 
-  if (client == NULL or client->impl() == NULL)
+  if (client and client->impl())
   {
-    *ret_ptr= GEARMAN_INVALID_ARGUMENT;
-    return NULL;
+    return add_task_ptr(*(client->impl()), task,
+                        context, GEARMAN_COMMAND_SUBMIT_JOB,
+                        function,
+                        unique,
+                        workload, workload_size,
+                        time_t(0),
+                        *ret_ptr,
+                        client->impl()->actions);
   }
 
-  return add_task_ptr(*(client->impl()), task,
-                      context, GEARMAN_COMMAND_SUBMIT_JOB,
-                      function,
-                      unique,
-                      workload, workload_size,
-                      time_t(0),
-                      *ret_ptr,
-                      client->impl()->actions);
+  *ret_ptr= GEARMAN_INVALID_ARGUMENT;
+  return NULL;
 }
 
 gearman_task_st *gearman_client_add_task_high(gearman_client_st *client,
@@ -1080,20 +1081,20 @@ gearman_task_st *gearman_client_add_task_high(gearman_client_st *client,
     ret_ptr= &unused;
   }
 
-  if (client == NULL or client->impl() == NULL)
+  if (client and client->impl())
   {
-    *ret_ptr= GEARMAN_INVALID_ARGUMENT;
-    return NULL;
+    return add_task_ptr(*(client->impl()), task, context,
+                        GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
+                        function,
+                        unique,
+                        workload, workload_size,
+                        time_t(0),
+                        *ret_ptr,
+                        client->impl()->actions);
   }
 
-  return add_task_ptr(*(client->impl()), task, context,
-                      GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
-                      function,
-                      unique,
-                      workload, workload_size,
-                      time_t(0),
-                      *ret_ptr,
-                      client->impl()->actions);
+  *ret_ptr= GEARMAN_INVALID_ARGUMENT;
+  return NULL;
 }
 
 gearman_task_st *gearman_client_add_task_low(gearman_client_st *client,
@@ -1110,19 +1111,19 @@ gearman_task_st *gearman_client_add_task_low(gearman_client_st *client,
     ret_ptr= &unused;
   }
 
-  if (client == NULL or client->impl() == NULL)
+  if (client and client->impl())
   {
-    *ret_ptr= GEARMAN_INVALID_ARGUMENT;
-    return NULL;
+    return add_task_ptr(*(client->impl()), task, context, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
+                        function,
+                        unique,
+                        workload, workload_size,
+                        time_t(0),
+                        *ret_ptr,
+                        client->impl()->actions);
   }
 
-  return add_task_ptr(*(client->impl()), task, context, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
-                      function,
-                      unique,
-                      workload, workload_size,
-                      time_t(0),
-                      *ret_ptr,
-                      client->impl()->actions);
+  *ret_ptr= GEARMAN_INVALID_ARGUMENT;
+  return NULL;
 }
 
 gearman_task_st *gearman_client_add_task_background(gearman_client_st *client,
@@ -1850,6 +1851,17 @@ bool gearman_client_has_tasks(const gearman_client_st *client_shell)
 {
   if (client_shell and client_shell->impl())
   {
+#ifndef NDEBUG
+    if (client_shell->impl()->task_list)
+    {
+      assert(client_shell->impl()->task_count);
+    }
+    else
+    {
+      assert(client_shell->impl()->task_count == 0);
+    }
+#endif
+
     return bool(client_shell->impl()->task_list);
   }
 

+ 20 - 1
libgearman/interface/client.hpp

@@ -90,7 +90,11 @@ struct Client
   {
     _do_handle[0]= 0;
 
-    if (shell_)
+    if (shell_ and shell_ == &_owned_shell)
+    {
+      gearman_set_allocated(_shell, true);
+    }
+    else if (shell_)
     {
       gearman_set_allocated(_shell, false);
     }
@@ -108,6 +112,21 @@ struct Client
 
   ~Client()
   {
+    if (_shell)
+    {
+#ifndef NDBUG
+      if (_shell == &_owned_shell)
+      {
+        assert(gearman_is_allocated(_shell));
+      }
+      else // _shell != &_owned_shell
+#endif
+      {
+        gearman_set_allocated(_shell, false);
+        gearman_set_initialized(_shell, false);
+        _shell->_impl= NULL;
+      }
+    }
   }
 
   gearman_client_st* shell()

+ 5 - 0
libgearman/interface/task.hpp

@@ -61,6 +61,11 @@ struct Task
       is_paused(false),
       is_initialized(true)
     { }
+
+    ~Options()
+    {
+      is_initialized= false;
+    }
   } options;
   enum gearman_task_kind_t type;
   enum gearman_task_state_t state;

+ 63 - 45
libgearman/task.cc

@@ -77,62 +77,71 @@ void gearman_task_free(gearman_task_st *task_shell)
 {
   if (task_shell and task_shell->impl())
   {
+    assert(gearman_is_initialized(task_shell));
+    Task* task= task_shell->impl();
     if (gearman_is_initialized(task_shell))
     {
-      assert(task_shell->impl());
-      Task* task;
-      if ((task= task_shell->impl()))
+      assert(task->magic_ != TASK_ANTI_MAGIC);
+      assert(task->magic_ == TASK_MAGIC);
+      task->magic_= TASK_ANTI_MAGIC;
+      if (task->client)
       {
-        assert(task->magic_ != TASK_ANTI_MAGIC);
-        assert(task->magic_ == TASK_MAGIC);
-        task->magic_= TASK_ANTI_MAGIC;
+        if (task->options.send_in_use)
+        {
+          gearman_packet_free(&(task->send));
+        }
 
-        if (task->client)
+        if (task->type != GEARMAN_TASK_KIND_DO  and task->context and  task->client->task_context_free_fn)
         {
-          if (task->options.send_in_use)
-          {
-            gearman_packet_free(&(task->send));
-          }
-
-          if (task->type != GEARMAN_TASK_KIND_DO  and task->context and  task->client->task_context_free_fn)
-          {
-            task->client->task_context_free_fn(task_shell, static_cast<void *>(task->context));
-          }
-
-          if (task->client->task_list == task_shell)
-          {
-            task->client->task_list= task->next;
-          }
-
-          if (task->prev)
-          {
-            task->prev->impl()->next= task->next;
-          }
-
-          if (task->next)
-          {
-            task->next->impl()->prev= task->prev;
-          }
-
-          task->client->task_count--;
-
-          // If the task we are removing is a current task, remove it from the client
-          // structures.
-          if (task->client->task == task_shell)
-          {
-            task->client->task= NULL;
-          }
-          task->client= NULL;
+          task->client->task_context_free_fn(task_shell, static_cast<void *>(task->context));
         }
-        task->job_handle[0]= 0;
 
-        gearman_set_initialized(task, false);
+        if (task->client->task_list == task_shell)
+        {
+          task->client->task_list= task->next;
+        }
 
-        task_shell->_impl= NULL;
+        if (task->prev)
+        {
+          task->prev->impl()->next= task->next;
+        }
 
-        delete task;
+        if (task->next)
+        {
+          task->next->impl()->prev= task->prev;
+        }
+
+        task->client->task_count--;
+
+        // If the task we are removing is a current task, remove it from the client
+        // structures.
+        if (task->client->task == task_shell)
+        {
+          task->client->task= NULL;
+        }
+        task->client= NULL;
       }
+      task->job_handle[0]= 0;
+
+      gearman_set_initialized(task, false);
+      gearman_set_initialized(task_shell, false);
+
+      task_shell->_impl= NULL;
+
+      delete task;
     }
+    else
+    {
+      task->client= NULL;
+      gearman_set_initialized(task_shell, false);
+      task_shell->_impl= NULL;
+      delete task;
+    }
+  }
+  else if (task_shell)
+  {
+    gearman_set_initialized(task_shell, false);
+    task_shell->_impl= NULL;
   }
 }
 
@@ -428,6 +437,15 @@ gearman_return_t gearman_task_return(const gearman_task_st *task_shell)
 Task::~Task()
 {
   free_result();
+
+  if (_shell)
+  {
+    if (_shell != &_owned_shell)
+    {
+      gearman_set_allocated(_shell, false);
+    }
+    gearman_set_initialized(_shell, false);
+  }
 }
 
 void Task::result(gearman_result_st* result_)

+ 3 - 5
libgearman/worker.cc

@@ -892,13 +892,11 @@ gearman_job_st *gearman_worker_grab_job(gearman_worker_st *worker_shell,
           active= 0;
           for (worker->con= worker->universal.con_list; worker->con; worker->con= worker->con->next_connection())
           {
-            if (worker->con->socket_descriptor_is_valid() == false)
+            if (worker->con->socket_descriptor_is_valid())
             {
-              continue;
+              worker->con->set_events(POLLIN);
+              active++;
             }
-
-            worker->con->set_events(POLLIN);
-            active++;
           }
 
           if ((&worker->universal)->options.non_blocking)

+ 21 - 17
libhostile/recv.c

@@ -99,28 +99,32 @@ ssize_t recv(int sockfd, void *buf, size_t len, int flags)
   (void) pthread_once(&function_lookup_once, set_local);
 
   bool corrupt= false;
-  if (is_called() == false && __function.frequency)
+  if (sockfd != -1)
   {
-    if (false)
-    { }
-    else if (--not_until < 0 && rand() % __function.frequency)
+    if (is_called() == false && __function.frequency)
     {
-      __function._used++;
-      shutdown(sockfd, SHUT_RDWR);
-      close(sockfd);
-
-      if (rand() % 2)
-      {
-        errno= ECONNREFUSED;
-        return -1;
-      }
-      else
+      if (false)
+      { }
+      else if (--not_until < 0 && rand() % __function.frequency)
       {
-        errno= 0;
+        __function._used++;
+        int tmp_sockfd= dup(sockfd);
+        shutdown(tmp_sockfd, SHUT_RDWR);
+        close(tmp_sockfd);
+
+        if (rand() % 2)
+        {
+          errno= ECONNREFUSED;
+          return -1;
+        }
+        else
+        {
+          errno= 0;
 #if 0
-        perror("HOSTILE CLOSE() of socket during recv()");
+          perror("HOSTILE CLOSE() of socket during recv()");
 #endif
-        return 0;
+          return 0; // Simulate EOF
+        }
       }
     }
   }

+ 12 - 8
libhostile/send.c

@@ -79,17 +79,21 @@ ssize_t send(int sockfd, const void *buf, size_t len, int flags)
 
   (void) pthread_once(&function_lookup_once, set_local);
 
-  if (is_called() == false)
+  if (sockfd != -1)
   {
-    if (__function.frequency)
+    if (is_called() == false)
     {
-      if (--not_until < 0 && random() % __function.frequency)
+      if (__function.frequency)
       {
-        __function._used++;
-        shutdown(sockfd, SHUT_RDWR);
-        close(sockfd);
-        errno= ECONNRESET;
-        return -1;
+        if (--not_until < 0 && random() % __function.frequency)
+        {
+          __function._used++;
+          int tmp_sockfd= dup(sockfd);
+          shutdown(tmp_sockfd, SHUT_RDWR);
+          close(tmp_sockfd);
+          errno= ECONNRESET;
+          return -1;
+        }
       }
     }
   }

+ 11 - 7
tests/hostile.cc

@@ -125,21 +125,22 @@ extern "C" {
     {
       libgearman::Client client(current_server());
 
+      libtest::vchar_t payload;
+      payload.resize(success->payload_size);
+
       gearman_client_set_timeout(&client, 1000);
       for (size_t x= 0; x < 100; x++)
       {
         int oldstate;
         pthread_setcanceltype(PTHREAD_CANCEL_DISABLE, &oldstate);
-        libtest::vchar_t payload;
-        payload.resize(success->payload_size);
         gearman_return_t rc;
         void *value= gearman_client_do(&client, WORKER_FUNCTION_NAME,
                                        NULL,
                                        &payload[0], 
                                        payload.size() ? random() % payload.size() : 0,
                                        NULL, &rc);
-        pthread_setcanceltype(oldstate, NULL);
 
+        fatal_assert(gearman_client_has_tasks(&client) == false);
         if (gearman_success(rc))
         {
           success->increment();
@@ -149,7 +150,10 @@ extern "C" {
         {
           free(value);
         }
+        pthread_setcanceltype(oldstate, NULL);
       }
+      
+      fatal_assert(gearman_client_has_tasks(&client) == false);
     }
 
     pthread_exit(0);
@@ -195,9 +199,9 @@ static bool join_thread(pthread_t& thread_arg)
 {
   int error;
 
+#if defined(HAVE_PTHREAD_TIMEDJOIN_NP) && HAVE_PTHREAD_TIMEDJOIN_NP
   if (HAVE_PTHREAD_TIMEDJOIN_NP)
   {
-#if defined(HAVE_PTHREAD_TIMEDJOIN_NP) && HAVE_PTHREAD_TIMEDJOIN_NP
     int limit= 2;
     while (--limit)
     {
@@ -228,10 +232,10 @@ static bool join_thread(pthread_t& thread_arg)
       Error << "pthread_cancel() " << strerror(error);
       return false;
     }
-#endif
 
     return true;
   }
+#endif
 
   if ((error= pthread_join(thread_arg, NULL)) != 0)
   {
@@ -272,13 +276,13 @@ static test_return_t worker_ramp_exec(const size_t payload_size)
   std::vector<client_thread_context_st>  success;
   success.resize(children.size());
 
-  for (size_t x= 0; x < children.size(); x++)
+  for (size_t x= 0; x < children.size(); ++x)
   {
     success[x].payload_size= payload_size;
     pthread_create(&children[x], NULL, client_thread, &success[x]);
   }
   
-  for (size_t x= 0; x < children.size(); x++)
+  for (size_t x= 0; x < children.size(); ++x)
   {
     pthread_t& thread= children[x];
     bool join_success= false;

+ 3 - 0
tests/include.am

@@ -153,6 +153,9 @@ noinst_PROGRAMS+=t/c
 valgrind-c: t/c
 	@$(VALGRIND_COMMAND) t/c
 
+gdb-c: t/c
+	@$(GDB_COMMAND) t/c
+
 test-cycle: t/cycle gearmand/gearmand
 	@t/cycle