Browse Source

We now use accept4() if it is available.

Brian Aker 12 years ago
parent
commit
fa4d14ed1c

+ 26 - 15
benchmark/blobslap_client.cc

@@ -74,16 +74,16 @@ int main(int argc, char *argv[])
   size_t min_size= BLOBSLAP_DEFAULT_BLOB_MIN_SIZE;
   size_t max_size= BLOBSLAP_DEFAULT_BLOB_MAX_SIZE;
   unsigned long int count= 1;
-  gearman_client_st client;
+  gearman_client_st master_client;
   bool shutdown_worker= false;
 
-  if (not gearman_client_create(&client))
+  if (gearman_client_create(&master_client) == NULL)
   {
     std::cerr << "Failed to allocate memory for client" << std::endl;
     return EXIT_FAILURE;
   }
 
-  gearman_client_add_options(&client, GEARMAN_CLIENT_UNBUFFERED_RESULT);
+  gearman_client_add_options(&master_client, GEARMAN_CLIENT_UNBUFFERED_RESULT);
 
   while ((c= getopt(argc, argv, "bc:f:h:m:M:n:p:s:ve?")) != -1)
   {
@@ -103,9 +103,9 @@ int main(int argc, char *argv[])
 
     case 'h':
       {
-        if (gearman_failed(gearman_client_add_server(&client, host, port)))
+        if (gearman_failed(gearman_client_add_server(&master_client, host, port)))
         {
-          std::cerr << "Failed while adding server " << host << ":" << port << " :" << gearman_client_error(&client) << std::endl;
+          std::cerr << "Failed while adding server " << host << ":" << port << " :" << gearman_client_error(&master_client) << std::endl;
           exit(EXIT_FAILURE);
         }
       }
@@ -140,23 +140,23 @@ int main(int argc, char *argv[])
       break;
 
     case '?':
-      gearman_client_free(&client);
+      gearman_client_free(&master_client);
       _usage(argv[0]);
       exit(EXIT_SUCCESS);
       break;
 
     default:
-      gearman_client_free(&client);
+      gearman_client_free(&master_client);
       _usage(argv[0]);
       exit(EXIT_FAILURE);
     }
   }
 
-  if (not host)
+  if (host == NULL)
   {
-    if (gearman_failed(gearman_client_add_server(&client, NULL, port)))
+    if (gearman_failed(gearman_client_add_server(&master_client, NULL, port)))
     {
-      std::cerr << "Failing to add localhost:" << port << " :" << gearman_client_error(&client) << std::endl;
+      std::cerr << "Failing to add localhost:" << port << " :" << gearman_client_error(&master_client) << std::endl;
       exit(EXIT_FAILURE);
     }
   }
@@ -191,6 +191,13 @@ int main(int argc, char *argv[])
   bool error= false;
   do
   {
+    gearman_client_st client;
+    if (gearman_client_clone(&client, &master_client) == NULL)
+    {
+      std::cerr << "Failed to allocate client clone" << std::endl;
+      exit(EXIT_FAILURE);
+    }
+
     for (uint32_t x= 0; x < num_tasks; x++)
     {
       size_t blob_size;
@@ -208,7 +215,7 @@ int main(int argc, char *argv[])
 
         blob_size= (blob_size % (max_size - min_size)) + min_size;
       }
-      
+
       const char *blob_ptr= blob_size ? blob : NULL;
 
       gearman_return_t ret;
@@ -239,8 +246,8 @@ int main(int argc, char *argv[])
           std::cerr << "Task #" << x << " failed during gearman_client_add_task(" << gearman_strerror(ret) << " -> " << gearman_client_error(&client) << std::endl ;
         }
 
-	error= true;
-	goto exit_immediatly;
+        error= true;
+        goto exit_immediatly;
       }
     }
 
@@ -280,20 +287,24 @@ int main(int argc, char *argv[])
     }
 
     count--;
+
+    gearman_client_free(&client);
   } while (count or error);
 
 exit_immediatly:
   if (shutdown_worker)
   {
-    gearman_client_do(&client, "shutdown", 0, 0, 0, 0, 0);
+    gearman_client_do(&master_client, "shutdown", 0, 0, 0, 0, 0);
   }
 
   delete [] blob;
   delete [] tasks;
-  gearman_client_free(&client);
+  gearman_client_free(&master_client);
 
   if (benchmark.verbose)
+  {
     std::cout << "Successfully completed all tasks" << std::endl;
+  }
 
   return error ? EXIT_FAILURE : 0;
 }

+ 1 - 0
configure.ac

@@ -166,6 +166,7 @@ AC_FUNC_MEMCMP
 AC_FUNC_STRERROR_R
 AC_FUNC_VPRINTF
 AC_REPLACE_FNMATCH
+AC_CHECK_FUNCS([accept4])
 AC_CHECK_FUNCS([dup2])
 AC_CHECK_FUNCS([gettimeofday])
 AC_CHECK_FUNCS([memchr])

+ 23 - 12
libgearman-server/gearmand.cc

@@ -62,6 +62,10 @@
 
 using namespace gearmand;
 
+#ifndef SOCK_NONBLOCK 
+# define SOCK_NONBLOCK 0
+#endif
+
 /*
  * Private declarations
  */
@@ -791,29 +795,36 @@ static void _listen_event(int event_fd, short events __attribute__ ((unused)), v
   struct sockaddr sa;
 
   socklen_t sa_len= sizeof(sa);
-  int fd= accept(event_fd, &sa, &sa_len);
+  int fd= -1;
+#if defined(HAVE_ACCEPT4) && HAVE_ACCEPT4
+  fd= accept4(event_fd, &sa, &sa_len, 0); //  SOCK_NONBLOCK);
+#endif
+
+  if (fd == -1)
+  {
+    fd= accept(event_fd, &sa, &sa_len);
+  }
+
   if (fd == -1)
   {
     int local_error= errno;
 
-    if (local_error == EINTR)
+    switch (local_error)
     {
+    case EINTR:
       return;
-    }
-    else if (local_error == ECONNABORTED)
-    {
-      gearmand_perror(local_error, "accept");
-      return;
-    }
-    else if (local_error == EMFILE)
-    {
+
+    case ECONNABORTED:
+    case EMFILE:
       gearmand_perror(local_error, "accept");
       return;
+
+    default:
+      break;
     }
 
     _clear_events(Gearmand());
-    gearmand_perror(local_error, "accept");
-    Gearmand()->ret= GEARMAN_ERRNO;
+    Gearmand()->ret= gearmand_perror(local_error, "accept");
     return;
   }
 

+ 1 - 1
libgearman-server/gearmand_con.c

@@ -298,7 +298,7 @@ void gearmand_con_check_queue(gearmand_thread_st *thread)
       gearmand_error_t rc;
       if ((rc= _con_add(thread, dcon)) != GEARMAN_SUCCESS)
       {
-	gearmand_gerror("_con_add() has failed, please report any crashes that occur immediatly after this.", rc);
+        gearmand_gerror("_con_add() has failed, please report any crashes that occur immediatly after this.", rc);
         gearmand_con_free(dcon);
       }
     }

+ 1 - 0
libgearman-server/plugins/protocol/gear/protocol.cc

@@ -109,6 +109,7 @@ public:
                 gearmand_error_t& ret_ptr)
   {
     size_t used_size;
+    gearmand_info("Gear unpack");
 
     if (packet->args_size == 0)
     {

+ 52 - 8
libhostile/accept.c

@@ -50,12 +50,16 @@
 
 static int not_until= 500;
 
-static struct function_st __function;
+static struct function_st __function_accept;
+static struct function_st __function_accept4;
 
 static pthread_once_t function_lookup_once = PTHREAD_ONCE_INIT;
 static void set_local(void)
 {
-  __function= set_function("accept", "HOSTILE_ACCEPT");
+  __function_accept= set_function("accept", "HOSTILE_ACCEPT");
+#if defined(HAVE_ACCEPT4) && HAVE_ACCEPT4
+  __function_accept4= set_function("accept4", "HOSTILE_ACCEPT4");
+#endif
 }
 
 bool libhostile_is_accept(void)
@@ -64,7 +68,7 @@ bool libhostile_is_accept(void)
 
   (void) pthread_once(&function_lookup_once, set_local);
 
-  if (__function.frequency)
+  if (__function_accept.frequency)
   {
     return true;
   }
@@ -76,12 +80,12 @@ void set_accept_close(bool arg, int frequency, int not_until_arg)
 {
   if (arg)
   {
-    __function.frequency= frequency;
+    __function_accept.frequency= frequency;
     not_until= not_until_arg;
   }
   else
   {
-    __function.frequency= 0;
+    __function_accept.frequency= 0;
     not_until= 0;
   }
 }
@@ -95,9 +99,9 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)
 
   if (is_called() == false)
   {
-    if (__function.frequency)
+    if (__function_accept.frequency)
     {
-      if (--not_until < 0 && rand() % __function.frequency)
+      if (--not_until < 0 && rand() % __function_accept.frequency)
       {
         if (rand() % 1)
         {
@@ -118,8 +122,48 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)
   }
 
   set_called();
-  int ret= __function.function.accept(sockfd, addr, addrlen);
+  int ret= __function_accept.function.accept(sockfd, addr, addrlen);
   reset_called();
 
   return ret;
 }
+
+#if defined(HAVE_ACCEPT4) && HAVE_ACCEPT4
+int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)
+{
+
+  hostile_initialize();
+
+  (void) pthread_once(&function_lookup_once, set_local);
+
+  if (is_called() == false)
+  {
+    if (__function_accept4.frequency)
+    {
+      if (--not_until < 0 && rand() % __function_accept4.frequency)
+      {
+        if (rand() % 1)
+        {
+          shutdown(sockfd, SHUT_RDWR);
+          close(sockfd);
+          errno= ECONNABORTED;
+          return -1;
+        }
+        else
+        {
+          shutdown(sockfd, SHUT_RDWR);
+          close(sockfd);
+          errno= EMFILE;
+          return -1;
+        }
+      }
+    }
+  }
+
+  set_called();
+  int ret= __function_accept4.function.accept4(sockfd, addr, addrlen, flags);
+  reset_called();
+
+  return ret;
+}
+#endif

+ 1 - 0
libhostile/accept.h

@@ -40,3 +40,4 @@
 #include <sys/socket.h>
 
 typedef int (accept_fn)(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
+typedef int (accept4_fn)(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);

+ 1 - 0
libhostile/function.h

@@ -51,6 +51,7 @@
 
 union function_un {
   accept_fn *accept;
+  accept4_fn *accept4;
   connect_fn *connect;
   getaddrinfo_fn *getaddrinfo;
   malloc_fn *malloc;

+ 2 - 0
libhostile/pipe.c

@@ -57,7 +57,9 @@ static pthread_once_t function_lookup_once= PTHREAD_ONCE_INIT;
 static void set_local(void)
 {
   __function_pipe= set_function("pipe", "HOSTILE_PIPE");
+#if defined(HAVE_PIPE2) && HAVE_PIPE2
   __function_pipe2= set_function("pipe2", "HOSTILE_PIPE2");
+#endif
 }
 
 int pipe(int pipefd_arg[2])

+ 12 - 17
tests/hostile.cc

@@ -298,6 +298,12 @@ static test_return_t worker_ramp_10K_TEST(void *)
   return worker_ramp_exec(1024*10);
 }
 
+static test_return_t skip_SETUP(void*)
+{
+  SKIP_IF(true);
+  return TEST_SUCCESS;
+}
+
 static test_return_t worker_ramp_SETUP(void *object)
 {
   test_skip_valgrind();
@@ -494,23 +500,12 @@ static test_return_t connect_TEARDOWN(void* object)
 
 /*********************** World functions **************************************/
 
-static void *world_create(server_startup_st& servers, test_return_t& error)
+static void *world_create(server_startup_st& servers, test_return_t&)
 {
-  if (has_hostile())
-  {
-    hostile_server= libtest::get_free_port();
-    if (server_startup(servers, SERVER_TARGET, hostile_server, 0, NULL) == false)
-    {
-      hostile_server= 0;
-      error= TEST_FAILURE;
-      return NULL;
-    }
-  }
-  else
-  {
-    error= TEST_SKIPPED;
-    return NULL;
-  }
+  SKIP_IF(has_hostile() == false);
+
+  hostile_server= libtest::get_free_port();
+  ASSERT_TRUE(server_startup(servers, SERVER_TARGET, hostile_server, 0, NULL));
 
   return new worker_handles_st;
 }
@@ -537,7 +532,7 @@ test_st worker_TESTS[] ={
 };
 
 collection_st collection[] ={
-  {"dos", 0, 0, dos_TESTS },
+  {"dos", skip_SETUP, 0, dos_TESTS },
   {"plain", worker_ramp_SETUP, worker_ramp_TEARDOWN, worker_TESTS },
   {"plain against hostile server", hostile_gearmand_SETUP, worker_ramp_TEARDOWN, worker_TESTS },
   {"hostile recv()", recv_SETUP, resv_TEARDOWN, worker_TESTS },