Browse Source

Restructure code so that a worker copies functions when cloned.

Brian Aker 11 years ago
parent
commit
5ddf48998a

+ 60 - 20
libgearman/function/base.hpp

@@ -66,52 +66,92 @@ struct _worker_function_st
 
   struct _worker_function_st *next;
   struct _worker_function_st *prev;
-  char *function_name;
-  size_t function_length;
-  void *context;
 
-  _worker_function_st(void *context_arg) : 
+  private:
+  char* _function_name;
+  size_t _function_length;
+  size_t _namespace_length;
+  void* _context;
+  struct gearman_function_t _function;
+  int _timeout;
+
+  public:
+
+  _worker_function_st(const gearman_function_t& function_, void *context_) : 
     next(NULL),
     prev(NULL),
-    function_name(NULL),
-    function_length(0),
-    context(context_arg)
+    _function_name(NULL),
+    _function_length(0),
+    _namespace_length(0),
+    _context(context_),
+    _function(function_),
+    _timeout(0)
   { }
 
   virtual bool has_callback() const= 0;
 
   virtual gearman_function_error_t callback(gearman_job_st* job, void *context_arg)= 0;
 
-  bool init(gearman_vector_st* namespace_arg, const char *name_arg, size_t size)
+  bool init(gearman_vector_st* namespace_,
+            const char *name_, const size_t size,
+            const int timeout_)
   {
-    function_length= gearman_string_length(namespace_arg) +size;
-    function_name= new (std::nothrow) char[function_length +1];
-    if (function_name == NULL)
+    _timeout= timeout_;
+
+    _namespace_length= gearman_string_length(namespace_);
+    _function_length= _namespace_length +size;
+    _function_name= new (std::nothrow) char[_function_length +1];
+    if (_function_name == NULL)
     {
       return false;
     }
 
-    char *ptr= function_name;
-    if (gearman_string_length(namespace_arg))
+    char *ptr= _function_name;
+    if (gearman_string_length(namespace_))
     {
-      memcpy(ptr, gearman_string_value(namespace_arg), gearman_string_length(namespace_arg));
-      ptr+= gearman_string_length(namespace_arg);
+      memcpy(ptr, gearman_string_value(namespace_), gearman_string_length(namespace_));
+      ptr+= gearman_string_length(namespace_);
     }
 
-    memcpy(ptr, name_arg, size);
-    function_name[function_length]= 0;
+    memcpy(ptr, name_, size);
+    _function_name[_function_length]= 0;
 
     return true;
   }
 
+  int timeout() const
+  {
+    return _timeout;
+  }
+
   const char *name() const
   {
-    return function_name;
+    return _function_name;
+  }
+
+  const char *function_name() const
+  {
+    return _function_name +_namespace_length;
+  }
+
+  size_t function_length() const
+  {
+    return length() -_namespace_length;
   }
 
   size_t length() const
   {
-    return function_length;
+    return _function_length;
+  }
+
+  const gearman_function_t& function()
+  {
+    return _function;
+  }
+
+  void* context()
+  {
+    return _context;
   }
 
   virtual ~_worker_function_st()
@@ -121,7 +161,7 @@ struct _worker_function_st
       gearman_packet_free(&_packet);
     }
 
-    delete [] function_name;
+    delete [] _function_name;
   }
 
   gearman_packet_st& packet()

+ 3 - 2
libgearman/function/function_v1.hpp

@@ -45,8 +45,9 @@ class FunctionV1: public _worker_function_st
   gearman_worker_fn *_worker_fn;
 
 public:
-  FunctionV1(gearman_worker_fn *worker_fn_arg, void *context_arg) :
-    _worker_function_st(context_arg),
+  FunctionV1(const gearman_function_t &function_,
+             gearman_worker_fn *worker_fn_arg, void *context_arg) :
+    _worker_function_st(function_, context_arg),
     _worker_fn(worker_fn_arg)
   { }
 

+ 3 - 2
libgearman/function/function_v2.hpp

@@ -45,8 +45,9 @@ class FunctionV2: public _worker_function_st
   gearman_function_fn *_function;
 
 public:
-  FunctionV2(gearman_function_fn *_function_arg, void *context_arg) :
-    _worker_function_st(context_arg),
+  FunctionV2(const gearman_function_t &function_,
+             gearman_function_fn *_function_arg, void *context_arg) :
+    _worker_function_st(function_, context_arg),
     _function(_function_arg)
   { }
 

+ 10 - 9
libgearman/function/make.cc

@@ -49,33 +49,34 @@
 
 _worker_function_st *make(gearman_vector_st* namespace_arg,
                           const char *name, size_t name_length, 
-                          const gearman_function_t &function_arg, 
-                          void *context_arg)
+                          const gearman_function_t &function_, 
+                          void *context_arg, const int timeout_)
 {
   _worker_function_st *function= NULL;
 
-  switch (function_arg.kind)
+  switch (function_.kind)
   {
     case GEARMAN_WORKER_FUNCTION_V1:
-      function= new (std::nothrow) FunctionV1(function_arg.callback.function_v1.func, context_arg);
+      function= new (std::nothrow) FunctionV1(function_, function_.callback.function_v1.func, context_arg);
       break;
 
     case GEARMAN_WORKER_FUNCTION_V2:
-      function= new (std::nothrow) FunctionV2(function_arg.callback.function_v2.func, context_arg);
+      function= new (std::nothrow) FunctionV2(function_, function_.callback.function_v2.func, context_arg);
       break;
 
     case GEARMAN_WORKER_FUNCTION_PARTITION:
-      function= new (std::nothrow) Partition(function_arg.callback.partitioner.func, 
-                                             function_arg.callback.partitioner.aggregator,
+      function= new (std::nothrow) Partition(function_,
+                                             function_.callback.partitioner.func, 
+                                             function_.callback.partitioner.aggregator,
                                              context_arg);
       break;
 
     case GEARMAN_WORKER_FUNCTION_NULL:
-      function= new (std::nothrow) Null(context_arg);
+      function= new (std::nothrow) Null(function_, context_arg);
       break;
   }
 
-  if (function and function->init(namespace_arg, name, name_length) == false)
+  if (function and function->init(namespace_arg, name, name_length, timeout_) == false)
   {
     delete function;
     return NULL;

+ 4 - 4
libgearman/function/make.hpp

@@ -39,7 +39,7 @@
 
 struct _worker_function_st;
 
-_worker_function_st *make(gearman_vector_st* namespace_arg,
-                          const char *name, size_t name_length, 
-                          const gearman_function_t &function_arg, 
-                          void *context_arg);
+_worker_function_st *make(gearman_vector_st*,
+                          const char*, size_t, 
+                          const gearman_function_t&, 
+                          void*, const int);

+ 2 - 2
libgearman/function/null.hpp

@@ -44,8 +44,8 @@ class Null: public _worker_function_st
 {
 
 public:
-  Null(void *context_arg) :
-    _worker_function_st(context_arg)
+  Null(const gearman_function_t &function_, void *context_arg) :
+    _worker_function_st(function_, context_arg)
   { }
 
   bool has_callback() const

+ 4 - 2
libgearman/function/partition.hpp

@@ -46,8 +46,10 @@ class Partition: public _worker_function_st
   gearman_aggregator_fn *aggregator_fn;
 
 public:
-  Partition(gearman_function_fn *partition_fn_arg, gearman_aggregator_fn *aggregator_fn_arg, void *context_arg) :
-    _worker_function_st(context_arg),
+  Partition(const gearman_function_t &function_,
+            gearman_function_fn *partition_fn_arg, gearman_aggregator_fn *aggregator_fn_arg,
+            void *context_arg) :
+    _worker_function_st(function_, context_arg),
     _partition_fn(partition_fn_arg),
     aggregator_fn(aggregator_fn_arg)
   { }

+ 23 - 11
libgearman/worker.cc

@@ -61,16 +61,16 @@
  * @{
  */
 
-static inline struct _worker_function_st *_function_exist(Worker* worker, const char *function_name, size_t function_length)
+static inline struct _worker_function_st *_function_exist(Worker* worker, const char *function_name, const size_t function_length)
 {
   struct _worker_function_st *function;
 
   for (function= worker->function_list; function;
        function= function->next)
   {
-    if (function_length == function->function_length)
+    if (function_length == function->function_length())
     {
-      if (memcmp(function_name, function->function_name, function_length) == 0)
+      if (memcmp(function_name, function->function_name(), function_length) == 0)
       {
         break;
       }
@@ -99,7 +99,7 @@ static gearman_return_t _worker_add_server(const char *host, in_port_t port, voi
  * Allocate and add a function to the register list.
  */
 static gearman_return_t _worker_function_create(Worker *worker,
-                                                const char *function_name, size_t function_length,
+                                                const char *function_name, const size_t function_length,
                                                 const gearman_function_t &function,
                                                 uint32_t timeout,
                                                 void *context);
@@ -164,6 +164,17 @@ gearman_worker_st *gearman_worker_clone(gearman_worker_st *worker_shell,
       gearman_worker_free(worker_shell);
       return NULL;
     }
+
+    for (struct _worker_function_st* function= source->function_list;
+         function;
+         function= function->next)
+    {
+      _worker_function_create(worker,
+                              function->function_name(), function->function_length(),
+                              function->function(),
+                              function->timeout(),
+                              function->context());
+    }
   }
 
   return worker_shell;
@@ -1092,8 +1103,7 @@ gearman_return_t gearman_worker_work(gearman_worker_st *worker_shell)
                worker->work_function;
                worker->work_function= worker->work_function->next)
           {
-            if (not strcmp(gearman_job_function_name(worker->work_job()),
-                           worker->work_function->function_name))
+            if (strcmp(gearman_job_function_name(worker->work_job()), worker->work_function->name()) == 0)
             {
               break;
             }
@@ -1117,7 +1127,7 @@ gearman_return_t gearman_worker_work(gearman_worker_st *worker_shell)
       case GEARMAN_WORKER_WORK_UNIVERSAL_FUNCTION:
         {
           switch (worker->work_function->callback(worker->work_job(),
-                                                          static_cast<void *>(worker->work_function->context)))
+                                                  static_cast<void *>(worker->work_function->context())))
           {
             case GEARMAN_FUNCTION_FATAL:
               if (gearman_job_send_fail_fin(worker->work_job()->impl()) == GEARMAN_LOST_CONNECTION) // If we fail this, we have no connection, @note this causes us to lose the current error
@@ -1283,7 +1293,7 @@ static gearman_return_t _worker_add_server(const char *host, in_port_t port, voi
 }
 
 static gearman_return_t _worker_function_create(Worker *worker,
-                                                const char *function_name, size_t function_length,
+                                                const char *function_name_, const size_t function_length_,
                                                 const gearman_function_t &function_arg,
                                                 uint32_t timeout,
                                                 void *context)
@@ -1291,9 +1301,9 @@ static gearman_return_t _worker_function_create(Worker *worker,
   const void *args[2];
   size_t args_size[2];
 
-  if (function_length == 0 or function_name == NULL or function_length > GEARMAN_FUNCTION_MAX_SIZE)
+  if (function_length_ == 0 or function_name_ == NULL or function_length_ > GEARMAN_FUNCTION_MAX_SIZE)
   {
-    if (function_length > GEARMAN_FUNCTION_MAX_SIZE)
+    if (function_length_ > GEARMAN_FUNCTION_MAX_SIZE)
     {
       gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "function name longer then GEARMAN_MAX_FUNCTION_SIZE");
     } 
@@ -1305,7 +1315,9 @@ static gearman_return_t _worker_function_create(Worker *worker,
     return GEARMAN_INVALID_ARGUMENT;
   }
 
-  _worker_function_st *function= make(worker->universal._namespace, function_name, function_length, function_arg, context);
+  _worker_function_st *function= make(worker->universal._namespace,
+                                      function_name_, function_length_,
+                                      function_arg, context, timeout);
   if (function == NULL)
   {
     gearman_perror(worker->universal, errno, "_worker_function_st::new()");

+ 25 - 0
libgearman/worker.hpp

@@ -83,6 +83,31 @@ public:
     gearman_worker_add_server(_worker, "localhost", arg);
   }
 
+  Worker(const Worker& worker_)
+  {
+    _worker= gearman_worker_clone(NULL, &worker_);
+
+    if (_worker == NULL)
+    {
+      throw std::runtime_error("gearman_worker_create() failed");
+    }
+    enable_logging();
+    enable_ssl();
+  }
+
+  void operator=(const Worker& worker_)
+  { 
+    gearman_worker_free(_worker);
+    _worker= gearman_worker_clone(NULL, &worker_);
+
+    if (_worker == NULL)
+    {
+      throw std::runtime_error("gearman_worker_create() failed");
+    }
+    enable_logging();
+    enable_ssl();
+  }
+
   gearman_worker_st* operator&() const
   { 
     return _worker;

+ 8 - 2
libtest/alarm.cc

@@ -44,7 +44,7 @@
 namespace libtest {
 
 #ifdef __APPLE__
-static const struct timeval default_it_value= { 1200, 0 };
+static const struct timeval default_it_value= { 2400, 0 };
 #else
 static const struct timeval default_it_value= { 600, 0 };
 #endif
@@ -58,10 +58,16 @@ void set_alarm()
 {
   if (gdb_is_caller() == false)
   {
-    if (setitimer(ITIMER_VIRTUAL, &defualt_timer, NULL) == -1)
+    struct itimerval old_timer;
+    if (setitimer(ITIMER_VIRTUAL, &defualt_timer, &old_timer) == -1)
     {
       Error << "setitimer() failed";
     }
+
+    if (old_timer.it_interval.tv_sec != 0 or old_timer.it_value.tv_sec != 0)
+    {
+      Error << "setitimer() return an old_timer structure which wasn't zero";
+    }
   }
 }
 

Some files were not shown because too many files changed in this diff