Browse Source

Update to shell around task.

Brian Aker 12 years ago
parent
commit
b1541f4b29

+ 17 - 26
libgearman-1.0/interface/task.h

@@ -58,35 +58,26 @@ enum gearman_task_kind_t {
   GEARMAN_TASK_KIND_DO
 };
 
+#ifdef __cplusplus
+struct Task;
+#endif
+
 struct gearman_task_st
 {
   struct {
-    bool allocated;
-    bool send_in_use;
-    bool is_known;
-    bool is_running;
-    bool was_reduced;
-    bool is_paused;
+    bool is_allocated;
     bool is_initialized;
   } options;
-  enum gearman_task_kind_t type;
-  enum gearman_task_state_t state;
-  uint32_t magic_;
-  uint32_t created_id;
-  uint32_t numerator;
-  uint32_t denominator;
-  uint32_t client_count;
-  gearman_client_st *client;
-  gearman_task_st *next;
-  gearman_task_st *prev;
-  void *context;
-  gearman_connection_st *con;
-  gearman_packet_st *recv;
-  gearman_packet_st send;
-  struct gearman_actions_t func;
-  gearman_return_t result_rc;
-  struct gearman_result_st *result_ptr;
-  char job_handle[GEARMAN_JOB_HANDLE_SIZE];
-  size_t unique_length;
-  char unique[GEARMAN_MAX_UNIQUE_SIZE];
+  void *_impl;
+#ifdef __cplusplus
+  struct Task* impl() const
+  {
+    return (Task*)(_impl);
+  }
+
+  void impl(Task* impl_)
+  {
+    _impl= impl_;
+  }
+#endif
 };

+ 2 - 1
libgearman-1.0/task_attr.h

@@ -66,7 +66,8 @@ struct gearman_task_attr_t {
 extern "C" {
 #endif
 
-#define  gearman_next(X) (X) ? (X)->next : NULL
+GEARMAN_API
+  gearman_task_st *gearman_next(gearman_task_st *task);
 
 GEARMAN_API
   gearman_task_attr_t gearman_task_attr_init(gearman_job_priority_t priority);

+ 25 - 18
libgearman/actions.cc

@@ -47,27 +47,28 @@
 
 struct gearman_result_st;
 
-static gearman_return_t _client_pause_data(gearman_task_st *task)
+static gearman_return_t _client_pause_data(gearman_task_st* shell)
 {
+  Task* task= shell->impl();
   if (task->options.is_paused)
   {
     task->options.is_paused= false;
     return GEARMAN_SUCCESS;
   }
 
-  if (gearman_task_data_size(task))
+  if (gearman_task_data_size(shell))
   {
-    if (gearman_task_result(task))
+    if (gearman_task_result(shell))
     {
-      gearman_task_result(task)->clear();
+      gearman_task_result(shell)->clear();
     }
     else
     {
-      task->result_ptr= new (std::nothrow) gearman_result_st(gearman_task_data_size(task));
+      task->result_ptr= new (std::nothrow) gearman_result_st(gearman_task_data_size(shell));
     }
     assert_msg(task->result_ptr, "programmer error, result_ptr has not been allocated for task");
 
-    gearman_string_append(gearman_task_mutable_result(task)->string(), static_cast<const char*>(gearman_task_data(task)), gearman_task_data_size(task));
+    gearman_string_append(gearman_task_mutable_result(shell)->string(), static_cast<const char*>(gearman_task_data(shell)), gearman_task_data_size(shell));
   }
 
   if (task->recv->command == GEARMAN_COMMAND_WORK_DATA)
@@ -92,8 +93,9 @@ static gearman_return_t _client_pause_complete(gearman_task_st *task)
 }
 
 
-static gearman_return_t _client_pause_status(gearman_task_st *task)
+static gearman_return_t _client_pause_status(gearman_task_st* shell)
 {
+  Task* task= shell->impl();
   assert_msg(task->recv->command == GEARMAN_COMMAND_WORK_STATUS or
              task->recv->command == GEARMAN_COMMAND_STATUS_RES_UNIQUE or
              task->recv->command == GEARMAN_COMMAND_STATUS_RES, "status has been called out of order for task, or was registered to run on non-status callback, see gearman_actions_t(3)");
@@ -107,8 +109,9 @@ static gearman_return_t _client_pause_status(gearman_task_st *task)
   return GEARMAN_PAUSE;
 }
 
-static gearman_return_t _client_pause_fail(gearman_task_st *task)
+static gearman_return_t _client_pause_fail(gearman_task_st* shell)
 {
+  Task* task= shell->impl();
   assert_msg(task->recv->command == GEARMAN_COMMAND_WORK_FAIL, 
              "fail callback has been called out of order for task, or was registered to run on non-fail callback, see gearman_actions_t(3)");
   if (task->options.is_paused)
@@ -121,39 +124,43 @@ static gearman_return_t _client_pause_fail(gearman_task_st *task)
   return GEARMAN_PAUSE;
 }
 
-static gearman_return_t _client_do_data(gearman_task_st *task)
+static gearman_return_t _client_do_data(gearman_task_st* shell)
 {
-  if (gearman_task_data_size(task))
+  Task* task= shell->impl();
+
+  if (gearman_task_data_size(shell))
   {
-    if (gearman_task_result(task) == NULL)
+    if (gearman_task_result(shell) == NULL)
     {
-      task->result_ptr= new (std::nothrow) gearman_result_st(gearman_task_data_size(task));
+      task->result_ptr= new (std::nothrow) gearman_result_st(gearman_task_data_size(shell));
       if (task->result_ptr == NULL)
       {
         return GEARMAN_MEMORY_ALLOCATION_FAILURE;
       }
     }
 
-    gearman_string_append(gearman_task_mutable_result(task)->string(), static_cast<const char*>(gearman_task_data(task)), gearman_task_data_size(task));
+    gearman_string_append(gearman_task_mutable_result(shell)->string(), static_cast<const char*>(gearman_task_data(shell)), gearman_task_data_size(shell));
   }
 
   return GEARMAN_SUCCESS;
 }
 
-static gearman_return_t _client_do_complete(gearman_task_st *task)
+static gearman_return_t _client_do_complete(gearman_task_st *shell)
 {
-  if (gearman_task_data_size(task))
+  Task* task= shell->impl();
+
+  if (gearman_task_data_size(shell))
   {
-    if (gearman_task_result(task) == NULL)
+    if (gearman_task_result(shell) == NULL)
     {
-      task->result_ptr= new (std::nothrow) gearman_result_st(gearman_task_data_size(task));
+      task->result_ptr= new (std::nothrow) gearman_result_st(gearman_task_data_size(shell));
       if (task->result_ptr == NULL)
       {
         return GEARMAN_MEMORY_ALLOCATION_FAILURE;
       }
     }
 
-    gearman_string_append(gearman_task_mutable_result(task)->string(), static_cast<const char*>(gearman_task_data(task)), gearman_task_data_size(task));
+    gearman_string_append(gearman_task_mutable_result(shell)->string(), static_cast<const char*>(gearman_task_data(shell)), gearman_task_data_size(shell));
   }
 
   task->result_rc= GEARMAN_SUCCESS;

+ 21 - 15
libgearman/add.cc

@@ -99,7 +99,7 @@ gearman_task_st *add_task_ptr(gearman_client_st& client,
 }
 
 gearman_task_st *add_task(gearman_client_st& client,
-                          gearman_task_st *task,
+                          gearman_task_st *task_shell,
                           void *context,
                           gearman_command_t command,
                           const gearman_string_t &function,
@@ -135,12 +135,18 @@ gearman_task_st *add_task(gearman_client_st& client,
     return NULL;
   }
 
-  task= gearman_task_internal_create(&client, task);
-  if (task == NULL)
+  task_shell= gearman_task_internal_create(client, task_shell);
+  if (task_shell == NULL or task_shell->impl() == NULL)
   {
     gearman_error(client.universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "");
     return NULL;
   }
+  assert(task_shell);
+  assert(task_shell->impl());
+  assert(task_shell->impl()->client);
+  assert(task_shell->impl()->client == &client);
+
+  Task* task= task_shell->impl();
 
   task->context= context;
   task->func= actions;
@@ -242,10 +248,10 @@ gearman_task_st *add_task(gearman_client_st& client,
     client.running_tasks++;
     task->options.send_in_use= true;
 
-    return task;
+    return task->shell();
   }
 
-  gearman_task_free(task);
+  gearman_task_free(task->shell());
 
   return NULL;
 }
@@ -292,15 +298,15 @@ gearman_task_st *add_reducer_task(gearman_client_st *client,
     return NULL;
   }
 
-  gearman_task_st *task= gearman_task_internal_create(client, NULL);
+  gearman_task_st *task= gearman_task_internal_create(*client, NULL);
   if (task == NULL)
   {
     gearman_error(client->universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "");
     return NULL;
   }
 
-  task->context= context;
-  task->func= actions;
+  task->impl()->context= context;
+  task->impl()->func= actions;
 
   /**
     @todo fix it so that NULL is done by default by the API not by happenstance.
@@ -328,14 +334,14 @@ gearman_task_st *add_reducer_task(gearman_client_st *client,
   {
     args[1]= gearman_c_str(unique);
     args_size[1]= gearman_size(unique) + 1;
-    strncpy(task->unique, gearman_c_str(unique), gearman_size(unique));
+    strncpy(task->impl()->unique, gearman_c_str(unique), gearman_size(unique));
   }
   else
   {
     safe_uuid_generate(uuid);
-    uuid_unparse(uuid, task->unique);
-    task->unique[GEARMAN_MAX_UUID_SIZE]= 0;
-    args[1]= task->unique;
+    uuid_unparse(uuid, task->impl()->unique);
+    task->impl()->unique[GEARMAN_MAX_UUID_SIZE]= 0;
+    args[1]= task->impl()->unique;
     args_size[1]= GEARMAN_MAX_UUID_SIZE +1; // +1 is for the needed null
   }
 
@@ -372,14 +378,14 @@ gearman_task_st *add_reducer_task(gearman_client_st *client,
   args_size[4]= gearman_size(workload);
 
   gearman_return_t rc;
-  if (gearman_success(rc= gearman_packet_create_args(client->universal, task->send,
+  if (gearman_success(rc= gearman_packet_create_args(client->universal, task->impl()->send,
                                                      GEARMAN_MAGIC_REQUEST, command,
                                                      args, args_size,
                                                      5)))
   {
     client->new_tasks++;
     client->running_tasks++;
-    task->options.send_in_use= true;
+    task->impl()->options.send_in_use= true;
   }
   else
   {
@@ -387,7 +393,7 @@ gearman_task_st *add_reducer_task(gearman_client_st *client,
     gearman_task_free(task);
     task= NULL;
   }
-  task->type= GEARMAN_TASK_KIND_EXECUTE;
+  task->impl()->type= GEARMAN_TASK_KIND_EXECUTE;
 
   return task;
 }

+ 113 - 94
libgearman/client.cc

@@ -142,18 +142,23 @@ static void *_client_do(gearman_client_st *client, gearman_command_t command,
   gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
 
   gearman_task_st do_task;
-  gearman_task_st *do_task_ptr= add_task(*client, &do_task, NULL, command,
-                                         function,
-                                         local_unique,
-                                         workload,
-                                         time_t(0),
-                                         gearman_actions_do_default());
-  if (do_task_ptr == NULL)
-  {
-    *ret_ptr= gearman_universal_error_code(client->universal);
-    return NULL;
+  {
+    gearman_task_st *do_task_ptr= add_task(*client, &do_task, NULL, command,
+                                           function,
+                                           local_unique,
+                                           workload,
+                                           time_t(0),
+                                           gearman_actions_do_default());
+    if (do_task_ptr == NULL)
+    {
+      *ret_ptr= gearman_universal_error_code(client->universal);
+      return NULL;
+    }
+    assert(do_task.impl());
+    assert(&do_task == do_task_ptr);
   }
-  do_task_ptr->type= GEARMAN_TASK_KIND_DO;
+
+  do_task.impl()->type= GEARMAN_TASK_KIND_DO;
 
   gearman_return_t ret= gearman_client_run_block_tasks(client);
 
@@ -173,14 +178,14 @@ static void *_client_do(gearman_client_st *client, gearman_command_t command,
     *ret_ptr= ret;
     *result_size= 0;
   }
-  else if (gearman_success(ret) and do_task_ptr->result_rc == GEARMAN_SUCCESS)
+  else if (gearman_success(ret) and do_task.impl()->result_rc == GEARMAN_SUCCESS)
   {
-    *ret_ptr= do_task_ptr->result_rc;
-    if (gearman_task_result(do_task_ptr))
+    *ret_ptr= do_task.impl()->result_rc;
+    if (gearman_task_result(&do_task))
     {
       if (gearman_has_allocator(client->universal))
       {
-        gearman_string_t result= gearman_result_string(do_task_ptr->result_ptr);
+        gearman_string_t result= gearman_result_string(do_task.impl()->result_ptr);
         returnable= static_cast<char *>(gearman_malloc(client->universal, gearman_size(result) +1));
         if (returnable == NULL)
         {
@@ -196,7 +201,7 @@ static void *_client_do(gearman_client_st *client, gearman_command_t command,
       }
       else
       {
-        gearman_string_t result= gearman_result_take_string(do_task_ptr->result_ptr);
+        gearman_string_t result= gearman_result_take_string(do_task.impl()->result_ptr);
         *result_size= gearman_size(result);
         returnable= const_cast<char *>(gearman_c_str(result));
       }
@@ -208,9 +213,9 @@ static void *_client_do(gearman_client_st *client, gearman_command_t command,
   }
   else // gearman_client_run_tasks() was successful, but the task was not
   {
-    gearman_error(client->universal, do_task_ptr->result_rc, "occured during gearman_client_run_tasks()");
+    gearman_error(client->universal, do_task.impl()->result_rc, "occured during gearman_client_run_tasks()");
 
-    *ret_ptr= do_task_ptr->result_rc;
+    *ret_ptr= do_task.impl()->result_rc;
     *result_size= 0;
   }
 
@@ -245,28 +250,32 @@ static gearman_return_t _client_do_background(gearman_client_st *client,
 
   client->_do_handle[0]= 0; // Reset the job_handle we store in client
 
-  gearman_task_st do_task, *do_task_ptr;
-  do_task_ptr= add_task(*client, &do_task, 
-                        client, 
-                        command,
-                        function,
-                        unique,
-                        workload,
-                        time_t(0),
-                        gearman_actions_do_default());
-  if (not do_task_ptr)
+  gearman_task_st do_task;
   {
-    return gearman_universal_error_code(client->universal);
+    gearman_task_st* do_task_ptr= add_task(*client, &do_task, 
+                                           client, 
+                                           command,
+                                           function,
+                                           unique,
+                                           workload,
+                                           time_t(0),
+                                           gearman_actions_do_default());
+    if (do_task_ptr == NULL)
+    {
+      return gearman_universal_error_code(client->universal);
+    }
+    assert(do_task_ptr);
+    assert(&do_task == do_task_ptr);
   }
-  do_task_ptr->type= GEARMAN_TASK_KIND_DO;
+  do_task.impl()->type= GEARMAN_TASK_KIND_DO;
 
   gearman_return_t ret= gearman_client_run_block_tasks(client);
 
   if (job_handle)
   {
-    strncpy(job_handle, do_task.job_handle, GEARMAN_JOB_HANDLE_SIZE);
+    strncpy(job_handle, do_task.impl()->job_handle, GEARMAN_JOB_HANDLE_SIZE);
   }
-  strncpy(client->_do_handle, do_task.job_handle, GEARMAN_JOB_HANDLE_SIZE);
+  strncpy(client->_do_handle, do_task.impl()->job_handle, GEARMAN_JOB_HANDLE_SIZE);
   client->new_tasks= 0;
   client->running_tasks= 0;
   gearman_task_free(&do_task);
@@ -634,7 +643,7 @@ size_t gearman_client_count_tasks(gearman_client_st *client)
   size_t count= 1;
   gearman_task_st *search= client->task_list;
 
-  while ((search= search->next))
+  while ((search= search->impl()->next))
   {
     count++;
   }
@@ -758,18 +767,23 @@ gearman_status_t gearman_client_unique_status(gearman_client_st *client,
 
   gearman_return_t ret;
   gearman_task_st do_task;
-  gearman_task_st *do_task_ptr= gearman_client_add_task_status_by_unique(client,
-                                                                         &do_task,
-                                                                         unique, &ret);
-  if (gearman_failed(ret))
   {
-    gearman_status_set_return(status, ret);
-    return status;
+    gearman_task_st *do_task_ptr= gearman_client_add_task_status_by_unique(client,
+                                                                           &do_task,
+                                                                           unique, &ret);
+    if (gearman_failed(ret))
+    {
+      gearman_status_set_return(status, ret);
+      return status;
+    }
+    assert(do_task_ptr);
+    assert(&do_task == do_task_ptr);
   }
-  assert(do_task_ptr);
-  do_task_ptr->type= GEARMAN_TASK_KIND_DO;
 
-  gearman_task_clear_fn(do_task_ptr);
+  Task* task= do_task.impl();
+  task->type= GEARMAN_TASK_KIND_DO;
+
+  gearman_task_clear_fn(&do_task);
 
   ret= gearman_client_run_block_tasks(client);
 
@@ -779,26 +793,26 @@ gearman_status_t gearman_client_unique_status(gearman_client_st *client,
   if (gearman_success(ret))
   {
     gearman_status_set(status,
-                       do_task.options.is_known,
-                       do_task.options.is_running,
-                       do_task.numerator,
-                       do_task.denominator,
-                       do_task.client_count);
+                       task->options.is_known,
+                       task->options.is_running,
+                       task->numerator,
+                       task->denominator,
+                       task->client_count);
 
     if (gearman_status_is_known(status) == false and gearman_status_is_running(status) == false)
     {
-      if (do_task.options.is_running) 
+      if (task->options.is_running) 
       {
         ret= GEARMAN_IN_PROGRESS;
       }
-      else if (do_task.options.is_known)
+      else if (task->options.is_known)
       {
         ret= GEARMAN_JOB_EXISTS;
       }
     }
   }
 
-  gearman_task_free(do_task_ptr);
+  gearman_task_free(&do_task);
 
   gearman_status_set_return(status, ret);
 
@@ -821,16 +835,18 @@ gearman_return_t gearman_client_job_status(gearman_client_st *client,
   universal_reset_error(client->universal);
 
   gearman_task_st do_task;
-  gearman_task_st *do_task_ptr= gearman_client_add_task_status(client, &do_task, client,
-                                                               job_handle, &ret);
-  if (gearman_failed(ret))
   {
-    return ret;
+    gearman_task_st *do_task_ptr= gearman_client_add_task_status(client, &do_task, client,
+                                                                 job_handle, &ret);
+    if (gearman_failed(ret))
+    {
+      return ret;
+    }
+    assert(do_task_ptr);
   }
-  assert(do_task_ptr);
-  do_task_ptr->type= GEARMAN_TASK_KIND_DO;
+  do_task.impl()->type= GEARMAN_TASK_KIND_DO;
 
-  gearman_task_clear_fn(do_task_ptr);
+  gearman_task_clear_fn(&do_task);
 
   ret= gearman_client_run_block_tasks(client);
 
@@ -841,31 +857,31 @@ gearman_return_t gearman_client_job_status(gearman_client_st *client,
   {
     if (is_known)
     {
-      *is_known= do_task.options.is_known;
+      *is_known= do_task.impl()->options.is_known;
     }
 
     if (is_running)
     {
-      *is_running= do_task.options.is_running;
+      *is_running= do_task.impl()->options.is_running;
     }
 
     if (numerator)
     {
-      *numerator= do_task.numerator;
+      *numerator= do_task.impl()->numerator;
     }
 
     if (denominator)
     {
-      *denominator= do_task.denominator;
+      *denominator= do_task.impl()->denominator;
     }
 
     if (is_known == false and is_running == false)
     {
-      if (do_task.options.is_running) 
+      if (do_task.impl()->options.is_running) 
       {
         ret= GEARMAN_IN_PROGRESS;
       }
-      else if (do_task.options.is_known)
+      else if (do_task.impl()->options.is_known)
       {
         ret= GEARMAN_JOB_EXISTS;
       }
@@ -893,7 +909,7 @@ gearman_return_t gearman_client_job_status(gearman_client_st *client,
       *denominator= 0;
     }
   }
-  gearman_task_free(do_task_ptr);
+  gearman_task_free(&do_task);
 
   return ret;
 }
@@ -1129,7 +1145,7 @@ gearman_task_st* gearman_client_add_task_low_background(gearman_client_st *clien
 }
 
 gearman_task_st *gearman_client_add_task_status(gearman_client_st *client,
-                                                gearman_task_st *task,
+                                                gearman_task_st *task_shell,
                                                 void *context,
                                                 const gearman_job_handle_t job_handle,
                                                 gearman_return_t *ret_ptr)
@@ -1149,12 +1165,14 @@ gearman_task_st *gearman_client_add_task_status(gearman_client_st *client,
     return NULL;
   }
 
-  if ((task= gearman_task_internal_create(client, task)) == NULL)
+  if ((task_shell= gearman_task_internal_create(*client, task_shell)) == NULL)
   {
     *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
     return NULL;
   }
 
+  Task* task= task_shell->impl();
+
   task->context= context;
   snprintf(task->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%s", job_handle);
 
@@ -1172,11 +1190,11 @@ gearman_task_st *gearman_client_add_task_status(gearman_client_st *client,
   }
   *ret_ptr= rc;
 
-  return task;
+  return task_shell;
 }
 
 gearman_task_st *gearman_client_add_task_status_by_unique(gearman_client_st *client,
-                                                          gearman_task_st *task_ptr,
+                                                          gearman_task_st *task_shell,
                                                           const char *unique_handle,
                                                           gearman_return_t *ret_ptr)
 {
@@ -1208,13 +1226,14 @@ gearman_task_st *gearman_client_add_task_status_by_unique(gearman_client_st *cli
     return NULL;
   }
 
-  gearman_task_st *task;
-  if ((task= gearman_task_internal_create(client, task_ptr)) == NULL)
+  if ((task_shell= gearman_task_internal_create(*client, task_shell)) == NULL)
   {
     *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
     return NULL;
   }
 
+  Task* task= task_shell->impl();
+
   task->unique_length= unique_length;
   memcpy(task->unique, unique_handle, unique_length);
   task->unique[task->unique_length]= 0;
@@ -1233,7 +1252,7 @@ gearman_task_st *gearman_client_add_task_status_by_unique(gearman_client_st *cli
   }
   *ret_ptr= rc;
 
-  return task;
+  return task_shell;
 }
 
 void gearman_client_set_workload_fn(gearman_client_st *client,
@@ -1329,9 +1348,9 @@ static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
       if (client->new_tasks > 0 && ! (client->options.no_new))
       {
         for (client->task= client->task_list; client->task;
-             client->task= client->task->next)
+             client->task= client->task->impl()->next)
         {
-          if (client->task->state != GEARMAN_TASK_STATE_NEW)
+          if (client->task->impl()->state != GEARMAN_TASK_STATE_NEW)
           {
             continue;
           }
@@ -1343,8 +1362,8 @@ static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
             break;
           }
 
-          assert_msg(client == client->task->client, "Programmer error, client and task member client are not the same");
-          gearman_return_t local_ret= _client_run_task(client->task);
+          assert_msg(client == client->task->impl()->client, "Programmer error, client and task member client are not the same");
+          gearman_return_t local_ret= _client_run_task(client->task->impl());
           if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
           {
             client->state= GEARMAN_CLIENT_STATE_NEW;
@@ -1370,11 +1389,11 @@ static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
         {
           /* Socket is ready for writing, continue submitting jobs. */
           for (client->task= client->task_list; client->task;
-               client->task= client->task->next)
+               client->task= client->task->impl()->next)
           {
-            if (client->task->con != client->con or
-                (client->task->state != GEARMAN_TASK_STATE_SUBMIT and
-                 client->task->state != GEARMAN_TASK_STATE_WORKLOAD))
+            if (client->task->impl()->con != client->con or
+                (client->task->impl()->state != GEARMAN_TASK_STATE_SUBMIT and
+                 client->task->impl()->state != GEARMAN_TASK_STATE_WORKLOAD))
             {
               continue;
             }
@@ -1385,8 +1404,8 @@ static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
               client->state= GEARMAN_CLIENT_STATE_IDLE;
               break;
             }
-            assert_msg(client == client->task->client, "Programmer error, client and task member client are not the same");
-            gearman_return_t local_ret= _client_run_task(client->task);
+            assert_msg(client == client->task->impl()->client, "Programmer error, client and task member client are not the same");
+            gearman_return_t local_ret= _client_run_task(client->task->impl());
             if (local_ret == GEARMAN_COULD_NOT_CONNECT)
             {
               client->state= GEARMAN_CLIENT_STATE_IDLE;
@@ -1424,11 +1443,11 @@ static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
             if (client->con->recv_state == GEARMAN_CON_RECV_STATE_READ_DATA)
             {
               for (client->task= client->task_list; client->task;
-                   client->task= client->task->next)
+                   client->task= client->task->impl()->next)
               {
-                if (client->task->con == client->con &&
-                    (client->task->state == GEARMAN_TASK_STATE_DATA or
-                     client->task->state == GEARMAN_TASK_STATE_COMPLETE))
+                if (client->task->impl()->con == client->con &&
+                    (client->task->impl()->state == GEARMAN_TASK_STATE_DATA or
+                     client->task->impl()->state == GEARMAN_TASK_STATE_COMPLETE))
                 {
                   break;
                 }
@@ -1475,9 +1494,9 @@ static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
 
             /* We have a packet, see which task it belongs to. */
             for (client->task= client->task_list; client->task;
-                 client->task= client->task->next)
+                 client->task= client->task->impl()->next)
             {
-              if (client->task->con != client->con)
+              if (client->task->impl()->con != client->con)
               {
                 continue;
               }
@@ -1485,7 +1504,7 @@ static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
               gearman_log_debug(&client->universal, "Got %s", gearman_strcommand(client->con->_packet.command));
               if (client->con->_packet.command == GEARMAN_COMMAND_JOB_CREATED)
               {
-                if (client->task->created_id != client->con->created_id)
+                if (client->task->impl()->created_id != client->con->created_id)
                 {
                   continue;
                 }
@@ -1519,13 +1538,13 @@ static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
                                static_cast<char *>(client->con->_packet.arg[0]),
                                client->con->_packet.arg_size[0]) == 0))
               { }
-              else if (strncmp(client->task->job_handle,
+              else if (strncmp(client->task->impl()->job_handle,
                                static_cast<char *>(client->con->_packet.arg[0]),
                                client->con->_packet.arg_size[0]) ||
                        (client->con->_packet.command != GEARMAN_COMMAND_WORK_FAIL &&
-                        strlen(client->task->job_handle) != client->con->_packet.arg_size[0] - 1) ||
+                        strlen(client->task->impl()->job_handle) != client->con->_packet.arg_size[0] - 1) ||
                        (client->con->_packet.command == GEARMAN_COMMAND_WORK_FAIL &&
-                        strlen(client->task->job_handle) != client->con->_packet.arg_size[0]))
+                        strlen(client->task->impl()->job_handle) != client->con->_packet.arg_size[0]))
               {
                 continue;
               }
@@ -1542,13 +1561,13 @@ static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
               continue;
             }
 
-            client->task->recv= &(client->con->_packet);
+            client->task->impl()->recv= &(client->con->_packet);
           }
 
   case GEARMAN_CLIENT_STATE_PACKET:
           /* Let task process job created or result packet. */
-          assert_msg(client == client->task->client, "Programmer error, client and task member client are not the same");
-          gearman_return_t local_ret= _client_run_task(client->task);
+          assert_msg(client == client->task->impl()->client, "Programmer error, client and task member client are not the same");
+          gearman_return_t local_ret= _client_run_task(client->task->impl());
           if (local_ret == GEARMAN_IO_WAIT)
           {
             break;

+ 1 - 0
libgearman/common.h

@@ -70,3 +70,4 @@ struct gearman_result_st;
 #include <libgearman/status.hpp>
 
 #include <libgearman/protocol/submit.h>
+#include <libgearman/interface/task.hpp>

+ 34 - 27
libgearman/do.cc

@@ -77,16 +77,20 @@ void *client_do(gearman_client_st *client, gearman_command_t command,
     return NULL;
   }
 
-  gearman_task_st *do_task_ptr= add_task(*client, &do_task, NULL, command,
-                                         function,
-                                         local_unique,
-                                         workload,
-                                         time_t(0),
-                                         gearman_actions_do_default());
-  if (do_task_ptr == NULL)
   {
-    *ret_ptr= gearman_universal_error_code(client->universal);
-    return NULL;
+    gearman_task_st *do_task_ptr= add_task(*client, &do_task, NULL, command,
+                                           function,
+                                           local_unique,
+                                           workload,
+                                           time_t(0),
+                                           gearman_actions_do_default());
+    if (do_task_ptr == NULL)
+    {
+      *ret_ptr= gearman_universal_error_code(client->universal);
+      return NULL;
+    }
+    assert(do_task_ptr);
+    assert(&do_task == do_task_ptr);
   }
 
   gearman_return_t ret= gearman_client_run_block_tasks(client);
@@ -104,12 +108,12 @@ void *client_do(gearman_client_st *client, gearman_command_t command,
   else // Now we check the task itself
   {
     assert(ret == GEARMAN_SUCCESS); // Programmer mistake
-    if (gearman_success(do_task_ptr->result_rc))
+    if (gearman_success(do_task.impl()->result_rc))
     {
-      *ret_ptr= do_task_ptr->result_rc;
-      if (gearman_task_result(do_task_ptr))
+      *ret_ptr= do_task.impl()->result_rc;
+      if (gearman_task_result(&do_task))
       {
-        gearman_string_t result= gearman_result_take_string(do_task_ptr->result_ptr);
+        gearman_string_t result= gearman_result_take_string(do_task.impl()->result_ptr);
         *result_size= gearman_size(result);
         returnable= gearman_c_str(result);
       }
@@ -118,9 +122,9 @@ void *client_do(gearman_client_st *client, gearman_command_t command,
     }
     else // gearman_client_run_block_tasks() was successful, but the task was not
     {
-      gearman_error(client->universal, do_task_ptr->result_rc, "occured during gearman_client_run_tasks()");
+      gearman_error(client->universal, do_task.impl()->result_rc, "occured during gearman_client_run_tasks()");
 
-      *ret_ptr= do_task_ptr->result_rc;
+      *ret_ptr= do_task.impl()->result_rc;
       *result_size= 0;
     }
   }
@@ -146,20 +150,23 @@ gearman_return_t client_do_background(gearman_client_st *client,
   }
 
   gearman_task_st do_task;
-  gearman_task_st *do_task_ptr= add_task(*client, &do_task, 
-                                         client, 
-                                         command,
-                                         function,
-                                         unique,
-                                         workload,
-                                         time_t(0),
-                                         gearman_actions_do_default());
-  if (do_task_ptr == NULL)
   {
-    return gearman_universal_error_code(client->universal);
+    gearman_task_st *do_task_ptr= add_task(*client, &do_task, 
+                                           client, 
+                                           command,
+                                           function,
+                                           unique,
+                                           workload,
+                                           time_t(0),
+                                           gearman_actions_do_default());
+    if (do_task_ptr == NULL)
+    {
+      return gearman_universal_error_code(client->universal);
+    }
+    assert(&do_task == do_task_ptr);
   }
 
-  gearman_task_clear_fn(do_task_ptr);
+  gearman_task_clear_fn(&do_task);
 
   gearman_return_t ret= gearman_client_run_block_tasks(client);
   assert(ret != GEARMAN_IO_WAIT);
@@ -167,7 +174,7 @@ gearman_return_t client_do_background(gearman_client_st *client,
   {
     if (job_handle)
     {
-      strncpy(job_handle, do_task.job_handle, GEARMAN_JOB_HANDLE_SIZE);
+      strncpy(job_handle, do_task.impl()->job_handle, GEARMAN_JOB_HANDLE_SIZE);
     }
     client->new_tasks= 0;
     client->running_tasks= 0;

+ 1 - 1
libgearman/execute.cc

@@ -150,7 +150,7 @@ gearman_task_st *gearman_execute(gearman_client_st *client,
     return NULL;
   }
 
-  task->type= GEARMAN_TASK_KIND_EXECUTE;
+  task->impl()->type= GEARMAN_TASK_KIND_EXECUTE;
   gearman_client_run_tasks(client);
 
   return task;

+ 128 - 0
libgearman/interface/task.hpp

@@ -0,0 +1,128 @@
+/*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
+ * 
+ *  Gearmand client and server library.
+ *
+ *  Copyright (C) 2012 Data Differential, http://datadifferential.com/
+ *  All rights reserved.
+ *
+ *  Redistribution and use in source and binary forms, with or without
+ *  modification, are permitted provided that the following conditions are
+ *  met:
+ *
+ *      * Redistributions of source code must retain the above copyright
+ *  notice, this list of conditions and the following disclaimer.
+ *
+ *      * Redistributions in binary form must reproduce the above
+ *  copyright notice, this list of conditions and the following disclaimer
+ *  in the documentation and/or other materials provided with the
+ *  distribution.
+ *
+ *      * The names of its contributors may not be used to endorse or
+ *  promote products derived from this software without specific prior
+ *  written permission.
+ *
+ *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#pragma once
+
+#define TASK_MAGIC 134
+#define TASK_ANTI_MAGIC 157
+
+struct Task
+{
+  struct Options {
+    bool allocated;
+    bool send_in_use;
+    bool is_known;
+    bool is_running;
+    bool was_reduced;
+    bool is_paused;
+    bool is_initialized;
+
+    Options() :
+      allocated(true),
+      send_in_use(false),
+      is_known(false),
+      is_running(false),
+      was_reduced(false),
+      is_paused(false),
+      is_initialized(true)
+    { }
+  } options;
+  enum gearman_task_kind_t type;
+  enum gearman_task_state_t state;
+  uint32_t magic_;
+  uint32_t created_id;
+  uint32_t numerator;
+  uint32_t denominator;
+  uint32_t client_count;
+  gearman_client_st *client;
+  gearman_task_st *next;
+  gearman_task_st *prev;
+  void *context;
+  gearman_connection_st *con;
+  gearman_packet_st *recv;
+  gearman_packet_st send;
+  struct gearman_actions_t func;
+  gearman_return_t result_rc;
+  struct gearman_result_st *result_ptr;
+  gearman_task_st* _shell;
+  char job_handle[GEARMAN_JOB_HANDLE_SIZE];
+  size_t unique_length;
+  char unique[GEARMAN_MAX_UNIQUE_SIZE];
+
+  gearman_task_st* shell()
+  {
+    return _shell;
+  }
+
+  Task(gearman_client_st& client_, gearman_task_st* shell_) :
+    type(GEARMAN_TASK_KIND_ADD_TASK),
+    state(GEARMAN_TASK_STATE_NEW),
+    magic_(TASK_MAGIC),
+    created_id(0),
+    numerator(0),
+    denominator(0),
+    client_count(0),
+    client(&client_),
+    next(NULL),
+    prev(NULL),
+    context(NULL),
+    con(NULL),
+    recv(NULL),
+    func(client_.actions),
+    result_rc(GEARMAN_UNKNOWN_STATE),
+    result_ptr(NULL),
+    _shell(shell_),
+    unique_length(0)
+  {
+    job_handle[0]= 0;
+    unique[0]= 0;
+
+    // Add the task to the client
+    {
+      if (client_.task_list)
+      {
+        client_.task_list->impl()->prev= shell_;
+      }
+      next= client_.task_list;
+      prev= NULL;
+      client_.task_list= shell_;
+      client_.task_count++;
+    }
+
+    shell_->impl(this);
+  }
+};

+ 2 - 2
libgearman/job.cc

@@ -132,9 +132,9 @@ struct gearman_job_reducer_st {
     {
       do
       {
-        if (gearman_failed(check_task->result_rc))
+        if (gearman_failed(check_task->impl()->result_rc))
         {
-          return check_task->result_rc;
+          return check_task->impl()->result_rc;
         }
       } while ((check_task= gearman_next(check_task)));
 

Some files were not shown because too many files changed in this diff