Browse Source

Include unittest, updated benchmark, and added "workers" to gearadmin

Brian Aker 13 years ago
parent
commit
28bee728ef

+ 1 - 0
.bzrignore

@@ -106,3 +106,4 @@ tests/var/tmp/*
 tests/worker_test
 tests/worker_test.res
 unittests/unittests
+libtest/unittest

+ 1 - 0
ChangeLog

@@ -2,6 +2,7 @@
   * Defined workers can now return GEARMAN_SHUTDOWN.
   * Benchmark worker can now be told to shutdown.
   * Allocator code has been cleaned up (gearman_allocator_t).
+  * Added "workers" option to gearadmin
 
 0.22 Sun Jun 19 20:32:18 PDT 2011
   * Added gearman_work_map() for gearman_work_t describer.

+ 2 - 7
benchmark/benchmark.h

@@ -11,10 +11,9 @@
  * @brief Common benchmark header
  */
 
-#ifndef __GEARMAN_BENCHMARK_H__
-#define __GEARMAN_BENCHMARK_H__
+#pragma once
 
-#include "config.h"
+#include <config.h>
 
 #include <libgearman/gearman.h>
 
@@ -80,7 +79,3 @@ struct gearman_benchmark_st
  * Check and possibly print time.
  */
 void benchmark_check_time(gearman_benchmark_st *benchmark);
-
-/** @} */
-
-#endif /* __GEARMAN_BENCHMARK_H__ */

+ 123 - 93
benchmark/blobslap_worker.cc

@@ -1,94 +1,109 @@
-/* Gearman server and library
- * Copyright (C) 2008 Brian Aker, Eric Day
- * All rights reserved.
+/*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
+ * 
+ *  Gearmand client and server library.
+ *
+ *  Copyright (C) 2011 Data Differential, http://datadifferential.com/
+ *  Copyright (C) 2008 Brian Aker, Eric Day
+ *  All rights reserved.
+ *
+ *  Redistribution and use in source and binary forms, with or without
+ *  modification, are permitted provided that the following conditions are
+ *  met:
+ *
+ *      * Redistributions of source code must retain the above copyright
+ *  notice, this list of conditions and the following disclaimer.
+ *
+ *      * Redistributions in binary form must reproduce the above
+ *  copyright notice, this list of conditions and the following disclaimer
+ *  in the documentation and/or other materials provided with the
+ *  distribution.
+ *
+ *      * The names of its contributors may not be used to endorse or
+ *  promote products derived from this software without specific prior
+ *  written permission.
+ *
+ *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  *
- * Use and distribution licensed under the BSD license.  See
- * the COPYING file in the parent directory for full text.
  */
 
+
 /**
  * @file
  * @brief Blob slap worker utility
  */
 
 #include <benchmark/benchmark.h>
+#include <boost/program_options.hpp>
 #include <cerrno>
 #include <cstdio>
 #include <climits>
 #include <iostream>
+#include <vector>
 #include "util/daemon.h"
 
 static void *worker_fn(gearman_job_st *job, void *context,
                        size_t *result_size, gearman_return_t *ret_ptr);
 
-static void usage(char *name);
-
-
 static gearman_return_t shutdown_fn(gearman_job_st*, void* /* context */)
 {
   return GEARMAN_SHUTDOWN;
 }
 
 
-int main(int argc, char *argv[])
+int main(int args, char *argv[])
 {
   gearman_benchmark_st benchmark;
-  int c;
-  char *host= NULL;
-  in_port_t port= 0;
-  char *function= NULL;
-  bool opt_daemon= false;
-  unsigned long int count= ULONG_MAX;
-  gearman_worker_st worker;
-
-  if (not gearman_worker_create(&worker))
+  bool opt_daemon;
+  bool opt_chunk;
+  bool opt_status;
+  bool opt_unique;
+  int32_t timeout;
+  uint32_t count= UINT_MAX;
+  in_port_t port;
+  std::string host;
+  std::vector<std::string>* functions= NULL;
+  std::string verbose_string;
+  boost::program_options::options_description desc("Options");
+  desc.add_options()
+    ("help", "Options related to the program.")
+    ("host,h", boost::program_options::value<std::string>(&host)->default_value("localhost"),"Connect to the host")
+    ("port,p", boost::program_options::value<in_port_t>(&port)->default_value(GEARMAN_DEFAULT_TCP_PORT), "Port number use for connection")
+    ("count,c", boost::program_options::value<uint32_t>(&count)->default_value(0), "Number of jobs to run before exiting")
+    ("timeout,u", boost::program_options::value<int32_t>(&timeout)->default_value(-1), "Timeout in milliseconds")
+    ("chunk", boost::program_options::bool_switch(&opt_chunk)->default_value(false), "Send result back in data chunks")
+    ("status,s", boost::program_options::bool_switch(&opt_status)->default_value(false), "Send status updates and sleep while running job")
+    ("unique,u", boost::program_options::bool_switch(&opt_unique)->default_value(false), "When grabbing jobs, grab the uniqie id")
+    ("daemon,d", boost::program_options::bool_switch(&opt_daemon)->default_value(false), "Daemonize")
+    ("function,f", boost::program_options::value(functions), "Function to use.")
+    ("verbose,v", boost::program_options::value(&verbose_string)->default_value("v"), "Increase verbosity level by one.")
+            ;
+
+  boost::program_options::variables_map vm;
+  try
   {
-    std::cerr << "Failed to allocate worker" << std::endl;
-    exit(EXIT_FAILURE);
+    boost::program_options::store(boost::program_options::parse_command_line(args, argv, desc), vm);
+    boost::program_options::notify(vm);
+  }
+  catch(std::exception &e)
+  { 
+    std::cout << e.what() << std::endl;
+    return EXIT_FAILURE;
   }
 
-  while ((c = getopt(argc, argv, "dc:f:h:p:v")) != -1)
+  if (vm.count("help"))
   {
-    switch(c)
-    {
-    case 'c':
-      count= strtoul(optarg, NULL, 10);
-      break;
-
-    case 'f':
-      function= optarg;
-      if (gearman_failed(gearman_worker_add_function(&worker, function, 0, worker_fn, &benchmark)))
-      {
-        std::cerr << "Failed adding function " << optarg << "() :" << gearman_worker_error(&worker) << std::endl;
-        exit(EXIT_FAILURE);
-      }
-      break;
-
-    case 'h':
-      host= optarg;
-      if (gearman_failed(gearman_worker_add_server(&worker, host, port)))
-      {
-        std::cerr << "Failed while adding server " << host << ":" << port << " :" << gearman_worker_error(&worker) << std::endl;
-        exit(EXIT_FAILURE);
-      }
-      break;
-
-    case 'p':
-      port= in_port_t(atoi(optarg));
-      break;
-
-    case 'd':
-      opt_daemon= true;
-      break;
-
-    case 'v':
-      benchmark.verbose++;
-      break;
-
-    default:
-      usage(argv[0]);
-      exit(EXIT_FAILURE);
-    }
+    std::cout << desc << std::endl;
+    return EXIT_SUCCESS;
   }
 
   if (opt_daemon)
@@ -97,58 +112,86 @@ int main(int argc, char *argv[])
   }
 
   if (opt_daemon)
+  {
     gearmand::daemon_is_ready(benchmark.verbose == 0);
+  }
 
-  if (not host)
+  gearman_worker_st *worker;
+  if (not (worker= gearman_worker_create(NULL)))
   {
-    if (gearman_failed(gearman_worker_add_server(&worker, NULL, port)))
-    {
-      std::cerr << "Failing to add localhost:" << port << " :" << gearman_worker_error(&worker) << std::endl;
-      exit(EXIT_FAILURE);
-    }
+    std::cerr << "Failed to allocate worker" << std::endl;
+    exit(EXIT_FAILURE);
   }
 
-  if (not function)
+
+  benchmark.verbose= static_cast<uint8_t>(verbose_string.length());
+
+  if (gearman_failed(gearman_worker_add_server(worker, host.c_str(), port)))
   {
-    if (gearman_failed(gearman_worker_add_function(&worker,
-                                                   GEARMAN_BENCHMARK_DEFAULT_FUNCTION, 0,
-                                                   worker_fn, &benchmark)))
-    {
-      std::cerr << "Failed to add default function: " << gearman_worker_error(&worker) << std::endl;
-      exit(EXIT_FAILURE);
-    }
+    std::cerr << "Failed while adding server " << host << ":" << port << " :" << gearman_worker_error(worker) << std::endl;
+    exit(EXIT_FAILURE);
   }
 
   gearman_function_t shutdown_function= gearman_function_create(shutdown_fn);
-  if (gearman_failed(gearman_worker_define_function(&worker,
+  if (gearman_failed(gearman_worker_define_function(worker,
 						    gearman_literal_param("shutdown"), 
 						    shutdown_function,
 						    0, 0)))
   {
-    std::cerr << "Failed to add default function: " << gearman_worker_error(&worker) << std::endl;
+    std::cerr << "Failed to add shutdown function: " << gearman_worker_error(worker) << std::endl;
     exit(EXIT_FAILURE);
   }
 
+  if (functions and functions->size())
+  {
+    for (std::vector<std::string>::iterator iter= functions->begin();
+         iter != functions->end();
+         iter++)
+    {
+      if (gearman_failed(gearman_worker_add_function(worker,
+                                                     (*iter).c_str(), 0,
+                                                     worker_fn, &benchmark)))
+      {
+        std::cerr << "Failed to add default function: " << gearman_worker_error(worker) << std::endl;
+        exit(EXIT_FAILURE);
+      }
+    }
+  }
+  else
+  {
+    if (gearman_failed(gearman_worker_add_function(worker,
+                                                   GEARMAN_BENCHMARK_DEFAULT_FUNCTION, 0,
+                                                   worker_fn, &benchmark)))
+    {
+      std::cerr << "Failed to add default function: " << gearman_worker_error(worker) << std::endl;
+      exit(EXIT_FAILURE);
+    }
+  }
+
+  gearman_worker_set_timeout(worker, timeout);
+
   do
   {
-    gearman_return_t rc= gearman_worker_work(&worker);
+    gearman_return_t rc= gearman_worker_work(worker);
 
     if (rc == GEARMAN_SHUTDOWN)
     {
       if (benchmark.verbose > 0)
+      {
         std::cerr << "shutdown" << std::endl;
+      }
       break;
     }
     else if (gearman_failed(rc))
     {
-      std::cerr << "gearman_worker_work(): " << gearman_worker_error(&worker) << std::endl;
+      std::cerr << "gearman_worker_work(): " << gearman_worker_error(worker) << std::endl;
       break;
     }
 
     count--;
   } while(count);
 
-  gearman_worker_free(&worker);
+  gearman_worker_free(worker);
 
   return 0;
 }
@@ -169,16 +212,3 @@ static void *worker_fn(gearman_job_st *job, void *context,
   *ret_ptr= GEARMAN_SUCCESS;
   return NULL;
 }
-
-static void usage(char *name)
-{
-  printf("\nusage: %s\n"
-         "\t[-c count] [-f function] [-h <host>] [-p <port>] [-v]\n\n", name);
-  printf("\t-c <count>    - number of jobs to run before exiting\n");
-  printf("\t-f <function> - function name for tasks, can specify many\n"
-         "\t                (default %s)\n",
-                            GEARMAN_BENCHMARK_DEFAULT_FUNCTION);
-  printf("\t-h <host>     - job server host, can specify many\n");
-  printf("\t-p <port>     - job server port\n");
-  printf("\t-v            - increase verbose level\n");
-}

+ 4 - 1
benchmark/include.am

@@ -28,7 +28,10 @@ benchmark_blobslap_worker_SOURCES= \
 				   benchmark/blobslap_worker.cc \
 				   util/daemon.cc
 
-benchmark_blobslap_worker_LDADD= benchmark/libbenchmark.la libgearman/libgearman.la
+benchmark_blobslap_worker_LDADD= \
+				 $(BOOST_PROGRAM_OPTIONS_LIBS) \
+				 benchmark/libbenchmark.la \
+				 libgearman/libgearman.la
 
 GEARMAND_BE_PIDFILE = ${abs_top_builddir}/tests/var/tmp/Begearmand.pid
 GEARMAND_BENCHMARK_PORT = 6000

+ 6 - 1
bin/gearadmin.cc

@@ -35,7 +35,7 @@
  *
  */
 
-#include "config.h"
+#include <config.h>
 
 #include <cstdio>
 #include <cstdlib>
@@ -147,6 +147,11 @@ int main(int args, char *argv[])
     instance.push(new Operation(STRING_WITH_LEN("status\r\n")));
   }
 
+  if (vm.count("workers"))
+  {
+    instance.push(new Operation(STRING_WITH_LEN("workers\r\n")));
+  }
+
   if (vm.count("server-version"))
   {
     instance.push(new Operation(STRING_WITH_LEN("version\r\n")));

+ 1 - 1
examples/reverse_worker.cc

@@ -38,7 +38,7 @@
 
 
 
-#include "config.h"
+#include <config.h>
 
 #include <cerrno>
 #include <cstdio>

+ 1 - 1
libgearman-server/job.c

@@ -603,7 +603,7 @@ gearmand_error_t gearman_server_job_queue(gearman_server_job_st *job)
         ret= gearman_server_io_packet_add(worker->con, false,
                                           GEARMAN_MAGIC_RESPONSE,
                                           GEARMAN_COMMAND_NOOP, NULL);
-        if (ret != GEARMAN_SUCCESS)
+        if (gearmand_failed(ret))
         {
           gearmand_gerror("gearman_server_io_packet_add", ret);
           return ret;

+ 5 - 3
libgearman/worker.cc

@@ -727,8 +727,10 @@ gearman_job_st *gearman_worker_grab_job(gearman_worker_st *worker,
       for (worker->con= (&worker->universal)->con_list; worker->con;
            worker->con= worker->con->next)
       {
-        if (worker->con->fd == -1)
+        if (worker->con->fd == INVALID_SOCKET)
+        {
           continue;
+        }
 
         *ret_ptr= worker->con->send(worker->pre_sleep, true);
         if (gearman_failed(*ret_ptr))
@@ -753,7 +755,7 @@ gearman_job_st *gearman_worker_grab_job(gearman_worker_st *worker,
       for (worker->con= worker->universal.con_list; worker->con;
            worker->con= worker->con->next)
       {
-        if (worker->con->fd == -1)
+        if (worker->con->fd == INVALID_SOCKET)
           continue;
 
         worker->con->set_events(POLLIN);
@@ -791,7 +793,7 @@ gearman_job_st *gearman_worker_grab_job(gearman_worker_st *worker,
       else
       {
         *ret_ptr= gearman_wait(worker->universal);
-        if (gearman_failed(*ret_ptr) and (*ret_ptr != GEARMAN_TIMEOUT || worker->options.timeout_return))
+        if (gearman_failed(*ret_ptr) and (*ret_ptr != GEARMAN_TIMEOUT or worker->options.timeout_return))
         {
           return NULL;
         }

+ 87 - 0
libtest/unittest.cc

@@ -0,0 +1,87 @@
+/*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
+ * 
+ *  uTest self unit test.
+ *
+ *  Copyright (C) 2011 Data Differential, http://datadifferential.com/
+ *  All rights reserved.
+ *
+ *  Redistribution and use in source and binary forms, with or without
+ *  modification, are permitted provided that the following conditions are
+ *  met:
+ *
+ *      * Redistributions of source code must retain the above copyright
+ *  notice, this list of conditions and the following disclaimer.
+ *
+ *      * Redistributions in binary form must reproduce the above
+ *  copyright notice, this list of conditions and the following disclaimer
+ *  in the documentation and/or other materials provided with the
+ *  distribution.
+ *
+ *      * The names of its contributors may not be used to endorse or
+ *  promote products derived from this software without specific prior
+ *  written permission.
+ *
+ *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <config.h>
+#include <libtest/test.hpp>
+
+static test_return_t test_success_test(void *)
+{
+  return TEST_SUCCESS;
+}
+
+static test_return_t local_test(void *)
+{
+  char buffer[sizeof("LIBTEST_LOCAL=1")];
+
+  snprintf(buffer, sizeof(buffer), "%s", "LIBTEST_LOCAL=1");
+  test_compare(0, putenv(buffer));
+
+  test_true(test_is_local());
+
+  return TEST_SUCCESS;
+}
+
+static test_return_t local_not_test(void *)
+{
+  test_compare(0, unsetenv("LIBTEST_LOCAL"));
+
+  test_false(test_is_local());
+
+  return TEST_SUCCESS;
+}
+
+test_st tests_log[] ={
+  {"TEST_SUCCESS", 0, test_success_test },
+  {0, 0, 0}
+};
+
+test_st local_log[] ={
+  {"test_is_local()", 0, local_test },
+  {"test_is_local(NOT)", 0, local_not_test },
+  {0, 0, 0}
+};
+
+collection_st collection[] ={
+  {"return values", 0, 0, tests_log},
+  {"local", 0, 0, local_log},
+  {0, 0, 0, 0}
+};
+
+void get_world(Framework *arg)
+{
+  arg->collections= collection;
+}