/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab: * * Gearmand client and server library. * * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/ * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following disclaimer * in the documentation and/or other materials provided with the * distribution. * * * The names of its contributors may not be used to endorse or * promote products derived from this software without specific prior * written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ /** * @file * @brief Redis Queue Storage Definitions */ #include <gear_config.h> #include <libgearman-server/common.h> #include <libgearman-server/plugins/queue/redis/queue.h> #include <libgearman-server/plugins/queue/base.h> #if defined(HAVE_HIREDIS) && HAVE_HIREDIS #include <hiredis/hiredis.h> /* Queue callback functions. */ static gearmand_error_t _hiredis_add(gearman_server_st *server, void *context, const char *unique, size_t unique_size, const char *function_name, size_t function_name_size, const void *data, size_t data_size, gearman_job_priority_t priority, int64_t when); static gearmand_error_t _hiredis_flush(gearman_server_st *server, void *context); static gearmand_error_t _hiredis_done(gearman_server_st *server, void *context, const char *unique, size_t unique_size, const char *function_name, size_t function_name_size); static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context, gearman_queue_add_fn *add_fn, void *add_context); namespace gearmand { namespace plugins { namespace queue { class Hiredis; }}} namespace gearmand { namespace plugins { namespace queue { class Hiredis : public Queue { public: Hiredis(); ~Hiredis(); gearmand_error_t initialize(); redisContext* redis() { return _redis; } std::string server; std::string service; private: redisContext *_redis; }; Hiredis::Hiredis() : Queue("redis"), server("127.0.0.1"), service("6379"), _redis(NULL) { command_line_options().add_options() ("redis-server", boost::program_options::value(&server), "Redis server") ("redis-port", boost::program_options::value(&service), "Redis server port/service"); } Hiredis::~Hiredis() { } gearmand_error_t Hiredis::initialize() { int service_port= atoi(service.c_str()); if ((_redis= redisConnect("127.0.0.1", service_port)) == NULL) { return gearmand_gerror("Could not connect to redis server", GEARMAND_QUEUE_ERROR); } gearmand_info("Initializing hiredis module"); gearman_server_set_queue(Gearmand()->server, this, _hiredis_add, _hiredis_flush, _hiredis_done, _hiredis_replay); return GEARMAND_SUCCESS; } void initialize_redis() { static Hiredis local_instance; } } // namespace queue } // namespace plugins } // namespace gearmand typedef std::vector<char> vchar_t; #define GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX "_gear_" #define GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX_SIZE sizeof(GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX) #define GEARMAND_KEY_LITERAL "%s-%.*s-%*s" static size_t build_key(vchar_t &key, const char *unique, size_t unique_size, const char *function_name, size_t function_name_size) { key.resize(function_name_size +unique_size +GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX_SIZE +4); int key_size= snprintf(&key[0], key.size(), GEARMAND_KEY_LITERAL, GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX, (int)function_name_size, function_name, (int)unique_size, unique); if (size_t(key_size) >= key.size() or key_size <= 0) { assert(0); return -1; } return key.size(); } /** * @addtogroup gearman_queue_hiredis hiredis Queue Storage Functions * @ingroup gearman_queue * @{ */ /* * Private declarations */ #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wold-style-cast" /* * Private definitions */ static gearmand_error_t _hiredis_add(gearman_server_st *, void *context, const char *unique, size_t unique_size, const char *function_name, size_t function_name_size, const void *data, size_t data_size, gearman_job_priority_t, int64_t when) { gearmand::plugins::queue::Hiredis *queue= (gearmand::plugins::queue::Hiredis *)context; if (when) // No support for EPOCH jobs { return GEARMAND_QUEUE_ERROR; } gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "hires add: %.*s", (uint32_t)unique_size, (char *)unique); std::vector<char> key; build_key(key, unique, unique_size, function_name, function_name_size); gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "hires key: %u", (uint32_t)key.size()); redisReply *reply= (redisReply*)redisCommand(queue->redis(), "SET %b %b", &key[0], key.size(), data, data_size); gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "got reply"); if (reply == NULL) { return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "failed to insert '%.*s' into redis", key.size(), &key[0]); } freeReplyObject(reply); return GEARMAND_SUCCESS; } static gearmand_error_t _hiredis_flush(gearman_server_st *, void *) { return GEARMAND_SUCCESS; } static gearmand_error_t _hiredis_done(gearman_server_st *, void *context, const char *unique, size_t unique_size, const char *function_name, size_t function_name_size) { gearmand::plugins::queue::Hiredis *queue= (gearmand::plugins::queue::Hiredis *)context; gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "hires done: %.*s", (uint32_t)unique_size, (char *)unique); std::vector<char> key; build_key(key, unique, unique_size, function_name, function_name_size); redisReply *reply= (redisReply*)redisCommand(queue->redis(), "DEL %b", &key[0], key.size()); if (reply == NULL) { return GEARMAND_QUEUE_ERROR; } freeReplyObject(reply); return GEARMAND_SUCCESS; } #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wformat-nonliteral" static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context, gearman_queue_add_fn *add_fn, void *add_context) { gearmand::plugins::queue::Hiredis *queue= (gearmand::plugins::queue::Hiredis *)context; gearmand_info("hiredis replay start"); redisReply *reply= (redisReply*)redisCommand(queue->redis(), "KEYS %s", GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX); if (reply == NULL) { return gearmand_gerror("Failed to call KEYS during QUEUE replay", GEARMAND_QUEUE_ERROR); } for (size_t x= 0; x < reply->elements; x++) { char prefix[GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX_SIZE]; char function_name[GEARMAN_FUNCTION_MAX_SIZE]; char unique[GEARMAN_MAX_UNIQUE_SIZE]; char fmt_str[100] = ""; int fmt_str_length= snprintf(fmt_str, sizeof(fmt_str), "%%%ds-%%%ds-%%%ds", int(GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX_SIZE), int(GEARMAN_FUNCTION_MAX_SIZE), int(GEARMAN_MAX_UNIQUE_SIZE)); if (fmt_str_length <= 0 or size_t(fmt_str_length) >= sizeof(fmt_str)) { assert(fmt_str_length != 1); return gearmand_gerror("snprintf() failed to produce a valud fmt_str for redis key", GEARMAND_QUEUE_ERROR); } int ret= sscanf(reply->element[x]->str, fmt_str, prefix, function_name, unique); if (ret == 0) { continue; } redisReply *get_reply= (redisReply*)redisCommand(queue->redis(), "GET %s", reply->element[x]->str); if (get_reply == NULL) { continue; } (void)(add_fn)(server, add_context, unique, strlen(unique), function_name, strlen(function_name), get_reply->str, get_reply->len, GEARMAN_JOB_PRIORITY_NORMAL, 0); freeReplyObject(get_reply); } freeReplyObject(reply); return GEARMAND_SUCCESS; } #pragma GCC diagnostic pop #pragma GCC diagnostic pop #endif // defined(HAVE_HIREDIS) && HAVE_HIREDIS