queue.cc 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2011 Oleksiy Krivoshey
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. #include <gear_config.h>
  39. #include <libgearman-server/common.h>
  40. #include <libgearman-server/byte.h>
  41. #include <libgearman-server/plugins/queue/mysql/queue.h>
  42. #include <libgearman-server/plugins/queue/base.h>
  43. #include <mysql.h>
  44. #if !defined(MARIADB_BASE_VERSION) && !defined(MARIADB_VERSION_ID) && \
  45. MYSQL_VERSION_ID >= 80001 && MYSQL_VERSION_ID != 80002
  46. typedef bool my_bool;
  47. #endif
  48. #include <errmsg.h>
  49. #include <cerrno>
  50. /**
  51. * Default values.
  52. */
  53. #define GEARMAND_QUEUE_MYSQL_DEFAULT_TABLE "gearman_queue"
  54. namespace gearmand { namespace plugins { namespace queue { class MySQL; } } }
  55. static gearmand_error_t _initialize(gearman_server_st& server, gearmand::plugins::queue::MySQL *queue);
  56. namespace gearmand
  57. {
  58. namespace plugins
  59. {
  60. namespace queue
  61. {
  62. class MySQL : public gearmand::plugins::Queue {
  63. public:
  64. MySQL();
  65. ~MySQL();
  66. gearmand_error_t initialize();
  67. gearmand_error_t prepareAddStatement();
  68. gearmand_error_t prepareDoneStatement();
  69. MYSQL *con;
  70. MYSQL_STMT *add_stmt;
  71. MYSQL_STMT *done_stmt;
  72. std::string mysql_host;
  73. std::string mysql_user;
  74. std::string mysql_password;
  75. std::string mysql_db;
  76. std::string mysql_table;
  77. in_port_t port() const
  78. {
  79. return _port;
  80. }
  81. private:
  82. in_port_t _port;
  83. };
  84. MySQL::MySQL() :
  85. Queue("MySQL"),
  86. con(NULL),
  87. add_stmt(NULL),
  88. done_stmt(NULL)
  89. {
  90. command_line_options().add_options()
  91. ("mysql-host", boost::program_options::value(&mysql_host)->default_value("localhost"), "MySQL host.")
  92. ("mysql-port", boost::program_options::value(&_port)->default_value(3306), "Port of server. (by default 3306)")
  93. ("mysql-user", boost::program_options::value(&mysql_user)->default_value(""), "MySQL user.")
  94. ("mysql-password", boost::program_options::value(&mysql_password)->default_value(""), "MySQL user password.")
  95. ("mysql-db", boost::program_options::value(&mysql_db)->default_value(""), "MySQL database.")
  96. ("mysql-table", boost::program_options::value(&mysql_table)->default_value(GEARMAND_QUEUE_MYSQL_DEFAULT_TABLE), "MySQL table name.");
  97. }
  98. MySQL::~MySQL()
  99. {
  100. if (add_stmt)
  101. {
  102. mysql_stmt_close(add_stmt);
  103. }
  104. if (con)
  105. {
  106. mysql_close(con);
  107. }
  108. }
  109. gearmand_error_t MySQL::initialize()
  110. {
  111. return _initialize(Gearmand()->server, this);
  112. }
  113. gearmand_error_t MySQL::prepareAddStatement()
  114. {
  115. char query_buffer[1024];
  116. if ((this->add_stmt= mysql_stmt_init(this->con)) == NULL)
  117. {
  118. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_init failed: %s", mysql_error(this->con));
  119. return GEARMAND_QUEUE_ERROR;
  120. }
  121. int query_buffer_length= snprintf(query_buffer, sizeof(query_buffer),
  122. "INSERT INTO %s "
  123. "(unique_key, function_name, priority, data, when_to_run) "
  124. "VALUES(?, ?, ?, ?, ?)", this->mysql_table.c_str());
  125. if (mysql_stmt_prepare(this->add_stmt, query_buffer, query_buffer_length))
  126. {
  127. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_prepare failed: %s", mysql_error(this->con));
  128. return GEARMAND_QUEUE_ERROR;
  129. }
  130. return GEARMAND_SUCCESS;
  131. }
  132. gearmand_error_t MySQL::prepareDoneStatement()
  133. {
  134. char query_buffer[1024];
  135. if ((this->done_stmt= mysql_stmt_init(this->con)) == NULL)
  136. {
  137. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_init failed: %s", mysql_error(this->con));
  138. return GEARMAND_QUEUE_ERROR;
  139. }
  140. int query_buffer_length= snprintf(query_buffer, sizeof(query_buffer),
  141. "DELETE FROM %s "
  142. "WHERE unique_key=? "
  143. "AND function_name=?", this->mysql_table.c_str());
  144. if (mysql_stmt_prepare(this->done_stmt, query_buffer, query_buffer_length))
  145. {
  146. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_prepare failed: %s", mysql_error(this->con));
  147. return GEARMAND_QUEUE_ERROR;
  148. }
  149. return GEARMAND_SUCCESS;
  150. }
  151. void initialize_mysql()
  152. {
  153. static MySQL local_instance;
  154. }
  155. } // namespace queue
  156. } // namespace plugin
  157. } // namespace gearmand
  158. /* Queue callback functions. */
  159. static gearmand_error_t _mysql_queue_add(gearman_server_st *server, void *context,
  160. const char *unique, size_t unique_size,
  161. const char *function_name,
  162. size_t function_name_size,
  163. const void *data, size_t data_size,
  164. gearman_job_priority_t priority,
  165. int64_t when);
  166. static gearmand_error_t _mysql_queue_flush(gearman_server_st *server, void *context);
  167. static gearmand_error_t _mysql_queue_done(gearman_server_st *server, void *context,
  168. const char *unique,
  169. size_t unique_size,
  170. const char *function_name,
  171. size_t function_name_size);
  172. static gearmand_error_t _mysql_queue_replay(gearman_server_st *server, void *context,
  173. gearman_queue_add_fn *add_fn,
  174. void *add_context);
  175. gearmand_error_t _initialize(gearman_server_st& server, gearmand::plugins::queue::MySQL *queue)
  176. {
  177. MYSQL_RES * result;
  178. my_bool my_true= true;
  179. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,"Initializing MySQL module");
  180. gearman_server_set_queue(server, queue, _mysql_queue_add, _mysql_queue_flush, _mysql_queue_done, _mysql_queue_replay);
  181. queue->con= mysql_init(queue->con);
  182. mysql_options(queue->con, MYSQL_READ_DEFAULT_GROUP, "gearmand");
  183. mysql_options(queue->con, MYSQL_OPT_RECONNECT, &my_true);
  184. if (!mysql_real_connect(queue->con,
  185. queue->mysql_host.c_str(),
  186. queue->mysql_user.c_str(),
  187. queue->mysql_password.c_str(),
  188. queue->mysql_db.c_str(),
  189. queue->port(), NULL, 0))
  190. {
  191. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "Failed to connect to database: %s", mysql_error(queue->con));
  192. return GEARMAND_QUEUE_ERROR;
  193. }
  194. if (!(result= mysql_list_tables(queue->con, queue->mysql_table.c_str())))
  195. {
  196. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_list_tables failed: %s", mysql_error(queue->con));
  197. return GEARMAND_QUEUE_ERROR;
  198. }
  199. if (mysql_num_rows(result) == 0)
  200. {
  201. char query_buffer[1024];
  202. int query_buffer_length= snprintf(query_buffer, sizeof(query_buffer),
  203. "CREATE TABLE %s"
  204. "("
  205. "unique_key VARCHAR(%d),"
  206. "function_name VARCHAR(255),"
  207. "priority INT,"
  208. "data LONGBLOB,"
  209. "when_to_run INT,"
  210. "unique key (unique_key, function_name)"
  211. ")",
  212. queue->mysql_table.c_str(), GEARMAN_UNIQUE_SIZE);
  213. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,"MySQL module: creating table %s", queue->mysql_table.c_str());
  214. if (mysql_real_query(queue->con, query_buffer, query_buffer_length))
  215. {
  216. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "MySQL module: create table failed: %s", mysql_error(queue->con));
  217. return GEARMAND_QUEUE_ERROR;
  218. }
  219. }
  220. mysql_free_result(result);
  221. if (queue->prepareAddStatement() == GEARMAND_QUEUE_ERROR)
  222. {
  223. return GEARMAND_QUEUE_ERROR;
  224. }
  225. if (queue->prepareDoneStatement() == GEARMAND_QUEUE_ERROR)
  226. {
  227. return GEARMAND_QUEUE_ERROR;
  228. }
  229. return GEARMAND_SUCCESS;
  230. }
  231. /*
  232. * Static definitions
  233. */
  234. static gearmand_error_t _mysql_queue_add(gearman_server_st *, void *context,
  235. const char *unique, size_t unique_size,
  236. const char *function_name,
  237. size_t function_name_size,
  238. const void *data, size_t data_size,
  239. gearman_job_priority_t priority,
  240. int64_t when)
  241. {
  242. MYSQL_BIND bind[5];
  243. gearmand::plugins::queue::MySQL *queue= (gearmand::plugins::queue::MySQL *)context;
  244. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,"MySQL queue add: %.*s %.*s", (uint32_t) unique_size, (char *) unique,
  245. (uint32_t) function_name_size, (char *) function_name);
  246. mysql_ping(queue->con);
  247. bind[0].buffer_type= MYSQL_TYPE_STRING;
  248. bind[0].buffer= (char *)unique;
  249. bind[0].buffer_length= unique_size;
  250. bind[0].is_null= 0;
  251. bind[0].length= (unsigned long*)&unique_size;
  252. bind[1].buffer_type= MYSQL_TYPE_STRING;
  253. bind[1].buffer= (char *)function_name;
  254. bind[1].buffer_length= function_name_size;
  255. bind[1].is_null= 0;
  256. bind[1].length= (unsigned long*)&function_name_size;
  257. bind[2].buffer_type= MYSQL_TYPE_LONG;
  258. bind[2].buffer= (char *)&priority;
  259. bind[2].is_null= 0;
  260. bind[2].length= 0;
  261. bind[3].buffer_type= MYSQL_TYPE_LONG_BLOB;
  262. bind[3].buffer= (char *)data;
  263. bind[3].buffer_length= data_size;
  264. bind[3].is_null= 0;
  265. bind[3].length= (unsigned long*)&data_size;
  266. bind[4].buffer_type= MYSQL_TYPE_LONG;
  267. bind[4].buffer= (char *)&when;
  268. bind[4].is_null= 0;
  269. bind[4].length= 0;
  270. while(1)
  271. {
  272. if (mysql_stmt_bind_param(queue->add_stmt, bind))
  273. {
  274. if ( mysql_stmt_errno(queue->add_stmt) == CR_NO_PREPARE_STMT )
  275. {
  276. if (queue->prepareAddStatement() == GEARMAND_QUEUE_ERROR)
  277. {
  278. return GEARMAND_QUEUE_ERROR;
  279. }
  280. continue;
  281. }
  282. else
  283. {
  284. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_bind_param failed: %s", mysql_error(queue->con));
  285. return GEARMAND_QUEUE_ERROR;
  286. }
  287. }
  288. if (mysql_stmt_execute(queue->add_stmt))
  289. {
  290. if ( mysql_stmt_errno(queue->add_stmt) == CR_SERVER_LOST )
  291. {
  292. mysql_stmt_close(queue->add_stmt);
  293. if (queue->prepareAddStatement() != GEARMAND_QUEUE_ERROR)
  294. {
  295. continue;
  296. }
  297. }
  298. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_execute failed: %s", mysql_error(queue->con));
  299. return GEARMAND_QUEUE_ERROR;
  300. }
  301. break;
  302. }
  303. return GEARMAND_SUCCESS;
  304. }
  305. static gearmand_error_t _mysql_queue_flush(gearman_server_st*, void*)
  306. {
  307. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,"MySQL queue flush");
  308. return GEARMAND_SUCCESS;
  309. }
  310. static gearmand_error_t _mysql_queue_done(gearman_server_st*, void *context,
  311. const char *unique,
  312. size_t unique_size,
  313. const char *function_name,
  314. size_t function_name_size)
  315. {
  316. MYSQL_BIND bind[2];
  317. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,"MySQL queue done: %.*s %.*s", (uint32_t) unique_size, (char *) unique,
  318. (uint32_t) function_name_size, (char *) function_name);
  319. gearmand::plugins::queue::MySQL *queue= (gearmand::plugins::queue::MySQL *)context;
  320. mysql_ping(queue->con);
  321. bind[0].buffer_type= MYSQL_TYPE_STRING;
  322. bind[0].buffer= (char *)unique;
  323. bind[0].buffer_length= unique_size;
  324. bind[0].is_null= 0;
  325. bind[0].length= (unsigned long*)&unique_size;
  326. bind[1].buffer_type= MYSQL_TYPE_STRING;
  327. bind[1].buffer= (char *)function_name;
  328. bind[1].buffer_length= function_name_size;
  329. bind[1].is_null= 0;
  330. bind[1].length= (unsigned long*)&function_name_size;
  331. while(1)
  332. {
  333. if (mysql_stmt_bind_param(queue->done_stmt, bind))
  334. {
  335. if ( mysql_stmt_errno(queue->done_stmt) == CR_NO_PREPARE_STMT )
  336. {
  337. if (queue->prepareDoneStatement() == GEARMAND_QUEUE_ERROR)
  338. {
  339. return GEARMAND_QUEUE_ERROR;
  340. }
  341. continue;
  342. }
  343. else
  344. {
  345. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_bind_param failed: %s", mysql_error(queue->con));
  346. return GEARMAND_QUEUE_ERROR;
  347. }
  348. }
  349. if (mysql_stmt_execute(queue->done_stmt))
  350. {
  351. if ( mysql_stmt_errno(queue->done_stmt) == CR_SERVER_LOST )
  352. {
  353. mysql_stmt_close(queue->done_stmt);
  354. if (queue->prepareDoneStatement() != GEARMAND_QUEUE_ERROR)
  355. {
  356. continue;
  357. }
  358. }
  359. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_execute failed: %s", mysql_error(queue->con));
  360. return GEARMAND_QUEUE_ERROR;
  361. }
  362. break;
  363. }
  364. return GEARMAND_SUCCESS;
  365. }
  366. static gearmand_error_t _mysql_queue_replay(gearman_server_st* server, void *context,
  367. gearman_queue_add_fn *add_fn,
  368. void *add_context)
  369. {
  370. MYSQL_RES * result;
  371. MYSQL_ROW row;
  372. char query_buffer[1024];
  373. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,"MySQL queue replay");
  374. gearmand::plugins::queue::MySQL *queue= (gearmand::plugins::queue::MySQL *)context;
  375. int query_buffer_length= snprintf(query_buffer, sizeof(query_buffer),
  376. "SELECT unique_key, function_name, data, priority, when_to_run FROM %s",
  377. queue->mysql_table.c_str());
  378. mysql_ping(queue->con);
  379. if (mysql_real_query(queue->con, query_buffer, query_buffer_length))
  380. {
  381. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_real_query failed: %s", mysql_error(queue->con));
  382. return GEARMAND_QUEUE_ERROR;
  383. }
  384. if (!(result= mysql_store_result(queue->con)))
  385. {
  386. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_store_result failed: %s", mysql_error(queue->con));
  387. return GEARMAND_QUEUE_ERROR;
  388. }
  389. if (mysql_num_fields(result) < 5)
  390. {
  391. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "MySQL queue: insufficient row fields in queue table");
  392. return GEARMAND_QUEUE_ERROR;
  393. }
  394. gearmand_error_t ret= GEARMAND_SUCCESS;
  395. while ((row= mysql_fetch_row(result)))
  396. {
  397. unsigned long *lengths;
  398. gearman_job_priority_t priority= (gearman_job_priority_t)0;
  399. int when= 0;
  400. lengths= mysql_fetch_lengths(result);
  401. /* need to make a copy here ... gearman_server_job_free will free it later */
  402. size_t data_size= lengths[2];
  403. char * data= (char *)malloc(data_size);
  404. if (data == NULL)
  405. {
  406. return gearmand_perror(errno, "malloc failed");
  407. }
  408. memcpy(data, row[2], data_size);
  409. if (lengths[3])
  410. {
  411. priority= (gearman_job_priority_t) atoi(row[3]);
  412. }
  413. if (lengths[4])
  414. {
  415. when= atoi(row[4]);
  416. }
  417. ret= (*add_fn)(server, add_context,
  418. row[0], (size_t) lengths[0],
  419. row[1], (size_t) lengths[1],
  420. data, data_size,
  421. priority,
  422. when);
  423. if (ret != GEARMAND_SUCCESS)
  424. {
  425. break;
  426. }
  427. }
  428. mysql_free_result(result);
  429. return ret;
  430. }