Browse Source

Add in workers that are v2 (ie, lets make sure we have tests on them).

Unified more of the internal interface for function execution.
Brian Aker 13 years ago
parent
commit
ebce745f74
10 changed files with 170 additions and 201 deletions
  1. 42 52
      docs/libgearman/gearman_return_t.rst
  2. 0 1
      libgearman/actions.cc
  3. 10 9
      libgearman/constants.h
  4. 33 76
      libtest/worker.cc
  5. 6 17
      libtest/worker.h
  6. 3 2
      tests/basic.cc
  7. 2 1
      tests/burnin.cc
  8. 66 37
      tests/client_test.cc
  9. 4 2
      tests/include.am
  10. 4 4
      tests/task.cc

+ 42 - 52
docs/libgearman/gearman_return_t.rst

@@ -37,90 +37,83 @@ You can print a text version of the error message with :c:func:`gearman_strerror
 occurred. This should be used for testing loops.
 
 Possible values of :c:type:`gearman_return_t`:
+++++++++++++++++++++++++++++++++++++++++++++++
 
 .. c:type:: GEARMAN_SUCCESS
 
-Success
+  Success
 
 .. c:type:: GEARMAN_NO_PENDING_TASKS
 
-:c:func:`gearman_client_run_tasks()` was called and it has completed all tasks assigned to the client.
+  :c:func:`gearman_client_run_tasks()` was called and it has completed all tasks assigned to the client.
 
 .. c:type:: GEARMAN_IO_WAIT 
 
-Blocking IO was found. gearman_continue() can be used to
-test for this.
+  Blocking IO was found. gearman_continue() can be used to test for this.
 
 .. c:type:: GEARMAN_ERRNO 
 
-System error occurred. Use either :c:func:`gearman_client_errno()` or :c:func:`gearman_worker_errno()` 
+  System error occurred. Use either :c:func:`gearman_client_errno()` or :c:func:`gearman_worker_errno()` 
 
 .. c:type:: GEARMAN_NO_ACTIVE_FDS 
 
-No active connections were available.  gearman_continue() can be used to test for this.
+   No active connections were available.  gearman_continue() can be used to test for this.
 
 .. c:type:: GEARMAN_GETADDRINFO 
 
-Name resolution failed for a host.
+   Name resolution failed for a host.
 
 .. c:type:: GEARMAN_NO_SERVERS 
 
-No servers have been provided for the client/worker.
+   No servers have been provided for the client/worker.
 
 .. c:type:: GEARMAN_LOST_CONNECTION 
 
-Connection was lost to the given server.
+   Connection was lost to the given server.
 
 .. c:type:: GEARMAN_MEMORY_ALLOCATION_FAILURE 
 
-Memory allocation failed.
+   Memory allocation failed.
 
 .. c:type:: GEARMAN_SERVER_ERROR 
 
-An error occurred on the server.
+   An error occurred on the server.
 
 .. c:type:: GEARMAN_NOT_CONNECTED 
 
-Client/Worker is not currently connected to the
-server.
+   Client/Worker is not currently connected to the server.
 
 .. c:type:: GEARMAN_COULD_NOT_CONNECT 
 
-Server name was valid, but a connection could not
-be made.
+   Server name was valid, but a connection could not be made.
 
 .. c:type:: GEARMAN_ECHO_DATA_CORRUPTION 
 
-Either :c:func:`gearman_client_echo()` or
-:c:func:`gearman_work_echo()` echo was unsuccessful because the data was returned from :program:`gearmand` corrupted.
+   Either :c:func:`gearman_client_echo()` or :c:func:`gearman_work_echo()` echo was unsuccessful because the data was returned from :program:`gearmand` corrupted.
 
 .. c:type:: GEARMAN_UNKNOWN_STATE 
 
-The gearman_return_t was never set.
+   The gearman_return_t was never set.
 
 .. c:type:: GEARMAN_FLUSH_DATA 
 
-Internal state, should never be seen by either client or worker.
+   Internal state, should never be seen by either client or worker.
 
 .. c:type:: GEARMAN_SEND_BUFFER_TOO_SMALL 
 
-Send buffer was too small.
+   Send buffer was too small.
 
 .. c:type:: GEARMAN_TIMEOUT 
 
-A timeout occurred when making a request to the server.
+   A timeout occurred when making a request to the server.
 
 .. c:type:: GEARMAN_ARGUMENT_TOO_LARGE 
 
-Argument was too large for the current buffer.
+   Argument was too large for the current buffer.
 
 .. c:type:: GEARMAN_INVALID_ARGUMENT 
 
-One of the arguments to the given API call was invalid. EINVAL will be set
-if :c:func:`gearman_client_error()` or :c:func:`gearman_worker_error()` were
-not settable. This can also be returned if
-:c:type:`GEARMAN_CLIENT_UNBUFFERED_RESULT` was set, but the client is not
-handling the data correctly.
+   One of the arguments to the given API call was invalid. EINVAL will be set if :c:func:`gearman_client_error()` or :c:func:`gearman_worker_error()` were not settable. This can also be returned if :c:type:`GEARMAN_CLIENT_UNBUFFERED_RESULT` was set, but the client is not handling the data correctly.
 
 
 ***********
@@ -129,20 +122,19 @@ CLIENT ONLY
 
 .. c:type:: GEARMAN_NEED_WORKLOAD_FN 
 
-A client was asked for work, but no :c:type:`gearman_workload_fn` callback was
-specified. See :c:func:`gearman_client_set_workload_fn()`
+   A client was asked for work, but no :c:type:`gearman_workload_fn` callback was specified. See :c:func:`gearman_client_set_workload_fn()`
 
 .. c:type:: GEARMAN_WORK_FAIL  
 
-A task has failed, and the worker has exited with an error or it called :c:func:`gearman_job_send_fail()`
+   A task has failed, and the worker has exited with an error or it called :c:func:`gearman_job_send_fail()`
 
 .. c:type:: GEARMAN_IN_PROGRESS
 
-:c:func:`gearman_client_job_status()` has been called for a :c:type:`gearman_job_handle_t` and the Job is currently being run by a worker.
+   :c:func:`gearman_client_job_status()` has been called for a :c:type:`gearman_job_handle_t` and the Job is currently being run by a worker.
 
 .. c:type:: GEARMAN_JOB_EXISTS
 
-:c:func:`gearman_client_job_status()` has been called for a :c:type:`gearman_job_handle_t` and the Job is currently known by a server, but is not being run by a worker.
+   :c:func:`gearman_client_job_status()` has been called for a :c:type:`gearman_job_handle_t` and the Job is currently known by a server, but is not being run by a worker.
 
 ***********
 WORKER ONLY
@@ -150,23 +142,23 @@ WORKER ONLY
 
 .. c:type:: GEARMAN_INVALID_FUNCTION_NAME 
 
-A worker was sent a request for a job that it did not have a valid function for.
+   A worker was sent a request for a job that it did not have a valid function for.
 
 .. c:type:: GEARMAN_INVALID_WORKER_FUNCTION 
 
-No callback was provided by the worker for a given function.
+   No callback was provided by the worker for a given function.
 
 .. c:type:: GEARMAN_NO_REGISTERED_FUNCTION 
 
-A request for removing a given function from a worker was invalid since that function did not exist.
+   A request for removing a given function from a worker was invalid since that function did not exist.
 
 .. c:type:: GEARMAN_NO_REGISTERED_FUNCTIONS 
 
-The worker has not registered any functions.
+   The worker has not registered any functions.
 
 .. c:type:: GEARMAN_NO_JOBS 
 
-No jobs were found for the worker.
+   No jobs were found for the worker.
 
 ****************
 WORKER TO CLIENT
@@ -177,31 +169,31 @@ value as return values to the calling client.
 
 .. c:type:: GEARMAN_WORK_DATA 
 
-Worker has sent a chunked piece of data to the client via :c:func:`gearman_job_send_data()`
+   Worker has sent a chunked piece of data to the client via :c:func:`gearman_job_send_data()`
 
 .. c:type:: GEARMAN_WORK_WARNING 
 
-Worker has issued a warning to the client via :c:func:`gearman_job_send_warning()`
+   Worker has issued a warning to the client via :c:func:`gearman_job_send_warning()`
 
 .. c:type:: GEARMAN_WORK_STATUS 
 
-Status has been updated by the worker via :c:func:`gearman_job_send_status()`
+   Status has been updated by the worker via :c:func:`gearman_job_send_status()`
 
 .. c:type:: GEARMAN_WORK_EXCEPTION 
 
-Worker has sent an exception the client via :c:func:`gearman_job_send_exception()`
+   Worker has sent an exception the client via :c:func:`gearman_job_send_exception()`
 
 .. c:type:: GEARMAN_WORK_FAIL  
 
-A task has failed, and the worker has exited with an error or it called :c:func:`gearman_job_send_fail()`
+   A task has failed, and the worker has exited with an error or it called :c:func:`gearman_job_send_fail()`
 
 .. c:type:: GEARMAN_WORK_ERROR  
 
-A task has had an error and will be retried.
+   A task has had an error and will be retried.
 
 .. c:type:: GEARMAN_PAUSE 
 
-Used only in custom application for client return based on :c:type:`GEARMAN_WORK_DATA`, :c:type:`GEARMAN_WORK_WARNING`, :c:type:`GEARMAN_WORK_EXCEPTION`, :c:type:`GEARMAN_WORK_FAIL`, or :c:type:`GEARMAN_WORK_STATUS`. :c:func:`gearman_continue()` can be used to check for this value.
+   Used only in custom application for client return based on :c:type:`GEARMAN_WORK_DATA`, :c:type:`GEARMAN_WORK_WARNING`, :c:type:`GEARMAN_WORK_EXCEPTION`, :c:type:`GEARMAN_WORK_FAIL`, or :c:type:`GEARMAN_WORK_STATUS`. :c:func:`gearman_continue()` can be used to check for this value.
 
 ****************
 WORKER TO CLIENT
@@ -211,21 +203,19 @@ Any function defined by :c:func:`gearman_worker_define_function()` may, and can
 
 .. c:type:: GEARMAN_SUCCESS 
 
-The function successfully completed the job.
+   The function successfully completed the job.
 
 .. c:type:: GEARMAN_FATAL  
 
-The function failed to complete the job.
+   The function failed to complete the job.
 
 .. c:type:: GEARMAN_ERROR  
 
-A task has had an error and will be retried.
+   A task has had an error and will be retried.
 
 .. c:type:: GEARMAN_SHUTDOWN  
 
-:c:type:`GEARMAN_SHUTDOWN` is a special case. If it is returned the client
-will be sent :c:type:`GEARMAN_SUCCESS`, but :c:func:`gearman_worker_work()`
-will exit with :c:type:`GEARMAN_SHUTDOWN`.
+   :c:type:`GEARMAN_SHUTDOWN` is a special case. If it is returned the client will be sent :c:type:`GEARMAN_SUCCESS`, but :c:func:`gearman_worker_work()` will exit with :c:type:`GEARMAN_SHUTDOWN`.
 
 
 *********
@@ -234,11 +224,11 @@ TASK ONLY
 
 .. c:type:: GEARMAN_NOT_FLUSHING
 
-:c:func:`gearman_task_send_workload()` failed, it was not in the correct state. 
+   :c:func:`gearman_task_send_workload()` failed, it was not in the correct state. 
 
 .. c:type:: GEARMAN_DATA_TOO_LARGE 
 
-:c:func:`gearman_task_send_workload()` failed, the data was too large to be sent.
+   :c:func:`gearman_task_send_workload()` failed, the data was too large to be sent.
 
 ********
 PROTOCOL

+ 0 - 1
libgearman/actions.cc

@@ -43,7 +43,6 @@
 #include <libgearman/result.hpp>
 #include <cassert>
 #include <memory>
-#include <iostream>
 
 struct gearman_result_st;
 

+ 10 - 9
libgearman/constants.h

@@ -57,19 +57,20 @@ extern "C" {
  */
 
 /* Defines. */
-#define GEARMAN_DEFAULT_TCP_HOST "127.0.0.1"
-#define GEARMAN_DEFAULT_SOCKET_TIMEOUT 10
-#define GEARMAN_DEFAULT_SOCKET_SEND_SIZE 32768
+#define GEARMAN_ARGS_BUFFER_SIZE 128
 #define GEARMAN_DEFAULT_SOCKET_RECV_SIZE 32768
-#define GEARMAN_MAX_ERROR_SIZE 1024
-#define GEARMAN_PACKET_HEADER_SIZE 12
+#define GEARMAN_DEFAULT_SOCKET_SEND_SIZE 32768
+#define GEARMAN_DEFAULT_SOCKET_TIMEOUT 10
+#define GEARMAN_DEFAULT_TCP_HOST "127.0.0.1"
 #define GEARMAN_JOB_HANDLE_SIZE 64
-#define GEARMAN_OPTION_SIZE 64
-#define GEARMAN_UNIQUE_SIZE 64
+#define GEARMAN_MAXIMUM_INTEGER_DISPLAY_LENGTH 20
 #define GEARMAN_MAX_COMMAND_ARGS 8
-#define GEARMAN_ARGS_BUFFER_SIZE 128
-#define GEARMAN_SEND_BUFFER_SIZE 8192
+#define GEARMAN_MAX_ERROR_SIZE 1024
+#define GEARMAN_OPTION_SIZE 64
+#define GEARMAN_PACKET_HEADER_SIZE 12
 #define GEARMAN_RECV_BUFFER_SIZE 8192
+#define GEARMAN_SEND_BUFFER_SIZE 8192
+#define GEARMAN_UNIQUE_SIZE 64
 #define GEARMAN_WORKER_WAIT_TIMEOUT (10 * 1000) /* Milliseconds */
 
 /**

+ 33 - 76
libtest/worker.cc

@@ -28,108 +28,93 @@
 struct context_st {
   in_port_t port;
   const char *function_name;
-  void *function_arg;
   struct worker_handle_st *handle;
   gearman_worker_options_t options;
-  gearman_worker_fn *worker_fn;
-  gearman_function_fn *partition_fn;
-  gearman_aggregator_fn *aggregator_fn;
+  gearman_function_t& worker_fn;
   const char *namespace_key;
+  void *context;
 
-  context_st() :
+  context_st(gearman_function_t& arg) :
     port(0),
-    function_arg(0),
     handle(0),
     options(gearman_worker_options_t()),
-    partition_fn(0),
-    aggregator_fn(0),
-    namespace_key(NULL)
+    worker_fn(arg),
+    namespace_key(NULL),
+    context(0)
   { }
 };
 
 static void *thread_runner(void *con)
 {
   struct context_st *context= (struct context_st *)con;
-  gearman_worker_st worker, *worker_ptr;
 
-  worker_ptr= gearman_worker_create(&worker);
-  assert(worker_ptr);
+  assert(context);
+
+  gearman_worker_st *worker= gearman_worker_create(NULL);
+  assert(worker);
 
   if (context->namespace_key)
-    gearman_worker_set_namespace(worker_ptr, context->namespace_key, strlen(context->namespace_key));
+  {
+    gearman_worker_set_namespace(worker, context->namespace_key, strlen(context->namespace_key));
+  }
 
-  gearman_return_t rc;
-  rc= gearman_worker_add_server(&worker, NULL, context->port);
+  gearman_return_t rc= gearman_worker_add_server(worker, NULL, context->port);
   assert(rc == GEARMAN_SUCCESS);
 
-  bool success= gearman_worker_set_server_option(&worker, gearman_literal_param("exceptions"));
+  bool success= gearman_worker_set_server_option(worker, gearman_literal_param("exceptions"));
   assert(success);
 
-  if (context->aggregator_fn)
-  {
-    gearman_function_t function= gearman_function_create_partition(context->partition_fn, context->aggregator_fn);
-    rc= gearman_worker_define_function(&worker,
-                                       context->function_name, strlen(context->function_name), 
-                                       function,
-                                       0, 
-                                       context->function_arg);
-  }
-  else
-  {
-    rc= gearman_worker_add_function(&worker, context->function_name, 0, context->worker_fn,
-                                    context->function_arg);
-  }
+  rc= gearman_worker_define_function(worker,
+				     context->function_name, strlen(context->function_name), 
+				     context->worker_fn,
+				     0, 
+				     context->context);
   assert(rc == GEARMAN_SUCCESS);
 
   if (context->options != gearman_worker_options_t())
   {
-    gearman_worker_add_options(&worker, context->options);
+    gearman_worker_add_options(worker, context->options);
   }
 
-  gearman_worker_set_timeout(&worker, 100);
+  gearman_worker_set_timeout(worker, 100);
 
   assert(context->handle);
 
   while (context->handle->shutdown == false)
   {
-    gearman_return_t ret= gearman_worker_work(&worker);
+    gearman_return_t ret= gearman_worker_work(worker);
     (void)ret;
   }
 
-  gearman_worker_free(&worker);
+  gearman_worker_free(worker);
 
   free(context);
 
   pthread_exit(0);
 }
 
-static struct worker_handle_st *_test_worker_start(in_port_t port, 
-                                                   const char *namespace_key,
-                                                   const char *function_name,
-                                                   gearman_worker_fn *worker_fn,
-                                                   gearman_function_fn *partition_fn,
-                                                   gearman_aggregator_fn *aggregator_fn,
-                                                   void *function_arg,
-                                                   gearman_worker_options_t options)
+struct worker_handle_st *test_worker_start(in_port_t port, 
+					   const char *namespace_key,
+					   const char *function_name,
+					   gearman_function_t &worker_fn,
+					   void *context,
+					   gearman_worker_options_t options)
 {
   pthread_attr_t attr;
 
   pthread_attr_init(&attr);
   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
 
-  struct worker_handle_st *handle= (struct worker_handle_st *)calloc(1, sizeof(struct worker_handle_st));
+  worker_handle_st *handle= new worker_handle_st();
   assert(handle);
 
-  struct context_st *foo= (struct context_st *)calloc(1, sizeof(struct context_st));
+  context_st *foo= new context_st(worker_fn);
 
   foo->port= port;
   foo->function_name= function_name;
-  foo->worker_fn= worker_fn;
-  foo->function_arg= function_arg;
+  foo->context= context;
   foo->handle= handle;
   foo->options= options;
-  foo->partition_fn= partition_fn;
-  foo->aggregator_fn= aggregator_fn;
   foo->namespace_key= namespace_key;
 
   test_assert_errno(pthread_create(&handle->thread, &attr, thread_runner, foo));
@@ -147,34 +132,6 @@ static struct worker_handle_st *_test_worker_start(in_port_t port,
   return handle;
 }
 
-struct worker_handle_st *test_worker_start(in_port_t port,
-                                           const char *function_name,
-                                           gearman_worker_fn *worker_fn, 
-                                           void *function_arg, gearman_worker_options_t options)
-{
-  return _test_worker_start(port, NULL, function_name, worker_fn, NULL, NULL, function_arg, options);
-}
-
-struct worker_handle_st *test_worker_start_with_namespace(in_port_t port,
-                                           const char *function_name,
-                                           gearman_worker_fn *worker_fn, 
-                                           void *function_arg,
-                                           const char *namespace_key,
-                                           gearman_worker_options_t options)
-{
-  return _test_worker_start(port, namespace_key, function_name, worker_fn, NULL, NULL, function_arg, options);
-}
-
-struct worker_handle_st *test_worker_start_with_reducer(in_port_t port,
-                                                        const char *namespace_key,
-                                                        const char *function_name,
-                                                        gearman_function_fn *partition_fn, gearman_aggregator_fn *aggregator_fn,  
-                                                        void *function_arg,
-                                                        gearman_worker_options_t options)
-{
-  return _test_worker_start(port, namespace_key, function_name, NULL, partition_fn, aggregator_fn, function_arg, options);
-}
-
 void test_worker_stop(struct worker_handle_st *handle)
 {
   void *unused;

+ 6 - 17
libtest/worker.h

@@ -24,23 +24,12 @@ extern "C" {
 #endif
 
 LIBTEST_API
-  struct worker_handle_st *test_worker_start(in_port_t port, const char *function_name,
-                                             gearman_worker_fn *function, void *function_arg,
-                                             gearman_worker_options_t options);
-
-LIBTEST_API
-  struct worker_handle_st *test_worker_start_with_namespace(in_port_t port, const char *function_name,
-							    gearman_worker_fn *function, void *function_arg,
-							    const char *namespace_key,
-							    gearman_worker_options_t options);
-
-LIBTEST_API
-  struct worker_handle_st *test_worker_start_with_reducer(in_port_t port,
-							  const char *namespace_key,
-                                                          const char *function_name,
-                                                          gearman_function_fn *mapper_fn, gearman_aggregator_fn *aggregator_fn,  
-                                                          void *function_arg,
-                                                          gearman_worker_options_t options);
+  struct worker_handle_st *test_worker_start(in_port_t port, 
+					     const char *namespace_key,
+					     const char *function_name,
+					     gearman_function_t &worker_fn,
+					     void *context,
+					     gearman_worker_options_t options);
 
 LIBTEST_API
 void test_worker_stop(struct worker_handle_st *);

+ 3 - 2
tests/basic.cc

@@ -248,12 +248,13 @@ test_return_t lp_734663(void *object)
   struct worker_handle_st *worker_handle[NUMBER_OF_WORKERS];
 
   uint32_t counter= 0;
+  gearman_function_t counter_function_fn= gearman_function_create_v1(counter_function);
   for (uint32_t x= 0; x < NUMBER_OF_WORKERS; x++)
   {
-    worker_handle[x]= test_worker_start(test->port(), worker_function_name, counter_function, &counter, gearman_worker_options_t());
+    worker_handle[x]= test_worker_start(test->port(), NULL, worker_function_name, counter_function_fn, &counter, gearman_worker_options_t());
   }
 
-  time_t end_time= time(NULL) + 5;
+  time_t end_time= time(NULL) +5;
   time_t current_time= 0;
   while (counter < NUMBER_OF_JOBS || current_time < end_time)
   {

+ 2 - 1
tests/burnin.cc

@@ -202,7 +202,8 @@ void *world_create(test_return_t *error)
     return NULL;
   }
 
-  test->handle= test_worker_start(CLIENT_TEST_PORT, DEFAULT_WORKER_NAME, worker_fn, NULL, gearman_worker_options_t());
+  gearman_function_t func_arg= gearman_function_create_v1(worker_fn);
+  test->handle= test_worker_start(CLIENT_TEST_PORT, NULL, DEFAULT_WORKER_NAME, func_arg, NULL, gearman_worker_options_t());
   if (not test->handle)
   {
     *error= TEST_FAILURE;

+ 66 - 37
tests/client_test.cc

@@ -48,6 +48,8 @@
 #include <cstring>
 #include <ctime>
 #include <iostream>
+#include <vector>
+#include <boost/foreach.hpp>
 
 #define GEARMAN_CORE
 #include <libgearman/gearman.h>
@@ -76,6 +78,7 @@
 #include <tests/task.h>
 #include <tests/unique.h>
 #include <tests/workers.h>
+#include <tests/workers_v1.h>
 
 #ifndef __INTEL_COMPILER
 #pragma GCC diagnostic ignored "-Wold-style-cast"
@@ -86,24 +89,12 @@ struct client_test_st
   gearman_client_st *_client;
   bool _clone;
   pid_t gearmand_pid;
-  struct worker_handle_st *completion_worker;
-  struct worker_handle_st *chunky_worker;
-  struct worker_handle_st *unique_check;
-  struct worker_handle_st *split_worker;
-  struct worker_handle_st *namespace_completion_worker;
-  struct worker_handle_st *namespace_chunky_worker;
-  struct worker_handle_st *namespace_split_worker;
-  struct worker_handle_st* increment_reset_worker[10]; 
+  std::vector<worker_handle_st *> workers;
   const char *_worker_name;
 
   client_test_st() :
     _clone(true),
     gearmand_pid(-1),
-    completion_worker(NULL),
-    chunky_worker(NULL),
-    unique_check(NULL),
-    split_worker(NULL),
-    namespace_completion_worker(NULL),
     _worker_name(WORKER_FUNCTION_NAME)
   { 
     if (not (_client= gearman_client_create(NULL)))
@@ -116,21 +107,19 @@ struct client_test_st
   ~client_test_st()
   {
     test_gearmand_stop(gearmand_pid);
-    test_worker_stop(completion_worker);
-    test_worker_stop(chunky_worker);
-    test_worker_stop(unique_check);
-    test_worker_stop(split_worker);
-    test_worker_stop(namespace_completion_worker);
-    test_worker_stop(namespace_chunky_worker);
-    test_worker_stop(namespace_split_worker);
-
-    for (uint32_t x= 0; x < 10; x++)
+
+    BOOST_FOREACH(worker_handle_st *worker, workers)
     {
-      test_worker_stop(increment_reset_worker[x]);
+      test_worker_stop(worker);
     }
     gearman_client_free(_client);
   }
 
+  void push(worker_handle_st *arg)
+  {
+    workers.push_back(arg);
+  }
+
   const char *worker_name() const
   {
     return _worker_name;
@@ -819,8 +808,9 @@ static test_return_t bug_518512_test(void *)
   test_false(result);
   test_compare(0, result_size);
 
-  struct worker_handle_st *completion_worker= test_worker_start(CLIENT_TEST_PORT, "client_test_temp",
-                                                                client_test_temp_worker, NULL, gearman_worker_options_t());
+  gearman_function_t func_arg= gearman_function_create_v1(client_test_temp_worker);
+  struct worker_handle_st *completion_worker= test_worker_start(CLIENT_TEST_PORT, NULL, "client_test_temp",
+                                                                func_arg, NULL, gearman_worker_options_t());
 
   gearman_client_set_timeout(&client, -1);
   result= gearman_client_do(&client, "client_test_temp", NULL, NULL, 0,
@@ -848,10 +838,11 @@ static test_return_t loop_test(void *)
   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
 
   struct worker_handle_st *handles[NUMBER_OF_WORKERS];
+  gearman_function_t func_arg= gearman_function_create_v1(client_test_temp_worker);
   for (size_t x= 0; x < NUMBER_OF_WORKERS; x++)
   {
-    handles[x]= test_worker_start(CLIENT_TEST_PORT, "client_test_temp",
-                                  client_test_temp_worker, NULL, gearman_worker_options_t());
+    handles[x]= test_worker_start(CLIENT_TEST_PORT, NULL, "client_test_temp",
+                                  func_arg, NULL, gearman_worker_options_t());
   }
 
   pthread_create(&one, &attr, client_thread, NULL);
@@ -1095,6 +1086,24 @@ static test_return_t pre_chunk(void *object)
   return TEST_SUCCESS;
 }
 
+static test_return_t pre_v2(void *object)
+{
+  client_test_st *all= (client_test_st *)object;
+
+  all->set_worker_name(WORKER_FUNCTION_NAME"_v2");
+
+  return TEST_SUCCESS;
+}
+
+static test_return_t pre_chunk_v2(void *object)
+{
+  client_test_st *all= (client_test_st *)object;
+
+  all->set_worker_name(WORKER_CHUNKED_FUNCTION_NAME"_v2");
+
+  return TEST_SUCCESS;
+}
+
 static test_return_t pre_free_tasks(void *object)
 {
   client_test_st *all= (client_test_st *)object;
@@ -1199,20 +1208,36 @@ void *world_create(test_return_t *error)
     return NULL;
   }
 
-  test->completion_worker= test_worker_start(CLIENT_TEST_PORT, WORKER_FUNCTION_NAME, echo_or_react_worker, NULL, gearman_worker_options_t());
-  test->chunky_worker= test_worker_start(CLIENT_TEST_PORT, WORKER_CHUNKED_FUNCTION_NAME, echo_or_react_chunk_worker, NULL, gearman_worker_options_t());
-  test->unique_check= test_worker_start(CLIENT_TEST_PORT, WORKER_UNIQUE_FUNCTION_NAME, unique_worker, NULL, GEARMAN_WORKER_GRAB_UNIQ);
-  test->split_worker= test_worker_start_with_reducer(CLIENT_TEST_PORT, NULL, WORKER_SPLIT_FUNCTION_NAME, split_worker, cat_aggregator_fn,  NULL, GEARMAN_WORKER_GRAB_ALL);
+  // Version 1 functions
+  gearman_function_t echo_react_fn_v1= gearman_function_create_v1(echo_or_react_worker);
+  test->push(test_worker_start(CLIENT_TEST_PORT, NULL, WORKER_FUNCTION_NAME, echo_react_fn_v1, NULL, gearman_worker_options_t()));
+  gearman_function_t echo_react_chunk_fn_v1= gearman_function_create_v1(echo_or_react_chunk_worker);
+  test->push(test_worker_start(CLIENT_TEST_PORT, NULL, WORKER_CHUNKED_FUNCTION_NAME, echo_react_chunk_fn_v1, NULL, gearman_worker_options_t()));
+
+  // Version 2 functsions
+  gearman_function_t echo_react_fn_v2= gearman_function_create(echo_or_react_worker_v2);
+  test->push(test_worker_start(CLIENT_TEST_PORT, NULL, WORKER_FUNCTION_NAME"_v2", echo_react_fn_v2, NULL, gearman_worker_options_t()));
+  gearman_function_t echo_react_chunk_fn_v2= gearman_function_create(echo_or_react_chunk_worker_v2);
+  test->push(test_worker_start(CLIENT_TEST_PORT, NULL, WORKER_CHUNKED_FUNCTION_NAME"_v2", echo_react_chunk_fn_v2, NULL, gearman_worker_options_t()));
+
+  gearman_function_t unique_worker_arg= gearman_function_create_v1(unique_worker);
+  test->push(test_worker_start(CLIENT_TEST_PORT, NULL, WORKER_UNIQUE_FUNCTION_NAME, unique_worker_arg, NULL, GEARMAN_WORKER_GRAB_UNIQ));
+
+  gearman_function_t split_worker_fn= gearman_function_create_partition(split_worker, cat_aggregator_fn);
+  test->push(test_worker_start(CLIENT_TEST_PORT, NULL, WORKER_SPLIT_FUNCTION_NAME, split_worker_fn,  NULL, GEARMAN_WORKER_GRAB_ALL));
 
-  test->namespace_completion_worker= test_worker_start_with_namespace(CLIENT_TEST_PORT, WORKER_FUNCTION_NAME, echo_or_react_worker, NULL, NAMESPACE_KEY, gearman_worker_options_t());
-  test->namespace_chunky_worker= test_worker_start_with_namespace(CLIENT_TEST_PORT, WORKER_CHUNKED_FUNCTION_NAME, echo_or_react_worker, NULL, NAMESPACE_KEY, gearman_worker_options_t());
-  test->namespace_split_worker= test_worker_start_with_reducer(CLIENT_TEST_PORT, NAMESPACE_KEY, WORKER_SPLIT_FUNCTION_NAME, split_worker, cat_aggregator_fn,  NULL, GEARMAN_WORKER_GRAB_ALL);
+  // Namespace versions of the above
+  test->push(test_worker_start(CLIENT_TEST_PORT, NAMESPACE_KEY, WORKER_FUNCTION_NAME, echo_react_fn_v1, NULL, gearman_worker_options_t()));
+  test->push(test_worker_start(CLIENT_TEST_PORT, NAMESPACE_KEY, WORKER_CHUNKED_FUNCTION_NAME, echo_react_chunk_fn_v1, NULL, gearman_worker_options_t()));
+  test->push(test_worker_start(CLIENT_TEST_PORT, NAMESPACE_KEY, WORKER_SPLIT_FUNCTION_NAME, split_worker_fn,  NULL, GEARMAN_WORKER_GRAB_ALL));
 
+  gearman_function_t increment_reset_worker_fn= gearman_function_create_v1(increment_reset_worker);
   for (uint32_t x= 0; x < 10; x++)
   {
-    test->increment_reset_worker[x]= test_worker_start(CLIENT_TEST_PORT, 
-                                                       "increment_reset_worker", increment_reset_worker, 
-                                                       NULL, gearman_worker_options_t());
+    test->push(test_worker_start(CLIENT_TEST_PORT, 
+                                 NULL,
+                                 "increment_reset_worker", increment_reset_worker_fn, 
+                                 NULL, gearman_worker_options_t()));
   }
 
   test->gearmand_pid= gearmand_pid;
@@ -1366,10 +1391,13 @@ collection_st collection[] ={
   {"gearman_client_st chunky", pre_chunk, post_function_reset, tests}, // Test with a worker that will respond in part
   {"gearman_strerror()", 0, 0, gearman_strerror_tests},
   {"gearman_task_add_task()", 0, 0, gearman_task_tests},
+  {"gearman_task_add_task() v2 workers", pre_v2, post_function_reset, gearman_task_tests},
   {"gearman_task_add_task() chunky", pre_chunk, post_function_reset, gearman_task_tests},
+  {"gearman_task_add_task() chunky v2 workers", pre_chunk_v2, post_function_reset, gearman_task_tests},
   {"gearman_task_add_task() namespace", pre_namespace, post_function_reset, gearman_task_tests},
   {"gearman_task_add_task(GEARMAN_CLIENT_FREE_TASKS)", pre_free_tasks, post_free_tasks, gearman_task_tests},
   {"gearman_task_add_task(GEARMAN_PAUSE)", pre_chunk, post_function_reset, gearman_task_pause_tests},
+  {"gearman_task_add_task(GEARMAN_PAUSE)", pre_chunk_v2, post_function_reset, gearman_task_pause_tests},
   {"unique", pre_unique, post_function_reset, unique_tests},
   {"gearman_client_set_workload_malloc_fn()", 0, 0, gearman_client_set_workload_malloc_fn_tests},
   {"gearman_client_do()", 0, 0, gearman_client_do_tests},
@@ -1384,6 +1412,7 @@ collection_st collection[] ={
   {"gearman_execute()", 0, 0, gearman_execute_tests},
   {"gearman_execute(GEARMAN_CLIENT_FREE_TASKS)", pre_free_tasks, post_free_tasks, gearman_execute_tests},
   {"gearman_execute() chunked return", pre_chunk, post_function_reset, gearman_execute_tests},
+  {"gearman_execute() chunked return", pre_chunk_v2, post_function_reset, gearman_execute_tests},
   {"gearman_execute_partition()", 0, 0, gearman_execute_partition_tests},
   {"gearman_execute_partition(GEARMAN_CLIENT_FREE_TASKS)", pre_free_tasks, post_free_tasks, gearman_execute_partition_tests},
   {"gearman_command_t", 0, 0, gearman_command_t_tests},

+ 4 - 2
tests/include.am

@@ -21,7 +21,8 @@ noinst_HEADERS+= \
 		 tests/server_options.h \
 		 tests/task.h \
 		 tests/unique.h \
-		 tests/workers.h
+		 tests/workers.h \
+		 tests/workers_v1.h
 
 CLIENT_LDADD= \
 	      libtest/libtest.la \
@@ -53,7 +54,8 @@ tests_client_test_SOURCES= \
 			   tests/server_options.cc \
 			   tests/task.cc \
 			   tests/unique.cc \
-			   tests/workers.cc
+			   tests/workers.cc \
+			   tests/workers_v1.cc
 tests_client_test_LDADD= ${CLIENT_LDADD}
 tests_client_test_DEPENDENCIES= ${CLIENT_LDADD}
 

+ 4 - 4
tests/task.cc

@@ -395,8 +395,8 @@ test_return_t gearman_client_add_task_pause_test(void *object)
   gearman_task_st *task= gearman_client_add_task(client, NULL, NULL,
                                                  worker_function, NULL, "dog", 3,
                                                  &ret);
-  test_true(client->actions.data_fn == pause_actions.data_fn);
-  test_true_got(gearman_success(ret), gearman_strerror(ret));
+  test_compare(client->actions.data_fn, pause_actions.data_fn);
+  test_compare_got(GEARMAN_SUCCESS, ret, gearman_strerror(ret));
   test_truth(task);
 
   do
@@ -405,9 +405,9 @@ test_return_t gearman_client_add_task_pause_test(void *object)
     uint32_t count= 0;
     do {
       count++;
-      test_true(client->actions.data_fn == pause_actions.data_fn);
+      test_compare(client->actions.data_fn, pause_actions.data_fn);
       ret= gearman_client_run_tasks(client);
-      test_true(client->actions.data_fn == pause_actions.data_fn);
+      test_compare(client->actions.data_fn, pause_actions.data_fn);
     } while (gearman_continue(ret));
 
     test_compare_got(GEARMAN_SUCCESS, ret, gearman_client_error(client));

Some files were not shown because too many files changed in this diff