Browse Source

More cleanup from ssl testing.

Brian Aker 11 years ago
parent
commit
0e5b92b0a3

+ 2 - 1
libgearman-1.0/constants.h

@@ -133,7 +133,8 @@ typedef enum
   GEARMAN_WORKER_TIMEOUT_RETURN=   (1 << 8),
   GEARMAN_WORKER_GRAB_ALL=         (1 << 9),
   GEARMAN_WORKER_SSL=              (1 << 10),
-  GEARMAN_WORKER_MAX=   (1 << 11)
+  GEARMAN_WORKER_IDENTIFIER=       (1 << 11),
+  GEARMAN_WORKER_MAX=   (1 << 12)
 } gearman_worker_options_t;
 
 /* Types. */

+ 12 - 9
libgearman-server/connection.cc

@@ -47,7 +47,8 @@
 
 #include <string.h>
 #include <errno.h>
-#include <assert.h>
+#include <cassert>
+#include <algorithm> 
 
 static gearman_server_con_st * _server_con_create(gearman_server_thread_st *thread, gearmand_con_st *dcon,
                                                   gearmand_error_t& ret);
@@ -313,16 +314,18 @@ const char *gearman_server_con_id(gearman_server_con_st *con)
   return con->id;
 }
 
-void gearman_server_con_set_id(gearman_server_con_st *con, char *id,
-                               size_t size)
+void gearman_server_con_set_id(gearman_server_con_st *con,
+                               const char *id,
+                               const size_t size)
 {
-  if (size >= GEARMAND_SERVER_CON_ID_SIZE)
-  {
-    size= GEARMAND_SERVER_CON_ID_SIZE - 1;
-  }
+  size_t min_size= std::min(size, size_t(GEARMAND_SERVER_CON_ID_SIZE -1));
+
+  memcpy(con->id, id, min_size);
+  con->id[min_size]= 0;
 
-  memcpy(con->id, id, size);
-  con->id[size]= 0;
+  gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
+                     "identifier set to %.*s", 
+                     min_size, con->id);
 }
 
 void gearman_server_con_free_worker(gearman_server_con_st *con,

+ 2 - 2
libgearman-server/connection.h

@@ -111,8 +111,8 @@ const char *gearman_server_con_id(gearman_server_con_st *con);
  * Set client id.
  */
 GEARMAN_API
-void gearman_server_con_set_id(gearman_server_con_st *con, char *id,
-                               size_t size);
+void gearman_server_con_set_id(gearman_server_con_st *con, const char *id,
+                               const size_t size);
 
 /**
  * Free server worker struction with name for a server connection.

+ 11 - 17
libgearman-server/gearmand.cc

@@ -861,15 +861,7 @@ static void _listen_event(int event_fd, short events __attribute__ ((unused)), v
     Gearmand()->ret= gearmand_perror(local_error, "accept");
     return;
   }
-  gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "accept() %d", fd);
-
-  {
-    int flags= 1;
-    if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags)) == -1)
-    {
-      gearmand_perror(errno, "setsockopt(SO_KEEPALIVE)");
-    }
-  }
+  gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "accept() fd:%d", fd);
 
   /* 
     Since this is numeric, it should never fail. Even if it did we don't want to really error from it.
@@ -887,6 +879,14 @@ static void _listen_event(int event_fd, short events __attribute__ ((unused)), v
 
   gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Accepted connection from %s:%s", host, port_str);
 
+  {
+    int flags= 1;
+    if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags)) == -1)
+    {
+      gearmand_log_perror(GEARMAN_DEFAULT_LOG_PARAM, errno, "%s:%s setsockopt(SO_KEEPALIVE)", host, port_str);
+    }
+  }
+
   gearmand_error_t ret= gearmand_con_create(Gearmand(), fd, host, port_str, port);
   if (ret == GEARMAND_MEMORY_ALLOCATION_FAILURE)
   {
@@ -915,15 +915,9 @@ static gearmand_error_t _wakeup_init(gearmand_st *gearmand)
     return gearmand_fatal_perror(errno, "pipe(gearmand->wakeup_fd)");
   }
 
-  int returned_flags;
-  if ((returned_flags= fcntl(gearmand->wakeup_fd[0], F_GETFL, 0)) == -1)
-  {
-    return gearmand_fatal_perror(errno, "fcntl:F_GETFL");
-  }
-
-  if (fcntl(gearmand->wakeup_fd[0], F_SETFL, returned_flags | O_NONBLOCK) == -1)
+  if ((local_ret= gearmand_sockfd_nonblock(gearmand->wakeup_fd[0])))
   {
-    return gearmand_fatal_perror(errno, "fcntl(F_SETFL)");
+    return local_ret;
   }
 #endif
 

+ 3 - 9
libgearman-server/gearmand_thread.cc

@@ -505,16 +505,10 @@ static gearmand_error_t _wakeup_init(gearmand_thread_st *thread)
     return gearmand_perror(errno, "pipe");
   }
 
-  int ret= fcntl(thread->wakeup_fd[0], F_GETFL, 0);
-  if (ret == -1)
+  gearmand_error_t local_ret;
+  if ((local_ret= gearmand_sockfd_nonblock(gearmand->wakeup_fd[0])))
   {
-    return gearmand_perror(errno, "fcntl(F_GETFL)");
-  }
-
-  ret= fcntl(thread->wakeup_fd[0], F_SETFL, ret | O_NONBLOCK);
-  if (ret == -1)
-  {
-    return gearmand_perror(errno, "fcntl(F_SETFL)");
+    return local_ret;
   }
 #endif
 

+ 38 - 42
libgearman-server/io.cc

@@ -146,10 +146,7 @@ static size_t _connection_read(gearman_server_con_st *con, void *data, size_t da
     if (read_size == 0)
     {
       ret= GEARMAND_LOST_CONNECTION;
-      gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, 
-                        "Peer connection has called close() %s:%s",
-                        connection->host(),
-                        connection->port());
+      gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Peer connection has called close()");
       _connection_close(connection);
       return 0;
     }
@@ -177,10 +174,7 @@ static size_t _connection_read(gearman_server_con_st *con, void *data, size_t da
       case EHOSTDOWN:
         {
           ret= GEARMAND_LOST_CONNECTION;
-          gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, 
-                            "Peer connection has called close() %s:%s",
-                            connection->host(),
-                            connection->port());
+          gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Peer connection has called close()");
           _connection_close(connection);
           return 0;
         }
@@ -300,9 +294,7 @@ static gearmand_error_t _connection_flush(gearman_server_con_st *con)
                   char errorString[80];
                   CyaSSL_ERR_error_string(err, errorString);
                   _connection_close(connection);
-                  return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_LOST_CONNECTION, "%s:%s SSL failure(%s)",
-                                             connection->host(),
-                                             connection->port(),
+                  return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_LOST_CONNECTION, "SSL failure(%s)",
                                              errorString);
                 }
             }
@@ -317,10 +309,8 @@ static gearmand_error_t _connection_flush(gearman_server_con_st *con)
         if (write_size == 0) // detect infinite loop?
         {
           ++loop_counter;
-          gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "send() sent zero bytes of %u to peer %s:%s",
-                             uint32_t(connection->send_buffer_size),
-                             connection->host(),
-                             connection->port());
+          gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "send() sent zero bytes of %u",
+                             uint32_t(connection->send_buffer_size));
 
           if (loop_counter > 5)
           {
@@ -366,10 +356,8 @@ static gearmand_error_t _connection_flush(gearman_server_con_st *con)
           return GEARMAND_ERRNO;
         }
 
-        gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "send() %u bytes to peer %s:%s",
-                           uint32_t(write_size),
-                           connection->host(),
-                           connection->port());
+        gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "send() %u bytes to peer",
+                           uint32_t(write_size));
 
         connection->send_buffer_size-= static_cast<size_t>(write_size);
         if (connection->send_state == gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA)
@@ -775,9 +763,7 @@ gearmand_error_t gearman_io_recv(gearman_server_con_st *con, bool recv_data)
         }
         return ret;
       }
-      gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "%s:%s read %lu bytes",
-                         connection->host(),
-                         connection->port(),
+      gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "read %lu bytes",
                          (unsigned long)recv_size);
 
       connection->recv_buffer_size+= recv_size;
@@ -901,7 +887,6 @@ gearmand_error_t gearmand_io_set_revents(gearman_server_con_st *con, short reven
 
 static gearmand_error_t _io_setsockopt(gearmand_io_st &connection)
 {
-  gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "setsockopt() fd:%d", connection.fd);
   {
     int setting= 1;
     if (setsockopt(connection.fd, IPPROTO_TCP, TCP_NODELAY, &setting, (socklen_t)sizeof(int)) and errno != EOPNOTSUPP)
@@ -969,28 +954,39 @@ static gearmand_error_t _io_setsockopt(gearmand_io_st &connection)
 
   if (SOCK_NONBLOCK == 0)
   {
-    int flags;
-    do
-    {
-      flags= fcntl(connection.fd, F_GETFL, 0);
-    } while (flags == -1 and (errno == EINTR or errno == EAGAIN));
-
-    if (flags == -1)
+    gearmand_error_t local_ret;
+    if ((local_ret= gearmand_sockfd_nonblock(connection.fd)))
     {
-      return gearmand_perror(errno, "fcntl(F_GETFL)");
+      return local_ret;
     }
-    else if ((flags & O_NONBLOCK) == 0)
+  }
+
+  return GEARMAND_SUCCESS;
+}
+
+gearmand_error_t gearmand_sockfd_nonblock(const int& sockfd)
+{
+  int flags;
+  do
+  {
+    flags= fcntl(sockfd, F_GETFL, 0);
+  } while (flags == -1 and (errno == EINTR or errno == EAGAIN));
+
+  if (flags == -1)
+  {
+    return gearmand_perror(errno, "fcntl(F_GETFL)");
+  }
+  else if ((flags & O_NONBLOCK) == 0)
+  {
+    int retval;
+    do
     {
-      int retval;
-      do
-      {
-        retval= fcntl(connection.fd, F_SETFL, flags | O_NONBLOCK);
-      } while (retval == -1 and (errno == EINTR or errno == EAGAIN));
+      retval= fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
+    } while (retval == -1 and (errno == EINTR or errno == EAGAIN));
 
-      if (retval == -1)
-      {
-        return gearmand_perror(errno, "fcntl(F_SETFL)");
-      }
+    if (retval == -1)
+    {
+      return gearmand_perror(errno, "fcntl(F_SETFL)");
     }
   }
 
@@ -1016,7 +1012,7 @@ void gearmand_sockfd_close(int& sockfd)
   }
   else
   {
-    gearmand_warning("gearmand_sockfd_close() called with an invalid socket");
+    gearmand_debug("gearmand_sockfd_close() called with an invalid socket, this was probably ok");
   }
 }
 

+ 4 - 2
libgearman-server/io.h

@@ -131,9 +131,11 @@ gearmand_error_t gearmand_io_set_events(gearman_server_con_st *connection, short
  */
 gearmand_error_t gearmand_io_set_revents(gearman_server_con_st *connection, short revents);
 
-void gearmand_sockfd_close(int& sockfd);
+void gearmand_sockfd_close(int&);
 
-void gearmand_pipe_close(int& sockfd);
+void gearmand_pipe_close(int&);
+
+gearmand_error_t gearmand_sockfd_nonblock(const int&);
 
 /** @} */
 

+ 26 - 3
libgearman-server/plugins/protocol/gear/protocol.cc

@@ -47,6 +47,7 @@
 #include <libgearman-server/common.h>
 #include <libgearman/strcommand.h>
 #include <libgearman-server/packet.h>
+#include "libgearman/strcommand.h"
 
 #include <cstdio>
 #include <cstdlib>
@@ -103,9 +104,9 @@ public:
     return false;
   }
 
-  void notify(gearman_server_con_st *)
+  void notify(gearman_server_con_st* connection)
   {
-    gearmand_info("Gear connection disconnected");
+    gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Gear connection disconnected: %s:%s", connection->host(), connection->port());
   }
 
   size_t unpack(gearmand_packet_st *packet,
@@ -257,6 +258,24 @@ public:
                          int(packet->arg_size[0]),
                          packet->arg[0]);
     }
+    else if (packet->command == GEARMAN_COMMAND_WORK_EXCEPTION)
+    {
+      gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
+                         "GEAR gearmand_command_t: %s handle: %.*s exception: %.*s",
+                         gearman_strcommand(packet->command),
+                         int(packet->arg_size[0]),
+                         packet->arg[0],
+                         int(packet->data_size),
+                         packet->data);
+    }
+    else if (packet->command == GEARMAN_COMMAND_SET_CLIENT_ID)
+    {
+      gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
+                         "GEAR gearmand_command_t: %s identifier: %.*s",
+                         gearman_strcommand(packet->command),
+                         int(packet->arg_size[0]),
+                         packet->arg[0]);
+    }
     else
     {
       gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
@@ -353,9 +372,13 @@ static gearmand_error_t _gear_con_add(gearman_server_con_st *connection)
                                    cyassl_error_buffer, cyassl_error);
       }
     }
-    gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "GearSSL connection made: %d", connection->con.fd);
+    gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "GearSSL connection made: %s:%s", connection->host(), connection->port());
   }
+  else
 #endif
+  {
+    gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Gear connection made: %s:%s", connection->host(), connection->port());
+  }
 
   connection->set_protocol(&gear_context);
 

+ 5 - 2
libgearman-server/server.cc

@@ -545,7 +545,7 @@ gearmand_error_t gearman_server_run_command(gearman_server_con_st *server_con,
       long timeout= strtol(strtol_buffer, &endptr, 10);
       if (timeout == LONG_MIN or timeout == LONG_MAX or errno != 0)
       {
-        return gearmand_log_perror(GEARMAN_DEFAULT_LOG_PARAM, errno, "GEARMAN_COMMAND_CAN_DO_TIMEOUT:strtol");
+        return gearmand_log_perror(GEARMAN_DEFAULT_LOG_PARAM, errno, "GEARMAN_COMMAND_CAN_DO_TIMEOUT:strtol: %s", strtol_buffer);
       }
 
       gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Registering function: %.*s with timeout %dl",
@@ -1042,7 +1042,10 @@ _server_queue_work_data(gearman_server_job_st *server_job,
     }
     else if (command == GEARMAN_COMMAND_WORK_EXCEPTION)
     {
-      gearmand_debug("Sending GEARMAN_COMMAND_WORK_EXCEPTION");
+      gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
+                         "%s:%s GEARMAN_COMMAND_WORK_EXCEPTION: %.*s",
+                         server_client->con->host(), server_client->con->port(),
+                         int(packet->data_size), packet->data);
     }
 
     uint8_t *data;

+ 5 - 8
libgearman-server/thread.cc

@@ -45,6 +45,7 @@
 #include "libgearman-server/common.h"
 
 #include <libgearman/command.h>
+#include "libgearman/strcommand.h"
 
 #ifdef __cplusplus
 # include <cassert>
@@ -308,10 +309,8 @@ static gearmand_error_t _thread_packet_read(gearman_server_con_st *con)
     }
 
     gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
-                       "Received %s %s:%u",
-                       gearmand_strcommand(&con->packet->packet),
-                       con->_host == NULL ? "-" : con->_host,
-                       con->_port == NULL ? "-" : con->_port);
+                       "Received %s",
+                       gearmand_strcommand(&con->packet->packet));
 
     /* We read a complete packet. */
     if (Server->flags.threaded)
@@ -355,10 +354,8 @@ static gearmand_error_t _thread_packet_flush(gearman_server_con_st *con)
     }
 
     gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, 
-                       "Sent %s to %s:%d",
-                       gearman_command_info(con->io_packet_list->packet.command)->name,
-                       con->_host == NULL ? "-" : con->_host,
-                       con->_port == NULL ? "-" : con->_port);
+                       "Sent %s",
+                       gearman_strcommand(con->io_packet_list->packet.command));
 
     gearman_server_io_packet_remove(con);
   }

Some files were not shown because too many files changed in this diff