Browse Source

Updating to shell worker interface.

Brian Aker 12 years ago
parent
commit
f56036ea63

+ 1 - 0
libgearman-1.0/include.am

@@ -6,6 +6,7 @@ include libgearman-1.0/t/include.am
 
 nobase_include_HEADERS+= libgearman-1.0/interface/status.h
 nobase_include_HEADERS+= libgearman-1.0/interface/task.h
+nobase_include_HEADERS+= libgearman-1.0/interface/worker.h
 nobase_include_HEADERS+= libgearman-1.0/status.h
 nobase_include_HEADERS+= \
 			 libgearman-1.0/actions.h \

+ 63 - 0
libgearman-1.0/interface/worker.h

@@ -0,0 +1,63 @@
+/*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
+ * 
+ *  Gearmand client and server library.
+ *
+ *  Copyright (C) 2012 Data Differential, http://datadifferential.com/
+ *  All rights reserved.
+ *
+ *  Redistribution and use in source and binary forms, with or without
+ *  modification, are permitted provided that the following conditions are
+ *  met:
+ *
+ *      * Redistributions of source code must retain the above copyright
+ *  notice, this list of conditions and the following disclaimer.
+ *
+ *      * Redistributions in binary form must reproduce the above
+ *  copyright notice, this list of conditions and the following disclaimer
+ *  in the documentation and/or other materials provided with the
+ *  distribution.
+ *
+ *      * The names of its contributors may not be used to endorse or
+ *  promote products derived from this software without specific prior
+ *  written permission.
+ *
+ *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+struct Worker;
+#endif
+
+struct gearman_worker_st
+{
+  struct {
+    bool is_allocated;
+    bool is_initialized;
+  } options;
+  void *_impl;
+#ifdef __cplusplus
+  struct Worker* impl() const
+  {
+    return (Worker*)(_impl);
+  }
+
+  void impl(Worker* impl_)
+  {
+    _impl= impl_;
+  }
+#endif
+};
+

+ 2 - 33
libgearman-1.0/worker.h

@@ -43,6 +43,8 @@
 
 #pragma once
 
+#include <libgearman-1.0/interface/worker.h>
+
 /** @addtogroup gearman_worker Worker Declarations
  *
  * This is the interface gearman workers should use.
@@ -67,39 +69,6 @@
     GEARMAN_WORKER_WORK_UNIVERSAL_FAIL
   };
 
-/**
- * @ingroup gearman_worker
- */
-struct gearman_worker_st
-{
-  struct {
-    bool allocated;
-    bool non_blocking;
-    bool packet_init;
-    bool change;
-    bool grab_uniq;
-    bool grab_all;
-    bool timeout_return;
-  } options;
-  enum gearman_worker_state_t state;
-  enum gearman_worker_universal_t work_state;
-  uint32_t function_count;
-  uint32_t job_count;
-  size_t work_result_size;
-  void *context;
-  gearman_connection_st *con;
-  gearman_job_st *job;
-  gearman_job_st *job_list;
-  struct _worker_function_st *function;
-  struct _worker_function_st *function_list;
-  struct _worker_function_st *work_function;
-  void *work_result;
-  struct gearman_universal_st universal;
-  gearman_packet_st grab_job;
-  gearman_packet_st pre_sleep;
-  gearman_job_st *work_job;
-};
-
 #ifdef __cplusplus
 #define gearman_has_reducer(A) (A) ? static_cast<bool>((A)->reducer.final_fn) : false
 #else

+ 1 - 0
libgearman/common.h

@@ -71,3 +71,4 @@ struct gearman_result_st;
 
 #include <libgearman/protocol/submit.h>
 #include <libgearman/interface/task.hpp>
+#include <libgearman/interface/worker.hpp>

+ 1 - 1
libgearman/function/function_v1.hpp

@@ -63,7 +63,7 @@ public:
     }
 
     job->error_code= GEARMAN_SUCCESS;
-    job->worker->work_result= _worker_fn(job, context_arg, &(job->worker->work_result_size), &job->error_code);
+    job->worker->impl()->work_result= _worker_fn(job, context_arg, &(job->worker->impl()->work_result_size), &job->error_code);
 
     if (job->error_code == GEARMAN_LOST_CONNECTION)
     {

+ 2 - 0
libgearman/include.am

@@ -12,6 +12,8 @@
 
 nobase_include_HEADERS+= libgearman/gearman.h
 
+noinst_HEADERS+= libgearman/interface/task.hpp
+noinst_HEADERS+= libgearman/interface/worker.hpp
 noinst_HEADERS+= libgearman/uuid.hpp
 noinst_HEADERS+= libgearman/job.hpp
 noinst_HEADERS+= libgearman/job.h

+ 105 - 0
libgearman/interface/worker.hpp

@@ -0,0 +1,105 @@
+/*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
+ * 
+ *  Gearmand client and server library.
+ *
+ *  Copyright (C) 2012 Data Differential, http://datadifferential.com/
+ *  All rights reserved.
+ *
+ *  Redistribution and use in source and binary forms, with or without
+ *  modification, are permitted provided that the following conditions are
+ *  met:
+ *
+ *      * Redistributions of source code must retain the above copyright
+ *  notice, this list of conditions and the following disclaimer.
+ *
+ *      * Redistributions in binary form must reproduce the above
+ *  copyright notice, this list of conditions and the following disclaimer
+ *  in the documentation and/or other materials provided with the
+ *  distribution.
+ *
+ *      * The names of its contributors may not be used to endorse or
+ *  promote products derived from this software without specific prior
+ *  written permission.
+ *
+ *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#pragma once
+
+struct Worker
+{
+  struct Options {
+    bool is_allocated;
+    bool non_blocking;
+    bool packet_init;
+    bool change;
+    bool grab_uniq;
+    bool grab_all;
+    bool timeout_return;
+
+    Options() :
+      is_allocated(true),
+      non_blocking(false),
+      packet_init(false),
+      change(false),
+      grab_uniq(true),
+      grab_all(true),
+      timeout_return(false)
+    { }
+  } options;
+  enum gearman_worker_state_t state;
+  enum gearman_worker_universal_t work_state;
+  uint32_t function_count;
+  uint32_t job_count;
+  size_t work_result_size;
+  void *context;
+  gearman_connection_st *con;
+  gearman_job_st *job;
+  gearman_job_st *job_list;
+  struct _worker_function_st *function;
+  struct _worker_function_st *function_list;
+  struct _worker_function_st *work_function;
+  void *work_result;
+  struct gearman_universal_st universal;
+  gearman_packet_st grab_job;
+  gearman_packet_st pre_sleep;
+  gearman_job_st *work_job;
+  gearman_worker_st* _shell;
+
+  Worker(gearman_worker_st* shell_) :
+    state(GEARMAN_WORKER_STATE_START),
+    work_state(GEARMAN_WORKER_WORK_UNIVERSAL_GRAB_JOB),
+    function_count(0),
+    job_count(0),
+    work_result_size(0),
+    context(NULL),
+    con(NULL),
+    job(NULL),
+    job_list(NULL),
+    function(NULL),
+    function_list(NULL),
+    work_function(NULL),
+    work_result(NULL),
+    work_job(NULL),
+    _shell(shell_)
+  {
+  }
+
+
+  gearman_worker_st* shell()
+  {
+    return _shell;
+  }
+};
+

+ 1 - 0
libgearman/is.hpp

@@ -42,6 +42,7 @@
 #define gearman_is_initialized(__object) ((__object)->options.is_initialized)
 #define gearman_is_purging(__object) ((__object)->state.is_purging)
 #define gearman_is_processing_input(__object) ((__object)->state.is_processing_input)
+#define gearman_is_non_blocking(__object) ((__object)->options.non_blocking)
 #define gearman_set_purging(__object, __value) ((__object)->state.is_purging= (__value))
 #define gearman_set_processing_input(__object, __value) ((__object)->state.is_processing_input= (__value))
 #define gearman_set_initialized(__object, __value) ((__object)->options.is_initialized= (__value))

+ 19 - 19
libgearman/job.cc

@@ -173,9 +173,9 @@ gearman_job_st *gearman_job_create(gearman_worker_st *worker, gearman_job_st *jo
   else
   {
     job= new (std::nothrow) gearman_job_st;
-    if (not job)
+    if (job == NULL)
     {
-      gearman_perror(worker->universal, "new");
+      gearman_perror(worker->impl()->universal, "new");
       return NULL;
     }
 
@@ -190,14 +190,14 @@ gearman_job_st *gearman_job_create(gearman_worker_st *worker, gearman_job_st *jo
   job->reducer= NULL;
   job->error_code= GEARMAN_UNKNOWN_STATE;
 
-  if (worker->job_list)
+  if (worker->impl()->job_list)
   {
-    worker->job_list->prev= job;
+    worker->impl()->job_list->prev= job;
   }
-  job->next= worker->job_list;
+  job->next= worker->impl()->job_list;
   job->prev= NULL;
-  worker->job_list= job;
-  worker->job_count++;
+  worker->impl()->job_list= job;
+  worker->impl()->job_count++;
 
   job->con= NULL;
 
@@ -213,7 +213,7 @@ bool gearman_job_build_reducer(gearman_job_st *job, gearman_aggregator_fn *aggre
 
   gearman_string_t reducer_func= gearman_job_reducer_string(job);
 
-  job->reducer= new (std::nothrow) gearman_job_reducer_st(job->worker->universal, reducer_func, aggregator_fn);
+  job->reducer= new (std::nothrow) gearman_job_reducer_st(job->worker->impl()->universal, reducer_func, aggregator_fn);
   if (not job->reducer)
   {
     gearman_job_free(job);
@@ -249,7 +249,7 @@ gearman_return_t gearman_job_send_data(gearman_job_st *job, const void *data, si
     args_size[0]= job->assigned.arg_size[0];
     args[1]= data;
     args_size[1]= data_size;
-    gearman_return_t ret= gearman_packet_create_args(job->worker->universal, job->work,
+    gearman_return_t ret= gearman_packet_create_args(job->worker->impl()->universal, job->work,
                                                      GEARMAN_MAGIC_REQUEST,
                                                      GEARMAN_COMMAND_WORK_DATA,
                                                      args, args_size, 2);
@@ -279,7 +279,7 @@ gearman_return_t gearman_job_send_warning(gearman_job_st *job,
     args_size[1]= warning_size;
 
     gearman_return_t ret;
-    ret= gearman_packet_create_args(job->worker->universal, job->work,
+    ret= gearman_packet_create_args(job->worker->impl()->universal, job->work,
                                     GEARMAN_MAGIC_REQUEST,
                                     GEARMAN_COMMAND_WORK_WARNING,
                                     args, args_size, 2);
@@ -314,7 +314,7 @@ gearman_return_t gearman_job_send_status(gearman_job_st *job,
     args_size[2]= strlen(denominator_string);
 
     gearman_return_t ret;
-    ret= gearman_packet_create_args(job->worker->universal, job->work,
+    ret= gearman_packet_create_args(job->worker->impl()->universal, job->work,
                                     GEARMAN_MAGIC_REQUEST,
                                     GEARMAN_COMMAND_WORK_STATUS,
                                     args, args_size, 3);
@@ -360,7 +360,7 @@ gearman_return_t gearman_job_send_complete_fin(gearman_job_st *job,
     gearman_return_t rc= job->reducer->complete();
     if (gearman_failed(rc))
     {
-      return gearman_error(job->worker->universal, rc, "The reducer's complete() returned an error");
+      return gearman_error(job->worker->impl()->universal, rc, "The reducer's complete() returned an error");
     }
 
     gearman_vector_st *reduced_value= job->reducer->result.string();
@@ -386,7 +386,7 @@ gearman_return_t gearman_job_send_complete_fin(gearman_job_st *job,
 
     args[1]= result;
     args_size[1]= result_size;
-    gearman_return_t ret= gearman_packet_create_args(job->worker->universal, job->work,
+    gearman_return_t ret= gearman_packet_create_args(job->worker->impl()->universal, job->work,
                                                      GEARMAN_MAGIC_REQUEST,
                                                      GEARMAN_COMMAND_WORK_COMPLETE,
                                                      args, args_size, 2);
@@ -422,7 +422,7 @@ gearman_return_t gearman_job_send_exception(gearman_job_st *job,
     args[1]= exception;
     args_size[1]= exception_size;
 
-    gearman_return_t ret= gearman_packet_create_args(job->worker->universal, job->work,
+    gearman_return_t ret= gearman_packet_create_args(job->worker->impl()->universal, job->work,
                                                      GEARMAN_MAGIC_REQUEST,
                                                      GEARMAN_COMMAND_WORK_EXCEPTION,
                                                      args, args_size, 2);
@@ -455,7 +455,7 @@ gearman_return_t gearman_job_send_fail_fin(gearman_job_st *job)
   {
     args[0]= job->assigned.arg[0];
     args_size[0]= job->assigned.arg_size[0] - 1;
-    gearman_return_t ret= gearman_packet_create_args(job->worker->universal, job->work,
+    gearman_return_t ret= gearman_packet_create_args(job->worker->impl()->universal, job->work,
                                                      GEARMAN_MAGIC_REQUEST,
                                                      GEARMAN_COMMAND_WORK_FAIL,
                                                      args, args_size, 1);
@@ -559,9 +559,9 @@ void gearman_job_free(gearman_job_st *job)
     gearman_packet_free(&(job->work));
   }
 
-  if (job->worker->job_list == job)
+  if (job->worker->impl()->job_list == job)
   {
-    job->worker->job_list= job->next;
+    job->worker->impl()->job_list= job->next;
   }
 
   if (job->prev)
@@ -573,7 +573,7 @@ void gearman_job_free(gearman_job_st *job)
   {
     job->next->prev= job->prev;
   }
-  job->worker->job_count--;
+  job->worker->impl()->job_count--;
 
   delete job->reducer;
   job->reducer= NULL;
@@ -594,7 +594,7 @@ static gearman_return_t _job_send(gearman_job_st *job)
 
   while ((ret == GEARMAN_IO_WAIT) or (ret == GEARMAN_TIMEOUT))
   {
-    ret= gearman_wait(job->worker->universal);
+    ret= gearman_wait(job->worker->impl()->universal);
     if (ret == GEARMAN_SUCCESS)
     {
       ret= job->con->send_packet(job->work, true);

File diff suppressed because it is too large
+ 235 - 238
libgearman/worker.cc


Some files were not shown because too many files changed in this diff