Browse Source

Update to just use boost, not pthreads, for slave startup

Brian Aker 13 years ago
parent
commit
37d40c6ada
4 changed files with 95 additions and 127 deletions
  1. 1 2
      libtest/http.cc
  2. 0 1
      tests/include.am
  3. 89 118
      tests/start_worker.cc
  4. 5 6
      tests/start_worker.h

+ 1 - 2
libtest/http.cc

@@ -30,8 +30,6 @@ class CURL;
 #endif
 
 
-static pthread_once_t start_key_once= PTHREAD_ONCE_INIT;
-
 static void cleanup_curl(void)
 {
 #if defined(HAVE_CURL_CURL_H) && HAVE_CURL_CURL_H
@@ -54,6 +52,7 @@ static void initialize_curl_startup()
   }
 }
 
+static pthread_once_t start_key_once= PTHREAD_ONCE_INIT;
 void initialize_curl(void)
 {
   int ret;

+ 0 - 1
tests/include.am

@@ -33,7 +33,6 @@ tests_libstartworker_la_LIBADD=
 tests_libstartworker_la_CXXFLAGS+= $(PTHREAD_CFLAGS)
 tests_libstartworker_la_CXXFLAGS+= $(BOOST_CPPFLAGS)
 tests_libstartworker_la_LIBADD+= $(BOOST_THREAD_LDFLAGS)
-tests_libstartworker_la_LIBADD+= $(PTHREAD_LIBS)
 tests_libstartworker_la_LIBADD+= $(BOOST_THREAD_LIBS)
 tests_libstartworker_la_LIBADD+= libgearman/libgearman.la
 tests_libstartworker_la_SOURCES= tests/start_worker.cc

+ 89 - 118
tests/start_worker.cc

@@ -48,8 +48,6 @@
 #include <stdlib.h>
 #include <sys/wait.h>
 #include <unistd.h>
-#include <pthread.h>
-#include <boost/thread.hpp>
 
 
 #include <cstdio>
@@ -116,101 +114,92 @@ struct context_st {
   ~context_st()
   {
   }
+
 };
 
-extern "C" {
+static void thread_runner(context_st* con)
+{
+  std::auto_ptr<context_st> context(con);
+
+  assert(context.get());
+  if (context.get() == NULL)
+  {
+    Error << "context_st passed to function was NULL";
+    return;
+  }
+
+  assert (context->magic == CONTEXT_MAGIC_MARKER);
+  if (context->magic != CONTEXT_MAGIC_MARKER)
+  {
+    Error << "context_st had bad magic";
+    return;
+  }
+
+  Worker worker;
+  if (&worker == NULL)
+  {
+    Error << "Failed to create Worker";
+    return;
+  }
+
+  assert(context->handle);
+  if (context->handle == NULL)
+  {
+    Error << "Progammer error, no handle found";
+    return;
+  }
+  context->handle->set_worker_id(&worker);
+
+  if (context->namespace_key.empty() == false)
+  {
+    gearman_worker_set_namespace(&worker, context->namespace_key.c_str(), context->namespace_key.length());
+  }
+
+  if (gearman_failed(gearman_worker_add_server(&worker, NULL, context->port)))
+  {
+    Error << "gearman_worker_add_server()";
+    return;
+  }
 
-  static void *thread_runner(void *con)
+  // Check for a working server by "asking" it for an option
   {
-    do
+    size_t count= 5;
+    bool success= false;
+    while (--count and success == false)
     {
-      std::auto_ptr<context_st> context((context_st *)con);
-
-      assert(context.get());
-      if (context.get() == NULL)
-      {
-        Error << "context_st passed to function was NULL";
-        break;
-      }
-
-      assert (context->magic == CONTEXT_MAGIC_MARKER);
-      if (context->magic != CONTEXT_MAGIC_MARKER)
-      {
-        Error << "context_st had bad magic";
-        break;
-      }
-
-      Worker worker;
-      if (&worker == NULL)
-      {
-        Error << "Failed to create Worker";
-        break;
-      }
-
-      assert(context->handle);
-      if (context->handle == NULL)
-      {
-        Error << "Progammer error, no handle found";
-        break;
-      }
-      context->handle->set_worker_id(&worker);
-
-      if (context->namespace_key.empty() == false)
-      {
-        gearman_worker_set_namespace(&worker, context->namespace_key.c_str(), context->namespace_key.length());
-      }
-
-      if (gearman_failed(gearman_worker_add_server(&worker, NULL, context->port)))
-      {
-        Error << "gearman_worker_add_server()";
-        break;
-      }
-
-      // Check for a working server by "asking" it for an option
-      {
-        size_t count= 5;
-        bool success= false;
-        while (--count and success == false)
-        {
-          success= gearman_worker_set_server_option(&worker, test_literal_param("exceptions"));
-        }
-
-        if (success == false)
-        {
-          Error << "gearman_worker_set_server_option() failed";
-          break;
-        }
-      }
-
-      if (gearman_failed(gearman_worker_define_function(&worker,
-                                                        context->function_name.c_str(), context->function_name.length(),
-                                                        context->worker_fn,
-                                                        0, 
-                                                        context->context)))
-      {
-        Error << "Failed to add function " << context->function_name << "(" << gearman_worker_error(&worker) << ")";
-        break;
-      }
-
-      if (context->options != gearman_worker_options_t())
-      {
-        gearman_worker_add_options(&worker, context->options);
-      }
-
-      context->handle->wait();
-
-      gearman_return_t ret= GEARMAN_SUCCESS;
-      while (context->handle->is_shutdown() == false or ret != GEARMAN_SHUTDOWN)
-      {
-        ret= gearman_worker_work(&worker);
-      }
-
-    } while (0);
-
-    pthread_exit(0);
+      success= gearman_worker_set_server_option(&worker, test_literal_param("exceptions"));
+    }
+
+    if (success == false)
+    {
+      Error << "gearman_worker_set_server_option() failed";
+      return;
+    }
+  }
+
+  if (gearman_failed(gearman_worker_define_function(&worker,
+                                                    context->function_name.c_str(), context->function_name.length(),
+                                                    context->worker_fn,
+                                                    0, 
+                                                    context->context)))
+  {
+    Error << "Failed to add function " << context->function_name << "(" << gearman_worker_error(&worker) << ")";
+    return;
+  }
+
+  if (context->options != gearman_worker_options_t())
+  {
+    gearman_worker_add_options(&worker, context->options);
   }
 
-} // extern "C"
+  context->handle->wait();
+
+  gearman_return_t ret= GEARMAN_SUCCESS;
+  while (context->handle->is_shutdown() == false or ret != GEARMAN_SHUTDOWN)
+  {
+    ret= gearman_worker_work(&worker);
+  }
+}
 
 
 worker_handle_st *test_worker_start(in_port_t port, 
@@ -229,10 +218,9 @@ worker_handle_st *test_worker_start(in_port_t port,
                                       context_arg, options);
   fatal_assert(context);
 
-  int ret;
-  if ((ret= pthread_create(handle->thread(), NULL, thread_runner, context)))
+  handle->_thread= new boost::thread(thread_runner, context);
+  if (handle->_thread == NULL)
   {
-    Error << "pthread_create(" << strerror(ret) << ")";
     delete context;
     delete handle;
 
@@ -246,12 +234,7 @@ worker_handle_st *test_worker_start(in_port_t port,
 
 boost::barrier* worker_handle_st::sync_point()
 {
-  return _sync_point;
-}
-
-pthread_t* worker_handle_st::thread()
-{
-  return &_thread;
+  return &_sync_point;
 }
 
 void worker_handle_st::set_worker_id(gearman_worker_st* worker)
@@ -263,38 +246,30 @@ worker_handle_st::worker_handle_st() :
   failed_startup(false),
   _shutdown(false),
   _worker_id(gearman_id_t()),
-  _sync_point(new boost::barrier(2))
+  _sync_point(2)
 {
-  fatal_assert(pthread_mutex_init(&_shutdown_lock, NULL) == 0);
 }
 
 worker_handle_st::~worker_handle_st()
 {
   shutdown();
-  pthread_mutex_destroy(&_shutdown_lock);
-  delete _sync_point;
 }
 
 void worker_handle_st::wait()
 {
-  _sync_point->wait();
+  _sync_point.wait();
 }
 
 void worker_handle_st::set_shutdown()
 {
-  pthread_mutex_lock(&_shutdown_lock);
+  boost::mutex::scoped_lock(_shutdown_lock);
   _shutdown= true;
-  pthread_mutex_unlock(&_shutdown_lock);
 }
 
 bool worker_handle_st::is_shutdown()
 {
-  bool tmp;
-  pthread_mutex_lock(&_shutdown_lock);
-  tmp= _shutdown;
-  pthread_mutex_unlock(&_shutdown_lock);
-
-  return tmp;
+  boost::mutex::scoped_lock(_shutdown_lock);
+  return _shutdown;
 }
 
 
@@ -314,12 +289,8 @@ bool worker_handle_st::shutdown()
     return false;
   }
 
-  void *ret;
-  int error;
-  if ((error= pthread_join(_thread, &ret)))
-  {
-    Error << "pthread_join() failed " << strerror(error);
-  }
+  _thread->join();
+  delete _thread;
 
   return true;
 }

+ 5 - 6
tests/start_worker.h

@@ -37,12 +37,12 @@
 
 
 #include <sys/types.h>
-#include <pthread.h>
 
 #include <libgearman/gearman.h>
 #include <libtest/visibility.h>
 
-namespace boost { class barrier; }
+#include <boost/thread.hpp>
+
 
 struct worker_handle_st
 {
@@ -56,20 +56,19 @@ public:
 
   void set_worker_id(gearman_worker_st*);
 
-  pthread_t* thread();
   boost::barrier* sync_point();
 
   void wait();
 
 
   volatile bool failed_startup;
+  boost::thread* _thread;
 
 private:
-  pthread_t _thread;
   bool _shutdown;
-  pthread_mutex_t _shutdown_lock;
+  boost::mutex _shutdown_lock;
   gearman_id_t _worker_id;
-  boost::barrier* _sync_point;
+  boost::barrier _sync_point;
 };
 
 #pragma once