Browse Source

Work to support cancel.

Brian Aker 12 years ago
parent
commit
1df9dc425f

+ 2 - 2
benchmark/blobslap_worker.cc

@@ -2,7 +2,7 @@
  * 
  *  Gearmand client and server library.
  *
- *  Copyright (C) 2011 Data Differential, http://datadifferential.com/
+ *  Copyright (C) 2011-2013 Data Differential, http://datadifferential.com/
  *  Copyright (C) 2008 Brian Aker, Eric Day
  *  All rights reserved.
  *
@@ -271,7 +271,7 @@ static void *worker_fn(gearman_job_st *job, void *context,
 
   if (benchmark->verbose > 1)
   {
-    std::cout << "Job=%s (" << gearman_job_workload_size(job) << ")" << std::endl;
+    std::cout << "Job=" << gearman_job_handle(job) << " (" << gearman_job_workload_size(job) << ")" << std::endl;
   }
 
   *ret_ptr= GEARMAN_SUCCESS;

+ 26 - 1
libgearman-server/gearmand_con.cc

@@ -2,7 +2,7 @@
  * 
  *  Gearmand client and server library.
  *
- *  Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
+ *  Copyright (C) 2011-2013 Data Differential, http://datadifferential.com/
  *  Copyright (C) 2008 Brian Aker, Eric Day
  *  All rights reserved.
  *
@@ -234,6 +234,31 @@ gearman_server_job_st *gearman_server_job_get(gearman_server_st *server,
   return NULL;
 }
 
+bool gearman_server_job_cancel(gearman_server_st& server,
+                               const char *job_handle,
+                               const size_t job_handle_length)
+{
+  uint32_t key= _server_job_hash(job_handle, job_handle_length);
+
+  gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "cancel: %.*s", int(job_handle_length), job_handle);
+
+  for (gearman_server_job_st *server_job= server.job_hash[key % server.hashtable_buckets];
+       server_job != NULL;
+       server_job= server_job->next)
+  {
+    if (server_job->job_handle_key == key and
+        strncmp(server_job->job_handle, job_handle, GEARMAND_JOB_HANDLE_SIZE) == 0)
+    {
+      server_job->ignore_job= true;
+      server_job->job_queued= false;
+
+      return true;
+    }
+  }
+
+  return false;
+}
+
 gearman_server_job_st * gearman_server_job_peek(gearman_server_con_st *server_con)
 {
   for (gearman_server_worker_st *server_worker= server_con->worker_list;

+ 4 - 0
libgearman-server/gearmand_con.h

@@ -78,6 +78,10 @@ gearmand_error_t gearmand_con_create(gearmand_st *gearmand, int fd,
                                      const char *host, const char *port,
                                      gearmand_connection_add_fn *add_fn);
 
+bool gearman_server_job_cancel(gearman_server_st& server,
+                               const char *job_handle,
+                               const size_t job_handle_length);
+
 /**
  * Free resources used by a connection.
  * @param dcon Connection previously initialized with gearmand_con_create.

+ 3 - 0
libgearman-server/job.cc

@@ -236,6 +236,9 @@ gearman_server_job_add_reducer(gearman_server_st *server,
     key= key % server->hashtable_buckets;
     GEARMAN_HASH__ADD(server->job, key, server_job);
 
+    gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "JOB %s :%u",
+                       server_job->job_handle, server_job->job_handle_key);
+
     if (server->state.queue_startup)
     {
       server_job->job_queued= true;

+ 6 - 6
libgearman-server/server.cc

@@ -196,13 +196,13 @@ gearmand_error_t gearman_server_run_command(gearman_server_con_st *server_con,
     {
       gearman_job_priority_t priority;
 
-      if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB ||
-          packet->command == GEARMAN_COMMAND_SUBMIT_JOB_BG ||
+      if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB or
+          packet->command == GEARMAN_COMMAND_SUBMIT_JOB_BG or
           packet->command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH)
       {
         priority= GEARMAN_JOB_PRIORITY_NORMAL;
       }
-      else if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH ||
+      else if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH or
                packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG)
       {
         priority= GEARMAN_JOB_PRIORITY_HIGH;
@@ -212,9 +212,9 @@ gearmand_error_t gearman_server_run_command(gearman_server_con_st *server_con,
         priority= GEARMAN_JOB_PRIORITY_LOW;
       }
 
-      if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_BG ||
-          packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG ||
-          packet->command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG ||
+      if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_BG or
+          packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG or
+          packet->command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG or
           packet->command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH)
       {
         server_client= NULL;

+ 10 - 2
libgearman-server/text.cc

@@ -135,7 +135,14 @@ gearmand_error_t server_run_text(gearman_server_con_st *server_con,
     if (packet->argc == 3
         and strcasecmp("job", (char *)(packet->arg[1])) == 0)
     {
-      data.vec_printf(TEXT_ERROR_UNKNOWN_JOB);
+      if (gearman_server_job_cancel(Gearmand()->server, packet->arg[2], strlen(packet->arg[2])))
+      {
+        data.vec_printf(TEXT_SUCCESS);
+      }
+      else
+      {
+        data.vec_printf(TEXT_ERROR_UNKNOWN_JOB);
+      }
     }
   }
   else if (packet->argc >= 2 and strcasecmp("show", (char *)(packet->arg[0])) == 0)
@@ -165,7 +172,8 @@ gearmand_error_t server_run_text(gearman_server_con_st *server_con,
              server_job != NULL;
              server_job= server_job->next)
         {
-          data.vec_append_printf("%s\n", server_job->job_handle);
+          data.vec_append_printf("%s\t%u\t%u\t%u\n", server_job->job_handle, uint32_t(server_job->retries),
+                                 uint32_t(server_job->ignore_job), uint32_t(server_job->job_queued));
         }
       }