Browse Source

Implement cancel via job_handle in client.

Brian Aker 12 years ago
parent
commit
edf64b4ab3

+ 1 - 0
Makefile.am

@@ -40,6 +40,7 @@ include libtest/include.am
 include libhostile/include.am
 include libhashkit-1.0/include.am
 
+include libgearmancore/include.am
 include libhashkit/include.am
 include benchmark/include.am
 include bin/include.am

+ 1 - 0
libgearman-1.0/gearman.h

@@ -102,3 +102,4 @@
 #include <libgearman-1.0/client.h>
 #include <libgearman-1.0/connection.h>
 #include <libgearman-1.0/parse.h>
+#include <libgearman-1.0/cancel.h>

+ 1 - 0
libgearman-1.0/include.am

@@ -4,6 +4,7 @@
 
 include libgearman-1.0/t/include.am
 
+nobase_include_HEADERS+= libgearman-1.0/cancel.h
 nobase_include_HEADERS+= libgearman-1.0/interface/client.h
 nobase_include_HEADERS+= libgearman-1.0/interface/status.h
 nobase_include_HEADERS+= libgearman-1.0/interface/task.h

+ 2 - 2
libgearman-server/server.cc

@@ -136,7 +136,7 @@ gearmand_error_t gearman_server_run_command(gearman_server_con_st *server_con,
       {
         gearman_server_client_free(server_client);
         gearmand_gerror("unique value too large", GEARMAN_ARGUMENT_TOO_LARGE);
-        return _server_error_packet(server_con, "job failure", "Unique value too large");
+        return _server_error_packet(server_con, "job_failure", "Unique value too large");
       }
 
       gearman_job_priority_t map_priority= GEARMAN_JOB_PRIORITY_NORMAL;
@@ -258,7 +258,7 @@ gearmand_error_t gearman_server_run_command(gearman_server_con_st *server_con,
       {
         gearmand_gerror("unique value too large", GEARMAN_ARGUMENT_TOO_LARGE);
         gearman_server_client_free(server_client);
-        return _server_error_packet(server_con, "job failure", "Unique value too large");
+        return _server_error_packet(server_con, "job_failure", "Unique value too large");
       }
 
       /* Schedule job. */

+ 16 - 9
libgearman/check.cc

@@ -2,7 +2,7 @@
  * 
  *  Gearmand client and server library.
  *
- *  Copyright (C) 2011-2013 Data Differential, http://datadifferential.com/
+ *  Copyright (C) 2011-2013 Data Differential, http://datadifferential.com/ 
  *  All rights reserved.
  *
  *  Redistribution and use in source and binary forms, with or without
@@ -35,12 +35,6 @@
  *
  */
 
-
-/**
- * @file
- * @brief Gearman State Definitions
- */
-
 #include "gear_config.h"
 
 #include "libgearman/common.h"
@@ -69,9 +63,22 @@ gearman_return_t EchoCheck::success(gearman_connection_st* con)
   return GEARMAN_SUCCESS;
 }
 
-OptionCheck::OptionCheck(gearman_universal_st& universal_):
-    _universal(universal_)
+gearman_return_t CancelCheck::success(gearman_connection_st* con)
 {
+  if (con->_packet.command == GEARMAN_COMMAND_ERROR)
+  {
+    if (con->_packet.argc)
+    {
+      return gearman_universal_set_error(_universal, GEARMAN_SERVER_ERROR, GEARMAN_AT, "%d: %.*s:%.*s", con->_packet.argc,
+                                         con->_packet.arg_size[0], con->_packet.arg[0],
+                                         con->_packet.arg_size[1], con->_packet.arg[1]
+                                        );
+    }
+
+    return gearman_error(_universal, GEARMAN_SERVER_ERROR, "server lacks support for client's to cancel a job");
+  }
+
+  return GEARMAN_SUCCESS;
 }
 
 gearman_return_t OptionCheck::success(gearman_connection_st* con)

+ 17 - 1
libgearman/check.h

@@ -59,7 +59,23 @@ private:
 
 class OptionCheck : public Check {
 public:
-  OptionCheck(gearman_universal_st& universal_);
+  OptionCheck(gearman_universal_st& universal_):
+    _universal(universal_)
+    {
+    }
+
+  gearman_return_t success(gearman_connection_st* con);
+
+private:
+  gearman_universal_st& _universal;
+};
+
+class CancelCheck : public Check {
+public:
+  CancelCheck(gearman_universal_st& universal_):
+    _universal(universal_)
+    {
+    }
 
   gearman_return_t success(gearman_connection_st* con);
 

+ 13 - 0
libgearman/client.cc

@@ -614,6 +614,19 @@ gearman_return_t gearman_client_wait(gearman_client_st *client_shell)
   return GEARMAN_INVALID_ARGUMENT;
 }
 
+gearman_return_t gearman_client_cancel_job(gearman_client_st *client_shell,
+                                           gearman_job_handle_t job_handle)
+{
+  if (client_shell and client_shell->impl())
+  {
+    client_shell->impl()->universal.reset_error();
+
+    return cancel_job(client_shell->impl()->universal, job_handle);
+  }
+
+  return GEARMAN_INVALID_ARGUMENT;
+}
+
 void *gearman_client_do(gearman_client_st *client_shell,
                         const char *function,
                         const char *unique,

+ 1 - 22
libgearman/include.am

@@ -63,28 +63,6 @@ noinst_HEADERS+= \
 		 libgearman/universal.hpp \
 		 libgearman/vector.hpp
 
-noinst_LTLIBRARIES+= libgearman/libgearmancore.la
-
-libgearman_libgearmancore_la_SOURCES= \
-				      libgearman/allocator.cc \
-				      libgearman/backtrace.cc \
-				      libgearman/byteorder.cc \
-				      libgearman/check.cc \
-				      libgearman/command.cc \
-				      libgearman/connection.cc \
-				      libgearman/error.cc \
-				      libgearman/log.cc \
-				      libgearman/packet.cc \
-				      libgearman/strcommand.cc \
-				      libgearman/strerror.cc \
-				      libgearman/server_options.cc \
-				      libgearman/universal.cc \
-				      libgearman/vector.cc
-
-libgearman_libgearmancore_la_CXXFLAGS= -DBUILDING_LIBGEARMAN
-libgearman_libgearmancore_la_LIBADD= @DL_LIB@
-libgearman_libgearmancore_la_LIBADD+= libhashkit/libhashkit.la
-
 noinst_LTLIBRARIES+= libgearman/libgearman-vector.la
 libgearman_libgearman_vector_la_CXXFLAGS= -DBUILDING_LIBGEARMAN
 libgearman_libgearman_vector_la_LIBADD= @DL_LIB@
@@ -104,6 +82,7 @@ libgearman_libgearman_la_CXXFLAGS=
 libgearman_libgearman_la_LDFLAGS=
 
 lib_LTLIBRARIES+= libgearman/libgearman.la
+libgearman_libgearman_la_SOURCES+= libgearman/check.cc
 libgearman_libgearman_la_SOURCES+= \
 				  libgearman/actions.cc \
 				  libgearman/aggregator.cc \

+ 4 - 2
libgearman/job.cc

@@ -519,12 +519,14 @@ gearman_return_t gearman_job_send_fail_fin(gearman_job_st *job)
       job->options.work_in_use= true;
     }
 
-    gearman_return_t ret;
-    ret= _job_send(job);
+    gearman_return_t ret= _job_send(job);
     if (gearman_failed(ret))
+    {
       return ret;
+    }
 
     job->options.finished= true;
+
     return GEARMAN_SUCCESS;
   }
 

+ 40 - 39
libgearman/universal.cc

@@ -427,45 +427,6 @@ gearman_return_t gearman_set_identifier(gearman_universal_st& universal,
   return ret;
 }
 
-EchoCheck::EchoCheck(gearman_universal_st& universal_,
-    const void *workload_, const size_t workload_size_) :
-    _universal(universal_),
-    _workload(workload_),
-    _workload_size(workload_size_)
-{
-}
-
-gearman_return_t EchoCheck::success(gearman_connection_st* con)
-{
-  if (con->_packet.command != GEARMAN_COMMAND_ECHO_RES)
-  {
-    return gearman_error(_universal, GEARMAN_INVALID_COMMAND, "Wrong command sent in response to ECHO request");
-  }
-
-  if (con->_packet.data_size != _workload_size or
-      memcmp(_workload, con->_packet.data, _workload_size))
-  {
-    return gearman_error(_universal, GEARMAN_ECHO_DATA_CORRUPTION, "corruption during echo");
-  }
-
-  return GEARMAN_SUCCESS;
-}
-
-OptionCheck::OptionCheck(gearman_universal_st& universal_):
-    _universal(universal_)
-{
-}
-
-gearman_return_t OptionCheck::success(gearman_connection_st* con)
-{
-  if (con->_packet.command == GEARMAN_COMMAND_ERROR)
-  {
-    return gearman_error(_universal, GEARMAN_INVALID_SERVER_OPTION, "invalid server option");
-  }
-
-  return GEARMAN_SUCCESS;
-}
-
 static gearman_return_t connection_loop(gearman_universal_st& universal,
                                         const gearman_packet_st& message,
                                         Check& check)
@@ -571,6 +532,46 @@ gearman_return_t gearman_echo(gearman_universal_st& universal,
   return ret;
 }
 
+gearman_return_t cancel_job(gearman_universal_st& universal,
+                            gearman_job_handle_t job_handle)
+{
+  if (universal.has_connections() == false)
+  {
+    return gearman_universal_set_error(universal, GEARMAN_NO_SERVERS, GEARMAN_AT, "no servers provided");
+  }
+
+  const void *args[1];
+  size_t args_size[1];
+
+  args[0]= job_handle;
+  args_size[0]= strlen(job_handle);
+
+  gearman_packet_st cancel_packet;
+
+  gearman_return_t ret= gearman_packet_create_args(universal,
+                                                   cancel_packet,
+                                                   GEARMAN_MAGIC_REQUEST,
+                                                   GEARMAN_COMMAND_WORK_FAIL,
+                                                   args, args_size, 1);
+  if (gearman_success(ret))
+  {
+    PUSH_BLOCKING(universal);
+
+    CancelCheck check(universal);
+    ret= connection_loop(universal, cancel_packet, check);
+  }
+  else
+  {
+    gearman_packet_free(&cancel_packet);
+    gearman_error(universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "gearman_packet_create_args()");
+    return ret;
+  }
+
+  gearman_packet_free(&cancel_packet);
+
+  return ret;
+}
+
 void gearman_free_all_packets(gearman_universal_st &universal)
 {
   while (universal.packet_list)

Some files were not shown because too many files changed in this diff