123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396 |
- /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
- *
- * Gearmand client and server library.
- *
- * Copyright (C) 2011-2013 Data Differential, http://datadifferential.com/
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- *
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * The names of its contributors may not be used to endorse or
- * promote products derived from this software without specific prior
- * written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
- #include "gear_config.h"
- #include <libtest/test.hpp>
- #include <libhostile/hostile.h>
- #include <cassert>
- #include <cstring>
- #include <memory>
- #include <signal.h>
- #include <stdlib.h>
- #include <sys/wait.h>
- #include <unistd.h>
- #include <cstdio>
- #include <tests/start_worker.h>
- #include <util/instance.hpp>
- using namespace libtest;
- using namespace datadifferential;
- #include "libgearman/worker.hpp"
- using namespace org::gearmand;
- #ifndef __INTEL_COMPILER
- #pragma GCC diagnostic ignored "-Wold-style-cast"
- #endif
- #define CONTEXT_MAGIC_MARKER 45
- struct context_st {
- worker_handle_st *handle;
- private:
- libgearman::Worker _worker;
- public:
- int magic;
- libtest::thread::Barrier* _sync_point;
- context_st(worker_handle_st* handle_,
- const libgearman::Worker& worker_) :
- handle(handle_),
- _worker(worker_),
- magic(CONTEXT_MAGIC_MARKER),
- _sync_point(handle_->sync_point())
- {
- }
- const libgearman::Worker& worker() const
- {
- return _worker;
- }
- void wait(void)
- {
- if (_sync_point)
- {
- _sync_point->wait();
- _sync_point= NULL;
- }
- }
- void fail(void)
- {
- handle->failed_startup= true;
- wait();
- }
- ~context_st()
- {
- }
- };
- static void thread_runner(context_st* con)
- {
- std::unique_ptr<context_st> context(con);
- assert(context.get());
- assert(context.get() == con);
- if (context.get() == NULL)
- {
- Error << "context_st passed to function was NULL";
- context->fail();
- return;
- }
- assert (context->magic == CONTEXT_MAGIC_MARKER);
- if (context->magic != CONTEXT_MAGIC_MARKER)
- {
- Error << "context_st had bad magic";
- context->fail();
- return;
- }
- libgearman::Worker worker(context->worker());
- if (&worker == NULL)
- {
- Error << "Failed to create Worker";
- context->fail();
- return;
- }
- assert(context->handle);
- if (context->handle == NULL)
- {
- Error << "Progammer error, no handle found";
- context->fail();
- return;
- }
- context->handle->set_worker_id(&worker);
- // Set worker id
- {
- if (gearman_failed(gearman_worker_set_identifier(&worker, gearman_literal_param("start_worker"))))
- {
- context->fail();
- return;
- }
- }
- // Check for a working server by pinging it with echo
- if (gearman_failed(gearman_worker_echo(&worker, gearman_literal_param("start_worker"))))
- {
- context->fail();
- return;
- }
- context->wait();
- gearman_return_t ret= GEARMAN_SUCCESS;
- while (context->handle->is_shutdown() == false)
- {
- ret= gearman_worker_work(&worker);
- if (ret == GEARMAN_NO_REGISTERED_FUNCTIONS)
- {
- context->handle->set_shutdown();
- continue;
- }
- if (ret == GEARMAN_SHUTDOWN)
- {
- gearman_return_t unreg_ret;
- if (gearman_failed((unreg_ret= gearman_worker_unregister_all(&worker))))
- {
- Error << "Failed to unregister " << gearman_strerror(unreg_ret);
- }
- continue;
- }
- if (ret != GEARMAN_SUCCESS and ret != GEARMAN_INVALID_ARGUMENT and ret != GEARMAN_WORK_FAIL)
- {
- context->handle->error();
- #if 0
- Error << context->function_name << ": " << gearman_strerror(ret) << ": " << gearman_worker_error(&worker);
- #endif
- }
- }
- }
- worker_handle_st *worker_run(const libgearman::Worker& worker_)
- {
- worker_handle_st *handle= new worker_handle_st();
- fatal_assert(handle);
- context_st *context= new context_st(handle, worker_);
- fatal_assert(context);
- handle->_thread= boost::shared_ptr<libtest::thread::Thread>(new libtest::thread::Thread(thread_runner, context));
- if (bool(handle->_thread) == false)
- {
- delete context;
- delete handle;
- FATAL("Could not allocate worker");
- }
- handle->wait();
- return handle;
- }
- worker_handle_st *test_worker_start(in_port_t port,
- const char *namespace_key,
- const char *function_name,
- const gearman_function_t &worker_fn,
- void *context_,
- gearman_worker_options_t options,
- int timeout)
- {
- worker_handle_st *handle= new worker_handle_st();
- fatal_assert(handle);
- libgearman::Worker worker(port);
- if (namespace_key)
- {
- gearman_worker_set_namespace(&worker, namespace_key, strlen(namespace_key));
- }
- // Set worker id
- {
- if (gearman_failed(gearman_worker_set_identifier(&worker, gearman_literal_param("start_worker"))))
- {
- delete handle;
- FATAL("gearman_worker_set_identifier() failed");
- }
- }
- // Check for a working server by pinging it with echo
- if (gearman_failed(gearman_worker_echo(&worker, gearman_literal_param("start_worker"))))
- {
- delete handle;
- FATAL("gearman_worker_echo() failed");
- }
- if (gearman_failed(gearman_worker_define_function(&worker,
- function_name, strlen(function_name),
- worker_fn,
- timeout,
- context_)))
- {
- delete handle;
- FATAL("gearman_worker_define_function() failed");
- }
- if (options != gearman_worker_options_t())
- {
- gearman_worker_add_options(&worker, options);
- }
- context_st *context= new context_st(handle, worker);
- fatal_assert(context);
- handle->_thread= boost::shared_ptr<libtest::thread::Thread>(new libtest::thread::Thread(thread_runner, context));
- if (bool(handle->_thread) == false)
- {
- delete context;
- delete handle;
- FATAL("Could not allocate worker");
- }
- handle->wait();
- return handle;
- }
- libtest::thread::Barrier* worker_handle_st::sync_point()
- {
- return &_sync_point;
- }
- void worker_handle_st::set_worker_id(gearman_worker_st* worker)
- {
- _worker_id= gearman_worker_id(worker);
- }
- worker_handle_st::worker_handle_st() :
- failed_startup(false),
- _shutdown(false),
- _error_count(0),
- _worker_id(gearman_id_t()),
- _sync_point(2)
- {
- }
- worker_handle_st::~worker_handle_st()
- {
- shutdown();
- }
- void worker_handle_st::wait()
- {
- _sync_point.wait();
- }
- void worker_handle_st::set_shutdown()
- {
- libtest::thread::ScopedLock l(_shutdown_lock);
- _shutdown= true;
- }
- bool worker_handle_st::is_shutdown()
- {
- libtest::thread::ScopedLock l(_shutdown_lock);
- return _shutdown;
- }
- bool worker_handle_st::shutdown()
- {
- if (is_shutdown())
- {
- return true;
- }
- set_shutdown();
- // This block issues an error, but the error is not fatal
- gearman_return_t rc;
- if (gearman_failed(rc= gearman_kill(_worker_id, GEARMAN_KILL)))
- {
- Error << "failed to shutdown " << rc;
- }
- _thread->join();
- return true;
- }
- bool worker_handle_st::check()
- {
- gearman_return_t rc;
- if (gearman_failed(rc= gearman_kill(_worker_id, GEARMAN_SIGNAL_CHECK)))
- {
- return false;
- }
- return true;
- }
- worker_handles_st::worker_handles_st()
- {
- }
- worker_handles_st::~worker_handles_st()
- {
- reset();
- }
- // Warning, this will not clean up memory
- void worker_handles_st::kill_all()
- {
- assert(valgrind_is_caller() == false);
- _workers.clear();
- }
- void worker_handles_st::reset()
- {
- for (std::vector<worker_handle_st *>::iterator iter= _workers.begin();
- iter != _workers.end();
- ++iter)
- {
- delete *iter;
- }
- _workers.clear();
- }
- void worker_handles_st::push(worker_handle_st *arg)
- {
- _workers.push_back(arg);
- }
|