Browse Source

First part of error cleanup in Job (working on fixing EXCEPTION bug).

Brian Aker 11 years ago
parent
commit
3f61c71c30

+ 18 - 8
libgearman-server/queue.cc

@@ -61,7 +61,11 @@ gearmand_error_t gearman_queue_add(gearman_server_st *server,
 {
   assert(server->state.queue_startup == false);
   gearmand_error_t ret;
-  if (server->queue_version == QUEUE_VERSION_FUNCTION)
+  if (server->queue_version == QUEUE_VERSION_NONE)
+  {
+    return GEARMAND_SUCCESS;
+  }
+  else if (server->queue_version == QUEUE_VERSION_FUNCTION)
   {
     assert(server->queue.functions->_add_fn);
     ret= (*(server->queue.functions->_add_fn))(server,
@@ -114,7 +118,11 @@ gearmand_error_t gearman_queue_done(gearman_server_st *server,
                                     const char *function_name,
                                     size_t function_name_size)
 {
-  if (server->queue_version == QUEUE_VERSION_FUNCTION)
+  if (server->queue_version == QUEUE_VERSION_NONE)
+  {
+    return GEARMAND_SUCCESS;
+  }
+  else if (server->queue_version == QUEUE_VERSION_FUNCTION)
   {
     assert(server->queue.functions->_done_fn);
     return (*(server->queue.functions->_done_fn))(server,
@@ -123,12 +131,14 @@ gearmand_error_t gearman_queue_done(gearman_server_st *server,
                                                   function_name,
                                                   function_name_size);
   }
-
-  assert(server->queue.object);
-  return server->queue.object->done(server,
-                                    unique, unique_size,
-                                    function_name,
-                                    function_name_size);
+  else
+  {
+    assert(server->queue.object);
+    return server->queue.object->done(server,
+                                      unique, unique_size,
+                                      function_name,
+                                      function_name_size);
+  }
 }
 
 void gearman_server_save_job(gearman_server_st& server,

+ 20 - 0
libgearman-server/server.cc

@@ -827,6 +827,26 @@ gearmand_error_t gearman_server_run_command(gearman_server_con_st *server_con,
       {
         return gearmand_gerror("_server_queue_work_data", ret);
       }
+
+#if 0
+      /* Remove from persistent queue if one exists. */
+      if (server_job->job_queued)
+      {
+        ret= gearman_queue_done(Server,
+                                server_job->unique,
+                                server_job->unique_length,
+                                server_job->function->function_name,
+                                server_job->function->function_name_size);
+        if (gearmand_failed(ret))
+        {
+          gearmand_gerror("Remove from persistent queue", ret);
+          return ret;
+        }
+      }
+
+      /* Job is done, remove it. */
+      gearman_server_job_free(server_job);
+#endif
     }
 
     break;

+ 5 - 5
libgearman/function/function_v1.hpp

@@ -62,20 +62,20 @@ public:
       gearman_job_build_reducer(job, NULL);
     }
 
-    job->error_code= GEARMAN_SUCCESS;
-    job->_worker.work_result= _worker_fn(job, context_arg, &(job->_worker.work_result_size), &job->error_code);
+    job->_error_code= GEARMAN_SUCCESS;
+    job->_worker.work_result= _worker_fn(job, context_arg, &(job->_worker.work_result_size), &job->_error_code);
 
-    if (job->error_code == GEARMAN_LOST_CONNECTION)
+    if (job->_error_code == GEARMAN_LOST_CONNECTION)
     {
       return GEARMAN_FUNCTION_ERROR;
     }
 
-    if (job->error_code == GEARMAN_SHUTDOWN)
+    if (job->_error_code == GEARMAN_SHUTDOWN)
     {
       return GEARMAN_FUNCTION_SHUTDOWN;
     }
 
-    if (gearman_failed(job->error_code))
+    if (gearman_failed(job->_error_code))
     {
       return GEARMAN_FUNCTION_FATAL;
     }

+ 4 - 4
libgearman/function/function_v2.cc

@@ -56,19 +56,19 @@ gearman_function_error_t FunctionV2::callback(gearman_job_st* job, void *context
   switch (error)
   {
   case GEARMAN_SHUTDOWN:
-    job->error_code= GEARMAN_SUCCESS;
+    job->_error_code= GEARMAN_SUCCESS;
     return GEARMAN_FUNCTION_SHUTDOWN;
 
   case GEARMAN_FATAL:
-    job->error_code= GEARMAN_FATAL;
+    job->_error_code= GEARMAN_FATAL;
     return GEARMAN_FUNCTION_FATAL;
 
   case GEARMAN_ERROR:
-    job->error_code= GEARMAN_ERROR;
+    job->_error_code= GEARMAN_ERROR;
     return GEARMAN_FUNCTION_ERROR;
 
   case GEARMAN_SUCCESS:
-    job->error_code= GEARMAN_SUCCESS;
+    job->_error_code= GEARMAN_SUCCESS;
     return GEARMAN_FUNCTION_SUCCESS;
 
   case GEARMAN_IO_WAIT:

+ 4 - 4
libgearman/function/partition.cc

@@ -56,19 +56,19 @@ gearman_function_error_t Partition::callback(gearman_job_st* job, void *context_
   switch (error)
   {
   case GEARMAN_FATAL:
-    job->error_code= GEARMAN_FATAL;
+    job->_error_code= GEARMAN_FATAL;
     return GEARMAN_FUNCTION_FATAL;
 
   case GEARMAN_SHUTDOWN:
-    job->error_code= GEARMAN_SUCCESS;
+    job->_error_code= GEARMAN_SUCCESS;
     return GEARMAN_FUNCTION_SHUTDOWN;
 
   case GEARMAN_ERROR:
-    job->error_code= GEARMAN_ERROR;
+    job->_error_code= GEARMAN_ERROR;
     return GEARMAN_FUNCTION_ERROR;
 
   case GEARMAN_SUCCESS:
-    job->error_code= GEARMAN_SUCCESS;
+    job->_error_code= GEARMAN_SUCCESS;
     return GEARMAN_FUNCTION_SUCCESS;
 
   case GEARMAN_IO_WAIT:

+ 5 - 0
libgearman/interface/worker.hpp

@@ -132,6 +132,11 @@ struct Worker
     return universal.error();
   }
 
+  gearman_return_t error_code() const
+  {
+    return universal.error_code();
+  }
+
 private:
   gearman_worker_st* _shell;
   gearman_worker_st _owned_shell;

+ 8 - 3
libgearman/job.cc

@@ -191,7 +191,7 @@ gearman_job_st *gearman_job_create(gearman_worker_st *worker, gearman_job_st *jo
     job->options.finished= false;
 
     job->reducer= NULL;
-    job->error_code= GEARMAN_UNKNOWN_STATE;
+    job->_error_code= GEARMAN_UNKNOWN_STATE;
 
     if (job->_worker.job_list)
     {
@@ -456,7 +456,6 @@ gearman_return_t gearman_job_send_complete_fin(gearman_job_st *job,
       {
         return ret;
       }
-
       job->finished(true);
     }
 
@@ -488,7 +487,13 @@ gearman_return_t gearman_job_send_exception(gearman_job_st *job,
         job->options.work_in_use= true;
       }
 
-      return _job_send(job);
+      if (gearman_failed(_job_send(job)))
+      {
+        return job->error_code();
+      }
+#if 0
+      job->finished(true);
+#endif
     }
 
     return GEARMAN_SUCCESS;

+ 11 - 1
libgearman/job.hpp

@@ -69,10 +69,20 @@ struct gearman_job_st
   gearman_packet_st assigned;
   gearman_packet_st work;
   struct gearman_job_reducer_st *reducer;
-  gearman_return_t error_code;
+  gearman_return_t _error_code;
 
   gearman_universal_st& universal()
   {
     return _worker.universal;
   }
+
+  gearman_universal_st& universal() const
+  {
+    return _worker.universal;
+  }
+
+  gearman_return_t error_code() const
+  {
+    return universal().error_code();
+  }
 };

+ 15 - 15
libgearman/worker.cc

@@ -1043,19 +1043,19 @@ gearman_return_t gearman_worker_work(gearman_worker_st *worker)
                                                           static_cast<void *>(worker->impl()->work_function->context)))
           {
             case GEARMAN_FUNCTION_INVALID_ARGUMENT:
-              worker->impl()->work_job->error_code= gearman_error(worker->impl()->universal, GEARMAN_INVALID_ARGUMENT, "worker returned an invalid response, gearman_return_t");
+              worker->impl()->work_job->_error_code= gearman_error(worker->impl()->universal, GEARMAN_INVALID_ARGUMENT, "worker returned an invalid response, gearman_return_t");
             case GEARMAN_FUNCTION_FATAL:
               if (gearman_job_send_fail_fin(worker->impl()->work_job) == GEARMAN_LOST_CONNECTION) // If we fail this, we have no connection, @note this causes us to lose the current error
               {
-                worker->impl()->work_job->error_code= GEARMAN_LOST_CONNECTION;
+                worker->impl()->work_job->_error_code= GEARMAN_LOST_CONNECTION;
                 break;
               }
               worker->impl()->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_FAIL;
-              return worker->impl()->work_job->error_code;
+              return worker->impl()->work_job->_error_code;
 
             case GEARMAN_FUNCTION_ERROR: // retry 
               gearman_reset(worker->impl()->universal);
-              worker->impl()->work_job->error_code= GEARMAN_LOST_CONNECTION;
+              worker->impl()->work_job->_error_code= GEARMAN_LOST_CONNECTION;
               break;
 
             case GEARMAN_FUNCTION_SHUTDOWN:
@@ -1065,7 +1065,7 @@ gearman_return_t gearman_worker_work(gearman_worker_st *worker)
               break;
           }
 
-          if (worker->impl()->work_job->error_code == GEARMAN_LOST_CONNECTION)
+          if (worker->impl()->work_job->_error_code == GEARMAN_LOST_CONNECTION)
           {
             break;
           }
@@ -1073,12 +1073,12 @@ gearman_return_t gearman_worker_work(gearman_worker_st *worker)
 
       case GEARMAN_WORKER_WORK_UNIVERSAL_COMPLETE:
         {
-          worker->impl()->work_job->error_code= gearman_job_send_complete_fin(worker->impl()->work_job,
+          worker->impl()->work_job->_error_code= gearman_job_send_complete_fin(worker->impl()->work_job,
                                                                               worker->impl()->work_result, worker->impl()->work_result_size);
-          if (worker->impl()->work_job->error_code == GEARMAN_IO_WAIT)
+          if (worker->impl()->work_job->_error_code == GEARMAN_IO_WAIT)
           {
             worker->impl()->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_COMPLETE;
-            return gearman_error(worker->impl()->universal, worker->impl()->work_job->error_code,
+            return gearman_error(worker->impl()->universal, worker->impl()->work_job->_error_code,
                                  "A failure occurred after worker had successful complete, unless gearman_job_send_complete() was called directly by worker, client has not been informed of success.");
           }
 
@@ -1089,31 +1089,31 @@ gearman_return_t gearman_worker_work(gearman_worker_st *worker)
           }
 
           // If we lost the connection, we retry the work, otherwise we error
-          if (worker->impl()->work_job->error_code == GEARMAN_LOST_CONNECTION)
+          if (worker->impl()->work_job->_error_code == GEARMAN_LOST_CONNECTION)
           {
             break;
           }
-          else if (worker->impl()->work_job->error_code == GEARMAN_SHUTDOWN)
+          else if (worker->impl()->work_job->_error_code == GEARMAN_SHUTDOWN)
           { }
-          else if (gearman_failed(worker->impl()->work_job->error_code))
+          else if (gearman_failed(worker->impl()->work_job->_error_code))
           {
             worker->impl()->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_FAIL;
 
-            return worker->impl()->work_job->error_code;
+            return worker->impl()->work_job->_error_code;
           }
         }
         break;
 
       case GEARMAN_WORKER_WORK_UNIVERSAL_FAIL:
         {
-          if (gearman_failed(worker->impl()->work_job->error_code= gearman_job_send_fail_fin(worker->impl()->work_job)))
+          if (gearman_failed(worker->impl()->work_job->_error_code= gearman_job_send_fail_fin(worker->impl()->work_job)))
           {
-            if (worker->impl()->work_job->error_code == GEARMAN_LOST_CONNECTION)
+            if (worker->impl()->work_job->_error_code == GEARMAN_LOST_CONNECTION)
             {
               break;
             }
 
-            return worker->impl()->work_job->error_code;
+            return worker->impl()->work_job->_error_code;
           }
         }
         break;

+ 16 - 3
tests/libgearman-1.0/worker_test.cc

@@ -759,6 +759,12 @@ static test_return_t GEARMAN_FAIL_return_TEST(void *)
   return TEST_SUCCESS;
 }
 
+static gearman_return_t exception_fn(gearman_task_st* task)
+{
+  Out << "Task Handle: " <<  gearman_task_job_handle(task);
+  return GEARMAN_SUCCESS;
+}
+
 static test_return_t gearman_job_send_exception_mass_TEST(void *)
 {
   gearman_function_t call_exception_WORKER_FN= gearman_function_create(call_exception_WORKER);
@@ -772,6 +778,10 @@ static test_return_t gearman_job_send_exception_mass_TEST(void *)
 
   std::vector<gearman_task_st*> tasks;
   libgearman::Client client(libtest::default_port());
+
+  gearman_exception_fn *func= exception_fn;
+  gearman_client_set_exception_fn(&client, func);
+
   for (size_t x= 0; x < 100; ++x)
   {
     char buffer[GEARMAN_MAXIMUM_INTEGER_DISPLAY_LENGTH];
@@ -1371,14 +1381,17 @@ static test_return_t _increase_TEST(gearman_function_t &func, gearman_client_opt
     max_block_size= 24;
   }
 
+  libtest::vchar_t workload;
+  libtest::vchar::make(workload, block_size);
+
   for (size_t x= 1; x < max_block_size; ++x)
   {
     if (valgrind_is_caller() and (x * block_size) > 15728640)
     {
       continue;
     }
-    libtest::vchar_t workload;
-    libtest::vchar::make(workload, x * block_size);
+
+    workload.resize(x * block_size);
 
     gearman_argument_t value= gearman_argument_make(0, 0, vchar_param(workload));
 
@@ -1403,7 +1416,7 @@ static test_return_t _increase_TEST(gearman_function_t &func, gearman_client_opt
                  gearman_task_return(task));
 
     gearman_result_st *result= gearman_task_result(task);
-    test_true(result);
+    ASSERT_TRUE(result);
     ASSERT_EQ(gearman_result_size(result), workload.size());
   }