Browse Source

Adding reducer test for failure cases.

Brian Aker 14 years ago
parent
commit
bbbc9e338c
10 changed files with 69 additions and 23 deletions
  1. 1 1
      libgearman/add.cc
  2. 11 6
      libgearman/client.cc
  3. 8 1
      libgearman/job.cc
  4. 2 2
      libgearman/return.h
  5. 2 1
      libgearman/string.h
  6. 4 2
      libgearman/worker.cc
  7. 13 1
      libtest/test.h
  8. 5 5
      libtest/worker.cc
  9. 6 4
      tests/client_test.cc
  10. 17 0
      tests/execute.cc

+ 1 - 1
libgearman/add.cc

@@ -92,7 +92,7 @@ gearman_task_st *add_task(gearman_client_st *client,
     return NULL;
   }
 
-  gearman_string_t function= { gearman_string_make_from_cstr(function_name) };
+  gearman_string_t function= { gearman_string_param_cstr(function_name) };
   gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
   gearman_string_t workload= { static_cast<const char *>(workload_str), workload_size };
 

+ 11 - 6
libgearman/client.cc

@@ -641,7 +641,11 @@ gearman_task_st *gearman_client_execute_reduce(gearman_client_st *client,
   if (task)
   {
     // Run!
-    gearman_client_run_tasks(client);
+    if (gearman_failed(gearman_client_run_tasks(client)))
+    {
+      gearman_task_free(task);
+      task= NULL;
+    }
   }
 
   return task;
@@ -680,7 +684,7 @@ gearman_return_t gearman_client_do_background(gearman_client_st *client,
                                               size_t workload_size,
                                               char *job_handle)
 {
-  gearman_string_t function= { gearman_string_make_from_cstr(function_name) };
+  gearman_string_t function= { gearman_string_param_cstr(function_name) };
   gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
   gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
 
@@ -698,7 +702,7 @@ gearman_return_t gearman_client_do_high_background(gearman_client_st *client,
                                                    size_t workload_size,
                                                    char *job_handle)
 {
-  gearman_string_t function= { gearman_string_make_from_cstr(function_name) };
+  gearman_string_t function= { gearman_string_param_cstr(function_name) };
   gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
   gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
 
@@ -716,7 +720,7 @@ gearman_return_t gearman_client_do_low_background(gearman_client_st *client,
                                                   size_t workload_size,
                                                   char *job_handle)
 {
-  gearman_string_t function= { gearman_string_make_from_cstr(function_name) };
+  gearman_string_t function= { gearman_string_param_cstr(function_name) };
   gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
   gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
 
@@ -1160,7 +1164,7 @@ static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
               break;
             }
 
-            if (client->task == NULL)
+            if (not client->task)
             {
               /* The client has stopped waiting for the response, ignore it. */
               gearman_packet_free(&(client->con->_packet));
@@ -1251,6 +1255,7 @@ gearman_return_t gearman_client_run_tasks(gearman_client_st *client)
 
   if (gearman_failed(rc))
   {
+    gearman_gerror(client->universal, rc);
 #if 0
     if (rc != gearman_universal_error_code(client->universal))
     {
@@ -1594,7 +1599,7 @@ static void *_client_do(gearman_client_st *client, gearman_command_t command,
 {
   gearman_task_st do_task, *do_task_ptr;
   gearman_client_task_free_all(client);
-  gearman_string_t function= { gearman_string_make_from_cstr(function_name) };
+  gearman_string_t function= { gearman_string_param_cstr(function_name) };
   gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
   gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
 

+ 8 - 1
libgearman/job.cc

@@ -127,6 +127,13 @@ struct gearman_job_reducer_st {
       return rc;
     }
 
+    gearman_task_st *check_task= client->task_list;
+    do
+    {
+      if (gearman_failed(check_task->result_rc))
+        return check_task->result_rc;
+    } while ((check_task= gearman_next(check_task)));
+
     if (aggregator_fn)
     {
       gearman_aggregator_st aggregator(client->context);
@@ -197,7 +204,7 @@ bool gearman_job_build_reducer(gearman_job_st *job, gearman_aggregator_fn *aggre
   if (job->reducer)
     return true;
 
-  gearman_string_t reducer_func= { gearman_string_make_from_cstr(gearman_job_reducer(job)) };
+  gearman_string_t reducer_func= { gearman_string_param_cstr(gearman_job_reducer(job)) };
 
   job->reducer= new (std::nothrow) gearman_job_reducer_st(job->worker->universal, reducer_func, aggregator_fn);
   if (not job->reducer)

+ 2 - 2
libgearman/return.h

@@ -96,8 +96,8 @@ enum gearman_return_t
   GEARMAN_MAX_RETURN /* Always add new error code before */
 };
 
-#define gearman_failed(X) (((X) != GEARMAN_SUCCESS) ? true : false)
-#define gearman_success(X) (((X) == GEARMAN_SUCCESS) ? true : false)
+#define gearman_failed(X) ((X) != GEARMAN_SUCCESS)
+#define gearman_success(X) ((X) == GEARMAN_SUCCESS)
 
 enum gearman_worker_error_t
 {

+ 2 - 1
libgearman/string.h

@@ -46,6 +46,8 @@ struct gearman_string_t {
 #define gearman_size(X) (X).size
 #define gearman_c_str(X) (X).c_str
 #define gearman_string_param(X) (X).c_str, (X).size
+#define gearman_string_param_null NULL, 0
+#define gearman_string_param_cstr(X) (X), ((X) ? strlen(X) : 0)
 
 #ifdef BUILDING_LIBGEARMAN
 
@@ -55,5 +57,4 @@ struct gearman_string_t {
 #define gearman_string_make(X) (X), (((size_t)((sizeof(X) - 1)))
 #endif // correct define
 
-#define gearman_string_make_from_cstr(X) (X), ((X) ? strlen(X) : 0)
 #endif // BUILDING_LIBGEARMAN

+ 4 - 2
libgearman/worker.cc

@@ -65,7 +65,7 @@ static inline struct _worker_function_st *_function_exist(gearman_worker_st *wor
   {
     if (function_length == function->function_length)
     {
-      if (! memcmp(function_name, function->function_name, function_length))
+      if (not memcmp(function_name, function->function_name, function_length))
         break;
     }
   }
@@ -535,7 +535,7 @@ gearman_job_st *gearman_worker_grab_job(gearman_worker_st *worker,
         worker->function= worker->function_list;
         while (worker->function)
         {
-          if (! (worker->function->options.change))
+          if (not (worker->function->options.change))
           {
             worker->function= worker->function->next;
             continue;
@@ -961,6 +961,8 @@ gearman_return_t gearman_worker_work(gearman_worker_st *worker)
         if (worker->work_job->error_code == GEARMAN_LOST_CONNECTION)
           break;
 
+        worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_FAIL;
+
         return worker->work_job->error_code;
       }
     }

+ 13 - 1
libtest/test.h

@@ -158,11 +158,23 @@ void create_core(void);
 LIBTEST_INTERNAL_API
 const char *test_strerror(test_return_t code);
 
+#define test_assert_errno(A) \
+do \
+{ \
+  if ((A)) { \
+    fprintf(stderr, "\nAssertion failed at %s:%d: ", __FILE__, __LINE__);\
+    perror(#A); \
+    fprintf(stderr, "\n"); \
+    create_core(); \
+    assert((A)); \
+  } \
+} while (0)
+
 #define test_truth(A) \
 do \
 { \
   if (! (A)) { \
-    fprintf(stderr, "\nAssertion failed in %s:%d: %s\n", __FILE__, __LINE__, #A);\
+    fprintf(stderr, "\nAssertion failed at %s:%d: %s\n", __FILE__, __LINE__, #A);\
     create_core(); \
     return TEST_FAILURE; \
   } \

+ 5 - 5
libtest/worker.cc

@@ -6,7 +6,7 @@
  * the COPYING file in the parent directory for full text.
  */
 
-#include "config.h"
+#include <config.h>
 
 #include <cassert>
 #include <cstring>
@@ -16,9 +16,10 @@
 #include <unistd.h>
 #include <pthread.h>
 
-#include <stdio.h>
+#include <cstdio>
 
-#include "libtest/worker.h"
+#include <libtest/test.h>
+#include <libtest/worker.h>
 
 #ifndef __INTEL_COMPILER
 #pragma GCC diagnostic ignored "-Wold-style-cast"
@@ -124,8 +125,7 @@ static struct worker_handle_st *_test_worker_start(in_port_t port,
   foo->mapper_fn= mapper_fn;
   foo->aggregator_fn= aggregator_fn;
 
-  int rc= pthread_create(&handle->thread, &attr, thread_runner, foo);
-  assert(rc == 0);
+  test_assert_errno(pthread_create(&handle->thread, &attr, thread_runner, foo));
 
   pthread_attr_destroy(&attr);
 

+ 6 - 4
tests/client_test.cc

@@ -259,7 +259,7 @@ static test_return_t clone_test(void *object)
   return TEST_SUCCESS;
 }
 
-static test_return_t option_test(void *object __attribute__((unused)))
+static test_return_t option_test(void *)
 {
   gearman_client_st *gear;
   gearman_client_options_t default_options;
@@ -935,6 +935,7 @@ test_st gearman_client_do_tests[] ={
 
 test_st gearman_client_execute_tests[] ={
   {"gearman_client_execute()", 0, gearman_client_execute_test },
+  {"gearman_client_execute(GEARMAN_WORK_FAIL)", 0, gearman_client_execute_fail_test },
   {"gearman_client_execute() epoch", 0, gearman_client_execute_epoch_test },
   {"gearman_client_execute() timeout", 0, gearman_client_execute_timeout_test },
   {"gearman_client_execute() background", 0, gearman_client_execute_bg_test },
@@ -955,8 +956,9 @@ test_st gearman_client_do_job_handle_tests[] ={
   {0, 0, 0}
 };
 
-test_st gearman_worker_set_reducer_tests[] ={
-  {"gearman_worker_set_reducer()", 0, gearman_worker_set_reducer_test },
+test_st gearman_client_execute_reduces[] ={
+  {"gearman_worker_set_reducer()", 0, gearman_client_execute_reduce_basic },
+  {"gearman_worker_set_reducer() fail in reduction", 0, gearman_client_execute_reduce_fail_in_reduction },
   {0, 0, 0}
 };
 
@@ -980,7 +982,6 @@ test_st gearman_task_tests[] ={
 
 
 collection_st collection[] ={
-  {"gearman_worker_set_reducer()", 0, 0, gearman_worker_set_reducer_tests},
   {"gearman_client_st", 0, 0, tests},
   {"gearman_client_st chunky", pre_chunk, post_function_reset, tests}, // Test with a worker that will respond in part
   {"gearman_strerror", 0, 0, gearman_strerror_tests},
@@ -993,6 +994,7 @@ collection_st collection[] ={
   {"gearman_client_do_background", 0, 0, gearman_client_do_background_tests},
   {"gearman_client_set_server_option", 0, 0, gearman_client_set_server_option_tests},
   {"gearman_client_execute", 0, 0, gearman_client_execute_tests},
+  {"gearman_worker_set_reducer()", 0, 0, gearman_client_execute_reduces},
   {"client-logging", pre_logging, post_logging, tests_log},
   {0, 0, 0, 0}
 };

+ 17 - 0
tests/execute.cc

@@ -63,6 +63,23 @@ test_return_t gearman_client_execute_test(void *object)
   return TEST_SUCCESS;
 }
 
+test_return_t gearman_client_execute_fail_test(void *object)
+{
+  gearman_client_st *client= (gearman_client_st *)object;
+  const char *worker_function= (const char *)gearman_client_context(client);
+  assert(worker_function);
+
+  gearman_task_st *task;
+  gearman_argument_t value= gearman_argument_make(gearman_literal_param("fail"));
+
+  test_true_got(task= gearman_client_execute(client, gearman_c_str_param(worker_function), NULL, 0, NULL, &value), gearman_client_error(client));
+  test_compare_got(GEARMAN_WORK_FAIL, gearman_task_error(task), gearman_strerror(gearman_task_error(task)));
+
+  gearman_task_free(task);
+
+  return TEST_SUCCESS;
+}
+
 test_return_t gearman_client_execute_timeout_test(void *object)
 {
   gearman_client_st *client= (gearman_client_st *)object;

Some files were not shown because too many files changed in this diff