Browse Source

Fix test case around httpd.

Brian Aker 12 years ago
parent
commit
f162f378c4

+ 8 - 0
examples/reverse_worker.cc

@@ -110,6 +110,14 @@ static gearman_return_t reverse_worker(gearman_job_st *job, void *context)
     }
   }
 
+  if (options.chunk == false) // Chunk the result set
+  {
+    if (gearman_failed(gearman_job_send_data(job, result, workload_size)))
+    {
+      return GEARMAN_ERROR;
+    }
+  }
+
   if (options.verbose)
   {
     std::cout << "Job=" << gearman_job_handle(job);

+ 23 - 11
libgearman-server/connection.c

@@ -49,7 +49,8 @@ gearman_server_con_st *gearman_server_con_add(gearman_server_thread_st *thread,
   return con;
 }
 
-static gearman_server_con_st * _server_con_create(gearman_server_thread_st *thread, gearmand_con_st *dcon,
+static gearman_server_con_st * _server_con_create(gearman_server_thread_st *thread,
+                                                  gearmand_con_st *dcon,
                                                   gearmand_error_t *ret)
 {
   gearman_server_con_st *con;
@@ -120,15 +121,23 @@ static gearman_server_con_st * _server_con_create(gearman_server_thread_st *thre
   con->protocol= NULL;
 
   int error;
-  if (! (error= pthread_mutex_lock(&thread->lock)))
+  if ((error= pthread_mutex_lock(&thread->lock)) == 0)
   {
     GEARMAN_LIST_ADD(thread->con, con,);
-    (void) pthread_mutex_unlock(&thread->lock);
+    if ((error= pthread_mutex_unlock(&thread->lock)))
+    {
+      errno= error;
+      gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "pthread_mutex_unlock(%d), programming error, please report", error);
+      gearman_server_con_free(con);
+
+      *ret= GEARMAN_ERRNO;
+      return NULL;
+    }
   }
   else
   {
-    errno= error;
-    gearmand_perror("pthread_mutex_lock");
+    assert(error);
+    gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "pthread_mutex_lock(%d), programming error, please report", error);
     gearman_server_con_free(con);
 
     *ret= GEARMAN_ERRNO;
@@ -232,27 +241,30 @@ void gearman_server_con_free(gearman_server_con_st *con)
   if (thread->free_con_count < GEARMAN_MAX_FREE_SERVER_CON)
   {
     GEARMAN_LIST_ADD(thread->free_con, con,)
+
+    con->is_cleaned_up = true;
+    return;
   }
-  else
-  {
-    gearmand_debug("free");
-    free(con);
-  }
-  con->is_cleaned_up = true;
+
+  gearmand_debug("free");
+  free(con);
 }
 
 gearmand_io_st *gearman_server_con_con(gearman_server_con_st *con)
 {
+  assert(con);
   return &con->con;
 }
 
 gearmand_con_st *gearman_server_con_data(gearman_server_con_st *con)
 {
+  assert(con);
   return gearman_io_context(&(con->con));
 }
 
 const char *gearman_server_con_id(gearman_server_con_st *con)
 {
+  assert(con);
   return con->id;
 }
 

+ 3 - 0
libgearman-server/connection.h

@@ -164,6 +164,9 @@ void gearman_server_con_delete_timeout(gearman_server_con_st *con);
 
 void gearman_server_con_protocol_release(gearman_server_con_st *con);
 
+gearman_server_con_st* build_gearman_server_con_st(void);
+void destroy_gearman_server_con_st(gearman_server_con_st* arg);
+
 /** @} */
 
 #ifdef __cplusplus

+ 10 - 0
libgearman-server/connection_plus.cc

@@ -39,6 +39,16 @@
 #include <libgearman-server/common.h>
 #include <libgearman-server/plugins/base.h>
 
+gearman_server_con_st* build_gearman_server_con_st(void)
+{
+  return new (std::nothrow) gearman_server_con_st;
+}
+
+void destroy_gearman_server_con_st(gearman_server_con_st* arg)
+{
+  delete arg;
+}
+
 void gearmand_connection_set_protocol(gearman_server_con_st *connection,
                                       gearmand::protocol::Context* arg)
 {

+ 7 - 1
libgearman-server/io.cc

@@ -188,7 +188,7 @@ static gearmand_error_t _connection_flush(gearman_server_con_st *con)
 
         if (write_size == 0) // detect infinite loop?
         {
-          gearmand_log_debug("send() sent zero bytes to peer %s:%s",
+          gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "send() sent zero bytes to peer %s:%s",
                              connection->context == NULL ? "-" : connection->context->host,
                              connection->context == NULL ? "-" : connection->context->port);
           continue;
@@ -226,6 +226,12 @@ static gearmand_error_t _connection_flush(gearman_server_con_st *con)
           return GEARMAN_ERRNO;
         }
 
+        gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "send() %u bytes to peer %s:%s %.*s",
+                           uint32_t(write_size),
+                           connection->context == NULL ? "-" : connection->context->host,
+                           connection->context == NULL ? "-" : connection->context->port,
+                           int32_t(write_size), connection->send_buffer_ptr);
+
         connection->send_buffer_size-= static_cast<size_t>(write_size);
         if (connection->send_state == gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA)
         {

+ 4 - 2
libgearman-server/job_plus.cc

@@ -192,7 +192,9 @@ gearman_server_job_st *gearman_server_job_take(gearman_server_con_st *server_con
 {
   for (gearman_server_worker_st *server_worker= server_con->worker_list; server_worker; server_worker= server_worker->con_next)
   {
-    gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Jobs available %lu", (unsigned long)(server_worker->function->job_count));
+    gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Jobs available for %.*s: %lu",
+                       (int)server_worker->function->function_name_size, server_worker->function->function_name,
+                       (unsigned long)(server_worker->function->job_count));
     if (server_worker->function->job_count)
     {
       if (server_worker == NULL)
@@ -274,7 +276,7 @@ void *_proc(void *data)
 {
   gearman_server_st *server= (gearman_server_st *)data;
 
-  gearmand_initialize_thread_logging("[ proc ]");
+  gearmand_initialize_thread_logging("[  proc ]");
 
   while (1)
   {

+ 2 - 2
libgearman-server/packet.cc

@@ -307,14 +307,14 @@ void gearmand_packet_free(gearmand_packet_st *packet)
 {
   if (packet->args != packet->args_buffer && packet->args != NULL)
   {
-    gearmand_debug("free");
+    gearmand_debug("free packet's args");
     free(packet->args);
     packet->args= NULL;
   }
 
   if (packet->options.free_data && packet->data != NULL)
   {
-    gearmand_debug("free");
+    gearmand_debug("free() packet's data");
     free((void *)packet->data); //@todo fix the need for the casting.
     packet->data= NULL;
   }

+ 10 - 3
libgearman-server/plugins/protocol/gear/protocol.cc

@@ -56,11 +56,11 @@ static gearmand_error_t gearmand_packet_unpack_header(gearmand_packet_st *packet
 {
   uint32_t tmp;
 
-  if (not memcmp(packet->args, "\0REQ", 4))
+  if (memcmp(packet->args, "\0REQ", 4) == 0)
   {
     packet->magic= GEARMAN_MAGIC_REQUEST;
   }
-  else if (not memcmp(packet->args, "\0RES", 4))
+  else if (memcmp(packet->args, "\0RES", 4) == 0)
   {
     packet->magic= GEARMAN_MAGIC_RESPONSE;
   }
@@ -181,7 +181,7 @@ public:
 
     while (packet->argc != gearman_command_info(packet->command)->argc)
     {
-      if (packet->argc != (gearman_command_info(packet->command)->argc - 1) ||
+      if (packet->argc != (gearman_command_info(packet->command)->argc - 1) or
           gearman_command_info(packet->command)->data)
       {
         uint8_t* ptr= (uint8_t *)memchr(((uint8_t *)data) +used_size, 0,
@@ -232,6 +232,12 @@ public:
               void *data, const size_t data_size,
               gearmand_error_t& ret_ptr)
   {
+
+    gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
+                       "Sending GEAR length:%"PRIu64" gearmand_command_t:%s",
+                       uint64_t(packet->data_size),
+                       gearman_strcommand(packet->command));
+
     if (packet->args_size == 0)
     {
       ret_ptr= GEARMAN_SUCCESS;
@@ -246,6 +252,7 @@ public:
 
     memcpy(data, packet->args, packet->args_size);
     ret_ptr= GEARMAN_SUCCESS;
+
     return packet->args_size;
   }
 

+ 114 - 61
libgearman-server/plugins/protocol/http/protocol.cc

@@ -75,6 +75,7 @@ public:
 
   HTTPtext() :
     _method(gearmand::protocol::httpd::TRACE),
+    _sent_header(false),
     _background(false),
     _keep_alive(false),
     _http_response(gearmand::protocol::httpd::HTTP_OK)
@@ -89,80 +90,128 @@ public:
     gearmand_debug("HTTP connection disconnected");
   }
 
-  size_t pack(const gearmand_packet_st *packet, gearman_server_con_st *connection,
-              void *data, const size_t data_size,
+  size_t pack(const gearmand_packet_st *packet,
+              gearman_server_con_st *connection,
+              void *send_buffer, const size_t send_buffer_size,
               gearmand_error_t& ret_ptr)
   {
-    if (packet->command != GEARMAN_COMMAND_WORK_COMPLETE and
-        packet->command != GEARMAN_COMMAND_WORK_FAIL and
-        packet->command != GEARMAN_COMMAND_ECHO_RES and
-        (background() == false or
-         packet->command != GEARMAN_COMMAND_JOB_CREATED))
+    switch (packet->command)
     {
-      gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
-                         "Sending HTTP told to ignore packet: gearmand_command_t:%s", 
-                         gearman_strcommand(packet->command));
-      ret_ptr= GEARMAN_IGNORE_PACKET;
-      return 0;
+    case GEARMAN_COMMAND_WORK_DATA:
+      {
+        for (const char *ptr= packet->data; ptr <= (packet->data +packet->data_size) -2; ptr++)
+        {
+          content.push_back(*ptr);
+        }
+
+        gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "HTTP gearmand_command_t: GEARMAN_COMMAND_WORK_DATA length:%"PRIu64, uint64_t(content.size()));
+        ret_ptr= GEARMAN_IGNORE_PACKET;
+        return 0;
+      }
+
+    default:
+      assert(0);
+    case GEARMAN_COMMAND_WORK_FAIL:
+    case GEARMAN_COMMAND_ECHO_RES:
+    case GEARMAN_COMMAND_WORK_COMPLETE:
+      break;
+
+    case GEARMAN_COMMAND_JOB_CREATED:
+      {
+        gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
+                           "Sending HTTP told to ignore packet: gearmand_command_t:%s", 
+                           gearman_strcommand(packet->command));
+        ret_ptr= GEARMAN_IGNORE_PACKET;
+        return 0;
+      }
     }
 
-    gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Sending HTTP response: Content-length:%"PRIu64" gearmand_command_t:%s response:%s", 
-                       uint64_t(packet->data_size), gearman_strcommand(packet->command),
+    gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
+                       "Sending HTTP response: Content-length:%"PRIu64" data_size:%"PRIu64" gearmand_command_t:%s response:%s", 
+                       uint64_t(content.size()),
+                       uint64_t(packet->data_size),
+                       gearman_strcommand(packet->command),
                        gearmand::protocol::httpd::response(response()));
 
-    size_t pack_size;
-    if (response() != gearmand::protocol::httpd::HTTP_OK)
+    size_t pack_size= 0;
+    if (_sent_header == false)
     {
-      pack_size= (size_t)snprintf((char *)data, data_size,
-                                  "HTTP/1.0 %u %s\r\n"
-                                  "Server: Gearman/" PACKAGE_VERSION "\r\n"
-                                  "Content-Length: 0\r\n"
-                                  "\r\n",
-                                  int(response()), gearmand::protocol::httpd::response(response()));
-    }
-    else if (method() == gearmand::protocol::httpd::HEAD)
-    {
-      pack_size= (size_t)snprintf((char *)data, data_size,
-                                  "HTTP/1.0 200 OK\r\n"
-                                  "X-Gearman-Job-Handle: %.*s\r\n"
-                                  "Content-Length: %"PRIu64"\r\n"
-                                  "Server: Gearman/" PACKAGE_VERSION "\r\n"
-                                  "\r\n",
-                                  packet->command == GEARMAN_COMMAND_JOB_CREATED ?  (int)packet->arg_size[0] : (int)packet->arg_size[0] - 1,
-                                  (const char *)packet->arg[0],
-                                  (uint64_t)packet->data_size);
-    }
-    else if (method() == gearmand::protocol::httpd::TRACE)
-    {
-      pack_size= (size_t)snprintf((char *)data, data_size,
-                                  "HTTP/1.0 200 OK\r\n"
-                                  "Server: Gearman/" PACKAGE_VERSION "\r\n"
-                                  "Connection: close\r\n"
-                                  "Content-Type: message/http\r\n"
-                                  "\r\n");
-    }
-    else
-    {
-      pack_size= (size_t)snprintf((char *)data, data_size,
-                                  "HTTP/1.0 200 OK\r\n"
-                                  "X-Gearman-Job-Handle: %.*s\r\n"
-                                  "X-Gearman-Command: %s\r\n"
-                                  "Content-Length: %"PRIu64"\r\n"
-                                  "Server: Gearman/" PACKAGE_VERSION "\r\n"
-                                  "\r\n",
-                                  packet->command == GEARMAN_COMMAND_JOB_CREATED ?  int(packet->arg_size[0]) : int(packet->arg_size[0] - 1), (const char *)packet->arg[0],
-                                  gearman_strcommand(packet->command),
-                                  uint64_t(packet->data_size));
+      if (response() != gearmand::protocol::httpd::HTTP_OK)
+      {
+        pack_size= (size_t)snprintf((char *)send_buffer, send_buffer_size,
+                                    "HTTP/1.0 %u %s\r\n"
+                                    "Server: Gearman/" PACKAGE_VERSION "\r\n"
+                                    "Content-Length: 0\r\n"
+                                    "\r\n",
+                                    int(response()), gearmand::protocol::httpd::response(response()));
+      }
+      else if (method() == gearmand::protocol::httpd::HEAD)
+      {
+        pack_size= (size_t)snprintf((char *)send_buffer, send_buffer_size,
+                                    "HTTP/1.0 200 OK\r\n"
+                                    "X-Gearman-Job-Handle: %.*s\r\n"
+                                    "Content-Length: %"PRIu64"\r\n"
+                                    "Server: Gearman/" PACKAGE_VERSION "\r\n"
+                                    "\r\n",
+                                    packet->command == GEARMAN_COMMAND_JOB_CREATED ?  (int)packet->arg_size[0] : (int)packet->arg_size[0] - 1,
+                                    (const char *)packet->arg[0],
+                                    (uint64_t)packet->data_size);
+      }
+      else if (method() == gearmand::protocol::httpd::TRACE)
+      {
+        pack_size= (size_t)snprintf((char *)send_buffer, send_buffer_size,
+                                    "HTTP/1.0 200 OK\r\n"
+                                    "Server: Gearman/" PACKAGE_VERSION "\r\n"
+                                    "Connection: close\r\n"
+                                    "Content-Type: message/http\r\n"
+                                    "\r\n");
+      }
+      else if (method() == gearmand::protocol::httpd::POST)
+      {
+        pack_size= (size_t)snprintf((char *)send_buffer, send_buffer_size,
+                                    "HTTP/1.0 200 OK\r\n"
+                                    "X-Gearman-Job-Handle: %.*s\r\n"
+                                    "X-Gearman-Command: %s\r\n"
+                                    "Content-Length: %"PRIu64"\r\n"
+                                    "Server: Gearman/" PACKAGE_VERSION "\r\n"
+                                    "\r\n%.*s",
+                                    packet->command == GEARMAN_COMMAND_JOB_CREATED ?  int(packet->arg_size[0]) : int(packet->arg_size[0] - 1),
+                                    (const char *)packet->arg[0],
+                                    gearman_strcommand(packet->command),
+                                    uint64_t(content.size()),
+                                    int(content.size()), &content[0]);
+      }
+      else
+      {
+        pack_size= (size_t)snprintf((char *)send_buffer, send_buffer_size,
+                                    "HTTP/1.0 200 OK\r\n"
+                                    "X-Gearman-Job-Handle: %.*s\r\n"
+                                    "X-Gearman-Command: %s\r\n"
+                                    "Content-Length: %"PRIu64"\r\n"
+                                    "Server: Gearman/" PACKAGE_VERSION "\r\n"
+                                    "\r\n",
+                                    packet->command == GEARMAN_COMMAND_JOB_CREATED ?  int(packet->arg_size[0]) : int(packet->arg_size[0] - 1),
+                                    (const char *)packet->arg[0],
+                                    gearman_strcommand(packet->command),
+                                    uint64_t(content.size()));
+      }
+
+      _sent_header= true;
     }
 
-    if (pack_size > data_size)
+    if (pack_size > send_buffer_size)
     {
       gearmand_debug("Sending HTTP had to flush");
       ret_ptr= GEARMAN_FLUSH_DATA;
       return 0;
     }
 
+    memcpy(send_buffer, &content[0], content.size());
+    pack_size+= content.size();
+
+#if 0
     if (keep_alive() == false)
+#endif
     {
       gearman_io_set_option(&connection->con, GEARMAND_CON_CLOSE_AFTER_FLUSH, true);
     }
@@ -242,7 +291,7 @@ public:
       }
     }
 
-    gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "METHOD: %s", str_method(method()));
+    gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "HTTP METHOD: %s", str_method(method()));
 
     while (*uri == ' ')
     {
@@ -265,7 +314,7 @@ public:
     }
 
     ptrdiff_t uri_size= version -uri;
-    gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "URI: %.*s", (int)uri_size, uri);
+    gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "HTTP URI: \"%.*s\"", (int)uri_size, uri);
     switch (method())
     {
     case gearmand::protocol::httpd::POST:
@@ -288,13 +337,13 @@ public:
     }
 
     size_t version_size= request_size - size_t(version - request);
-    if (false and version_size == 8 and 
+    if (version_size == 8 and 
         strncmp(version, "HTTP/1.1", 8) == 0)
     {
       set_keep_alive(true);
     }
     else if (version_size == 8 and 
-             strncmp(version, "HTTP/1.1", 8) == 0)
+             strncmp(version, "HTTP/1.0", 8) == 0)
     { }
     else
     {
@@ -499,8 +548,10 @@ public:
 
   void reset()
   {
+    _sent_header= false;
     _background= false;
     _keep_alive= false;
+    content.clear();
     _method= gearmand::protocol::httpd::TRACE;
     _http_response= gearmand::protocol::httpd::HTTP_OK;
   }
@@ -530,10 +581,12 @@ public:
 
 private:
   gearmand::protocol::httpd::method_t _method;
+  bool _sent_header;
   bool _background;
   bool _keep_alive;
   std::string global_port;
   gearmand::protocol::httpd::response_t _http_response;
+  std::vector<char> content;
 };
 
 static gearmand_error_t _http_con_add(gearman_server_con_st *connection)

+ 5 - 2
libgearman-server/server.c

@@ -207,7 +207,7 @@ gearmand_error_t gearman_server_run_command(gearman_server_con_st *server_con,
       }
 
       gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
-                         "Received submission, %.*s/%.*s with %d arguments",
+                         "Received submission, function:%.*s unique:%.*s with %d arguments",
                          packet->arg_size[0], packet->arg[0],
                          packet->arg_size[1], packet->arg[1],
                          (int)packet->argc);
@@ -216,7 +216,7 @@ gearmand_error_t gearman_server_run_command(gearman_server_con_st *server_con,
       {
         sscanf((char *)packet->arg[2], "%lld", (long long *)&when);
         gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, 
-                           "Received EPOCH job submission, %.*s/%.*s, with data for %jd at %jd, args %d",
+                           "Received EPOCH job submission, function:%.*s unique:%.*s with data for %jd at %jd, args %d",
                            packet->arg_size[0], packet->arg[0],
                            packet->arg_size[1], packet->arg[1],
                            when, time(NULL),
@@ -453,6 +453,7 @@ gearmand_error_t gearman_server_run_command(gearman_server_con_st *server_con,
 
   /* Worker requests. */
   case GEARMAN_COMMAND_CAN_DO:
+    gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Registering function: %.*s", packet->arg_size[0], packet->arg[0]);
     if (gearman_server_worker_add(server_con, (char *)(packet->arg[0]),
                                   packet->arg_size[0], 0) == NULL)
     {
@@ -462,6 +463,7 @@ gearmand_error_t gearman_server_run_command(gearman_server_con_st *server_con,
     break;
 
   case GEARMAN_COMMAND_CAN_DO_TIMEOUT:
+    gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Registering function: %.*s with timeout", packet->arg_size[0], packet->arg[0]);
     if (gearman_server_worker_add(server_con, (char *)(packet->arg[0]),
                                   packet->arg_size[0] - 1,
                                   0) == NULL)
@@ -472,6 +474,7 @@ gearmand_error_t gearman_server_run_command(gearman_server_con_st *server_con,
     break;
 
   case GEARMAN_COMMAND_CANT_DO:
+    gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Removing function: %.*s with timeout", packet->arg_size[0], packet->arg[0]);
     gearman_server_con_free_worker(server_con, (char *)(packet->arg[0]),
                                    packet->arg_size[0]);
     break;

Some files were not shown because too many files changed in this diff