queue.cc 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2011 Oleksiy Krivoshey
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. #include <gear_config.h>
  39. #include <libgearman-server/common.h>
  40. #include <libgearman-server/byte.h>
  41. #include <libgearman-server/plugins/queue/mysql/queue.h>
  42. #include <libgearman-server/plugins/queue/base.h>
  43. #include <mysql.h>
  44. #include <errmsg.h>
  45. #include <cerrno>
  46. /**
  47. * Default values.
  48. */
  49. #define GEARMAND_QUEUE_MYSQL_DEFAULT_TABLE "gearman_queue"
  50. namespace gearmand { namespace plugins { namespace queue { class MySQL; } } }
  51. static gearmand_error_t _initialize(gearman_server_st& server, gearmand::plugins::queue::MySQL *queue);
  52. namespace gearmand
  53. {
  54. namespace plugins
  55. {
  56. namespace queue
  57. {
  58. class MySQL : public gearmand::plugins::Queue {
  59. public:
  60. MySQL();
  61. ~MySQL();
  62. gearmand_error_t initialize();
  63. gearmand_error_t prepareAddStatement();
  64. gearmand_error_t prepareDoneStatement();
  65. MYSQL *con;
  66. MYSQL_STMT *add_stmt;
  67. MYSQL_STMT *done_stmt;
  68. std::string mysql_host;
  69. std::string mysql_user;
  70. std::string mysql_password;
  71. std::string mysql_db;
  72. std::string mysql_table;
  73. in_port_t port() const
  74. {
  75. return _port;
  76. }
  77. private:
  78. in_port_t _port;
  79. };
  80. MySQL::MySQL() :
  81. Queue("MySQL"),
  82. con(NULL),
  83. add_stmt(NULL),
  84. done_stmt(NULL)
  85. {
  86. command_line_options().add_options()
  87. ("mysql-host", boost::program_options::value(&mysql_host)->default_value("localhost"), "MySQL host.")
  88. ("mysql-port", boost::program_options::value(&_port)->default_value(3306), "Port of server. (by default 3306)")
  89. ("mysql-user", boost::program_options::value(&mysql_user)->default_value(""), "MySQL user.")
  90. ("mysql-password", boost::program_options::value(&mysql_password)->default_value(""), "MySQL user password.")
  91. ("mysql-db", boost::program_options::value(&mysql_db)->default_value(""), "MySQL database.")
  92. ("mysql-table", boost::program_options::value(&mysql_table)->default_value(GEARMAND_QUEUE_MYSQL_DEFAULT_TABLE), "MySQL table name.");
  93. }
  94. MySQL::~MySQL()
  95. {
  96. if (add_stmt)
  97. {
  98. mysql_stmt_close(add_stmt);
  99. }
  100. if (con)
  101. {
  102. mysql_close(con);
  103. }
  104. }
  105. gearmand_error_t MySQL::initialize()
  106. {
  107. return _initialize(Gearmand()->server, this);
  108. }
  109. gearmand_error_t MySQL::prepareAddStatement()
  110. {
  111. char query_buffer[1024];
  112. if ((this->add_stmt= mysql_stmt_init(this->con)) == NULL)
  113. {
  114. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_init failed: %s", mysql_error(this->con));
  115. return GEARMAND_QUEUE_ERROR;
  116. }
  117. int query_buffer_length= snprintf(query_buffer, sizeof(query_buffer),
  118. "INSERT INTO %s "
  119. "(unique_key, function_name, priority, data, when_to_run) "
  120. "VALUES(?, ?, ?, ?, ?)", this->mysql_table.c_str());
  121. if (mysql_stmt_prepare(this->add_stmt, query_buffer, query_buffer_length))
  122. {
  123. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_prepare failed: %s", mysql_error(this->con));
  124. return GEARMAND_QUEUE_ERROR;
  125. }
  126. return GEARMAND_SUCCESS;
  127. }
  128. gearmand_error_t MySQL::prepareDoneStatement()
  129. {
  130. char query_buffer[1024];
  131. if ((this->done_stmt= mysql_stmt_init(this->con)) == NULL)
  132. {
  133. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_init failed: %s", mysql_error(this->con));
  134. return GEARMAND_QUEUE_ERROR;
  135. }
  136. int query_buffer_length= snprintf(query_buffer, sizeof(query_buffer),
  137. "DELETE FROM %s "
  138. "WHERE unique_key=? "
  139. "AND function_name=?", this->mysql_table.c_str());
  140. if (mysql_stmt_prepare(this->done_stmt, query_buffer, query_buffer_length))
  141. {
  142. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_prepare failed: %s", mysql_error(this->con));
  143. return GEARMAND_QUEUE_ERROR;
  144. }
  145. return GEARMAND_SUCCESS;
  146. }
  147. void initialize_mysql()
  148. {
  149. static MySQL local_instance;
  150. }
  151. } // namespace queue
  152. } // namespace plugin
  153. } // namespace gearmand
  154. /* Queue callback functions. */
  155. static gearmand_error_t _mysql_queue_add(gearman_server_st *server, void *context,
  156. const char *unique, size_t unique_size,
  157. const char *function_name,
  158. size_t function_name_size,
  159. const void *data, size_t data_size,
  160. gearman_job_priority_t priority,
  161. int64_t when);
  162. static gearmand_error_t _mysql_queue_flush(gearman_server_st *server, void *context);
  163. static gearmand_error_t _mysql_queue_done(gearman_server_st *server, void *context,
  164. const char *unique,
  165. size_t unique_size,
  166. const char *function_name,
  167. size_t function_name_size);
  168. static gearmand_error_t _mysql_queue_replay(gearman_server_st *server, void *context,
  169. gearman_queue_add_fn *add_fn,
  170. void *add_context);
  171. gearmand_error_t _initialize(gearman_server_st& server, gearmand::plugins::queue::MySQL *queue)
  172. {
  173. MYSQL_RES * result;
  174. my_bool my_true= true;
  175. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,"Initializing MySQL module");
  176. gearman_server_set_queue(server, queue, _mysql_queue_add, _mysql_queue_flush, _mysql_queue_done, _mysql_queue_replay);
  177. queue->con= mysql_init(queue->con);
  178. mysql_options(queue->con, MYSQL_READ_DEFAULT_GROUP, "gearmand");
  179. if (!mysql_real_connect(queue->con,
  180. queue->mysql_host.c_str(),
  181. queue->mysql_user.c_str(),
  182. queue->mysql_password.c_str(),
  183. queue->mysql_db.c_str(),
  184. queue->port(), NULL, 0))
  185. {
  186. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "Failed to connect to database: %s", mysql_error(queue->con));
  187. return GEARMAND_QUEUE_ERROR;
  188. }
  189. mysql_options(queue->con, MYSQL_OPT_RECONNECT, &my_true);
  190. if (!(result= mysql_list_tables(queue->con, queue->mysql_table.c_str())))
  191. {
  192. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_list_tables failed: %s", mysql_error(queue->con));
  193. return GEARMAND_QUEUE_ERROR;
  194. }
  195. if (mysql_num_rows(result) == 0)
  196. {
  197. char query_buffer[1024];
  198. int query_buffer_length= snprintf(query_buffer, sizeof(query_buffer),
  199. "CREATE TABLE %s"
  200. "("
  201. "unique_key VARCHAR(%d),"
  202. "function_name VARCHAR(255),"
  203. "priority INT,"
  204. "data LONGBLOB,"
  205. "when_to_run INT,"
  206. "unique key (unique_key, function_name)"
  207. ")",
  208. queue->mysql_table.c_str(), GEARMAN_UNIQUE_SIZE);
  209. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,"MySQL module: creating table %s", queue->mysql_table.c_str());
  210. if (mysql_real_query(queue->con, query_buffer, query_buffer_length))
  211. {
  212. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "MySQL module: create table failed: %s", mysql_error(queue->con));
  213. return GEARMAND_QUEUE_ERROR;
  214. }
  215. }
  216. mysql_free_result(result);
  217. if (queue->prepareAddStatement() == GEARMAND_QUEUE_ERROR)
  218. {
  219. return GEARMAND_QUEUE_ERROR;
  220. }
  221. if (queue->prepareDoneStatement() == GEARMAND_QUEUE_ERROR)
  222. {
  223. return GEARMAND_QUEUE_ERROR;
  224. }
  225. return GEARMAND_SUCCESS;
  226. }
  227. /*
  228. * Static definitions
  229. */
  230. static gearmand_error_t _mysql_queue_add(gearman_server_st *, void *context,
  231. const char *unique, size_t unique_size,
  232. const char *function_name,
  233. size_t function_name_size,
  234. const void *data, size_t data_size,
  235. gearman_job_priority_t priority,
  236. int64_t when)
  237. {
  238. MYSQL_BIND bind[5];
  239. gearmand::plugins::queue::MySQL *queue= (gearmand::plugins::queue::MySQL *)context;
  240. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,"MySQL queue add: %.*s %.*s", (uint32_t) unique_size, (char *) unique,
  241. (uint32_t) function_name_size, (char *) function_name);
  242. bind[0].buffer_type= MYSQL_TYPE_STRING;
  243. bind[0].buffer= (char *)unique;
  244. bind[0].buffer_length= unique_size;
  245. bind[0].is_null= 0;
  246. bind[0].length= (unsigned long*)&unique_size;
  247. bind[1].buffer_type= MYSQL_TYPE_STRING;
  248. bind[1].buffer= (char *)function_name;
  249. bind[1].buffer_length= function_name_size;
  250. bind[1].is_null= 0;
  251. bind[1].length= (unsigned long*)&function_name_size;
  252. bind[2].buffer_type= MYSQL_TYPE_LONG;
  253. bind[2].buffer= (char *)&priority;
  254. bind[2].is_null= 0;
  255. bind[2].length= 0;
  256. bind[3].buffer_type= MYSQL_TYPE_LONG_BLOB;
  257. bind[3].buffer= (char *)data;
  258. bind[3].buffer_length= data_size;
  259. bind[3].is_null= 0;
  260. bind[3].length= (unsigned long*)&data_size;
  261. bind[4].buffer_type= MYSQL_TYPE_LONG;
  262. bind[4].buffer= (char *)&when;
  263. bind[4].is_null= 0;
  264. bind[4].length= 0;
  265. while(1)
  266. {
  267. if (mysql_stmt_bind_param(queue->add_stmt, bind))
  268. {
  269. if ( mysql_stmt_errno(queue->add_stmt) == CR_NO_PREPARE_STMT )
  270. {
  271. if (queue->prepareAddStatement() == GEARMAND_QUEUE_ERROR)
  272. {
  273. return GEARMAND_QUEUE_ERROR;
  274. }
  275. continue;
  276. }
  277. else
  278. {
  279. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_bind_param failed: %s", mysql_error(queue->con));
  280. return GEARMAND_QUEUE_ERROR;
  281. }
  282. }
  283. if (mysql_stmt_execute(queue->add_stmt))
  284. {
  285. if ( mysql_stmt_errno(queue->add_stmt) == CR_SERVER_LOST )
  286. {
  287. mysql_stmt_close(queue->add_stmt);
  288. if (queue->prepareAddStatement() != GEARMAND_QUEUE_ERROR)
  289. {
  290. continue;
  291. }
  292. }
  293. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_execute failed: %s", mysql_error(queue->con));
  294. return GEARMAND_QUEUE_ERROR;
  295. }
  296. break;
  297. }
  298. return GEARMAND_SUCCESS;
  299. }
  300. static gearmand_error_t _mysql_queue_flush(gearman_server_st*, void*)
  301. {
  302. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,"MySQL queue flush");
  303. return GEARMAND_SUCCESS;
  304. }
  305. static gearmand_error_t _mysql_queue_done(gearman_server_st*, void *context,
  306. const char *unique,
  307. size_t unique_size,
  308. const char *function_name,
  309. size_t function_name_size)
  310. {
  311. MYSQL_BIND bind[2];
  312. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,"MySQL queue done: %.*s %.*s", (uint32_t) unique_size, (char *) unique,
  313. (uint32_t) function_name_size, (char *) function_name);
  314. gearmand::plugins::queue::MySQL *queue= (gearmand::plugins::queue::MySQL *)context;
  315. bind[0].buffer_type= MYSQL_TYPE_STRING;
  316. bind[0].buffer= (char *)unique;
  317. bind[0].buffer_length= unique_size;
  318. bind[0].is_null= 0;
  319. bind[0].length= (unsigned long*)&unique_size;
  320. bind[1].buffer_type= MYSQL_TYPE_STRING;
  321. bind[1].buffer= (char *)function_name;
  322. bind[1].buffer_length= function_name_size;
  323. bind[1].is_null= 0;
  324. bind[1].length= (unsigned long*)&function_name_size;
  325. while(1)
  326. {
  327. if (mysql_stmt_bind_param(queue->done_stmt, bind))
  328. {
  329. if ( mysql_stmt_errno(queue->done_stmt) == CR_NO_PREPARE_STMT )
  330. {
  331. if (queue->prepareDoneStatement() == GEARMAND_QUEUE_ERROR)
  332. {
  333. return GEARMAND_QUEUE_ERROR;
  334. }
  335. continue;
  336. }
  337. else
  338. {
  339. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_bind_param failed: %s", mysql_error(queue->con));
  340. return GEARMAND_QUEUE_ERROR;
  341. }
  342. }
  343. if (mysql_stmt_execute(queue->done_stmt))
  344. {
  345. if ( mysql_stmt_errno(queue->done_stmt) == CR_SERVER_LOST )
  346. {
  347. mysql_stmt_close(queue->done_stmt);
  348. if (queue->prepareDoneStatement() != GEARMAND_QUEUE_ERROR)
  349. {
  350. continue;
  351. }
  352. }
  353. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_execute failed: %s", mysql_error(queue->con));
  354. return GEARMAND_QUEUE_ERROR;
  355. }
  356. break;
  357. }
  358. return GEARMAND_SUCCESS;
  359. }
  360. static gearmand_error_t _mysql_queue_replay(gearman_server_st* server, void *context,
  361. gearman_queue_add_fn *add_fn,
  362. void *add_context)
  363. {
  364. MYSQL_RES * result;
  365. MYSQL_ROW row;
  366. char query_buffer[1024];
  367. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,"MySQL queue replay");
  368. gearmand::plugins::queue::MySQL *queue= (gearmand::plugins::queue::MySQL *)context;
  369. int query_buffer_length= snprintf(query_buffer, sizeof(query_buffer),
  370. "SELECT unique_key, function_name, data, priority, when_to_run FROM %s",
  371. queue->mysql_table.c_str());
  372. if (mysql_real_query(queue->con, query_buffer, query_buffer_length))
  373. {
  374. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_real_query failed: %s", mysql_error(queue->con));
  375. return GEARMAND_QUEUE_ERROR;
  376. }
  377. if (!(result= mysql_store_result(queue->con)))
  378. {
  379. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_store_result failed: %s", mysql_error(queue->con));
  380. return GEARMAND_QUEUE_ERROR;
  381. }
  382. if (mysql_num_fields(result) < 5)
  383. {
  384. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "MySQL queue: insufficient row fields in queue table");
  385. return GEARMAND_QUEUE_ERROR;
  386. }
  387. gearmand_error_t ret= GEARMAND_SUCCESS;
  388. while ((row= mysql_fetch_row(result)))
  389. {
  390. unsigned long *lengths;
  391. gearman_job_priority_t priority= (gearman_job_priority_t)0;
  392. int when= 0;
  393. lengths= mysql_fetch_lengths(result);
  394. /* need to make a copy here ... gearman_server_job_free will free it later */
  395. size_t data_size= lengths[2];
  396. char * data= (char *)malloc(data_size);
  397. if (data == NULL)
  398. {
  399. return gearmand_perror(errno, "malloc failed");
  400. }
  401. memcpy(data, row[2], data_size);
  402. if (lengths[3])
  403. {
  404. priority= (gearman_job_priority_t) atoi(row[3]);
  405. }
  406. if (lengths[4])
  407. {
  408. when= atoi(row[4]);
  409. }
  410. ret= (*add_fn)(server, add_context,
  411. row[0], (size_t) lengths[0],
  412. row[1], (size_t) lengths[1],
  413. data, data_size,
  414. priority,
  415. when);
  416. if (ret != GEARMAND_SUCCESS)
  417. {
  418. break;
  419. }
  420. }
  421. mysql_free_result(result);
  422. return ret;
  423. }