queue.cc 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief libdrizzle Queue Storage Definitions
  41. */
  42. #include <gear_config.h>
  43. #include <libgearman-server/common.h>
  44. #include <libgearman-server/byte.h>
  45. #include <libgearman-1.0/limits.h>
  46. #include <pwd.h>
  47. #include <sys/types.h>
  48. #include <unistd.h>
  49. #include <cerrno>
  50. #include <libgearman-server/plugins/queue/drizzle/queue.h>
  51. #include <libgearman-server/plugins/queue/base.h>
  52. #if defined(HAVE_LIBDRIZZLE) && HAVE_LIBDRIZZLE
  53. # include <libdrizzle-5.1/drizzle_client.h>
  54. using namespace gearmand_internal;
  55. using namespace gearmand;
  56. namespace gearmand { namespace plugins { namespace queue { class Drizzle; }}}
  57. static gearmand_error_t gearman_server_queue_libdrizzle_init(plugins::queue::Drizzle *queue, gearman_server_st *server);
  58. /**
  59. * @addtogroup plugins::queue::Drizzleatic Static libdrizzle Queue Storage Definitions
  60. * @ingroup gearman_queue_libdrizzle
  61. * @{
  62. */
  63. /**
  64. * Default values.
  65. */
  66. #define GEARMAN_QUEUE_LIBDRIZZLE_DEFAULT_DATABASE "gearman"
  67. #define GEARMAN_QUEUE_LIBDRIZZLE_DEFAULT_TABLE "queue"
  68. #define GEARMAN_QUEUE_QUERY_BUFFER 256
  69. static bool libdrizzle_failed(drizzle_return_t arg)
  70. {
  71. return arg != DRIZZLE_RETURN_OK;
  72. }
  73. typedef std::vector<char> vchar_t;
  74. ssize_t escape_string(vchar_t &destination, const char *from, size_t from_size)
  75. {
  76. if (from_size == 0)
  77. {
  78. return 0;
  79. }
  80. destination.reserve(from_size * 2);
  81. char newchar;
  82. const char *end;
  83. for (end= from + from_size; from < end; from++)
  84. {
  85. newchar= 0;
  86. /* All multi-byte UTF8 characters have the high bit set for all bytes. */
  87. if (!(*from & 0x80))
  88. {
  89. switch (*from)
  90. {
  91. case 0:
  92. newchar= '0';
  93. break;
  94. case '\n':
  95. newchar= 'n';
  96. break;
  97. case '\r':
  98. newchar= 'r';
  99. break;
  100. case '\032':
  101. newchar= 'Z';
  102. break;
  103. case '\\':
  104. newchar= '\\';
  105. break;
  106. case '\'':
  107. newchar= '\'';
  108. break;
  109. case '"':
  110. newchar= '"';
  111. break;
  112. default:
  113. break;
  114. }
  115. }
  116. if (newchar != '\0')
  117. {
  118. destination.push_back('\\');
  119. destination.push_back(newchar);
  120. }
  121. else
  122. {
  123. destination.push_back(*from);
  124. }
  125. }
  126. return destination.size();
  127. }
  128. namespace gearmand {
  129. namespace plugins {
  130. namespace queue {
  131. class Drizzle : public gearmand::plugins::Queue {
  132. public:
  133. Drizzle();
  134. ~Drizzle();
  135. gearmand_error_t initialize();
  136. drizzle_result_st *result()
  137. {
  138. return _result;
  139. }
  140. void resize_query(size_t arg)
  141. {
  142. _query.resize(arg);
  143. }
  144. char *query_ptr()
  145. {
  146. return &_query[0];
  147. }
  148. void set_epoch_support(bool arg)
  149. {
  150. _epoch_support= arg;
  151. }
  152. bool epoch_support()
  153. {
  154. return _epoch_support;
  155. }
  156. drizzle_st *drizzle;
  157. drizzle_st *insert_con;
  158. drizzle_result_st* _result;
  159. std::vector<char> _query;
  160. std::string host;
  161. std::string username;
  162. std::string password;
  163. std::string uds;
  164. std::string user;
  165. std::string schema;
  166. std::string table;
  167. bool mysql_protocol;
  168. in_port_t port;
  169. private:
  170. bool _epoch_support;
  171. };
  172. Drizzle::Drizzle () :
  173. Queue("libdrizzle"),
  174. drizzle(NULL),
  175. insert_con(NULL),
  176. _result(NULL),
  177. _query(),
  178. username(""),
  179. mysql_protocol(false),
  180. port(DRIZZLE_DEFAULT_TCP_PORT),
  181. _epoch_support(true)
  182. {
  183. command_line_options().add_options()
  184. ("libdrizzle-host", boost::program_options::value(&host)->default_value(DRIZZLE_DEFAULT_TCP_HOST), "Host of server.")
  185. ("libdrizzle-port", boost::program_options::value(&port)->default_value(DRIZZLE_DEFAULT_TCP_PORT), "Port of server. (by default Drizzle)")
  186. ("libdrizzle-uds", boost::program_options::value(&uds), "Unix domain socket for server.")
  187. ("libdrizzle-user", boost::program_options::value(&user)->default_value("root"), "User name for authentication.")
  188. ("libdrizzle-password", boost::program_options::value(&password), "Password for authentication.")
  189. ("libdrizzle-db", boost::program_options::value(&schema)->default_value(GEARMAN_QUEUE_LIBDRIZZLE_DEFAULT_DATABASE), "Database to use.")
  190. ("libdrizzle-table", boost::program_options::value(&table)->default_value(GEARMAN_QUEUE_LIBDRIZZLE_DEFAULT_TABLE), "Table to use.")
  191. ("libdrizzle-mysql", boost::program_options::bool_switch(&mysql_protocol)->default_value(false), "Use MySQL protocol.")
  192. ;
  193. drizzle_set_timeout(drizzle, -1);
  194. assert(drizzle_timeout(drizzle) == -1);
  195. }
  196. Drizzle::~Drizzle()
  197. {
  198. drizzle_quit(insert_con);
  199. drizzle_quit(drizzle);
  200. }
  201. gearmand_error_t Drizzle::initialize()
  202. {
  203. return gearman_server_queue_libdrizzle_init(this, &Gearmand()->server);
  204. }
  205. void initialize_drizzle()
  206. {
  207. static Drizzle local_instance;
  208. }
  209. } // namespace queue
  210. } // namespace plugins
  211. } // namespace gearmand
  212. /*
  213. * thin wrapper around drizzle_query to handle the server going away
  214. * this happens usually because of a low wait_timeout value
  215. * we attempt to connect back only once
  216. */
  217. static drizzle_result_st *_libdrizzle_query_with_retry(drizzle_st* con,
  218. const char *query, size_t query_size,
  219. drizzle_return_t& ret_ptr)
  220. {
  221. drizzle_result_st *query_result= NULL;
  222. ret_ptr= DRIZZLE_RETURN_LOST_CONNECTION;
  223. for (int retry= 0; (ret_ptr == DRIZZLE_RETURN_LOST_CONNECTION) && (retry < 2); ++retry)
  224. {
  225. query_result= drizzle_query(con, query, query_size, &ret_ptr);
  226. }
  227. if (libdrizzle_failed(ret_ptr))
  228. {
  229. if (ret_ptr == DRIZZLE_RETURN_COULD_NOT_CONNECT)
  230. {
  231. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
  232. "Failed to connect to database instance. host: %s:%d user: %s schema: %s (%s)",
  233. drizzle_host(con),
  234. int(drizzle_port(con)),
  235. drizzle_user(con),
  236. drizzle_db(con),
  237. drizzle_error(con));
  238. }
  239. else
  240. {
  241. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
  242. "libdrizled error '%s' executing '%.*s'",
  243. drizzle_error(con),
  244. query_size, query);
  245. }
  246. }
  247. return query_result;
  248. }
  249. /**
  250. * Query handling function.
  251. */
  252. static drizzle_return_t _libdrizzle_query(plugins::queue::Drizzle *queue,
  253. const char *query, size_t query_size)
  254. {
  255. drizzle_return_t ret;
  256. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libdrizzle query: %.*s", (uint32_t)query_size, query);
  257. drizzle_result_st *result= _libdrizzle_query_with_retry(queue->drizzle, query, query_size, ret);
  258. if (libdrizzle_failed(ret))
  259. {
  260. return ret;
  261. }
  262. (void)result;
  263. return DRIZZLE_RETURN_OK;
  264. }
  265. static drizzle_return_t _libdrizzle_insert(plugins::queue::Drizzle *queue,
  266. const vchar_t& query)
  267. {
  268. drizzle_return_t ret;
  269. drizzle_result_st *result= _libdrizzle_query_with_retry(queue->insert_con, &query[0], query.size() -1, ret);
  270. if (libdrizzle_failed(ret))
  271. {
  272. return ret;
  273. }
  274. ret= drizzle_result_buffer(result);
  275. drizzle_result_free(result);
  276. return ret;
  277. }
  278. /* Queue callback functions. */
  279. static gearmand_error_t _libdrizzle_add(gearman_server_st *server,
  280. void *context,
  281. const char *unique, size_t unique_size,
  282. const char *function_name, size_t function_name_size,
  283. const void *data, size_t data_size,
  284. gearman_job_priority_t priority,
  285. int64_t when);
  286. static gearmand_error_t _libdrizzle_flush(gearman_server_st *gearman,
  287. void *context);
  288. static gearmand_error_t _libdrizzle_done(gearman_server_st *gearman,
  289. void *context, const char *unique, size_t unique_size,
  290. const char *function_name, size_t function_name_size);
  291. static gearmand_error_t _libdrizzle_replay(gearman_server_st *gearman,
  292. void *context,
  293. gearman_queue_add_fn *add_fn,
  294. void *add_context);
  295. /** @} */
  296. /*
  297. * Public definitions
  298. */
  299. #pragma GCC diagnostic ignored "-Wold-style-cast"
  300. gearmand_error_t gearman_server_queue_libdrizzle_init(plugins::queue::Drizzle *queue, gearman_server_st *server)
  301. {
  302. gearmand_info("Initializing libdrizzle module");
  303. #if 0
  304. if (queue->mysql_protocol)
  305. {
  306. drizzle_set_options(queue->drizzle, DRIZZLE_CON_MYSQL);
  307. drizzle_set_options(queue->insert_con, DRIZZLE_CON_MYSQL);
  308. }
  309. #endif
  310. if (queue->uds.empty())
  311. {
  312. queue->drizzle= drizzle_create(queue->host.c_str(), queue->port,
  313. queue->user.c_str(), queue->password.c_str(),
  314. "INFORMATION_SCHEMA",
  315. 0);
  316. queue->insert_con= drizzle_create(queue->host.c_str(), queue->port,
  317. queue->user.c_str(), queue->password.c_str(),
  318. "INFORMATION_SCHEMA",
  319. 0);
  320. }
  321. else
  322. {
  323. queue->drizzle= drizzle_create(queue->uds.c_str(), 0,
  324. queue->user.c_str(), queue->password.c_str(),
  325. "INFORMATION_SCHEMA",
  326. 0);
  327. queue->insert_con= drizzle_create(queue->uds.c_str(), 0,
  328. queue->user.c_str(), queue->password.c_str(),
  329. "INFORMATION_SCHEMA",
  330. 0);
  331. }
  332. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Using '%s' as the username", queue->user.c_str());
  333. std::string query;
  334. {
  335. query.clear();
  336. query+= "CREATE SCHEMA IF NOT EXISTS " +queue->schema;
  337. if (libdrizzle_failed(_libdrizzle_query(queue, query.c_str(), query.size())))
  338. {
  339. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAN_QUEUE_ERROR);
  340. }
  341. if (libdrizzle_failed(drizzle_column_skip_all(queue->result())))
  342. {
  343. drizzle_result_free(queue->result());
  344. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAN_QUEUE_ERROR);
  345. }
  346. drizzle_result_free(queue->result());
  347. }
  348. // Look for schema
  349. {
  350. query.clear();
  351. query+= "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = \"";
  352. query+= queue->schema;
  353. query+= "\"";
  354. if (libdrizzle_failed(_libdrizzle_query(queue, query.c_str(), query.size())))
  355. {
  356. return gearmand_gerror("Error occurred while searching for gearman queue schema", GEARMAN_QUEUE_ERROR);
  357. }
  358. if (libdrizzle_failed(drizzle_result_buffer(queue->result())))
  359. {
  360. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAN_QUEUE_ERROR);
  361. }
  362. if (drizzle_result_row_count(queue->result()) == 0)
  363. {
  364. return gearmand_gerror("Error occurred while search for gearman queue schema", GEARMAN_QUEUE_ERROR);
  365. }
  366. drizzle_result_free(queue->result());
  367. }
  368. drizzle_return_t ret;
  369. ret= drizzle_select_db(queue->drizzle, queue->schema.c_str());
  370. if (libdrizzle_failed(ret))
  371. {
  372. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAN_QUEUE_ERROR);
  373. }
  374. ret= drizzle_select_db(queue->insert_con, queue->schema.c_str());
  375. if (libdrizzle_failed(ret))
  376. {
  377. return gearmand_gerror(drizzle_error(queue->insert_con), GEARMAN_QUEUE_ERROR);
  378. }
  379. // We need to check and see if the tables exists, and if not create it
  380. query.clear();
  381. query+= "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = \"" +queue->table +"\"";
  382. if (libdrizzle_failed(_libdrizzle_query(queue, query.c_str(), query.size())))
  383. {
  384. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAN_QUEUE_ERROR);
  385. }
  386. if (libdrizzle_failed(drizzle_result_buffer(queue->result())))
  387. {
  388. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAN_QUEUE_ERROR);
  389. }
  390. bool create_table= drizzle_result_row_count(queue->result());
  391. drizzle_result_free(queue->result());
  392. if (create_table == false)
  393. {
  394. gearmand_log_info("libdrizzle module creating table '%s.%s'",
  395. drizzle_db(queue->drizzle), queue->table.c_str());
  396. query.clear();
  397. query+= "CREATE TABLE " +queue->schema + "." +queue->table + "( unique_key VARCHAR(" + TOSTRING(GEARMAN_UNIQUE_SIZE) + "),";
  398. query+= "function_name VARCHAR(255), priority INT, data LONGBLOB, when_to_run BIGINT, unique key (unique_key, function_name))";
  399. if (libdrizzle_failed(_libdrizzle_query(queue, query.c_str(), query.size())))
  400. {
  401. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAN_QUEUE_ERROR);
  402. }
  403. if (libdrizzle_failed(drizzle_column_skip_all(queue->result())))
  404. {
  405. drizzle_result_free(queue->result());
  406. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAN_QUEUE_ERROR);
  407. }
  408. drizzle_result_free(queue->result());
  409. }
  410. else
  411. {
  412. gearmand_log_info("libdrizzle module using table '%s.%s'",
  413. drizzle_db(queue->drizzle), queue->table.c_str());
  414. query.clear();
  415. query+= "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = \"" +queue->schema +"\" ";
  416. query+= "AND TABLE_NAME = \"" +queue->table +"\" AND COLUMN_NAME = \"when_to_run\"";
  417. if (libdrizzle_failed(_libdrizzle_query(queue, query.c_str(), query.size())))
  418. {
  419. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAN_QUEUE_ERROR);
  420. }
  421. if (libdrizzle_failed(drizzle_result_buffer(queue->result())))
  422. {
  423. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAN_QUEUE_ERROR);
  424. }
  425. if (drizzle_result_row_count(queue->result()) == 0)
  426. {
  427. gearmand_info("Current schema does not have when_to_run column");
  428. queue->set_epoch_support(false);
  429. }
  430. drizzle_result_free(queue->result());
  431. }
  432. gearman_server_set_queue(*server, queue, _libdrizzle_add, _libdrizzle_flush, _libdrizzle_done, _libdrizzle_replay);
  433. return GEARMAN_SUCCESS;
  434. }
  435. /*
  436. * Static definitions
  437. */
  438. static gearmand_error_t _libdrizzle_add(gearman_server_st *,
  439. void *context,
  440. const char *unique, size_t unique_size,
  441. const char *function_name, size_t function_name_size,
  442. const void *data, size_t data_size,
  443. gearman_job_priority_t priority,
  444. int64_t when)
  445. {
  446. plugins::queue::Drizzle *queue= (plugins::queue::Drizzle *)context;
  447. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libdrizzle add: %.*s, %.*s(%u bytes)",
  448. uint32_t(function_name_size), function_name,
  449. uint32_t(unique_size), (char *)unique, uint32_t(data_size));
  450. if (when and queue->epoch_support() == false)
  451. {
  452. return gearmand_gerror("Table lacks when_to_run field", GEARMAN_QUEUE_ERROR);
  453. }
  454. vchar_t escaped_unique_name;
  455. vchar_t escaped_function_name;
  456. (void)escape_string(escaped_unique_name, (const char *)unique, unique_size);
  457. (void)escape_string(escaped_function_name, (const char *)function_name, function_name_size);
  458. // @todo test for overallocation failure
  459. size_t query_size;
  460. char query_buffer[GEARMAN_QUEUE_QUERY_BUFFER +(GEARMAN_MAX_UNIQUE_SIZE*2) +(GEARMAN_FUNCTION_MAX_SIZE*2)];
  461. if (queue->epoch_support())
  462. {
  463. query_size= (size_t)snprintf(query_buffer, sizeof(query_buffer),
  464. "INSERT INTO %.*s.%.*s SET priority=%u,when_to_run=%lld,unique_key='%.*s',function_name='%.*s',data='",
  465. int(queue->schema.size()), queue->schema.c_str(),
  466. int(queue->table.size()), queue->table.c_str(),
  467. uint32_t(priority),
  468. (long long unsigned int)when,
  469. int(escaped_unique_name.size()), &escaped_unique_name[0],
  470. int(escaped_function_name.size()), &escaped_function_name[0]
  471. );
  472. }
  473. else
  474. {
  475. query_size= (size_t)snprintf(query_buffer, sizeof(query_buffer),
  476. "INSERT INTO %.*s.%.*s SET priority=%u,unique_key='%.*s',function_name='%.*s',data='",
  477. int(queue->schema.size()), queue->schema.c_str(),
  478. int(queue->table.size()), queue->table.c_str(),
  479. uint32_t(priority),
  480. int(escaped_unique_name.size()), &escaped_unique_name[0],
  481. int(escaped_function_name.size()), &escaped_function_name[0]
  482. );
  483. }
  484. vchar_t query;
  485. query.resize(query_size);
  486. memcpy(&query[0], query_buffer, query_size);
  487. query_size+= size_t(escape_string(query, (const char *)data, data_size));
  488. query.push_back('\'');
  489. query.resize(query.size() +1);
  490. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "%.*s", query.size() -1, &query[0]);
  491. if (libdrizzle_failed(_libdrizzle_insert(queue, query)))
  492. {
  493. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAN_QUEUE_ERROR);
  494. }
  495. return GEARMAN_SUCCESS;
  496. }
  497. static gearmand_error_t _libdrizzle_flush(gearman_server_st *, void *)
  498. {
  499. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libdrizzle flush");
  500. return GEARMAN_SUCCESS;
  501. }
  502. static gearmand_error_t _libdrizzle_done(gearman_server_st *,
  503. void *context,
  504. const char *unique, size_t unique_size,
  505. const char *function_name, size_t function_name_size)
  506. {
  507. plugins::queue::Drizzle *queue= (plugins::queue::Drizzle *)context;
  508. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  509. "libdrizzle done: %.*s(%.*s)",
  510. uint32_t(function_name_size), (char *)function_name,
  511. uint32_t(unique_size), (char *)unique);
  512. vchar_t escaped_unique_name;
  513. vchar_t escaped_function_name;
  514. (void)escape_string(escaped_unique_name, (const char *)unique, unique_size);
  515. (void)escape_string(escaped_function_name, (const char *)function_name, function_name_size);
  516. vchar_t query;
  517. query.resize(escaped_unique_name.size()
  518. +escaped_function_name.size()
  519. +queue->schema.size()
  520. +queue->table.size()
  521. +GEARMAN_QUEUE_QUERY_BUFFER);
  522. int query_size= snprintf(&query[0], query.size(),
  523. "DELETE FROM %.*s.%.*s WHERE unique_key='%.*s' and function_name= '%.*s'",
  524. int(queue->schema.size()), queue->schema.c_str(),
  525. int(queue->table.size()), queue->table.c_str(),
  526. int(escaped_unique_name.size()), &escaped_unique_name[0],
  527. int(escaped_function_name.size()), &escaped_function_name[0]
  528. );
  529. if (query_size < 0 or size_t(query_size) > query.size())
  530. {
  531. return gearmand_gerror("snprintf(DELETE)", GEARMAN_MEMORY_ALLOCATION_FAILURE);
  532. }
  533. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "%.*", query_size, &query[0]);
  534. if (libdrizzle_failed(_libdrizzle_query(queue, &query[0], query_size)))
  535. {
  536. return gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "Error occured from %.*s", query_size, &query[0]);
  537. }
  538. drizzle_result_free(queue->result());
  539. return GEARMAN_SUCCESS;
  540. }
  541. #if 0
  542. static gearmand_error_t _dump_queue(plugins::queue::Drizzle *queue)
  543. {
  544. char query[DRIZZLE_MAX_TABLE_SIZE + GEARMAN_QUEUE_QUERY_BUFFER];
  545. size_t query_size;
  546. query_size= (size_t)snprintf(query, sizeof(query),
  547. "SELECT unique_key,function_name FROM %s",
  548. queue->table.c_str());
  549. if (_libdrizzle_query(NULL, queue, query, query_size) != DRIZZLE_RETURN_OK)
  550. {
  551. gearmand_log_error("drizzle_column_skip_all:%s", drizzle_error(queue->drizzle));
  552. return GEARMAN_QUEUE_ERROR;
  553. }
  554. if (drizzle_column_skip_all(queue->result()) != DRIZZLE_RETURN_OK)
  555. {
  556. drizzle_result_free(queue->result());
  557. gearmand_log_error("drizzle_column_skip_all:%s", drizzle_error(queue->drizzle));
  558. return GEARMAN_QUEUE_ERROR;
  559. }
  560. gearmand_debug("Shutting down with the following items left to be processed.");
  561. while (1)
  562. {
  563. drizzle_return_t ret;
  564. drizzle_row_t row= drizzle_row_buffer(queue->result(), &ret);
  565. if (ret != DRIZZLE_RETURN_OK)
  566. {
  567. drizzle_result_free(queue->result());
  568. gearmand_log_error("drizzle_row_buffer:%s", drizzle_error(queue->drizzle));
  569. return GEARMAN_QUEUE_ERROR;
  570. }
  571. if (row == NULL)
  572. break;
  573. size_t *field_sizes;
  574. field_sizes= drizzle_row_field_sizes(queue->result());
  575. gearmand_log_debug("\t unique: %.*s function: %.*s",
  576. (uint32_t)field_sizes[0], row[0],
  577. (uint32_t)field_sizes[1], row[1]
  578. );
  579. drizzle_row_free(queue->result(), row);
  580. }
  581. drizzle_result_free(queue->result());
  582. return GEARMAN_SUCCESS;
  583. }
  584. #endif
  585. static gearmand_error_t _libdrizzle_replay(gearman_server_st *server,
  586. void *context,
  587. gearman_queue_add_fn *add_fn,
  588. void *add_context)
  589. {
  590. plugins::queue::Drizzle *queue= (plugins::queue::Drizzle *)context;
  591. size_t *field_sizes;
  592. gearmand_error_t gret;
  593. gearmand_info("libdrizzle replay start");
  594. std::string query;
  595. if (queue->epoch_support())
  596. {
  597. query+= "SELECT unique_key,function_name,priority,data,when_to_run FROM ";
  598. query+= queue->schema;
  599. query+= ".";
  600. query+= queue->table;
  601. }
  602. else
  603. {
  604. query+= "SELECT unique_key,function_name,priority,data FROM ";
  605. query+= queue->schema;
  606. query+= ".";
  607. query+= queue->table;
  608. }
  609. if (libdrizzle_failed(_libdrizzle_query(queue, query.c_str(), query.size())))
  610. {
  611. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAN_QUEUE_ERROR);
  612. }
  613. if (libdrizzle_failed(drizzle_column_buffer(queue->result())))
  614. {
  615. drizzle_result_free(queue->result());
  616. return gearmand_gerror(drizzle_error(queue->drizzle), GEARMAN_QUEUE_ERROR);
  617. }
  618. while (1)
  619. {
  620. drizzle_return_t ret;
  621. drizzle_row_t row= drizzle_row_buffer(queue->result(), &ret);
  622. if (libdrizzle_failed(ret))
  623. {
  624. drizzle_result_free(queue->result());
  625. return gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "drizzle_row_buffer:%s", drizzle_error(queue->drizzle));
  626. }
  627. if (row == NULL)
  628. {
  629. break;
  630. }
  631. field_sizes= drizzle_row_field_sizes(queue->result());
  632. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libdrizzle replay: %.*s", (uint32_t)field_sizes[0], row[0]);
  633. size_t data_size= field_sizes[3];
  634. /* need to make a copy here ... gearman_server_job_free will free it later */
  635. char *data= (char *)malloc(data_size);
  636. if (data == NULL)
  637. {
  638. gearmand_perror(errno, "Failed to allocate data while replaying the queue");
  639. drizzle_row_free(queue->result(), row);
  640. drizzle_result_free(queue->result());
  641. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  642. }
  643. memcpy(data, row[3], data_size);
  644. int64_t when;
  645. if (not queue->epoch_support())
  646. {
  647. when= 0;
  648. }
  649. else
  650. {
  651. when= atoi(row[4]);
  652. }
  653. gret= (*add_fn)(server, add_context, row[0], field_sizes[0],
  654. row[1], field_sizes[1],
  655. data, data_size, (gearman_job_priority_t)atoi(row[2]), when);
  656. if (gret != GEARMAN_SUCCESS)
  657. {
  658. drizzle_row_free(queue->result(), row);
  659. drizzle_result_free(queue->result());
  660. return gret;
  661. }
  662. drizzle_row_free(queue->result(), row);
  663. }
  664. drizzle_result_free(queue->result());
  665. return GEARMAN_SUCCESS;
  666. }
  667. #else // if defined(HAVE_LIBDRIZZLE) && HAVE_LIBDRIZZLE
  668. namespace gearmand {
  669. namespace plugins {
  670. namespace queue {
  671. void initialize_drizzle()
  672. {
  673. }
  674. } // namespace queue
  675. } // namespace plugins
  676. } // namespace gearmand
  677. #endif // if defined(HAVE_LIBDRIZZLE) && HAVE_LIBDRIZZLE