queue.cc 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief libpq Queue Storage Definitions
  41. */
  42. #include <gear_config.h>
  43. #include <libgearman-server/common.h>
  44. #include <libgearman-server/byte.h>
  45. #include <libgearman-server/plugins/queue/postgres/queue.h>
  46. #include <libgearman-server/plugins/queue/base.h>
  47. #pragma GCC diagnostic push
  48. #if defined(HAVE_LIBPQ) and HAVE_LIBPQ
  49. # pragma GCC diagnostic ignored "-Wundef"
  50. # include <libpq-fe.h>
  51. #endif
  52. #include <cerrno>
  53. /**
  54. * @addtogroup plugins::queue::Postgresatic Static libpq Queue Storage Definitions
  55. * @ingroup gearman_queue_libpq
  56. * @{
  57. */
  58. /**
  59. * Default values.
  60. */
  61. #define GEARMAND_QUEUE_LIBPQ_DEFAULT_TABLE "queue"
  62. #define GEARMAND_QUEUE_QUERY_BUFFER 256
  63. namespace gearmand { namespace plugins { namespace queue { class Postgres; }}}
  64. static gearmand_error_t _initialize(gearman_server_st& server, gearmand::plugins::queue::Postgres *queue);
  65. namespace gearmand {
  66. namespace plugins {
  67. namespace queue {
  68. class Postgres : public gearmand::plugins::Queue {
  69. public:
  70. Postgres();
  71. ~Postgres();
  72. gearmand_error_t initialize();
  73. const std::string &insert()
  74. {
  75. return _insert_query;
  76. }
  77. const std::string &select()
  78. {
  79. return _select_query;
  80. }
  81. const std::string &create()
  82. {
  83. return _create_query;
  84. }
  85. PGconn *con;
  86. std::string postgres_connect_string;
  87. std::string table;
  88. std::vector<char> query_buffer;
  89. public:
  90. std::string _insert_query;
  91. std::string _select_query;
  92. std::string _create_query;
  93. };
  94. Postgres::Postgres() :
  95. Queue("Postgres"),
  96. con(NULL),
  97. postgres_connect_string(""),
  98. table(""),
  99. query_buffer()
  100. {
  101. command_line_options().add_options()
  102. ("libpq-conninfo", boost::program_options::value(&postgres_connect_string)->default_value(""), "PostgreSQL connection information string.")
  103. ("libpq-table", boost::program_options::value(&table)->default_value(GEARMAND_QUEUE_LIBPQ_DEFAULT_TABLE), "Table to use.");
  104. }
  105. Postgres::~Postgres ()
  106. {
  107. if (con)
  108. PQfinish(con);
  109. }
  110. gearmand_error_t Postgres::initialize()
  111. {
  112. _create_query+= "CREATE TABLE " +table +" (unique_key VARCHAR" +"(" + TOSTRING(GEARMAN_UNIQUE_SIZE) +"), ";
  113. _create_query+= "function_name VARCHAR(255), priority INTEGER, data BYTEA, when_to_run INTEGER, UNIQUE (unique_key, function_name))";
  114. gearmand_error_t ret= _initialize(Gearmand()->server, this);
  115. _insert_query+= "INSERT INTO " +table +" (priority, unique_key, function_name, data, when_to_run) VALUES($1,$2,$3,$4::BYTEA,$5)";
  116. _select_query+= "SELECT unique_key,function_name,priority,data,when_to_run FROM " +table;
  117. return ret;
  118. }
  119. void initialize_postgres()
  120. {
  121. static Postgres local_instance;
  122. }
  123. } // namespace queue
  124. } // namespace plugins
  125. } // namespace gearmand
  126. /**
  127. * PostgreSQL notification callback.
  128. */
  129. static void _libpq_notice_processor(void *arg, const char *message);
  130. /* Queue callback functions. */
  131. static gearmand_error_t _libpq_add(gearman_server_st *server, void *context,
  132. const char *unique, size_t unique_size,
  133. const char *function_name,
  134. size_t function_name_size,
  135. const void *data, size_t data_size,
  136. gearman_job_priority_t priority,
  137. int64_t when);
  138. static gearmand_error_t _libpq_flush(gearman_server_st *server, void *context);
  139. static gearmand_error_t _libpq_done(gearman_server_st *server, void *context,
  140. const char *unique,
  141. size_t unique_size,
  142. const char *function_name,
  143. size_t function_name_size);
  144. static gearmand_error_t _libpq_replay(gearman_server_st *server, void *context,
  145. gearman_queue_add_fn *add_fn,
  146. void *add_context);
  147. /** @} */
  148. /*
  149. * Public definitions
  150. */
  151. #pragma GCC diagnostic push
  152. #pragma GCC diagnostic ignored "-Wold-style-cast"
  153. gearmand_error_t _initialize(gearman_server_st& server,
  154. gearmand::plugins::queue::Postgres *queue)
  155. {
  156. gearmand_info("Initializing libpq module");
  157. gearman_server_set_queue(server, queue, _libpq_add, _libpq_flush, _libpq_done, _libpq_replay);
  158. queue->con= PQconnectdb(queue->postgres_connect_string.c_str());
  159. if (queue->con == NULL || PQstatus(queue->con) != CONNECTION_OK)
  160. {
  161. gearman_server_set_queue(server, NULL, NULL, NULL, NULL, NULL);
  162. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "PQconnectdb: %s", PQerrorMessage(queue->con));
  163. }
  164. (void)PQsetNoticeProcessor(queue->con, _libpq_notice_processor, &server);
  165. std::string query("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME ='" +queue->table + "'");
  166. PGresult* result= PQexec(queue->con, query.c_str());
  167. if (result == NULL || PQresultStatus(result) != PGRES_TUPLES_OK)
  168. {
  169. std::string error_string= "PQexec:";
  170. error_string+= PQerrorMessage(queue->con);
  171. gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
  172. PQclear(result);
  173. return GEARMAND_QUEUE_ERROR;
  174. }
  175. if (PQntuples(result) == 0)
  176. {
  177. PQclear(result);
  178. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "libpq module creating table '%s'", queue->table.c_str());
  179. result= PQexec(queue->con, queue->create().c_str());
  180. if (result == NULL || PQresultStatus(result) != PGRES_COMMAND_OK)
  181. {
  182. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
  183. "PQexec:%s", PQerrorMessage(queue->con));
  184. PQclear(result);
  185. gearman_server_set_queue(server, NULL, NULL, NULL, NULL, NULL);
  186. return GEARMAND_QUEUE_ERROR;
  187. }
  188. }
  189. PQclear(result);
  190. return GEARMAND_SUCCESS;
  191. }
  192. /*
  193. * Static definitions
  194. */
  195. static void _libpq_notice_processor(void *arg, const char *message)
  196. {
  197. (void)arg;
  198. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "PostgreSQL %s", message);
  199. }
  200. static gearmand_error_t _libpq_add(gearman_server_st*, void *context,
  201. const char *unique, size_t unique_size,
  202. const char *function_name,
  203. size_t function_name_size,
  204. const void *data, size_t data_size,
  205. gearman_job_priority_t priority,
  206. int64_t when)
  207. {
  208. gearmand::plugins::queue::Postgres *queue= (gearmand::plugins::queue::Postgres *)context;
  209. char priority_buffer[GEARMAN_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
  210. int priority_buffer_length= snprintf(priority_buffer, sizeof(priority_buffer), "%u", static_cast<uint32_t>(priority));
  211. char when_buffer[GEARMAN_MAXIMUM_INTEGER_DISPLAY_LENGTH +1];
  212. int when_buffer_length= snprintf(when_buffer, sizeof(when_buffer), "%" PRId64, when);
  213. const char *param_values[]= {
  214. (char *)priority_buffer,
  215. (char *)unique,
  216. (char *)function_name,
  217. (char *)data,
  218. (char *)when_buffer };
  219. int param_lengths[]= {
  220. (int)priority_buffer_length,
  221. (int)unique_size,
  222. (int)function_name_size,
  223. (int)data_size,
  224. (int)when_buffer_length };
  225. int param_formats[] = { 0, 0, 0, 1, 0 };
  226. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libpq add: %.*s", (uint32_t)unique_size, (char *)unique);
  227. PGresult *result= PQexecParams(queue->con, queue->insert().c_str(),
  228. gearmand_array_size(param_lengths),
  229. NULL, param_values, param_lengths, param_formats, 0);
  230. if (result == NULL || PQresultStatus(result) != PGRES_COMMAND_OK)
  231. {
  232. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "PQexec:%s", PQerrorMessage(queue->con));
  233. PQclear(result);
  234. return GEARMAND_QUEUE_ERROR;
  235. }
  236. PQclear(result);
  237. return GEARMAND_SUCCESS;
  238. }
  239. static gearmand_error_t _libpq_flush(gearman_server_st *, void *)
  240. {
  241. gearmand_debug("libpq flush");
  242. return GEARMAND_SUCCESS;
  243. }
  244. static gearmand_error_t _libpq_done(gearman_server_st*, void *context,
  245. const char *unique,
  246. size_t unique_size,
  247. const char *function_name,
  248. size_t function_name_size)
  249. {
  250. (void)function_name_size;
  251. gearmand::plugins::queue::Postgres *queue= (gearmand::plugins::queue::Postgres *)context;
  252. PGresult *result;
  253. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libpq done: %.*s", (uint32_t)unique_size, (char *)unique);
  254. std::string query;
  255. query.reserve(function_name_size +unique_size + 80);
  256. query+= "DELETE FROM ";
  257. query+= queue->table;
  258. query+= " WHERE unique_key='";
  259. query+= (const char *)unique;
  260. query+= "' AND function_name='";
  261. query+= (const char *)function_name;
  262. query+= "'";
  263. result= PQexec(queue->con, query.c_str());
  264. if (result == NULL || PQresultStatus(result) != PGRES_COMMAND_OK)
  265. {
  266. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "PQexec:%s", PQerrorMessage(queue->con));
  267. PQclear(result);
  268. return GEARMAND_QUEUE_ERROR;
  269. }
  270. PQclear(result);
  271. return GEARMAND_SUCCESS;
  272. }
  273. static gearmand_error_t _libpq_replay(gearman_server_st *server, void *context,
  274. gearman_queue_add_fn *add_fn,
  275. void *add_context)
  276. {
  277. gearmand::plugins::queue::Postgres *queue= (gearmand::plugins::queue::Postgres *)context;
  278. gearmand_info("libpq replay start");
  279. std::string query("SELECT unique_key,function_name,priority,data,when_to_run FROM " + queue->table);
  280. PGresult *result= PQexecParams(queue->con, query.c_str(), 0, NULL, NULL, NULL, NULL, 1);
  281. if (result == NULL || PQresultStatus(result) != PGRES_TUPLES_OK)
  282. {
  283. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "PQexecParams:%s", PQerrorMessage(queue->con));
  284. PQclear(result);
  285. return GEARMAND_QUEUE_ERROR;
  286. }
  287. for (int row= 0; row < PQntuples(result); row++)
  288. {
  289. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  290. "libpq replay: %.*s",
  291. PQgetlength(result, row, 0),
  292. PQgetvalue(result, row, 0));
  293. size_t data_length;
  294. char *data;
  295. if (PQgetlength(result, row, 3) == 0)
  296. {
  297. data= NULL;
  298. data_length= 0;
  299. }
  300. else
  301. {
  302. data_length= size_t(PQgetlength(result, row, 3));
  303. data= (char *)malloc(data_length);
  304. if (data == NULL)
  305. {
  306. PQclear(result);
  307. return gearmand_perror(errno, "malloc");
  308. }
  309. memcpy(data, PQgetvalue(result, row, 3), data_length);
  310. }
  311. gearmand_error_t ret;
  312. ret= (*add_fn)(server, add_context, PQgetvalue(result, row, 0),
  313. (size_t)PQgetlength(result, row, 0),
  314. PQgetvalue(result, row, 1),
  315. (size_t)PQgetlength(result, row, 1),
  316. data, data_length,
  317. (gearman_job_priority_t)atoi(PQgetvalue(result, row, 2)),
  318. atoll(PQgetvalue(result, row, 4)));
  319. if (gearmand_failed(ret))
  320. {
  321. PQclear(result);
  322. return ret;
  323. }
  324. }
  325. PQclear(result);
  326. return GEARMAND_SUCCESS;
  327. }
  328. #pragma GCC diagnostic pop
  329. #pragma GCC diagnostic pop