queue.cc 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief libdrizzle Queue Storage Definitions
  41. */
  42. #include <gear_config.h>
  43. #include <libgearman-server/common.h>
  44. #include <libgearman-server/byte.h>
  45. #include <libgearman-1.0/limits.h>
  46. #include <pwd.h>
  47. #include <sys/types.h>
  48. #include <unistd.h>
  49. #include <cerrno>
  50. #include <libgearman-server/plugins/queue/drizzle/queue.h>
  51. #include <libgearman-server/plugins/queue/base.h>
  52. #if defined(HAVE_LIBDRIZZLE) && HAVE_LIBDRIZZLE
  53. # include <libdrizzle-5.1/drizzle_client.h>
  54. using namespace gearmand_internal;
  55. using namespace gearmand;
  56. namespace gearmand { namespace plugins { namespace queue { class Drizzle; }}}
  57. static gearmand_error_t gearman_server_queue_libdrizzle_init(plugins::queue::Drizzle *queue, gearman_server_st *server);
  58. /**
  59. * @addtogroup plugins::queue::Drizzleatic Static libdrizzle Queue Storage Definitions
  60. * @ingroup gearman_queue_libdrizzle
  61. * @{
  62. */
  63. /**
  64. * Default values.
  65. */
  66. #define GEARMAND_QUEUE_LIBDRIZZLE_DEFAULT_DATABASE "gearman"
  67. #define GEARMAND_QUEUE_LIBDRIZZLE_DEFAULT_TABLE "queue"
  68. #define GEARMAND_QUEUE_QUERY_BUFFER 256
  69. static bool libdrizzle_failed(drizzle_return_t arg)
  70. {
  71. return arg != DRIZZLE_RETURN_OK;
  72. }
  73. typedef std::vector<char> vchar_t;
  74. ssize_t escape_string(vchar_t &destination, const char *from, size_t from_size)
  75. {
  76. if (from_size == 0)
  77. {
  78. return 0;
  79. }
  80. destination.reserve(from_size * 2);
  81. char newchar;
  82. const char *end;
  83. for (end= from + from_size; from < end; from++)
  84. {
  85. newchar= 0;
  86. /* All multi-byte UTF8 characters have the high bit set for all bytes. */
  87. if (!(*from & 0x80))
  88. {
  89. switch (*from)
  90. {
  91. case 0:
  92. newchar= '0';
  93. break;
  94. case '\n':
  95. newchar= 'n';
  96. break;
  97. case '\r':
  98. newchar= 'r';
  99. break;
  100. case '\032':
  101. newchar= 'Z';
  102. break;
  103. case '\\':
  104. newchar= '\\';
  105. break;
  106. case '\'':
  107. newchar= '\'';
  108. break;
  109. case '"':
  110. newchar= '"';
  111. break;
  112. default:
  113. break;
  114. }
  115. }
  116. if (newchar != '\0')
  117. {
  118. destination.push_back('\\');
  119. destination.push_back(newchar);
  120. }
  121. else
  122. {
  123. destination.push_back(*from);
  124. }
  125. }
  126. return destination.size();
  127. }
  128. namespace gearmand {
  129. namespace plugins {
  130. namespace queue {
  131. class Drizzle : public gearmand::plugins::Queue {
  132. public:
  133. Drizzle();
  134. ~Drizzle();
  135. gearmand_error_t initialize();
  136. drizzle_result_st *result()
  137. {
  138. return _result;
  139. }
  140. void resize_query(size_t arg)
  141. {
  142. _query.resize(arg);
  143. }
  144. char *query_ptr()
  145. {
  146. return &_query[0];
  147. }
  148. void set_epoch_support(bool arg)
  149. {
  150. _epoch_support= arg;
  151. }
  152. bool epoch_support()
  153. {
  154. return _epoch_support;
  155. }
  156. drizzle_st *drizzle;
  157. drizzle_st *insert_con;
  158. drizzle_result_st* _result;
  159. std::vector<char> _query;
  160. std::string host;
  161. std::string username;
  162. std::string password;
  163. std::string uds;
  164. std::string user;
  165. std::string schema;
  166. std::string table;
  167. bool mysql_protocol;
  168. in_port_t port;
  169. private:
  170. bool _epoch_support;
  171. };
  172. Drizzle::Drizzle () :
  173. Queue("libdrizzle"),
  174. drizzle(NULL),
  175. insert_con(NULL),
  176. _result(NULL),
  177. _query(),
  178. username(""),
  179. mysql_protocol(false),
  180. port(DRIZZLE_DEFAULT_TCP_PORT),
  181. _epoch_support(true)
  182. {
  183. command_line_options().add_options()
  184. ("libdrizzle-host", boost::program_options::value(&host)->default_value(DRIZZLE_DEFAULT_TCP_HOST), "Host of server.")
  185. ("libdrizzle-port", boost::program_options::value(&port)->default_value(DRIZZLE_DEFAULT_TCP_PORT), "Port of server. (by default Drizzle)")
  186. ("libdrizzle-uds", boost::program_options::value(&uds), "Unix domain socket for server.")
  187. ("libdrizzle-user", boost::program_options::value(&user)->default_value("root"), "User name for authentication.")
  188. ("libdrizzle-password", boost::program_options::value(&password), "Password for authentication.")
  189. ("libdrizzle-db", boost::program_options::value(&schema)->default_value(GEARMAND_QUEUE_LIBDRIZZLE_DEFAULT_DATABASE), "Database to use.")
  190. ("libdrizzle-table", boost::program_options::value(&table)->default_value(GEARMAND_QUEUE_LIBDRIZZLE_DEFAULT_TABLE), "Table to use.")
  191. ("libdrizzle-mysql", boost::program_options::bool_switch(&mysql_protocol)->default_value(false), "Use MySQL protocol.")
  192. ;
  193. drizzle_set_timeout(drizzle, -1);
  194. assert(drizzle_timeout(drizzle) == -1);
  195. }
  196. Drizzle::~Drizzle()
  197. {
  198. drizzle_quit(insert_con);
  199. drizzle_quit(drizzle);
  200. }
  201. gearmand_error_t Drizzle::initialize()
  202. {
  203. return gearman_server_queue_libdrizzle_init(this, &Gearmand()->server);
  204. }
  205. void initialize_drizzle()
  206. {
  207. static Drizzle local_instance;
  208. }
  209. } // namespace queue
  210. } // namespace plugins
  211. } // namespace gearmand
  212. /*
  213. * thin wrapper around drizzle_query to handle the server going away
  214. * this happens usually because of a low wait_timeout value
  215. * we attempt to connect back only once
  216. */
  217. static drizzle_result_st *_libdrizzle_query_with_retry(drizzle_st* con,
  218. const char *query, size_t query_size,
  219. drizzle_return_t& ret_ptr)
  220. {
  221. drizzle_result_st *query_result= NULL;
  222. ret_ptr= DRIZZLE_RETURN_LOST_CONNECTION;
  223. for (int retry= 0; (ret_ptr == DRIZZLE_RETURN_LOST_CONNECTION) && (retry < 2); ++retry)
  224. {
  225. query_result= drizzle_query(con, query, query_size, &ret_ptr);
  226. }
  227. if (libdrizzle_failed(ret_ptr))
  228. {
  229. if (ret_ptr == DRIZZLE_RETURN_COULD_NOT_CONNECT)
  230. {
  231. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
  232. "Failed to connect to database instance. host: %s:%d user: %s schema: %s (%s)",
  233. drizzle_host(con),
  234. int(drizzle_port(con)),
  235. drizzle_user(con),
  236. drizzle_db(con),
  237. drizzle_error(con));
  238. }
  239. else
  240. {
  241. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
  242. "libdrizled error '%s' executing '%.*s'",
  243. drizzle_error(con),
  244. query_size, query);
  245. }
  246. }
  247. return query_result;
  248. }
  249. /**
  250. * Query handling function.
  251. */
  252. static drizzle_return_t _libdrizzle_query(plugins::queue::Drizzle *queue,
  253. const char *query, size_t query_size)
  254. {
  255. drizzle_return_t ret;
  256. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libdrizzle query: %.*s", (uint32_t)query_size, query);
  257. drizzle_result_st *result= _libdrizzle_query_with_retry(queue->drizzle, query, query_size, ret);
  258. if (libdrizzle_failed(ret))
  259. {
  260. return ret;
  261. }
  262. (void)result;
  263. return DRIZZLE_RETURN_OK;
  264. }
  265. static drizzle_return_t _libdrizzle_insert(plugins::queue::Drizzle *queue,
  266. const vchar_t& query)
  267. {
  268. drizzle_return_t ret;
  269. drizzle_result_st *result= _libdrizzle_query_with_retry(queue->insert_con, &query[0], query.size() -1, ret);
  270. if (libdrizzle_failed(ret))
  271. {
  272. return ret;
  273. }
  274. ret= drizzle_result_buffer(result);
  275. drizzle_result_free(result);
  276. return ret;
  277. }
  278. /* Queue callback functions. */
  279. static gearmand_error_t _libdrizzle_add(gearman_server_st *server,
  280. void *context,
  281. const char *unique, size_t unique_size,
  282. const char *function_name, size_t function_name_size,
  283. const void *data, size_t data_size,
  284. gearman_job_priority_t priority,
  285. int64_t when);
  286. static gearmand_error_t _libdrizzle_flush(gearman_server_st *gearman,
  287. void *context);
  288. static gearmand_error_t _libdrizzle_done(gearman_server_st *gearman,
  289. void *context, const char *unique, size_t unique_size,
  290. const char *function_name, size_t function_name_size);
  291. static gearmand_error_t _libdrizzle_replay(gearman_server_st *gearman,
  292. void *context,
  293. gearman_queue_add_fn *add_fn,
  294. void *add_context);
  295. /** @} */
  296. /*
  297. * Public definitions
  298. */
  299. #pragma GCC diagnostic push
  300. #pragma GCC diagnostic ignored "-Wold-style-cast"
  301. gearmand_error_t gearman_server_queue_libdrizzle_init(plugins::queue::Drizzle *queue, gearman_server_st *server)
  302. {
  303. gearmand_info("Initializing libdrizzle module");
  304. #if 0
  305. if (queue->mysql_protocol)
  306. {
  307. drizzle_set_options(queue->drizzle, DRIZZLE_CON_MYSQL);
  308. drizzle_set_options(queue->insert_con, DRIZZLE_CON_MYSQL);
  309. }
  310. #endif
  311. if (queue->uds.empty())
  312. {
  313. queue->drizzle= drizzle_create(queue->host.c_str(), queue->port,
  314. queue->user.c_str(), queue->password.c_str(),
  315. "INFORMATION_SCHEMA",
  316. 0);
  317. queue->insert_con= drizzle_create(queue->host.c_str(), queue->port,
  318. queue->user.c_str(), queue->password.c_str(),
  319. "INFORMATION_SCHEMA",
  320. 0);
  321. }
  322. else
  323. {
  324. queue->drizzle= drizzle_create(queue->uds.c_str(), 0,
  325. queue->user.c_str(), queue->password.c_str(),
  326. "INFORMATION_SCHEMA",
  327. 0);
  328. queue->insert_con= drizzle_create(queue->uds.c_str(), 0,
  329. queue->user.c_str(), queue->password.c_str(),
  330. "INFORMATION_SCHEMA",
  331. 0);
  332. }
  333. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Using '%s' as the username", queue->user.c_str());
  334. std::string query;
  335. {
  336. query.clear();
  337. query+= "CREATE SCHEMA IF NOT EXISTS " +queue->schema;
  338. if (libdrizzle_failed(_libdrizzle_query(queue, query.c_str(), query.size())))
  339. {
  340. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAND_QUEUE_ERROR);
  341. }
  342. if (libdrizzle_failed(drizzle_column_skip_all(queue->result())))
  343. {
  344. drizzle_result_free(queue->result());
  345. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAND_QUEUE_ERROR);
  346. }
  347. drizzle_result_free(queue->result());
  348. }
  349. // Look for schema
  350. {
  351. query.clear();
  352. query+= "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = \"";
  353. query+= queue->schema;
  354. query+= "\"";
  355. if (libdrizzle_failed(_libdrizzle_query(queue, query.c_str(), query.size())))
  356. {
  357. return gearmand_gerror("Error occurred while searching for gearman queue schema", GEARMAND_QUEUE_ERROR);
  358. }
  359. if (libdrizzle_failed(drizzle_result_buffer(queue->result())))
  360. {
  361. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAND_QUEUE_ERROR);
  362. }
  363. if (drizzle_result_row_count(queue->result()) == 0)
  364. {
  365. return gearmand_gerror("Error occurred while search for gearman queue schema", GEARMAND_QUEUE_ERROR);
  366. }
  367. drizzle_result_free(queue->result());
  368. }
  369. drizzle_return_t ret;
  370. ret= drizzle_select_db(queue->drizzle, queue->schema.c_str());
  371. if (libdrizzle_failed(ret))
  372. {
  373. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAND_QUEUE_ERROR);
  374. }
  375. ret= drizzle_select_db(queue->insert_con, queue->schema.c_str());
  376. if (libdrizzle_failed(ret))
  377. {
  378. return gearmand_gerror(drizzle_error(queue->insert_con), GEARMAND_QUEUE_ERROR);
  379. }
  380. // We need to check and see if the tables exists, and if not create it
  381. query.clear();
  382. query+= "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = \"" +queue->table +"\"";
  383. if (libdrizzle_failed(_libdrizzle_query(queue, query.c_str(), query.size())))
  384. {
  385. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAND_QUEUE_ERROR);
  386. }
  387. if (libdrizzle_failed(drizzle_result_buffer(queue->result())))
  388. {
  389. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAND_QUEUE_ERROR);
  390. }
  391. bool create_table= drizzle_result_row_count(queue->result());
  392. drizzle_result_free(queue->result());
  393. if (create_table == false)
  394. {
  395. gearmand_log_info("libdrizzle module creating table '%s.%s'",
  396. drizzle_db(queue->drizzle), queue->table.c_str());
  397. query.clear();
  398. query+= "CREATE TABLE " +queue->schema + "." +queue->table + "( unique_key VARCHAR(" + TOSTRING(GEARMAN_UNIQUE_SIZE) + "),";
  399. query+= "function_name VARCHAR(255), priority INT, data LONGBLOB, when_to_run BIGINT, unique key (unique_key, function_name))";
  400. if (libdrizzle_failed(_libdrizzle_query(queue, query.c_str(), query.size())))
  401. {
  402. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAND_QUEUE_ERROR);
  403. }
  404. if (libdrizzle_failed(drizzle_column_skip_all(queue->result())))
  405. {
  406. drizzle_result_free(queue->result());
  407. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAND_QUEUE_ERROR);
  408. }
  409. drizzle_result_free(queue->result());
  410. }
  411. else
  412. {
  413. gearmand_log_info("libdrizzle module using table '%s.%s'",
  414. drizzle_db(queue->drizzle), queue->table.c_str());
  415. query.clear();
  416. query+= "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = \"" +queue->schema +"\" ";
  417. query+= "AND TABLE_NAME = \"" +queue->table +"\" AND COLUMN_NAME = \"when_to_run\"";
  418. if (libdrizzle_failed(_libdrizzle_query(queue, query.c_str(), query.size())))
  419. {
  420. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAND_QUEUE_ERROR);
  421. }
  422. if (libdrizzle_failed(drizzle_result_buffer(queue->result())))
  423. {
  424. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAND_QUEUE_ERROR);
  425. }
  426. if (drizzle_result_row_count(queue->result()) == 0)
  427. {
  428. gearmand_info("Current schema does not have when_to_run column");
  429. queue->set_epoch_support(false);
  430. }
  431. drizzle_result_free(queue->result());
  432. }
  433. gearman_server_set_queue(*server, queue, _libdrizzle_add, _libdrizzle_flush, _libdrizzle_done, _libdrizzle_replay);
  434. return GEARMAND_SUCCESS;
  435. }
  436. /*
  437. * Static definitions
  438. */
  439. static gearmand_error_t _libdrizzle_add(gearman_server_st *,
  440. void *context,
  441. const char *unique, size_t unique_size,
  442. const char *function_name, size_t function_name_size,
  443. const void *data, size_t data_size,
  444. gearman_job_priority_t priority,
  445. int64_t when)
  446. {
  447. plugins::queue::Drizzle *queue= (plugins::queue::Drizzle *)context;
  448. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libdrizzle add: %.*s, %.*s(%u bytes)",
  449. uint32_t(function_name_size), function_name,
  450. uint32_t(unique_size), (char *)unique, uint32_t(data_size));
  451. if (when and queue->epoch_support() == false)
  452. {
  453. return gearmand_gerror("Table lacks when_to_run field", GEARMAND_QUEUE_ERROR);
  454. }
  455. vchar_t escaped_unique_name;
  456. vchar_t escaped_function_name;
  457. (void)escape_string(escaped_unique_name, (const char *)unique, unique_size);
  458. (void)escape_string(escaped_function_name, (const char *)function_name, function_name_size);
  459. // @todo test for overallocation failure
  460. size_t query_size;
  461. char query_buffer[GEARMAND_QUEUE_QUERY_BUFFER +(GEARMAN_MAX_UNIQUE_SIZE*2) +(GEARMAN_FUNCTION_MAX_SIZE*2)];
  462. if (queue->epoch_support())
  463. {
  464. query_size= (size_t)snprintf(query_buffer, sizeof(query_buffer),
  465. "INSERT INTO %.*s.%.*s SET priority=%u,when_to_run=%lld,unique_key='%.*s',function_name='%.*s',data='",
  466. int(queue->schema.size()), queue->schema.c_str(),
  467. int(queue->table.size()), queue->table.c_str(),
  468. uint32_t(priority),
  469. (long long unsigned int)when,
  470. int(escaped_unique_name.size()), &escaped_unique_name[0],
  471. int(escaped_function_name.size()), &escaped_function_name[0]
  472. );
  473. }
  474. else
  475. {
  476. query_size= (size_t)snprintf(query_buffer, sizeof(query_buffer),
  477. "INSERT INTO %.*s.%.*s SET priority=%u,unique_key='%.*s',function_name='%.*s',data='",
  478. int(queue->schema.size()), queue->schema.c_str(),
  479. int(queue->table.size()), queue->table.c_str(),
  480. uint32_t(priority),
  481. int(escaped_unique_name.size()), &escaped_unique_name[0],
  482. int(escaped_function_name.size()), &escaped_function_name[0]
  483. );
  484. }
  485. vchar_t query;
  486. query.resize(query_size);
  487. memcpy(&query[0], query_buffer, query_size);
  488. query_size+= size_t(escape_string(query, (const char *)data, data_size));
  489. query.push_back('\'');
  490. query.resize(query.size() +1);
  491. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "%.*s", query.size() -1, &query[0]);
  492. if (libdrizzle_failed(_libdrizzle_insert(queue, query)))
  493. {
  494. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAND_QUEUE_ERROR);
  495. }
  496. return GEARMAND_SUCCESS;
  497. }
  498. static gearmand_error_t _libdrizzle_flush(gearman_server_st *, void *)
  499. {
  500. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libdrizzle flush");
  501. return GEARMAND_SUCCESS;
  502. }
  503. static gearmand_error_t _libdrizzle_done(gearman_server_st *,
  504. void *context,
  505. const char *unique, size_t unique_size,
  506. const char *function_name, size_t function_name_size)
  507. {
  508. plugins::queue::Drizzle *queue= (plugins::queue::Drizzle *)context;
  509. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  510. "libdrizzle done: %.*s(%.*s)",
  511. uint32_t(function_name_size), (char *)function_name,
  512. uint32_t(unique_size), (char *)unique);
  513. vchar_t escaped_unique_name;
  514. vchar_t escaped_function_name;
  515. (void)escape_string(escaped_unique_name, (const char *)unique, unique_size);
  516. (void)escape_string(escaped_function_name, (const char *)function_name, function_name_size);
  517. vchar_t query;
  518. query.resize(escaped_unique_name.size()
  519. +escaped_function_name.size()
  520. +queue->schema.size()
  521. +queue->table.size()
  522. +GEARMAND_QUEUE_QUERY_BUFFER);
  523. int query_size= snprintf(&query[0], query.size(),
  524. "DELETE FROM %.*s.%.*s WHERE unique_key='%.*s' and function_name= '%.*s'",
  525. int(queue->schema.size()), queue->schema.c_str(),
  526. int(queue->table.size()), queue->table.c_str(),
  527. int(escaped_unique_name.size()), &escaped_unique_name[0],
  528. int(escaped_function_name.size()), &escaped_function_name[0]
  529. );
  530. if (query_size < 0 or size_t(query_size) > query.size())
  531. {
  532. return gearmand_gerror("snprintf(DELETE)", GEARMAND_MEMORY_ALLOCATION_FAILURE);
  533. }
  534. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "%.*", query_size, &query[0]);
  535. if (libdrizzle_failed(_libdrizzle_query(queue, &query[0], query_size)))
  536. {
  537. return gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "Error occurred from %.*s", query_size, &query[0]);
  538. }
  539. drizzle_result_free(queue->result());
  540. return GEARMAND_SUCCESS;
  541. }
  542. #if 0
  543. static gearmand_error_t _dump_queue(plugins::queue::Drizzle *queue)
  544. {
  545. char query[DRIZZLE_MAX_TABLE_SIZE + GEARMAND_QUEUE_QUERY_BUFFER];
  546. size_t query_size;
  547. query_size= (size_t)snprintf(query, sizeof(query),
  548. "SELECT unique_key,function_name FROM %s",
  549. queue->table.c_str());
  550. if (_libdrizzle_query(NULL, queue, query, query_size) != DRIZZLE_RETURN_OK)
  551. {
  552. gearmand_log_error("drizzle_column_skip_all:%s", drizzle_error(queue->drizzle));
  553. return GEARMAND_QUEUE_ERROR;
  554. }
  555. if (drizzle_column_skip_all(queue->result()) != DRIZZLE_RETURN_OK)
  556. {
  557. drizzle_result_free(queue->result());
  558. gearmand_log_error("drizzle_column_skip_all:%s", drizzle_error(queue->drizzle));
  559. return GEARMAND_QUEUE_ERROR;
  560. }
  561. gearmand_debug("Shutting down with the following items left to be processed.");
  562. while (1)
  563. {
  564. drizzle_return_t ret;
  565. drizzle_row_t row= drizzle_row_buffer(queue->result(), &ret);
  566. if (ret != DRIZZLE_RETURN_OK)
  567. {
  568. drizzle_result_free(queue->result());
  569. gearmand_log_error("drizzle_row_buffer:%s", drizzle_error(queue->drizzle));
  570. return GEARMAND_QUEUE_ERROR;
  571. }
  572. if (row == NULL)
  573. break;
  574. size_t *field_sizes;
  575. field_sizes= drizzle_row_field_sizes(queue->result());
  576. gearmand_log_debug("\t unique: %.*s function: %.*s",
  577. (uint32_t)field_sizes[0], row[0],
  578. (uint32_t)field_sizes[1], row[1]
  579. );
  580. drizzle_row_free(queue->result(), row);
  581. }
  582. drizzle_result_free(queue->result());
  583. return GEARMAND_SUCCESS;
  584. }
  585. #endif
  586. static gearmand_error_t _libdrizzle_replay(gearman_server_st *server,
  587. void *context,
  588. gearman_queue_add_fn *add_fn,
  589. void *add_context)
  590. {
  591. plugins::queue::Drizzle *queue= (plugins::queue::Drizzle *)context;
  592. size_t *field_sizes;
  593. gearmand_error_t gret;
  594. gearmand_info("libdrizzle replay start");
  595. std::string query;
  596. if (queue->epoch_support())
  597. {
  598. query+= "SELECT unique_key,function_name,priority,data,when_to_run FROM ";
  599. query+= queue->schema;
  600. query+= ".";
  601. query+= queue->table;
  602. }
  603. else
  604. {
  605. query+= "SELECT unique_key,function_name,priority,data FROM ";
  606. query+= queue->schema;
  607. query+= ".";
  608. query+= queue->table;
  609. }
  610. if (libdrizzle_failed(_libdrizzle_query(queue, query.c_str(), query.size())))
  611. {
  612. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAND_QUEUE_ERROR);
  613. }
  614. if (libdrizzle_failed(drizzle_column_buffer(queue->result())))
  615. {
  616. drizzle_result_free(queue->result());
  617. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAND_QUEUE_ERROR);
  618. }
  619. while (1)
  620. {
  621. drizzle_return_t ret;
  622. drizzle_row_t row= drizzle_row_buffer(queue->result(), &ret);
  623. if (libdrizzle_failed(ret))
  624. {
  625. drizzle_result_free(queue->result());
  626. return gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "drizzle_row_buffer:%s", drizzle_error(queue->drizzle));
  627. }
  628. if (row == NULL)
  629. {
  630. break;
  631. }
  632. field_sizes= drizzle_row_field_sizes(queue->result());
  633. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libdrizzle replay: %.*s", (uint32_t)field_sizes[0], row[0]);
  634. size_t data_size= field_sizes[3];
  635. /* need to make a copy here ... gearman_server_job_free will free it later */
  636. char *data= (char *)malloc(data_size);
  637. if (data == NULL)
  638. {
  639. gearmand_perror(errno, "Failed to allocate data while replaying the queue");
  640. drizzle_row_free(queue->result(), row);
  641. drizzle_result_free(queue->result());
  642. return GEARMAND_MEMORY_ALLOCATION_FAILURE;
  643. }
  644. memcpy(data, row[3], data_size);
  645. int64_t when;
  646. if (not queue->epoch_support())
  647. {
  648. when= 0;
  649. }
  650. else
  651. {
  652. when= atoi(row[4]);
  653. }
  654. gret= (*add_fn)(server, add_context, row[0], field_sizes[0],
  655. row[1], field_sizes[1],
  656. data, data_size, (gearman_job_priority_t)atoi(row[2]), when);
  657. if (gret != GEARMAND_SUCCESS)
  658. {
  659. drizzle_row_free(queue->result(), row);
  660. drizzle_result_free(queue->result());
  661. return gret;
  662. }
  663. drizzle_row_free(queue->result(), row);
  664. }
  665. drizzle_result_free(queue->result());
  666. return GEARMAND_SUCCESS;
  667. }
  668. #pragma GCC diagnostic pop
  669. #else // if defined(HAVE_LIBDRIZZLE) && HAVE_LIBDRIZZLE
  670. namespace gearmand {
  671. namespace plugins {
  672. namespace queue {
  673. void initialize_drizzle()
  674. {
  675. }
  676. } // namespace queue
  677. } // namespace plugins
  678. } // namespace gearmand
  679. #endif // if defined(HAVE_LIBDRIZZLE) && HAVE_LIBDRIZZLE