Browse Source

Partial commit.

Brian Aker 14 years ago
parent
commit
573f997f94
6 changed files with 104 additions and 80 deletions
  1. 2 2
      benchmark/blobslap_client.c
  2. 22 20
      libgearman/client.cc
  3. 5 5
      libgearman/worker.cc
  4. 1 0
      libtest/worker.cc
  5. 68 50
      tests/burnin.cc
  6. 6 3
      tests/include.am

+ 2 - 2
benchmark/blobslap_client.c

@@ -216,8 +216,7 @@ int main(int argc, char *argv[])
     gearman_client_set_complete_fn(&client, _complete);
     gearman_client_set_fail_fn(&client, _fail);
 
-    gearman_return_t ret;
-    ret= gearman_client_run_tasks(&client);
+    gearman_return_t ret= gearman_client_run_tasks(&client);
 
     if (ret != GEARMAN_SUCCESS && ret != GEARMAN_LOST_CONNECTION)
     {
@@ -318,6 +317,7 @@ static gearman_return_t _complete(gearman_task_st *task)
     size= gearman_task_recv_data(task, buffer, BLOBSLAP_BUFFER_SIZE, &ret);
     if (ret != GEARMAN_SUCCESS)
       return ret;
+
     if (size == 0)
       break;
   }

+ 22 - 20
libgearman/client.cc

@@ -1051,7 +1051,7 @@ static inline void _pop_non_blocking(gearman_client_st *client)
 
 static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
 {
-  gearman_return_t ret;
+  gearman_return_t ret= GEARMAN_MAX_RETURN;
 
   switch(client->state)
   {
@@ -1068,21 +1068,21 @@ static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
             continue;
 
   case GEARMAN_CLIENT_STATE_NEW:
-          ret= _client_run_task(client, client->task);
-          if (ret != GEARMAN_SUCCESS && ret != GEARMAN_IO_WAIT)
+          gearman_return_t local_ret= _client_run_task(client, client->task);
+          if (gearman_failed(ret) and local_ret != GEARMAN_IO_WAIT)
           {
             client->state= GEARMAN_CLIENT_STATE_NEW;
 
-            return ret;
+            return local_ret;
           }
         }
 
         if (client->new_tasks == 0)
         {
-          ret= gearman_flush_all(&client->universal);
-          if (gearman_failed(ret))
+          gearman_return_t local_ret= gearman_flush_all(&client->universal);
+          if (gearman_failed(local_ret))
           {
-            return ret;
+            return local_ret;
           }
         }
       }
@@ -1104,11 +1104,11 @@ static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
             }
 
   case GEARMAN_CLIENT_STATE_SUBMIT:
-            ret= _client_run_task(client, client->task);
-            if (ret != GEARMAN_SUCCESS && ret != GEARMAN_IO_WAIT)
+            gearman_return_t local_ret= _client_run_task(client, client->task);
+            if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
             {
               client->state= GEARMAN_CLIENT_STATE_SUBMIT;
-              return ret;
+              return local_ret;
             }
           }
         }
@@ -1154,6 +1154,7 @@ static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
 
           if (client->task == NULL)
           {
+            assert(ret != GEARMAN_MAX_RETURN);
 
             /* Check the return of the gearman_connection_recv() calls above. */
             if (gearman_failed(ret))
@@ -1223,14 +1224,14 @@ static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
 
   case GEARMAN_CLIENT_STATE_PACKET:
           /* Let task process job created or result packet. */
-          ret= _client_run_task(client, client->task);
-          if (ret == GEARMAN_IO_WAIT)
+          gearman_return_t local_ret= _client_run_task(client, client->task);
+          if (local_ret == GEARMAN_IO_WAIT)
             break;
 
-          if (gearman_failed(ret))
+          if (gearman_failed(local_ret))
           {
             client->state= GEARMAN_CLIENT_STATE_PACKET;
-            return ret;
+            return local_ret;
           }
 
           /* Clean up the packet. */
@@ -1261,8 +1262,8 @@ static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
       }
 
       /* Wait for activity on one of the connections. */
-      ret= gearman_wait(&client->universal);
-      if (ret != GEARMAN_SUCCESS && ret != GEARMAN_IO_WAIT)
+      gearman_return_t local_ret= gearman_wait(&client->universal);
+      if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
       {
         client->state= GEARMAN_CLIENT_STATE_IDLE;
 
@@ -1300,11 +1301,10 @@ gearman_return_t gearman_client_run_tasks(gearman_client_st *client)
 
   if (gearman_failed(rc))
   {
-    gearman_error(&client->universal, rc, "occured during gearman_client_run_tasks()");
-    return rc;
+    assert(gearman_universal_error_code(&client->universal) == rc);
   }
 
-  return GEARMAN_SUCCESS;
+  return rc;
 }
 
 /*
@@ -1316,7 +1316,7 @@ static gearman_client_st *_client_allocate(gearman_client_st *client, bool is_cl
   if (client == NULL)
   {
     client= new (std::nothrow) gearman_client_st;
-    if (client == NULL)
+    if (not client)
       return NULL;
 
     client->options.allocated= true;
@@ -1573,6 +1573,8 @@ static gearman_return_t _client_run_task(gearman_client_st *client, gearman_task
     }
     else if (task->recv->command == GEARMAN_COMMAND_WORK_COMPLETE)
     {
+      task->result_rc= GEARMAN_SUCCESS;
+
   case GEARMAN_TASK_STATE_COMPLETE:
       if (task->func.complete_fn)
       {

+ 5 - 5
libgearman/worker.cc

@@ -901,20 +901,20 @@ gearman_return_t gearman_worker_work(gearman_worker_st *worker)
         gearman_job_build_reducer(worker->work_job, worker->work_function->mapper_fn);
       }
 
-      gearman_return_t ret;
+      gearman_return_t ret= GEARMAN_WORK_FAIL;
       worker->work_result= worker->work_function->worker_fn(worker->work_job,
 							    static_cast<void *>(worker->work_function->context),
 							    &(worker->work_result_size), &ret);
       if (ret == GEARMAN_WORK_FAIL)
       {
-	ret= gearman_job_send_fail(worker->work_job);
-	if (gearman_failed(ret))
+	gearman_return_t rc= gearman_job_send_fail(worker->work_job);
+	if (gearman_failed(rc))
 	{
-	  if (ret == GEARMAN_LOST_CONNECTION)
+	  if (rc == GEARMAN_LOST_CONNECTION)
 	    break;
 
 	  worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_FAIL;
-	  return ret;
+	  return rc;
 	}
 
 	break;

+ 1 - 0
libtest/worker.cc

@@ -92,6 +92,7 @@ static void *thread_runner(void *con)
 
   pthread_exit(0);
 }
+
 struct worker_handle_st *test_worker_start(in_port_t port, const char *function_name,
                                            gearman_worker_fn *function, void *function_arg,
                                            gearman_worker_options_t options)

+ 68 - 50
tests/burnin.cc

@@ -12,11 +12,12 @@
 # undef NDEBUG
 #endif
 
-#include <assert.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <errno.h>
+#include <cassert>
+#include <cerrno>
+#include <cstdio>
+#include <cstdlib>
+#include <cstring>
+#include <iostream>
 
 #include <libgearman/gearman.h>
 
@@ -24,16 +25,20 @@
 #include <libtest/server.h>
 #include <libtest/worker.h>
 
-#define CLIENT_TEST_PORT 32123
+#define CLIENT_TEST_PORT 32143
 
 #define DEFAULT_WORKER_NAME "burnin"
 
-typedef struct
-{
+struct client_test_st {
   gearman_client_st client;
   pid_t gearmand_pid;
   struct worker_handle_st *handle;
-} client_test_st;
+
+  client_test_st():
+    gearmand_pid(-1),
+    handle(NULL)
+  { }
+};
 
 struct client_context_st {
   int latch;
@@ -42,11 +47,27 @@ struct client_context_st {
   size_t num_tasks;
   size_t count;
   char *blob;
+
+  client_context_st():
+    latch(0),
+    min_size(1024),
+    max_size(1024 *2),
+    num_tasks(10),
+    count(0), // 1000
+    blob(NULL)
+  { }
 };
 
 void *world_create(test_return_t *error);
 test_return_t world_destroy(void *object);
 
+static gearman_return_t _complete(gearman_task_st *task)
+{
+  std::cerr << "Completed " << gearman_task_job_handle(task) << std::endl;
+
+  return GEARMAN_SUCCESS;
+}
+
 #ifndef __INTEL_COMPILER
 #pragma GCC diagnostic ignored "-Wold-style-cast"
 #endif
@@ -61,6 +82,10 @@ static test_return_t burnin_test(void *object)
   gearman_task_st *tasks= (gearman_task_st *)calloc(context->num_tasks, sizeof(gearman_task_st));
   test_true_got(tasks, strerror(errno));
 
+  gearman_client_set_complete_fn(client, _complete);
+
+  test_true_got(gearman_success(gearman_client_echo(client, gearman_literal_param("echo_test"))), gearman_client_error(client));
+
   do
   {
     for (uint32_t x= 0; x < context->num_tasks; x++)
@@ -81,36 +106,35 @@ static test_return_t burnin_test(void *object)
         blob_size= (blob_size % (context->max_size - context->min_size)) + context->min_size;
       }
 
+      gearman_task_st *task_ptr;
       gearman_return_t ret;
       if (context->latch)
       {
-        (void)gearman_client_add_task_background(client, &(tasks[x]),
-                                                 NULL, DEFAULT_WORKER_NAME, NULL,
-                                                 (void *)context->blob, blob_size, &ret);
+        task_ptr= gearman_client_add_task_background(client, &(tasks[x]),
+                                                     NULL, DEFAULT_WORKER_NAME, NULL,
+                                                     (void *)context->blob, blob_size, &ret);
       }
       else
       {
-        (void)gearman_client_add_task(client, &(tasks[x]), NULL,
-                                      DEFAULT_WORKER_NAME, NULL, (void *)context->blob, blob_size,
-                                      &ret);
+        task_ptr= gearman_client_add_task(client, &(tasks[x]), NULL,
+                                          DEFAULT_WORKER_NAME, NULL, (void *)context->blob, blob_size,
+                                          &ret);
       }
 
-      if (gearman_failed(ret))
-      {
-        if (ret == GEARMAN_LOST_CONNECTION)
-          continue;
-
-        test_true_got(false, gearman_client_error(client));
-      }
+      test_true_got(gearman_success(ret), gearman_client_error(client));
+      test_truth(task_ptr);
+      std::cerr << "Added job " << x << std::endl;
     }
 
-    gearman_return_t ret;
-    ret= gearman_client_run_tasks(client);
-
-    if (ret != GEARMAN_SUCCESS && ret != GEARMAN_LOST_CONNECTION)
+    gearman_client_set_timeout(client, 4000);
+    gearman_return_t ret= gearman_client_run_tasks(client);
+    for (uint32_t x= 0; x < context->num_tasks; x++)
     {
-      test_true_got(false, gearman_client_error(client));
+      std::cerr << gearman_strerror(gearman_task_error(&tasks[x])) << std::endl;
     }
+    std::cerr << "run tasks left " << client->new_tasks << std::endl;
+
+    test_true_got(gearman_success(ret), gearman_client_error(client));
 
     for (uint32_t x= 0; x < context->num_tasks; x++)
     {
@@ -130,14 +154,9 @@ static test_return_t setup(void *object)
 {
   gearman_client_st *client= (gearman_client_st *)object;
 
-  struct client_context_st *context= (struct client_context_st *)calloc(1, sizeof(struct client_context_st));
+  struct client_context_st *context= new client_context_st;
   test_true_got(context, strerror(errno));
 
-  context->min_size= 1024;
-  context->max_size= context->min_size *2;
-  context->num_tasks= 10;
-  context->count= 1000;
-
   context->blob= (char *)malloc(context->max_size);
   test_true_got(context->blob, strerror(errno));
   memset(context->blob, 'x', context->max_size); 
@@ -154,25 +173,22 @@ static test_return_t cleanup(void *object)
   struct client_context_st *context= (struct client_context_st *)gearman_client_context(client);
 
   free(context->blob);
+  delete(context);
 
   return TEST_SUCCESS;
 }
 
 
-static void *worker_fn(gearman_job_st *job, void *context,
+static void *worker_fn(gearman_job_st *, void *,
                        size_t *result_size, gearman_return_t *ret_ptr)
 {
-  (void)job;
-  (void)context;
-  (void)result_size;
-
+  result_size= 0;
   *ret_ptr= GEARMAN_SUCCESS;
   return NULL;
 }
 
 void *world_create(test_return_t *error)
 {
-  client_test_st *test;
   pid_t gearmand_pid;
 
   /**
@@ -181,8 +197,8 @@ void *world_create(test_return_t *error)
    */
   const char *argv[1]= { "client_gearmand" };
 
-  test= (client_test_st *)calloc(1, sizeof(client_test_st));
-  if (! test)
+  client_test_st *test= new client_test_st;
+  if (not test)
   {
     *error= TEST_MEMORY_ALLOCATION_FAILURE;
     return NULL;
@@ -191,25 +207,27 @@ void *world_create(test_return_t *error)
   /**
     We start up everything before we allocate so that we don't have to track memory in the forked process.
   */
-  gearmand_pid= test_gearmand_start(CLIENT_TEST_PORT, 1, argv);
-  
-  if (gearmand_pid == -1)
+  test->gearmand_pid= gearmand_pid= test_gearmand_start(CLIENT_TEST_PORT, 1, argv);
+  if (test->gearmand_pid == -1)
   {
     *error= TEST_FAILURE;
     return NULL;
   }
 
-  test->handle= test_worker_start(CLIENT_TEST_PORT, "burnin", worker_fn, NULL, gearman_worker_options_t());
-
-  test->gearmand_pid= gearmand_pid;
+  test->handle= test_worker_start(CLIENT_TEST_PORT, DEFAULT_WORKER_NAME, worker_fn, NULL, gearman_worker_options_t());
+  if (not test->handle)
+  {
+    *error= TEST_FAILURE;
+    return NULL;
+  }
 
-  if (gearman_client_create(&(test->client)) == NULL)
+  if (not gearman_client_create(&(test->client)))
   {
     *error= TEST_FAILURE;
     return NULL;
   }
 
-  if (gearman_client_add_server(&(test->client), NULL, CLIENT_TEST_PORT) != GEARMAN_SUCCESS)
+  if (gearman_failed(gearman_client_add_server(&(test->client), NULL, CLIENT_TEST_PORT)))
   {
     *error= TEST_FAILURE;
     return NULL;
@@ -226,7 +244,7 @@ test_return_t world_destroy(void *object)
   gearman_client_free(&(test->client));
   test_gearmand_stop(test->gearmand_pid);
   test_worker_stop(test->handle);
-  free(test);
+  delete test;
 
   return TEST_SUCCESS;
 }

+ 6 - 3
tests/include.am

@@ -76,14 +76,14 @@ tests_worker_test_LDADD= \
 tests_cpp_test_SOURCES= tests/cpp_test.cc
 tests_cpp_test_LDADD= ${CLIENT_LDADD}
 
-test-burnin:
+test-burnin: tests/burnin_test
 	@tests/burnin_test $(ARG1) $(ARG2)
 
 test-client: tests/client_test
 	@tests/client_test $(ARG1) $(ARG2)
 
-test-round-robin:
-	tests/round_robin_test $(ARG1) $(ARG2)
+test-round-robin: tests/round_robin_test
+	@tests/round_robin_test $(ARG1) $(ARG2)
 
 test-worker: tests/worker_test
 	@tests/worker_test $(ARG1) $(ARG2)
@@ -107,6 +107,9 @@ gdb-worker: ${noinst_PROGRAMS}
 gdb-internals: ${noinst_PROGRAMS}
 	$(LIBTOOL) --mode=execute gdb tests/internals_test
 
+gdb-burnin: ${noinst_PROGRAMS}
+	$(LIBTOOL) --mode=execute gdb tests/burnin_test
+
 valgrind-client:
 	$(VALGRIND_COMMAND) tests/client_test $(ARG1) $(ARG2)