Browse Source

Add prefix support in redis queue plugin

Felix Aronsson 6 years ago
parent
commit
929f245e94

+ 18 - 13
libgearman-server/plugins/queue/redis/queue.cc

@@ -183,12 +183,14 @@ gearmand::plugins::queue::Hiredis::Hiredis() :
   Queue("redis"),
   _redis(nullptr),
   server("127.0.0.1"),
-  service("6379")
+  service("6379"),
+  prefix("_gear_")
 {
   command_line_options().add_options()
     ("redis-server", boost::program_options::value(&server), "Redis server")
     ("redis-port", boost::program_options::value(&service), "Redis server port/service")
-    ("redis-password", boost::program_options::value(&password), "Redis server password/service");
+    ("redis-password", boost::program_options::value(&password), "Redis server password/service")
+    ("redis-prefix", boost::program_options::value(&prefix), "Prefix to use in redis keys");
 }
 
 /**
@@ -254,22 +256,22 @@ void gearmand::plugins::queue::initialize_redis()
   static gearmand::plugins::queue::Hiredis local_instance;
 }
 
-#define GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX "_gear_"
-#define GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX_SIZE sizeof(GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX)
 #define GEARMAND_KEY_LITERAL "%s-%.*s-%*s"
 
 static size_t build_key(vchar_t &key,
+                        const char *prefix,
+                        size_t prefix_size,
                         const char *unique,
                         size_t unique_size,
                         const char *function_name,
                         size_t function_name_size)
 {
-  size_t buf_size = function_name_size + unique_size + GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX_SIZE + 4;
+  size_t buf_size = function_name_size + unique_size + prefix_size + 4;
   char buf[buf_size];
   // buf size is overestimated
   // so buf contains some \0 at the end
   int key_size= snprintf(buf, buf_size, GEARMAND_KEY_LITERAL,
-                         GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX,
+                         prefix,
                          (int)function_name_size, function_name,
                          (int)unique_size, unique);
   if (size_t(key_size) >= buf_size or key_size <= 0)
@@ -325,14 +327,14 @@ static gearmand_error_t _hiredis_add(gearman_server_st *, void *context,
     (uint32_t)function_name_size, function_name,
     (uint32_t)unique_size, (char *)unique);
 
+  gearmand::plugins::queue::Hiredis *queue= (gearmand::plugins::queue::Hiredis *)context;
+
   vchar_t key;
-  build_key(key, unique, unique_size, function_name, function_name_size);
+  build_key(key, queue->prefix.c_str(), queue->prefix.size(), unique, unique_size, function_name, function_name_size);
 
   gearmand_log_debug(
     GEARMAN_DEFAULT_LOG_PARAM,
     "hires key: %u", (uint32_t)key.size());
-
-  gearmand::plugins::queue::Hiredis *queue= (gearmand::plugins::queue::Hiredis *)context;
   if (queue->hmset(key, data, data_size, (uint32_t)priority))
     return GEARMAND_SUCCESS;
 
@@ -360,7 +362,7 @@ static gearmand_error_t _hiredis_done(gearman_server_st *, void *context,
     "hires done: %.*s", (uint32_t)unique_size, (char *)unique);
 
   vchar_t key;
-  build_key(key, unique, unique_size, function_name, function_name_size);
+  build_key(key, queue->prefix.c_str(), queue->prefix.size(), unique, unique_size, function_name, function_name_size);
 
   redisReply *reply= (redisReply*)redisCommand(queue->redis(), "DEL %b", &key[0], key.size());
   if (reply == nullptr)
@@ -386,7 +388,7 @@ static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context
 
   gearmand_info("hiredis replay start");
 
-  redisReply *reply= (redisReply*)redisCommand(queue->redis(), "KEYS %s*", GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX);
+  redisReply *reply= (redisReply*)redisCommand(queue->redis(), "KEYS %s*", queue->prefix.c_str());
   if (reply == nullptr)
   {
     return gearmand_log_gerror(
@@ -397,17 +399,18 @@ static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context
 
   for (size_t x= 0; x < reply->elements; x++)
   {
-    char prefix[GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX_SIZE];
+    char* prefix= (char*) malloc(queue->prefix.size() * sizeof(char));
     char function_name[GEARMAN_FUNCTION_MAX_SIZE];
     char unique[GEARMAN_MAX_UNIQUE_SIZE];
 
     char fmt_str[100] = "";
     int fmt_str_length= snprintf(fmt_str, sizeof(fmt_str), "%%%d[^-]-%%%d[^-]-%%%ds",
-                                 int(GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX_SIZE),
+                                 int(queue->prefix.size()),
                                  int(GEARMAN_FUNCTION_MAX_SIZE),
                                  int(GEARMAN_MAX_UNIQUE_SIZE));
     if (fmt_str_length <= 0 or size_t(fmt_str_length) >= sizeof(fmt_str))
     {
+      free(prefix);
       assert(fmt_str_length != 1);
       return gearmand_gerror(
         "snprintf() failed to produce a valud fmt_str for redis key",
@@ -418,6 +421,8 @@ static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context
                     prefix,
                     function_name,
                     unique);
+
+    free(prefix);
     if (ret == 0)
     {
       continue;

+ 1 - 0
libgearman-server/plugins/queue/redis/queue.h

@@ -71,6 +71,7 @@ class Hiredis : public Queue {
     std::string server;
     std::string service;
     std::string password;
+    std::string prefix;
 
     Hiredis();
     ~Hiredis();

+ 14 - 0
tests/redis.cc

@@ -81,6 +81,19 @@ static test_return_t collection_init(void *object)
   return TEST_SUCCESS;
 }
 
+static test_return_t collection_init_with_prefix(void *object)
+{
+  SKIP_IF(true);
+  Context *test= (Context *)object;
+  assert(test);
+
+  const char *argv[]= { "--queue-type=redis", "--redis-prefix=_prefix_", 0 };
+
+  test->initialize(argv);
+
+  return TEST_SUCCESS;
+}
+
 static test_return_t collection_cleanup(void *object)
 {
   Context *test= (Context *)object;
@@ -142,6 +155,7 @@ test_st regressions[] ={
 collection_st collection[] ={
   {"gearmand redis options", 0, 0, gearmand_basic_option_tests},
   {"redis queue", collection_init, collection_cleanup, tests},
+  {"redis queue with prefix", collection_init_with_prefix, collection_cleanup, tests},
   {"regressions", collection_init, collection_cleanup, regressions},
   {0, 0, 0, 0}
 };