Browse Source

Fix issue where identifier was not being set on reconnect.

Brian Aker 12 years ago
parent
commit
f207b85e92

+ 8 - 12
libgearman-server/server.cc

@@ -68,8 +68,14 @@
  * Queue an error packet.
  */
 static gearmand_error_t _server_error_packet(gearman_server_con_st *server_con,
-                                             const char *error_code,
-                                             const char *error_string);
+                                             const char *error_code, const char *error_string)
+{
+  return gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
+                                      GEARMAN_COMMAND_ERROR, error_code,
+                                      (size_t)(strlen(error_code) + 1),
+                                      error_string,
+                                      (size_t)strlen(error_string), NULL);
+}
 
 /**
  * Send work result packets with data back to clients.
@@ -982,16 +988,6 @@ gearmand_error_t Context::replay_add(gearman_server_st *server,
 /*
  * Private definitions
  */
-static gearmand_error_t _server_error_packet(gearman_server_con_st *server_con,
-                                             const char *error_code,
-                                             const char *error_string)
-{
-  return gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
-                                      GEARMAN_COMMAND_ERROR, error_code,
-                                      (size_t)(strlen(error_code) + 1),
-                                      error_string,
-                                      (size_t)strlen(error_string), NULL);
-}
 
 static gearmand_error_t
 _server_queue_work_data(gearman_server_job_st *server_job,

+ 9 - 0
libgearman-server/text.cc

@@ -39,6 +39,7 @@
 
 #include "libgearman-server/common.h"
 #include "libgearman-server/log.h"
+#include "libgearman/command.h"
 #include "libgearman/vector.hpp"
 
 #include <cassert>
@@ -65,10 +66,18 @@ gearmand_error_t server_run_text(gearman_server_con_st *server_con,
                        int(packet->argc));
   }
 
+#if 0
+  const struct gearman_command_info_st *command= NULL;
+#endif
   if (packet->argc == 0)
   {
     data.vec_printf(TEXT_ERROR_UNKNOWN_COMMAND, 4, "NULL");
   }
+#if 0
+  else if ((command= gearman_command_lookup((char *)(packet->arg[0]), packet->arg_size[0])))
+  {
+  }
+#endif
   else if (strcasecmp("workers", (char *)(packet->arg[0])) == 0)
   {
     for (gearman_server_thread_st *thread= Server->thread_list;

+ 13 - 3
libgearman/command.cc

@@ -40,10 +40,12 @@
 #include <libgearman/common.h>
 
 #include <libgearman-1.0/visibility.h>
-#include <libgearman/command.h>
+#include <libgearman/command.hpp>
 
 #include "libgearman/assert.hpp"
 
+using namespace org::gearman;
+
 /**
  * Command info. Update GEARMAN_MAX_COMMAND_ARGS to the largest number in the
  * args column.
@@ -95,9 +97,17 @@ gearman_command_info_st gearmand_command_info_list[GEARMAN_COMMAND_MAX]=
   { "STATUS_RES_UNIQUE", GEARMAN_COMMAND_STATUS_RES_UNIQUE, 6, false }
 };
 
-gearman_command_info_st *gearman_command_info(gearman_command_t command)
+const gearman_command_info_st *gearman_command_info(gearman_command_t command)
 {
   assert(command >= GEARMAN_COMMAND_TEXT);
   assert(command < GEARMAN_COMMAND_MAX);
-  return &gearmand_command_info_list[command];
+  const struct gearman_command_info_st* command_info= &gearmand_command_info_list[command];
+  assert(command_info->code == command);
+  return command_info;
+}
+
+const struct gearman_command_info_st * gearman_command_lookup (register const char *str, register unsigned int len)
+{
+  const struct gearman_command_string_st* com_str=  command_string2command_code (str, len);
+  return gearman_command_info(com_str->code);
 }

+ 3 - 1
libgearman/command.h

@@ -53,7 +53,9 @@ struct gearman_command_info_st
 extern "C" {
 #endif
 
-struct gearman_command_info_st *gearman_command_info(gearman_command_t command);
+const struct gearman_command_info_st *gearman_command_info(gearman_command_t command);
+
+const struct gearman_command_info_st * gearman_command_lookup (register const char *str, register unsigned int len);
 
 #ifdef __cplusplus
 }

+ 45 - 0
libgearman/connection.cc

@@ -399,6 +399,41 @@ void gearman_connection_st::reset_addrinfo()
   addrinfo_next= NULL;
 }
 
+gearman_return_t gearman_connection_st::send_identifier(void)
+{
+  if (universal._identifier)
+  {
+    const void* id= (void*)universal._identifier->value();
+    size_t id_size= universal._identifier->size();
+
+    gearman_packet_st packet;
+    gearman_return_t ret= gearman_packet_create_args(universal, packet, GEARMAN_MAGIC_REQUEST,
+        GEARMAN_COMMAND_SET_CLIENT_ID,
+        (const void**)&id, &id_size, 1);
+
+    if (gearman_success(ret))
+    {
+      PUSH_BLOCKING(universal);
+
+      gearman_return_t local_ret= send_packet(packet, true);
+      if (gearman_failed(local_ret))
+      {
+        ret= local_ret;
+      }
+      else
+      {
+        options.identifier_sent= true;
+      }
+    }
+
+    gearman_packet_free(&packet);
+
+    return ret;
+  }
+
+  return GEARMAN_SUCCESS;
+}
+
 
 /*
  * The send_packet() method does not only send the passed-in packet_arg. If there are any server options
@@ -411,6 +446,16 @@ void gearman_connection_st::reset_addrinfo()
  */
 gearman_return_t gearman_connection_st::send_packet(const gearman_packet_st& packet_arg, const bool flush_buffer)
 {
+  if (options.identifier_sent == false)
+  {
+    gearman_return_t ret= send_identifier();
+    if (gearman_failed(ret))
+    {
+      return ret;
+    }
+    options.identifier_sent= true;
+  }
+
   if (options.server_options_sent == false)
   {
     for (gearman_server_options_st* head= universal.server_options_list;

+ 5 - 0
libgearman/connection.hpp

@@ -46,11 +46,13 @@ struct gearman_connection_st
 {
   struct Options {
     bool server_options_sent;
+    bool identifier_sent;
     bool ready;
     bool packet_in_use;
 
     Options() :
       server_options_sent(false),
+      identifier_sent(false),
       ready(false),
       packet_in_use(false)
     { }
@@ -133,11 +135,14 @@ public:
 
   gearman_connection_st(const gearman_connection_st&);
 
+  gearman_return_t send_identifier(void);
+
 private:
   gearman_return_t _send_packet(const gearman_packet_st&, const bool flush_buffer);
   gearman_return_t set_socket_options();
   size_t recv_socket(void *data, size_t data_size, gearman_return_t&);
   gearman_return_t connect_poll();
+
   gearman_packet_st *_recv_packet;
 };
 

+ 16 - 0
libgearman/include.am

@@ -28,6 +28,7 @@ noinst_HEADERS+= libgearman/job.h
 noinst_HEADERS+= libgearman/job.hpp
 noinst_HEADERS+= libgearman/ostream.hpp
 noinst_HEADERS+= libgearman/error_code.h
+noinst_HEADERS+= libgearman/command.hpp
 noinst_HEADERS+= libgearman/uuid.hpp
 noinst_HEADERS+= \
 		 libgearman/actions.hpp \
@@ -169,3 +170,18 @@ libgearman/error_code.hpp: libgearman/error_code.gperf
 	  rm $@t; \
 	  touch $@; \
 	  fi
+
+EXTRA_DIST+= libgearman/command.gperf
+BUILT_SOURCES+= libgearman/command.hpp
+libgearman_libgearman_la_SOURCES+= libgearman/command.hpp
+libgearman/command.hpp: libgearman/command.gperf
+	if $(GPERF) $(GPERFFLAGS) -D --struct-type \
+	  libgearman/command.gperf >$@t; then \
+	  mv $@t $@; \
+	  elif $(GPERF) --version >/dev/null 2>&1; then \
+	  rm $@t; \
+	  exit 1; \
+	  else \
+	  rm $@t; \
+	  touch $@; \
+	  fi

+ 10 - 0
libgearman/interface/universal.hpp

@@ -73,6 +73,7 @@ struct gearman_universal_st
   gearman_log_fn *log_fn;
   void *log_context;
   gearman_allocator_t allocator;
+  struct gearman_vector_st *_identifier;
   struct gearman_vector_st *_namespace;
   struct error_st {
     gearman_return_t rc;
@@ -174,6 +175,7 @@ struct gearman_universal_st
     log_fn(NULL),
     log_context(NULL),
     allocator(gearman_default_allocator()),
+    _identifier(NULL),
     _namespace(NULL)
   {
     wakeup_fd[0]= INVALID_SOCKET;
@@ -191,6 +193,14 @@ struct gearman_universal_st
       }
     }
   }
+
+  ~gearman_universal_st()
+  {
+    gearman_string_free(_identifier);
+    gearman_string_free(_namespace);
+  }
+
+  void identifier(const char *identifier_, const size_t identifier_size_);
 };
 
 static inline bool gearman_universal_is_non_blocking(gearman_universal_st &self)

+ 19 - 20
libgearman/universal.cc

@@ -100,6 +100,7 @@ void gearman_universal_clone(gearman_universal_st &destination, const gearman_un
   destination.timeout= source.timeout;
 
   destination._namespace= gearman_string_clone(source._namespace);
+  destination._identifier= gearman_string_clone(source._identifier);
   destination.verbose= source.verbose;
   destination.log_fn= source.log_fn;
   destination.log_context= source.log_context;
@@ -119,7 +120,6 @@ void gearman_universal_free(gearman_universal_st &universal)
 {
   gearman_free_all_cons(universal);
   gearman_free_all_packets(universal);
-  gearman_string_free(universal._namespace);
 
   if (universal.pfds)
   {
@@ -377,6 +377,19 @@ gearman_connection_st *gearman_ready(gearman_universal_st& universal)
   return NULL;
 }
 
+void gearman_universal_st::identifier(const char *identifier_, const size_t identifier_size_)
+{
+  gearman_string_free(_identifier);
+  if (identifier_ and identifier_size_)
+  {
+    _identifier= gearman_string_create(NULL, identifier_, identifier_size_);
+  }
+  else
+  {
+    _identifier= NULL;
+  }
+}
+
 gearman_return_t gearman_set_identifier(gearman_universal_st& universal,
                                         const char *id, size_t id_size)
 {
@@ -403,28 +416,14 @@ gearman_return_t gearman_set_identifier(gearman_universal_st& universal,
     }
   }
 
-  gearman_packet_st packet;
-  gearman_return_t ret= gearman_packet_create_args(universal, packet, GEARMAN_MAGIC_REQUEST,
-                                                   GEARMAN_COMMAND_SET_CLIENT_ID,
-                                                   (const void**)&id, &id_size, 1);
-  if (gearman_success(ret))
-  {
-    PUSH_BLOCKING(universal);
+  universal.identifier(id, id_size);
 
-    for (gearman_connection_st *con= universal.con_list; con; con= con->next)
-    {
-      gearman_return_t local_ret= con->send_packet(packet, true);
-      if (gearman_failed(local_ret))
-      {
-        ret= local_ret;
-      }
-
-    }
+  for (gearman_connection_st *con= universal.con_list; con; con= con->next)
+  {
+    con->send_identifier();
   }
 
-  gearman_packet_free(&packet);
-
-  return ret;
+  return GEARMAN_SUCCESS;
 }
 
 static gearman_return_t connection_loop(gearman_universal_st& universal,

+ 6 - 6
libgearman/worker.cc

@@ -1281,7 +1281,7 @@ gearman_return_t gearman_worker_set_memory_allocators(gearman_worker_st *worker,
                                                       gearman_calloc_fn *calloc_fn,
                                                       void *context)
 {
-  if (worker)
+  if (worker and worker->impl())
   {
     return gearman_set_memory_allocator(worker->impl()->universal.allocator, malloc_fn, free_fn, realloc_fn, calloc_fn, context);
   }
@@ -1302,7 +1302,7 @@ bool gearman_worker_set_server_option(gearman_worker_st *worker_shell, const cha
 
 void gearman_worker_set_namespace(gearman_worker_st *self, const char *namespace_key, size_t namespace_key_size)
 {
-  if (self)
+  if (self and self->impl())
   {
     gearman_universal_set_namespace(self->impl()->universal, namespace_key, namespace_key_size);
   }
@@ -1332,7 +1332,7 @@ gearman_worker_st *gearman_job_clone_worker(gearman_job_st *job)
 gearman_return_t gearman_worker_set_identifier(gearman_worker_st *worker,
                                                const char *id, size_t id_size)
 {
-  if (worker)
+  if (worker and worker->impl())
   {
     return gearman_set_identifier(worker->impl()->universal, id, id_size);
   }
@@ -1340,11 +1340,11 @@ gearman_return_t gearman_worker_set_identifier(gearman_worker_st *worker,
   return GEARMAN_INVALID_ARGUMENT;
 }
 
-const char *gearman_worker_namespace(gearman_worker_st *self)
+const char *gearman_worker_namespace(gearman_worker_st* worker)
 {
-  if (self)
+  if (worker and worker->impl())
   {
-    return gearman_univeral_namespace(self->impl()->universal);
+    return gearman_univeral_namespace(worker->impl()->universal);
   }
 
   return NULL;