Browse Source

Fix for removing job from durable queue on cancel.

Brian Aker 11 years ago
parent
commit
1eed6f5f7a

+ 37 - 5
libgearman-server/gearmand_con.cc

@@ -44,6 +44,7 @@
 #include "gear_config.h"
 #include "libgearman-server/common.h"
 #include <libgearman-server/gearmand.h>
+#include <libgearman-server/queue.h>
 #include <cstring>
 
 #include <cerrno>
@@ -253,10 +254,11 @@ gearman_server_job_st *gearman_server_job_get(gearman_server_st *server,
   return NULL;
 }
 
-bool gearman_server_job_cancel(gearman_server_st& server,
-                               const char *job_handle,
-                               const size_t job_handle_length)
+gearmand_error_t gearman_server_job_cancel(gearman_server_st& server,
+                                           const char *job_handle,
+                                           const size_t job_handle_length)
 {
+  gearmand_error_t ret= GEARMAND_NO_JOBS;
   uint32_t key= _server_job_hash(job_handle, job_handle_length);
 
   gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "cancel: %.*s", int(job_handle_length), job_handle);
@@ -268,14 +270,44 @@ bool gearman_server_job_cancel(gearman_server_st& server,
     if (server_job->job_handle_key == key and
         strncmp(server_job->job_handle, job_handle, GEARMAND_JOB_HANDLE_SIZE) == 0)
     {
+      /* Queue the fail packet for all clients. */
+      for (gearman_server_client_st* client= server_job->client_list; client != NULL; client= client->job_next)
+      {
+        ret= gearman_server_io_packet_add(client->con, false,
+                                          GEARMAN_MAGIC_RESPONSE,
+                                          GEARMAN_COMMAND_WORK_FAIL,
+                                          server_job->job_handle,
+                                          (size_t)strlen(server_job->job_handle),
+                                          NULL);
+        if (gearmand_failed(ret))
+        {
+          gearmand_log_gerror_warn(GEARMAN_DEFAULT_LOG_PARAM, ret, "Failed to send WORK_FAIL packet to %s:%s", client->con->host(), client->con->port());
+        }
+      }
+
+      /* Remove from persistent queue if one exists. */
+      if (server_job->job_queued)
+      {
+        ret= gearman_queue_done(Server,
+                                server_job->unique,
+                                server_job->unique_length,
+                                server_job->function->function_name,
+                                server_job->function->function_name_size);
+        if (gearmand_failed(ret))
+        {
+          gearmand_gerror("Remove from persistent queue", ret);
+          return ret;
+        }
+      }
+
       server_job->ignore_job= true;
       server_job->job_queued= false;
 
-      return true;
+      return GEARMAND_SUCCESS;
     }
   }
 
-  return false;
+  return ret;
 }
 
 gearman_server_job_st * gearman_server_job_peek(gearman_server_con_st *server_con)

+ 4 - 3
libgearman-server/gearmand_con.h

@@ -78,9 +78,10 @@ gearmand_error_t gearmand_con_create(gearmand_st *gearmand, int&,
                                      const char *host, const char*,
                                      struct gearmand_port_st*);
 
-bool gearman_server_job_cancel(gearman_server_st& server,
-                               const char *job_handle,
-                               const size_t job_handle_length);
+GEARMAN_API
+gearmand_error_t gearman_server_job_cancel(gearman_server_st& server,
+                                           const char *job_handle,
+                                           const size_t job_handle_length);
 
 /**
  * Free resources used by a connection.

+ 1 - 2
libgearman-server/job.cc

@@ -403,8 +403,7 @@ gearmand_error_t gearman_server_job_queue(gearman_server_job_st *job)
                                                            GEARMAN_COMMAND_NOOP, NULL);
         if (gearmand_failed(ret))
         {
-          gearmand_gerror("gearman_server_io_packet_add", ret);
-          return ret;
+          gearmand_log_gerror_warn(GEARMAN_DEFAULT_LOG_PARAM, ret, "Failed to send NOOP packet to %s:%s", worker->con->host(), worker->con->port());
         }
 
         worker->con->is_noop_sent= true;

+ 7 - 1
libgearman-server/text.cc

@@ -144,10 +144,16 @@ gearmand_error_t server_run_text(gearman_server_con_st *server_con,
     if (packet->argc == 3
         and strcasecmp("job", (char *)(packet->arg[1])) == 0)
     {
-      if (gearman_server_job_cancel(Gearmand()->server, packet->arg[2], strlen(packet->arg[2])))
+      gearmand_error_t ret= gearman_server_job_cancel(Gearmand()->server, packet->arg[2], strlen(packet->arg[2]));
+
+      if (ret == GEARMAND_SUCCESS)
       {
         data.vec_printf(TEXT_SUCCESS);
       }
+      else if (ret != GEARMAND_NO_JOBS)
+      {
+        data.vec_printf(TEXT_ERROR_INTERNAL_ERROR);
+      }
       else
       {
         data.vec_printf(TEXT_ERROR_UNKNOWN_JOB);

+ 4 - 0
libtest/client.hpp

@@ -38,6 +38,9 @@
 
 #include "libgearman/ssl.h"
 
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wunused-private-field"
+
 namespace libtest {
 
 class SimpleClient {
@@ -107,3 +110,4 @@ private:
 };
 
 } // namespace libtest
+#pragma GCC diagnostic pop

+ 5 - 0
util/instance.hpp

@@ -52,6 +52,9 @@
 
 struct addrinfo;
 
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wunused-private-field"
+
 namespace datadifferential {
 namespace util {
 
@@ -163,3 +166,5 @@ private:
 
 } /* namespace util */
 } /* namespace datadifferential */
+
+#pragma GCC diagnostic pop