queue.cc 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
  6. * All rights reserved.
  7. *
  8. * Redistribution and use in source and binary forms, with or without
  9. * modification, are permitted provided that the following conditions are
  10. * met:
  11. *
  12. * * Redistributions of source code must retain the above copyright
  13. * notice, this list of conditions and the following disclaimer.
  14. *
  15. * * Redistributions in binary form must reproduce the above
  16. * copyright notice, this list of conditions and the following disclaimer
  17. * in the documentation and/or other materials provided with the
  18. * distribution.
  19. *
  20. * * The names of its contributors may not be used to endorse or
  21. * promote products derived from this software without specific prior
  22. * written permission.
  23. *
  24. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  25. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  26. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  27. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  28. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  29. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  30. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  31. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  32. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  33. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  34. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  35. *
  36. */
  37. /**
  38. * @file
  39. * @brief Redis Queue Storage Definitions
  40. */
  41. #include <gear_config.h>
  42. #include <libgearman-server/plugins/queue/redis/queue.h>
  43. #if defined(GEARMAND_PLUGINS_QUEUE_REDIS_H)
  44. /* Queue callback functions. */
  45. static gearmand_error_t _hiredis_add(gearman_server_st *server, void *context,
  46. const char *unique,
  47. size_t unique_size,
  48. const char *function_name,
  49. size_t function_name_size,
  50. const void *data, size_t data_size,
  51. gearman_job_priority_t priority,
  52. int64_t when);
  53. static gearmand_error_t _hiredis_flush(gearman_server_st *server, void *context);
  54. static gearmand_error_t _hiredis_done(gearman_server_st *server, void *context,
  55. const char *unique,
  56. size_t unique_size,
  57. const char *function_name,
  58. size_t function_name_size);
  59. static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context,
  60. gearman_queue_add_fn *add_fn,
  61. void *add_context);
  62. /**
  63. * gearmand::plugins::queue::Hiredis::redis()
  64. *
  65. * returns _redis
  66. */
  67. redisContext* gearmand::plugins::queue::Hiredis::redis()
  68. {
  69. return this->_redis;
  70. }
  71. /*
  72. * gearmand::plugins::queue::Hiredis::hmset(vchar_t key, const void *data, size_t data_size, uint32_t priority)
  73. *
  74. * returns true if hiredis HMSET succeeded
  75. */
  76. bool gearmand::plugins::queue::Hiredis::hmset(vchar_t key, const void *data, size_t data_size, uint32_t priority) {
  77. redisContext* context = this->redis();
  78. const size_t argc = 6;
  79. std::string _priority = std::to_string((uint32_t)priority);
  80. const size_t argvlen[argc] = {
  81. (size_t)5,
  82. (size_t)key.size(),
  83. (size_t)4,
  84. (size_t)data_size,
  85. (size_t)8,
  86. _priority.size()
  87. };
  88. std::vector<const char*> argv {"HMSET"};
  89. argv.push_back( &key[0] );
  90. argv.push_back( "data" );
  91. argv.push_back( static_cast<const char*>(data) );
  92. argv.push_back( "priority" );
  93. argv.push_back( _priority.c_str() );
  94. redisReply *reply = (redisReply *)redisCommandArgv(context, static_cast<int>(argv.size()), &(argv[0]), &(argvlen[0]) );
  95. if (reply == nullptr)
  96. return false;
  97. bool res = (reply->type == REDIS_REPLY_STATUS);
  98. freeReplyObject(reply);
  99. return res;
  100. }
  101. /*
  102. * bool gearmand::plugins::queue::Hiredis::fetch(char *key, gearmand::plugins::queue::redis_record_t &req)
  103. *
  104. * fetch redis result for the key by HGETALL command and put it into the redis_record_t
  105. *
  106. * returns true on success
  107. */
  108. bool gearmand::plugins::queue::Hiredis::fetch(char *key, gearmand::plugins::queue::redis_record_t &req)
  109. {
  110. redisContext * context = this->redis();
  111. redisReply * reply = (redisReply*)redisCommand(context, "HGETALL %s", key);
  112. if (reply == nullptr)
  113. return false;
  114. //FIXME remove workaround
  115. if(reply->type == REDIS_REPLY_ERROR) {
  116. // workaround to ensure gearmand upgrade.
  117. // gearmand <=1.1.15 stores data in string, not in hash.
  118. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "redis replies for HGETALL: %s", reply->str);
  119. reply = (redisReply*)redisCommand(context, "TYPE %s", key);
  120. if (reply == nullptr)
  121. return false;
  122. if(strcmp(reply->str, "string") != 0) {
  123. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "unexpected type of the value stored in key: %s", reply->str);
  124. return false;
  125. }
  126. reply = (redisReply*)redisCommand(context, "GET %s", key);
  127. if (reply == nullptr)
  128. return false;
  129. std::string s{reply->str};
  130. req.data = s;
  131. req.priority = GEARMAN_JOB_PRIORITY_NORMAL;
  132. } else {
  133. // 2 x (key + value)
  134. assert(reply->elements == 4);
  135. auto fk = reply->element[0]->str;
  136. if(strcmp(fk, "data") == 0) {
  137. std::string s{reply->element[1]->str};
  138. req.data = s;
  139. req.priority = (uint32_t)std::stoi(reply->element[3]->str);
  140. } else if (strcmp(fk, "priority") == 0) {
  141. std::string s{reply->element[3]->str};
  142. req.data = s;
  143. req.priority = (uint32_t)std::stoi(reply->element[1]->str);
  144. } else {
  145. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "unexpected key %s", fk);
  146. return false;
  147. }
  148. }
  149. freeReplyObject(reply);
  150. return true;
  151. }
  152. /**
  153. * gearmand::plugins::queue::Hiredis::Hiredis()
  154. *
  155. * setup server, service and password properties
  156. *
  157. */
  158. gearmand::plugins::queue::Hiredis::Hiredis() :
  159. Queue("redis"),
  160. _redis(nullptr),
  161. server("127.0.0.1"),
  162. service("6379"),
  163. prefix("_gear_")
  164. {
  165. command_line_options().add_options()
  166. ("redis-server", boost::program_options::value(&server), "Redis server")
  167. ("redis-port", boost::program_options::value(&service), "Redis server port/service")
  168. ("redis-password", boost::program_options::value(&password), "Redis server password/service")
  169. ("redis-prefix", boost::program_options::value(&prefix), "Prefix to use in redis keys");
  170. }
  171. /**
  172. * gearmand::plugins::queue::Hiredis::~Hiredis()
  173. *
  174. * free _redis context
  175. */
  176. gearmand::plugins::queue::Hiredis::~Hiredis()
  177. {
  178. if(this->_redis)
  179. redisFree(this->_redis);
  180. }
  181. gearmand_error_t gearmand::plugins::queue::Hiredis::initialize()
  182. {
  183. int service_port= atoi(service.c_str());
  184. if ((_redis= redisConnect(server.c_str(), service_port)) == nullptr)
  185. {
  186. return gearmand_log_gerror(
  187. GEARMAN_DEFAULT_LOG_PARAM,
  188. GEARMAND_QUEUE_ERROR,
  189. "Could not connect to redis server: %s", _redis->errstr);
  190. }
  191. if (password.size())
  192. {
  193. redisReply *reply = (redisReply*)redisCommand(_redis, "AUTH %s", password.c_str());
  194. if(reply == nullptr)
  195. {
  196. return gearmand_log_gerror(
  197. GEARMAN_DEFAULT_LOG_PARAM,
  198. GEARMAND_QUEUE_ERROR,
  199. "Failed to exec AUTH command, redis server reply: %s", _redis->errstr);
  200. }
  201. if(reply->type == REDIS_REPLY_ERROR)
  202. {
  203. gearmand_log_gerror(
  204. GEARMAN_DEFAULT_LOG_PARAM,
  205. GEARMAND_QUEUE_ERROR,
  206. "Could not pass redis server auth, redis server reply: %s", reply->str);
  207. freeReplyObject(reply);
  208. return GEARMAND_QUEUE_ERROR;
  209. }
  210. freeReplyObject(reply);
  211. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Auth success");
  212. }
  213. gearmand_info("Initializing hiredis module");
  214. gearman_server_set_queue(Gearmand()->server, this, _hiredis_add, _hiredis_flush, _hiredis_done, _hiredis_replay);
  215. return GEARMAND_SUCCESS;
  216. }
  217. /**
  218. * define static gearmand::plugins::queue::Hiredis
  219. */
  220. void gearmand::plugins::queue::initialize_redis()
  221. {
  222. static gearmand::plugins::queue::Hiredis local_instance;
  223. }
  224. #define GEARMAND_KEY_LITERAL "%s-%.*s-%*s"
  225. static size_t build_key(vchar_t &key,
  226. const char *prefix,
  227. size_t prefix_size,
  228. const char *unique,
  229. size_t unique_size,
  230. const char *function_name,
  231. size_t function_name_size)
  232. {
  233. size_t buf_size = function_name_size + unique_size + prefix_size + 4;
  234. char buf[buf_size];
  235. // buf size is overestimated
  236. // so buf contains some \0 at the end
  237. int key_size= snprintf(buf, buf_size, GEARMAND_KEY_LITERAL,
  238. prefix,
  239. (int)function_name_size, function_name,
  240. (int)unique_size, unique);
  241. if (size_t(key_size) >= buf_size or key_size <= 0)
  242. {
  243. assert(0);
  244. return -1;
  245. }
  246. // std::string removes all \0 at the end of buf
  247. std::string s{buf};
  248. key.resize(0);
  249. std::copy(s.begin(), s.end(), std::back_inserter(key));
  250. return key.size();
  251. }
  252. /**
  253. * @addtogroup gearman_queue_hiredis hiredis Queue Storage Functions
  254. * @ingroup gearman_queue
  255. * @{
  256. */
  257. /*
  258. * Private declarations
  259. */
  260. #pragma GCC diagnostic push
  261. #pragma GCC diagnostic ignored "-Wold-style-cast"
  262. /*
  263. * Private definitions
  264. */
  265. static gearmand_error_t _hiredis_add(gearman_server_st *, void *context,
  266. const char *unique,
  267. size_t unique_size,
  268. const char *function_name,
  269. size_t function_name_size,
  270. const void *data, size_t data_size,
  271. gearman_job_priority_t priority,
  272. int64_t when)
  273. {
  274. if (when) // No support for EPOCH jobs
  275. {
  276. return gearmand_gerror("hiredis queue does not support epoch jobs", GEARMAND_QUEUE_ERROR);
  277. }
  278. gearmand_log_debug(
  279. GEARMAN_DEFAULT_LOG_PARAM,
  280. "hires add func: %.*s, unique: %.*s",
  281. (uint32_t)function_name_size, function_name,
  282. (uint32_t)unique_size, (char *)unique);
  283. gearmand::plugins::queue::Hiredis *queue= (gearmand::plugins::queue::Hiredis *)context;
  284. vchar_t key;
  285. build_key(key, queue->prefix.c_str(), queue->prefix.size(), unique, unique_size, function_name, function_name_size);
  286. gearmand_log_debug(
  287. GEARMAN_DEFAULT_LOG_PARAM,
  288. "hires key: %u", (uint32_t)key.size());
  289. if (queue->hmset(key, data, data_size, (uint32_t)priority))
  290. return GEARMAND_SUCCESS;
  291. return gearmand_log_gerror(
  292. GEARMAN_DEFAULT_LOG_PARAM,
  293. GEARMAND_QUEUE_ERROR,
  294. "failed to insert '%.*s' into redis", (uint32_t)key.size(), &key[0]);
  295. }
  296. static gearmand_error_t _hiredis_flush(gearman_server_st *, void *)
  297. {
  298. return GEARMAND_SUCCESS;
  299. }
  300. static gearmand_error_t _hiredis_done(gearman_server_st *, void *context,
  301. const char *unique,
  302. size_t unique_size,
  303. const char *function_name,
  304. size_t function_name_size)
  305. {
  306. gearmand::plugins::queue::Hiredis *queue= (gearmand::plugins::queue::Hiredis *)context;
  307. gearmand_log_debug(
  308. GEARMAN_DEFAULT_LOG_PARAM,
  309. "hires done: %.*s", (uint32_t)unique_size, (char *)unique);
  310. vchar_t key;
  311. build_key(key, queue->prefix.c_str(), queue->prefix.size(), unique, unique_size, function_name, function_name_size);
  312. redisReply *reply= (redisReply*)redisCommand(queue->redis(), "DEL %b", &key[0], key.size());
  313. if (reply == nullptr)
  314. {
  315. return gearmand_log_gerror(
  316. GEARMAN_DEFAULT_LOG_PARAM,
  317. GEARMAND_QUEUE_ERROR,
  318. "Failed to call DEL for key %s: %s", &key[0], queue->redis()->errstr);
  319. }
  320. freeReplyObject(reply);
  321. return GEARMAND_SUCCESS;
  322. }
  323. #pragma GCC diagnostic push
  324. #pragma GCC diagnostic ignored "-Wformat-nonliteral"
  325. static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context,
  326. gearman_queue_add_fn *add_fn,
  327. void *add_context)
  328. {
  329. gearmand::plugins::queue::Hiredis *queue= (gearmand::plugins::queue::Hiredis *)context;
  330. gearmand_info("hiredis replay start");
  331. redisReply *reply= (redisReply*)redisCommand(queue->redis(), "KEYS %s*", queue->prefix.c_str());
  332. if (reply == nullptr)
  333. {
  334. return gearmand_log_gerror(
  335. GEARMAN_DEFAULT_LOG_PARAM,
  336. GEARMAND_QUEUE_ERROR,
  337. "Failed to call KEYS during QUEUE replay: %s", queue->redis()->errstr);
  338. }
  339. for (size_t x= 0; x < reply->elements; x++)
  340. {
  341. char* prefix= (char*) malloc(queue->prefix.size() * sizeof(char));
  342. char function_name[GEARMAN_FUNCTION_MAX_SIZE];
  343. char unique[GEARMAN_MAX_UNIQUE_SIZE];
  344. char fmt_str[100] = "";
  345. int fmt_str_length= snprintf(fmt_str, sizeof(fmt_str), "%%%d[^-]-%%%d[^-]-%%%d[^*]",
  346. int(queue->prefix.size()),
  347. int(GEARMAN_FUNCTION_MAX_SIZE),
  348. int(GEARMAN_MAX_UNIQUE_SIZE));
  349. if (fmt_str_length <= 0 or size_t(fmt_str_length) >= sizeof(fmt_str))
  350. {
  351. free(prefix);
  352. assert(fmt_str_length != 1);
  353. return gearmand_gerror(
  354. "snprintf() failed to produce a valud fmt_str for redis key",
  355. GEARMAND_QUEUE_ERROR);
  356. }
  357. int ret= sscanf(reply->element[x]->str,
  358. fmt_str,
  359. prefix,
  360. function_name,
  361. unique);
  362. free(prefix);
  363. if (ret != 3)
  364. {
  365. continue;
  366. }
  367. gearmand::plugins::queue::redis_record_t record;
  368. if(!queue->fetch(reply->element[x]->str, record))
  369. {
  370. return gearmand_log_gerror(
  371. GEARMAN_DEFAULT_LOG_PARAM,
  372. GEARMAND_QUEUE_ERROR,
  373. "Failed to fetch data for the key: %s", reply->element[x]->str);
  374. }
  375. /* need to make a copy here ... gearman_server_job_free will free it later */
  376. char *data = strdup(record.data.c_str());
  377. size_t data_size = record.data.size();
  378. gearman_job_priority_t priority = static_cast<gearman_job_priority_t>(record.priority);
  379. (void)(add_fn)(server, add_context,
  380. unique, strlen(unique),
  381. function_name, strlen(function_name),
  382. data, data_size,
  383. priority, 0);
  384. }
  385. freeReplyObject(reply);
  386. return GEARMAND_SUCCESS;
  387. }
  388. #pragma GCC diagnostic pop
  389. #pragma GCC diagnostic pop
  390. #endif // defined(GEARMAND_PLUGINS_QUEUE_REDIS_H)