queue.cc 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief libpq Queue Storage Definitions
  41. */
  42. #include <gear_config.h>
  43. #include <libgearman-server/common.h>
  44. #include <libgearman-server/byte.h>
  45. #include <libgearman-server/plugins/queue/postgres/queue.h>
  46. #include <libgearman-server/plugins/queue/base.h>
  47. #pragma GCC diagnostic push
  48. #if defined(HAVE_LIBPQ) and HAVE_LIBPQ
  49. # pragma GCC diagnostic ignored "-Wundef"
  50. # include <libpq-fe.h>
  51. #endif
  52. #include <cerrno>
  53. /**
  54. * @addtogroup plugins::queue::Postgresatic Static libpq Queue Storage Definitions
  55. * @ingroup gearman_queue_libpq
  56. * @{
  57. */
  58. /**
  59. * Default values.
  60. */
  61. #define GEARMAND_QUEUE_LIBPQ_DEFAULT_TABLE "queue"
  62. #define GEARMAND_QUEUE_QUERY_BUFFER 256
  63. namespace gearmand { namespace plugins { namespace queue { class Postgres; }}}
  64. static gearmand_error_t _initialize(gearman_server_st& server, gearmand::plugins::queue::Postgres *queue);
  65. namespace gearmand {
  66. namespace plugins {
  67. namespace queue {
  68. class Postgres : public gearmand::plugins::Queue {
  69. public:
  70. Postgres();
  71. ~Postgres();
  72. gearmand_error_t initialize();
  73. const std::string &insert()
  74. {
  75. return _insert_query;
  76. }
  77. const std::string &select()
  78. {
  79. return _select_query;
  80. }
  81. const std::string &create()
  82. {
  83. return _create_query;
  84. }
  85. PGconn *con;
  86. std::string postgres_connect_string;
  87. std::string table;
  88. std::vector<char> query_buffer;
  89. public:
  90. std::string _insert_query;
  91. std::string _select_query;
  92. std::string _create_query;
  93. };
  94. Postgres::Postgres() :
  95. Queue("Postgres"),
  96. con(NULL),
  97. postgres_connect_string(""),
  98. table(""),
  99. query_buffer()
  100. {
  101. command_line_options().add_options()
  102. ("libpq-conninfo", boost::program_options::value(&postgres_connect_string)->default_value(""), "PostgreSQL connection information string.")
  103. ("libpq-table", boost::program_options::value(&table)->default_value(GEARMAND_QUEUE_LIBPQ_DEFAULT_TABLE), "Table to use.");
  104. }
  105. Postgres::~Postgres ()
  106. {
  107. if (con)
  108. PQfinish(con);
  109. }
  110. gearmand_error_t Postgres::initialize()
  111. {
  112. // "IF NOT EXISTS" added in Postgres 9.1
  113. _create_query+= "CREATE TABLE IF NOT EXISTS " +table +" (unique_key VARCHAR" +"(" + TOSTRING(GEARMAN_UNIQUE_SIZE) +"), ";
  114. _create_query+= "function_name VARCHAR(255), priority INTEGER, data BYTEA, when_to_run INTEGER, UNIQUE (unique_key, function_name))";
  115. gearmand_error_t ret= _initialize(Gearmand()->server, this);
  116. _insert_query+= "INSERT INTO " +table +" (priority, unique_key, function_name, data, when_to_run) VALUES($1,$2,$3,$4::BYTEA,$5)";
  117. _select_query+= "SELECT unique_key,function_name,priority,data,when_to_run FROM " +table;
  118. return ret;
  119. }
  120. void initialize_postgres()
  121. {
  122. static Postgres local_instance;
  123. }
  124. } // namespace queue
  125. } // namespace plugins
  126. } // namespace gearmand
  127. /**
  128. * PostgreSQL notification callback.
  129. */
  130. static void _libpq_notice_processor(void *arg, const char *message);
  131. /* Queue callback functions. */
  132. static gearmand_error_t _libpq_add(gearman_server_st *server, void *context,
  133. const char *unique, size_t unique_size,
  134. const char *function_name,
  135. size_t function_name_size,
  136. const void *data, size_t data_size,
  137. gearman_job_priority_t priority,
  138. int64_t when);
  139. static gearmand_error_t _libpq_flush(gearman_server_st *server, void *context);
  140. static gearmand_error_t _libpq_done(gearman_server_st *server, void *context,
  141. const char *unique,
  142. size_t unique_size,
  143. const char *function_name,
  144. size_t function_name_size);
  145. static gearmand_error_t _libpq_replay(gearman_server_st *server, void *context,
  146. gearman_queue_add_fn *add_fn,
  147. void *add_context);
  148. /** @} */
  149. /*
  150. * Public definitions
  151. */
  152. #pragma GCC diagnostic push
  153. #pragma GCC diagnostic ignored "-Wold-style-cast"
  154. gearmand_error_t _initialize(gearman_server_st& server,
  155. gearmand::plugins::queue::Postgres *queue)
  156. {
  157. gearmand_info("Initializing libpq module");
  158. gearman_server_set_queue(server, queue, _libpq_add, _libpq_flush, _libpq_done, _libpq_replay);
  159. queue->con= PQconnectdb(queue->postgres_connect_string.c_str());
  160. if (queue->con == NULL || PQstatus(queue->con) != CONNECTION_OK)
  161. {
  162. gearman_server_set_queue(server, NULL, NULL, NULL, NULL, NULL);
  163. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "PQconnectdb: %s", PQerrorMessage(queue->con));
  164. }
  165. (void)PQsetNoticeProcessor(queue->con, _libpq_notice_processor, &server);
  166. std::string query("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME ='" +queue->table + "'");
  167. PGresult* result= PQexec(queue->con, query.c_str());
  168. if (result == NULL || PQresultStatus(result) != PGRES_TUPLES_OK)
  169. {
  170. std::string error_string= "PQexec:";
  171. error_string+= PQerrorMessage(queue->con);
  172. gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
  173. PQclear(result);
  174. return GEARMAND_QUEUE_ERROR;
  175. }
  176. if (PQntuples(result) == 0)
  177. {
  178. PQclear(result);
  179. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "libpq module creating table '%s'", queue->table.c_str());
  180. result= PQexec(queue->con, queue->create().c_str());
  181. if (result == NULL || PQresultStatus(result) != PGRES_COMMAND_OK)
  182. {
  183. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
  184. "PQexec:%s", PQerrorMessage(queue->con));
  185. PQclear(result);
  186. gearman_server_set_queue(server, NULL, NULL, NULL, NULL, NULL);
  187. return GEARMAND_QUEUE_ERROR;
  188. }
  189. }
  190. PQclear(result);
  191. return GEARMAND_SUCCESS;
  192. }
  193. /*
  194. * Static definitions
  195. */
  196. static void _libpq_notice_processor(void *arg, const char *message)
  197. {
  198. (void)arg;
  199. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "PostgreSQL %s", message);
  200. }
  201. static gearmand_error_t _libpq_add(gearman_server_st*, void *context,
  202. const char *unique, size_t unique_size,
  203. const char *function_name,
  204. size_t function_name_size,
  205. const void *data, size_t data_size,
  206. gearman_job_priority_t priority,
  207. int64_t when)
  208. {
  209. gearmand::plugins::queue::Postgres *queue= (gearmand::plugins::queue::Postgres *)context;
  210. char priority_buffer[GEARMAN_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
  211. int priority_buffer_length= snprintf(priority_buffer, sizeof(priority_buffer), "%u", static_cast<uint32_t>(priority));
  212. char when_buffer[GEARMAN_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
  213. int when_buffer_length= snprintf(when_buffer, sizeof(when_buffer), "%" PRId64, when);
  214. const char *param_values[]= {
  215. (char *)priority_buffer,
  216. (char *)unique,
  217. (char *)function_name,
  218. (char *)data,
  219. (char *)when_buffer };
  220. int param_lengths[]= {
  221. (int)priority_buffer_length,
  222. (int)unique_size,
  223. (int)function_name_size,
  224. (int)data_size,
  225. (int)when_buffer_length };
  226. int param_formats[] = { 0, 0, 0, 1, 0 };
  227. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libpq add: %.*s", (uint32_t)unique_size, (char *)unique);
  228. PGresult *result= PQexecParams(queue->con, queue->insert().c_str(),
  229. gearmand_array_size(param_lengths),
  230. NULL, param_values, param_lengths, param_formats, 0);
  231. if (result == NULL || PQresultStatus(result) != PGRES_COMMAND_OK)
  232. {
  233. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "PQexec:%s", PQerrorMessage(queue->con));
  234. PQclear(result);
  235. return GEARMAND_QUEUE_ERROR;
  236. }
  237. PQclear(result);
  238. return GEARMAND_SUCCESS;
  239. }
  240. static gearmand_error_t _libpq_flush(gearman_server_st *, void *)
  241. {
  242. gearmand_debug("libpq flush");
  243. return GEARMAND_SUCCESS;
  244. }
  245. static gearmand_error_t _libpq_done(gearman_server_st*, void *context,
  246. const char *unique,
  247. size_t unique_size,
  248. const char *function_name,
  249. size_t function_name_size)
  250. {
  251. (void)function_name_size;
  252. gearmand::plugins::queue::Postgres *queue= (gearmand::plugins::queue::Postgres *)context;
  253. PGresult *result;
  254. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libpq done: %.*s", (uint32_t)unique_size, (char *)unique);
  255. std::string query;
  256. query.reserve(function_name_size +unique_size + 80);
  257. query+= "DELETE FROM ";
  258. query+= queue->table;
  259. query+= " WHERE unique_key='";
  260. query+= (const char *)unique;
  261. query+= "' AND function_name='";
  262. query+= (const char *)function_name;
  263. query+= "'";
  264. result= PQexec(queue->con, query.c_str());
  265. if (result == NULL || PQresultStatus(result) != PGRES_COMMAND_OK)
  266. {
  267. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "PQexec:%s", PQerrorMessage(queue->con));
  268. PQclear(result);
  269. return GEARMAND_QUEUE_ERROR;
  270. }
  271. PQclear(result);
  272. return GEARMAND_SUCCESS;
  273. }
  274. static gearmand_error_t _libpq_replay(gearman_server_st *server, void *context,
  275. gearman_queue_add_fn *add_fn,
  276. void *add_context)
  277. {
  278. gearmand::plugins::queue::Postgres *queue= (gearmand::plugins::queue::Postgres *)context;
  279. gearmand_info("libpq replay start");
  280. std::string query("SELECT unique_key,function_name,priority,data,when_to_run FROM " + queue->table);
  281. PGresult *result= PQexecParams(queue->con, query.c_str(), 0, NULL, NULL, NULL, NULL, 1);
  282. if (result == NULL || PQresultStatus(result) != PGRES_TUPLES_OK)
  283. {
  284. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "PQexecParams:%s", PQerrorMessage(queue->con));
  285. PQclear(result);
  286. return GEARMAND_QUEUE_ERROR;
  287. }
  288. for (int row= 0; row < PQntuples(result); row++)
  289. {
  290. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  291. "libpq replay: %.*s",
  292. PQgetlength(result, row, 0),
  293. PQgetvalue(result, row, 0));
  294. size_t data_length;
  295. char *data;
  296. if (PQgetlength(result, row, 3) == 0)
  297. {
  298. data= NULL;
  299. data_length= 0;
  300. }
  301. else
  302. {
  303. data_length= size_t(PQgetlength(result, row, 3));
  304. data= (char *)malloc(data_length);
  305. if (data == NULL)
  306. {
  307. PQclear(result);
  308. return gearmand_perror(errno, "malloc");
  309. }
  310. memcpy(data, PQgetvalue(result, row, 3), data_length);
  311. }
  312. gearmand_error_t ret;
  313. ret= (*add_fn)(server, add_context, PQgetvalue(result, row, 0),
  314. (size_t)PQgetlength(result, row, 0),
  315. PQgetvalue(result, row, 1),
  316. (size_t)PQgetlength(result, row, 1),
  317. data, data_length,
  318. (gearman_job_priority_t)atoi(PQgetvalue(result, row, 2)),
  319. atoll(PQgetvalue(result, row, 4)));
  320. if (gearmand_failed(ret))
  321. {
  322. PQclear(result);
  323. return ret;
  324. }
  325. }
  326. PQclear(result);
  327. return GEARMAND_SUCCESS;
  328. }
  329. #pragma GCC diagnostic pop
  330. #pragma GCC diagnostic pop