123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657 |
- /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
- *
- * Gearmand client and server library.
- *
- * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
- * Copyright (C) 2008 Brian Aker, Eric Day
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- *
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * The names of its contributors may not be used to endorse or
- * promote products derived from this software without specific prior
- * written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
- /**
- * @file
- * @brief Gearmand Thread Definitions
- */
- #include "gear_config.h"
- #include "libgearman-server/common.h"
- #include <libgearman-server/gearmand.h>
- #include <cassert>
- #include <cerrno>
- #include <memory>
- #include <csignal>
- /*
- * Private declarations
- */
- namespace
- {
- #if defined(HAVE_PTHREAD_TIMEDJOIN_NP) && HAVE_PTHREAD_TIMEDJOIN_NP
- bool fill_timespec(struct timespec& ts)
- {
- #if defined(HAVE_CLOCK_GETTIME) && HAVE_CLOCK_GETTIME
- if (HAVE_CLOCK_GETTIME) // This won't be called on OSX, etc,...
- {
- if (clock_gettime(CLOCK_REALTIME, &ts) == -1)
- {
- gearmand_perror(errno, "clock_gettime(CLOCK_REALTIME)");
- return false;
- }
- }
- #else
- {
- struct timeval tv;
- if (gettimeofday(&tv, NULL) == -1)
- {
- gearmand_perror(errno, "gettimeofday()");
- return false;
- }
- TIMEVAL_TO_TIMESPEC(&tv, &ts);
- }
- #endif
- return true;
- }
- #endif // defined(HAVE_PTHREAD_TIMEDJOIN_NP) && HAVE_PTHREAD_TIMEDJOIN_NP
- }
- /**
- * @addtogroup gearmand_thread_private Private Gearmand Thread Functions
- * @ingroup gearmand_thread
- * @{
- */
- static void *_thread(void *data);
- static void _log(const char *line, gearmand_verbose_t verbose, gearmand_thread_st *dthread);
- static void _run(gearman_server_thread_st *thread, void *fn_arg);
- static gearmand_error_t _wakeup_init(gearmand_thread_st *thread);
- static void _wakeup_close(gearmand_thread_st *thread);
- static void _wakeup_clear(gearmand_thread_st *thread);
- static void _wakeup_event(int fd, short events, void *arg);
- static void _clear_events(gearmand_thread_st *thread);
- namespace {
- gearmand_error_t gearmand_connection_watch(gearmand_io_st *con, short events, void *)
- {
- short set_events= 0;
- gearmand_con_st* dcon= gearman_io_context(con);
- if (events & POLLIN)
- {
- set_events|= EV_READ;
- }
- if (events & POLLOUT)
- {
- set_events|= EV_WRITE;
- }
- if (dcon->last_events != set_events)
- {
- if (dcon->last_events)
- {
- if (event_del(&(dcon->event)) == -1)
- {
- gearmand_perror(errno, "event_del");
- assert_msg(false, "event_del");
- }
- }
- event_set(&(dcon->event), dcon->fd, set_events | EV_PERSIST, _con_ready, dcon);
- if (event_base_set(dcon->thread->base, &(dcon->event)) == -1)
- {
- gearmand_perror(errno, "event_base_set");
- assert_msg(false, "event_del");
- }
- if (event_add(&(dcon->event), NULL) == -1)
- {
- gearmand_perror(errno, "event_add");
- return GEARMAND_EVENT;
- }
- dcon->last_events= set_events;
- }
- gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
- "%15s:%5s Watching %6s %s",
- dcon->host, dcon->port,
- events & POLLIN ? "POLLIN" : "",
- events & POLLOUT ? "POLLOUT" : "");
- return GEARMAND_SUCCESS;
- }
- }
- gearmand_thread_st::gearmand_thread_st(gearmand_st& gearmand_):
- is_thread_lock{false},
- is_wakeup_event{false},
- count{0},
- dcon_count{0},
- dcon_add_count{0},
- free_dcon_count{0},
- wakeup_fd{},
- #if __GNUC__ == 4 && __GNUC_MINOR__ < 9
- _gearmand(gearmand_),
- #else
- _gearmand{gearmand_},
- #endif
- next{nullptr},
- prev{nullptr},
- base{nullptr},
- dcon_list{nullptr},
- dcon_add_list{nullptr},
- free_dcon_list{nullptr},
- server_thread{},
- id{},
- lock{}
- //TODO uniform initialisation doesn't help with wakeup_event
- {
- }
- /** @} */
- /*
- * Public definitions
- */
- gearmand_error_t gearmand_thread_create(gearmand_st& gearmand)
- {
- gearmand_thread_st* thread= new (std::nothrow) gearmand_thread_st(gearmand);
- if (!thread)
- {
- return gearmand_merror("new", gearmand_thread_st, 1);
- }
- if (! gearman_server_thread_init(gearmand_server(&gearmand), &(thread->server_thread),
- _log, thread, gearmand_connection_watch))
- {
- delete thread;
- gearmand_fatal("gearman_server_thread_init(NULL)");
- return GEARMAND_MEMORY_ALLOCATION_FAILURE;
- }
- thread->is_thread_lock= false;
- thread->is_wakeup_event= false;
- thread->count= 0;
- thread->dcon_count= 0;
- thread->dcon_add_count= 0;
- thread->free_dcon_count= 0;
- thread->wakeup_fd[0]= -1;
- thread->wakeup_fd[1]= -1;
- GEARMAND_LIST__ADD(Gearmand()->thread, thread);
- thread->dcon_list= nullptr;
- thread->dcon_add_list= nullptr;
- thread->free_dcon_list= nullptr;
- /* If we have no threads, we still create a fake thread that uses the main
- libevent instance. Otherwise create a libevent instance for each thread. */
- if (gearmand.threads == 0)
- {
- thread->base= gearmand.base;
- }
- else
- {
- gearmand_debug("Initializing libevent for IO thread");
- assert(thread->base == nullptr);
- thread->base= event_base_new();
- if (!thread->base)
- {
- gearmand_thread_free(thread);
- gearmand_fatal("event_base_new()");
- return GEARMAND_EVENT;
- }
- }
- gearmand_error_t ret;
- if (gearmand_failed(ret= _wakeup_init(thread)))
- {
- gearmand_thread_free(thread);
- return ret;
- }
- /* If we are not running multi-threaded, just return the thread context. */
- if (gearmand.threads == 0)
- {
- return GEARMAND_SUCCESS;
- }
- thread->count= gearmand.thread_count;
- int pthread_ret= pthread_mutex_init(&(thread->lock), nullptr);
- if (pthread_ret != 0)
- {
- thread->count= 0;
- gearmand_thread_free(thread);
- return gearmand_fatal_perror(pthread_ret, "pthread_mutex_init");
- }
- thread->is_thread_lock= true;
- thread->server_thread.run(_run, thread);
- pthread_ret= pthread_create(&(thread->id), nullptr, _thread, thread);
- if (pthread_ret != 0)
- {
- thread->count= 0;
- gearmand_thread_free(thread);
- return gearmand_perror(pthread_ret, "pthread_create");
- }
- gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Thread %u created", thread->count);
- return GEARMAND_SUCCESS;
- }
- void gearmand_thread_free(gearmand_thread_st *thread)
- {
- if (thread)
- {
- if (Gearmand()->threads and thread->count > 0)
- {
- gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Shutting down thread %u", thread->count);
- gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN);
- int pthread_error= -1;
- #if defined(HAVE_PTHREAD_TIMEDJOIN_NP) && HAVE_PTHREAD_TIMEDJOIN_NP
- {
- struct timespec ts;
- if (fill_timespec(ts))
- {
- ts.tv_sec+= 300;
- pthread_error= pthread_timedjoin_np(thread->id, nullptr, &ts);
- if (pthread_error)
- {
- gearmand_perror(pthread_error, "pthread_timedjoin_np");
- }
- }
- if (pthread_error != 0)
- {
- pthread_error= pthread_kill(thread->id, SIGQUIT);
- if (pthread_error)
- {
- gearmand_perror(pthread_error, "pthread_kill(, SIGQUIT)");
- }
- pthread_error= pthread_join(thread->id, nullptr);
- }
- }
- #else
- pthread_error= pthread_join(thread->id, nullptr);
- #endif
- if (pthread_error)
- {
- gearmand_perror(pthread_error, "pthread_join");
- }
- }
- if (thread->is_thread_lock)
- {
- int pthread_error;
- if ((pthread_error= pthread_mutex_destroy(&(thread->lock))))
- {
- gearmand_perror(pthread_error, "pthread_mutex_destroy");
- }
- }
- _wakeup_close(thread);
- while (thread->dcon_list)
- {
- gearmand_con_free(thread->dcon_list);
- }
- while (thread->dcon_add_list)
- {
- gearmand_con_st* dcon= thread->dcon_add_list;
- thread->dcon_add_list= dcon->next;
- dcon->close_socket();
- delete dcon;
- }
- while (thread->free_dcon_list)
- {
- gearmand_con_st* dcon= thread->free_dcon_list;
- thread->free_dcon_list= dcon->next;
- delete dcon;
- }
- gearman_server_thread_free(&(thread->server_thread));
- GEARMAND_LIST__DEL(Gearmand()->thread, thread);
- if (Gearmand()->threads > 0)
- {
- if (thread->base)
- {
- if (Gearmand()->base == thread->base)
- {
- Gearmand()->base= nullptr;
- }
- event_base_free(thread->base);
- thread->base= nullptr;
- }
- gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Thread %u shutdown complete", thread->count);
- }
- delete thread;
- }
- }
- void gearmand_thread_wakeup(gearmand_thread_st *thread,
- gearmand_wakeup_t wakeup)
- {
- uint8_t buffer= wakeup;
- /* If this fails, there is not much we can really do. This should never fail
- though if the main gearmand thread is still active. */
- int limit= 5;
- ssize_t written;
- while (--limit)
- {
- if ((written= write(thread->wakeup_fd[1], &buffer, 1)) != 1)
- {
- if (written < 0)
- {
- switch (errno)
- {
- case EINTR:
- continue;
- default:
- break;
- }
- gearmand_perror(errno, gearmand_strwakeup(wakeup));
- }
- else
- {
- gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
- "gearmand_wakeup() incorrectly wrote %lu bytes of data.", (unsigned long)written);
- }
- }
- break;
- }
- }
- void gearmand_thread_run(gearmand_thread_st *thread)
- {
- while (1)
- {
- gearmand_error_t ret;
- gearmand_con_st *dcon= gearman_server_thread_run(&(thread->server_thread), &ret);
- if (ret == GEARMAND_SUCCESS or
- ret == GEARMAND_IO_WAIT or
- ret == GEARMAND_SHUTDOWN_GRACEFUL)
- {
- return;
- }
- if (!dcon)
- {
- /* We either got a GEARMAND_SHUTDOWN or some other fatal internal error.
- Either way, we want to shut the server down. */
- gearmand_wakeup(Gearmand(), GEARMAND_WAKEUP_SHUTDOWN);
- return;
- }
- gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Disconnected %s:%s", dcon->host, dcon->port);
- gearmand_con_free(dcon);
- }
- }
- #pragma GCC diagnostic push
- #ifndef __INTEL_COMPILER
- # pragma GCC diagnostic ignored "-Wold-style-cast"
- #endif
- /*
- * Private definitions
- */
- static void *_thread(void *data)
- {
- gearmand_thread_st *thread= (gearmand_thread_st *)data;
- char buffer[BUFSIZ];
- int length= snprintf(buffer, sizeof(buffer), "[%6u ]", thread->count);
- if (length <= 0 or sizeof(length) >= sizeof(buffer))
- {
- assert(0);
- buffer[0]= 0;
- }
- (void)gearmand_initialize_thread_logging(buffer);
- gearmand_debug("Entering thread event loop");
- if (event_base_loop(thread->base, 0) == -1)
- {
- gearmand_fatal("event_base_loop(-1)");
- Gearmand()->ret= GEARMAND_EVENT;
- }
- gearmand_debug("Exiting thread event loop");
- return nullptr;
- }
- static void _log(const char *line, gearmand_verbose_t verbose, gearmand_thread_st*)
- {
- if (Gearmand())
- {
- (*Gearmand()->log_fn)(line, verbose, (void *)Gearmand()->log_context);
- }
- }
- static void _run(gearman_server_thread_st*, void *fn_arg)
- {
- if (fn_arg)
- {
- gearmand_thread_st *dthread= (gearmand_thread_st*)fn_arg;
- gearmand_thread_wakeup(dthread, GEARMAND_WAKEUP_RUN);
- }
- }
- static gearmand_error_t _wakeup_init(gearmand_thread_st *thread)
- {
- gearmand_debug("Creating IO thread wakeup pipe");
- #if defined(HAVE_PIPE2) && HAVE_PIPE2
- if (pipe2(thread->wakeup_fd, O_NONBLOCK) == -1)
- {
- return gearmand_perror(errno, "pipe");
- }
- #else
- if (pipe(thread->wakeup_fd) == -1)
- {
- return gearmand_perror(errno, "pipe");
- }
- gearmand_error_t local_ret;
- if ((local_ret= gearmand_sockfd_nonblock(thread->wakeup_fd[0])))
- {
- return local_ret;
- }
- #endif
- event_set(&(thread->wakeup_event), thread->wakeup_fd[0], EV_READ | EV_PERSIST,
- _wakeup_event, thread);
- if (event_base_set(thread->base, &(thread->wakeup_event)) == -1)
- {
- gearmand_perror(errno, "event_base_set");
- }
- if (event_add(&(thread->wakeup_event), nullptr) < 0)
- {
- gearmand_perror(errno, "event_add");
- return GEARMAND_EVENT;
- }
- thread->is_wakeup_event= true;
- return GEARMAND_SUCCESS;
- }
- static void _wakeup_close(gearmand_thread_st *thread)
- {
- _wakeup_clear(thread);
- if (thread->wakeup_fd[0] >= 0)
- {
- gearmand_debug("Closing IO thread wakeup pipe");
- gearmand_pipe_close(thread->wakeup_fd[0]);
- thread->wakeup_fd[0]= -1;
- gearmand_pipe_close(thread->wakeup_fd[1]);
- thread->wakeup_fd[1]= -1;
- }
- }
- static void _wakeup_clear(gearmand_thread_st *thread)
- {
- if (thread->is_wakeup_event)
- {
- gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Clearing event for IO thread wakeup pipe %u", thread->count);
- if (event_del(&(thread->wakeup_event)) < 0)
- {
- gearmand_perror(errno, "event_del() failure, shutdown may hang");
- }
- thread->is_wakeup_event= false;
- }
- }
- #pragma GCC diagnostic push
- #pragma GCC diagnostic ignored "-Wunreachable-code"
- static void _wakeup_event(int fd, short events __attribute__ ((unused)), void *arg)
- {
- gearmand_thread_st *thread= (gearmand_thread_st *)arg;
- uint8_t buffer[GEARMAND_PIPE_BUFFER_SIZE];
- ssize_t ret;
- while (1)
- {
- ret= read(fd, buffer, GEARMAND_PIPE_BUFFER_SIZE);
- if (ret == 0)
- {
- _clear_events(thread);
- gearmand_fatal("read(EOF)");
- Gearmand()->ret= GEARMAND_PIPE_EOF;
- return;
- }
- else if (ret == -1)
- {
- int local_errno= errno;
- if (local_errno == EINTR)
- {
- continue;
- }
- if (local_errno == EAGAIN)
- {
- break;
- }
- _clear_events(thread);
- gearmand_perror(local_errno, "_wakeup_event:read");
- Gearmand()->ret= GEARMAND_ERRNO;
- return;
- }
- for (ssize_t x= 0; x < ret; x++)
- {
- switch ((gearmand_wakeup_t)buffer[x])
- {
- case GEARMAND_WAKEUP_PAUSE:
- gearmand_debug("Received PAUSE wakeup event");
- break;
- case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
- gearmand_debug("Received SHUTDOWN_GRACEFUL wakeup event");
- if (gearman_server_shutdown_graceful(&(Gearmand()->server)) == GEARMAND_SHUTDOWN)
- {
- gearmand_wakeup(Gearmand(), GEARMAND_WAKEUP_SHUTDOWN);
- }
- break;
- case GEARMAND_WAKEUP_SHUTDOWN:
- gearmand_debug("Received SHUTDOWN wakeup event");
- _clear_events(thread);
- break;
- case GEARMAND_WAKEUP_CON:
- gearmand_debug("Received CON wakeup event");
- gearmand_con_check_queue(thread);
- break;
- case GEARMAND_WAKEUP_RUN:
- gearmand_debug("Received RUN wakeup event");
- gearmand_thread_run(thread);
- break;
- default:
- gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "Received unknown wakeup event (%u)", buffer[x]);
- _clear_events(thread);
- Gearmand()->ret= GEARMAND_UNKNOWN_STATE;
- break;
- }
- }
- }
- }
- #pragma GCC diagnostic pop
- static void _clear_events(gearmand_thread_st *thread)
- {
- _wakeup_clear(thread);
- while (thread->dcon_list)
- {
- gearmand_con_free(thread->dcon_list);
- }
- }
- #pragma GCC diagnostic pop
|