instance.cc 18 KB


  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2012 Data Differential, http://datadifferential.com/
  6. * All rights reserved.
  7. *
  8. * Redistribution and use in source and binary forms, with or without
  9. * modification, are permitted provided that the following conditions are
  10. * met:
  11. *
  12. * * Redistributions of source code must retain the above copyright
  13. * notice, this list of conditions and the following disclaimer.
  14. *
  15. * * Redistributions in binary form must reproduce the above
  16. * copyright notice, this list of conditions and the following disclaimer
  17. * in the documentation and/or other materials provided with the
  18. * distribution.
  19. *
  20. * * The names of its contributors may not be used to endorse or
  21. * promote products derived from this software without specific prior
  22. * written permission.
  23. *
  24. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  25. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  26. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  27. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  28. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  29. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  30. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  31. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  32. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  33. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  34. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  35. *
  36. */
  37. #include <gear_config.h>
  38. #include <libgearman-server/common.h>
  39. #include "libgearman-server/plugins/base.h"
  40. #include "libgearman-server/plugins/queue/sqlite/instance.hpp"
  41. #include <cerrno>
  42. namespace gearmand {
  43. namespace queue {
  44. Instance::Instance(const std::string& schema_, const std::string& table_):
  45. _epoch_support(true),
  46. _check_replay(false),
  47. _in_trans(0),
  48. _db(NULL),
  49. delete_sth(NULL),
  50. insert_sth(NULL),
  51. replay_sth(NULL),
  52. _schema(schema_),
  53. _table(table_)
  54. {
  55. _delete_query+= "DELETE FROM ";
  56. _delete_query+= _table;
  57. _delete_query+= " WHERE unique_key=? and function_name=?";
  58. if (_epoch_support)
  59. {
  60. _insert_query+= "INSERT OR REPLACE INTO ";
  61. _insert_query+= _table;
  62. _insert_query+= " (priority, unique_key, function_name, data, when_to_run) VALUES (?,?,?,?,?)";
  63. }
  64. else
  65. {
  66. _insert_query+= "INSERT OR REPLACE INTO ";
  67. _insert_query+= _table;
  68. _insert_query+= " (priority, unique_key, function_name, data) VALUES (?,?,?,?,?)";
  69. }
  70. }
  71. Instance::~Instance()
  72. {
  73. _sqlite3_finalize(delete_sth);
  74. delete_sth= NULL;
  75. _sqlite3_finalize(insert_sth);
  76. insert_sth= NULL;
  77. _sqlite3_finalize(replay_sth);
  78. replay_sth= NULL;
  79. assert(_db);
  80. if (_db)
  81. {
  82. if (sqlite3_close(_db) != SQLITE_OK)
  83. {
  84. gearmand_error(sqlite3_errmsg(_db));
  85. }
  86. _db= NULL;
  87. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "sqlite shutdown database");
  88. }
  89. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "sqlite shutdown");
  90. }
  91. void Instance::_sqlite3_finalize(sqlite3_stmt* sth)
  92. {
  93. if (sth)
  94. {
  95. if (sqlite3_finalize(sth) != SQLITE_OK )
  96. {
  97. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "finalize error: %s", sqlite3_errmsg(_db));
  98. }
  99. }
  100. }
  101. bool Instance::_sqlite_prepare(const std::string& query, sqlite3_stmt ** sth)
  102. {
  103. reset_error();
  104. if (query.size() > UINT32_MAX)
  105. {
  106. _error_string= "query size too big";
  107. return false;
  108. }
  109. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "sqlite query: %s", query.c_str());
  110. if (sqlite3_prepare_v2(_db, query.c_str(), -1, sth, NULL) != SQLITE_OK)
  111. {
  112. _error_string= sqlite3_errmsg(_db);
  113. return false;
  114. }
  115. return true;
  116. }
  117. bool Instance::_sqlite_lock()
  118. {
  119. /* already in transaction? */
  120. if (_in_trans == 0)
  121. {
  122. if (_sqlite_dispatch("BEGIN TRANSACTION") == false)
  123. {
  124. return false;
  125. }
  126. }
  127. _in_trans++;
  128. return true;
  129. }
  130. static int sql_count(void * rows_, int argc, char **, char **)
  131. {
  132. int *rows= (int*)rows_;
  133. assert(argc == 1);
  134. (void)argc;
  135. assert(rows);
  136. *rows= *rows +1;
  137. return 0;
  138. }
  139. bool Instance::_sqlite_dispatch(const char* arg)
  140. {
  141. int count;
  142. return _sqlite_count(arg, count);
  143. }
  144. bool Instance::_sqlite_count(const char* arg, int& count)
  145. {
  146. reset_error();
  147. count= 0;
  148. char* error= NULL;
  149. int errcode= sqlite3_exec(_db, arg, sql_count, &count, &error);
  150. if (error != NULL or errcode != SQLITE_OK)
  151. {
  152. assert(errcode != SQLITE_OK);
  153. _error_string= error;
  154. sqlite3_free(error);
  155. return false;
  156. }
  157. return true;
  158. }
  159. bool Instance::_sqlite_count(const std::string& arg, int& count)
  160. {
  161. return _sqlite_count(arg.c_str(), count);
  162. }
  163. bool Instance::_sqlite_dispatch(const std::string& arg)
  164. {
  165. int count;
  166. return _sqlite_count(arg.c_str(), count);
  167. }
  168. bool Instance::_sqlite_commit()
  169. {
  170. /* not in transaction? */
  171. if (_in_trans)
  172. {
  173. if (_sqlite_dispatch("COMMIT") == false)
  174. {
  175. return false;
  176. }
  177. }
  178. _in_trans= 0;
  179. return true;
  180. }
  181. gearmand_error_t Instance::init()
  182. {
  183. gearmand_info("Initializing libsqlite3 module");
  184. if (_schema.empty())
  185. {
  186. return gearmand_gerror("missing required --libsqlite3-db=<dbfile> argument", GEARMAND_QUEUE_ERROR);
  187. }
  188. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "sqlite open: %s", _schema.c_str());
  189. assert(_db == NULL);
  190. if (sqlite3_open_v2(_schema.c_str(), &_db, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL) != SQLITE_OK)
  191. {
  192. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "sqlite3_open failed with: %s", sqlite3_errmsg(_db));
  193. }
  194. if (_db == NULL)
  195. {
  196. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "Unknown error while opening up sqlite file");
  197. }
  198. // The only reason why we do this is because during testing we might read the
  199. // database which can cause a lock conflict.
  200. sqlite3_busy_timeout(_db, 6000);
  201. int rows;
  202. std::string check_table_str("SELECT 1 FROM sqlite_master WHERE type='table' AND name='");
  203. check_table_str+= _table;
  204. check_table_str+= "'";
  205. if (_sqlite_count(check_table_str, rows) == false)
  206. {
  207. return gearmand_gerror(_error_string.c_str(), GEARMAND_QUEUE_ERROR);
  208. }
  209. if (rows)
  210. {
  211. std::string query("SELECT when_to_run FROM ");
  212. query+= _table;
  213. sqlite3_stmt* select_sth= NULL;
  214. if (_sqlite_prepare(query, &select_sth) == false)
  215. {
  216. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  217. "Error from '%s': %s",
  218. query.c_str(),
  219. _error_string.c_str());
  220. query.clear();
  221. query+= "ALTER TABLE ";
  222. query+= _table;
  223. query+= " ADD COLUMN when_to_run INTEGER";
  224. if (_sqlite_dispatch(query) == false)
  225. {
  226. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
  227. "Error from '%s': %s",
  228. query.c_str(),
  229. _error_string.c_str());
  230. gearmand_info("No epoch support in sqlite queue");
  231. _epoch_support= false;
  232. }
  233. }
  234. _sqlite3_finalize(select_sth);
  235. }
  236. else
  237. {
  238. std::string query("CREATE TABLE ");
  239. query+= _table;
  240. query+= " ( unique_key TEXT, function_name TEXT, priority INTEGER, data BLOB, when_to_run INTEGER, PRIMARY KEY (unique_key, function_name))";
  241. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "sqlite module creating table '%s'", _table.c_str());
  242. if (_sqlite_dispatch(query) == false)
  243. {
  244. return gearmand_gerror(_error_string.c_str(), GEARMAND_QUEUE_ERROR);
  245. }
  246. }
  247. if (_sqlite_prepare(_delete_query, &delete_sth) == false)
  248. {
  249. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR,
  250. "DELETE PREPARE error: %s",
  251. _error_string.c_str());
  252. }
  253. if (_sqlite_prepare(_insert_query, &insert_sth) == false)
  254. {
  255. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR,
  256. "INSERT PREPARE: %s", _error_string.c_str());
  257. }
  258. {
  259. std::string query;
  260. if (_epoch_support)
  261. {
  262. query+= "SELECT unique_key,function_name,priority,data,when_to_run FROM ";
  263. }
  264. else
  265. {
  266. query+= "SELECT unique_key,function_name,priority,data FROM ";
  267. }
  268. query+= _table;
  269. if (_sqlite_prepare(query, &replay_sth) == false)
  270. {
  271. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR,
  272. "REPLAY PREPARE: %s", _error_string.c_str());
  273. }
  274. }
  275. return GEARMAND_SUCCESS;
  276. }
  277. bool Instance::_sqlite_rollback()
  278. {
  279. /* not in transaction? */
  280. if (_in_trans)
  281. {
  282. if (_sqlite_dispatch("ROLLBACK") == false)
  283. {
  284. return false;
  285. }
  286. }
  287. _in_trans= 0;
  288. return true;
  289. }
  290. gearmand_error_t Instance::add(gearman_server_st*,
  291. const char *unique, size_t unique_size,
  292. const char *function_name,
  293. size_t function_name_size,
  294. const void *data, size_t data_size,
  295. gearman_job_priority_t priority,
  296. int64_t when)
  297. {
  298. assert(_check_replay == false);
  299. if (when and _epoch_support == false)
  300. {
  301. return gearmand_gerror("Table lacks when_to_run field", GEARMAND_QUEUE_ERROR);
  302. }
  303. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  304. "sqlite add: priority: %d, unique_key: %.*s, function_name: %.*s when: %ld size: %u",
  305. int(priority),
  306. int(unique_size), (char*)unique,
  307. int(function_name_size), (char*)function_name,
  308. (long int)when,
  309. uint32_t(data_size));
  310. if (_sqlite_lock() == false)
  311. {
  312. return gearmand_gerror(_error_string.c_str(), GEARMAND_QUEUE_ERROR);
  313. }
  314. if (sqlite3_reset(insert_sth) != SQLITE_OK)
  315. {
  316. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR,
  317. "failed to reset INSERT prep statement: %s", sqlite3_errmsg(_db));
  318. }
  319. if (sqlite3_bind_int(insert_sth, 1, priority) != SQLITE_OK)
  320. {
  321. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR,
  322. "failed to bind priority [%d]: %s", priority, sqlite3_errmsg(_db));
  323. }
  324. if (sqlite3_bind_text(insert_sth, 2, (const char *)unique, (int)unique_size, SQLITE_TRANSIENT) != SQLITE_OK)
  325. {
  326. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR,
  327. "failed to bind unique [%.*s]: %s", (uint32_t)unique_size, (char*)unique, sqlite3_errmsg(_db));
  328. }
  329. if (sqlite3_bind_text(insert_sth, 3, (const char *)function_name, (int)function_name_size, SQLITE_TRANSIENT) != SQLITE_OK)
  330. {
  331. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR,
  332. "failed to bind function [%.*s]: %s", (uint32_t)function_name_size, (char*)function_name, sqlite3_errmsg(_db));
  333. }
  334. if (sqlite3_bind_blob(insert_sth, 4, data, (int)data_size, SQLITE_TRANSIENT) != SQLITE_OK)
  335. {
  336. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR,
  337. "failed to bind data blob: %s", sqlite3_errmsg(_db));
  338. }
  339. // epoch data
  340. if (sqlite3_bind_int64(insert_sth, 5, when) != SQLITE_OK)
  341. {
  342. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR,
  343. "failed to bind epoch int64_t(%ld): %s", (long int)when, sqlite3_errmsg(_db));
  344. }
  345. // INSERT happens here
  346. if (sqlite3_step(insert_sth) != SQLITE_DONE)
  347. {
  348. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR,
  349. "INSERT error: %s", sqlite3_errmsg(_db));
  350. }
  351. return GEARMAND_SUCCESS;
  352. }
  353. gearmand_error_t Instance::flush(gearman_server_st*)
  354. {
  355. gearmand_debug("sqlite flush");
  356. if (_sqlite_commit() == false)
  357. {
  358. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR,
  359. "COMMIT called on FLUSH error: %s",
  360. _error_string.c_str());
  361. }
  362. return GEARMAND_SUCCESS;
  363. }
  364. gearmand_error_t Instance::done(gearman_server_st*,
  365. const char *unique,
  366. size_t unique_size,
  367. const char *function_name,
  368. size_t function_name_size)
  369. {
  370. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  371. "sqlite done: unique_key: %.*s, function_name: %.*s",
  372. int(unique_size), (char*)unique,
  373. int(function_name_size), (char*)function_name);
  374. if (_sqlite_lock() == false)
  375. {
  376. return gearmand_gerror(_error_string.c_str(), GEARMAND_QUEUE_ERROR);
  377. }
  378. if (sqlite3_reset(delete_sth) != SQLITE_OK)
  379. {
  380. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR,
  381. "failed to reset DELETE prep statement: %s", sqlite3_errmsg(_db));
  382. }
  383. if (sqlite3_bind_text(delete_sth, 1, (const char *)unique, int(unique_size), SQLITE_TRANSIENT) != SQLITE_OK)
  384. {
  385. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR,
  386. "failed to bind unique [%.*s]: %s", uint32_t(unique_size), (char*)unique, sqlite3_errmsg(_db));
  387. }
  388. if (sqlite3_bind_text(delete_sth, 2, (const char *)function_name, int(function_name_size), SQLITE_TRANSIENT) != SQLITE_OK)
  389. {
  390. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR,
  391. "failed to bind function [%.*s]: %s", uint32_t(function_name_size), (char*)function_name, sqlite3_errmsg(_db));
  392. }
  393. // DELETE happens here
  394. if (sqlite3_step(delete_sth) != SQLITE_DONE)
  395. {
  396. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR,
  397. "DELETE error: %s",
  398. sqlite3_errmsg(_db));
  399. }
  400. if (_sqlite_commit() == false)
  401. {
  402. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "DELETE error: %s", _error_string.c_str());
  403. }
  404. return GEARMAND_SUCCESS;
  405. }
  406. gearmand_error_t Instance::replay(gearman_server_st *server)
  407. {
  408. gearmand_error_t ret;
  409. _check_replay= true;
  410. if (gearmand_failed(ret= replay_loop(server)))
  411. {
  412. if (_sqlite_rollback() == false)
  413. {
  414. gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "failed to rollback sqlite from failed replay error: %s", _error_string.c_str());
  415. }
  416. }
  417. _check_replay= false;
  418. return ret;
  419. }
  420. gearmand_error_t Instance::replay_loop(gearman_server_st *server)
  421. {
  422. gearmand_info("sqlite replay start");
  423. gearmand_error_t gret= GEARMAND_UNKNOWN_STATE;
  424. size_t row_count= 0;
  425. while (sqlite3_step(replay_sth) == SQLITE_ROW)
  426. {
  427. const char *unique, *function_name;
  428. size_t unique_size, function_name_size;
  429. row_count++;
  430. if (sqlite3_column_type(replay_sth, 0) == SQLITE_TEXT)
  431. {
  432. unique= (char *)sqlite3_column_text(replay_sth, 0);
  433. unique_size= size_t(sqlite3_column_bytes(replay_sth, 0));
  434. }
  435. else
  436. {
  437. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "column %d is not type TEXT: %d", 0, int(sqlite3_column_type(replay_sth, 0)));
  438. }
  439. if (sqlite3_column_type(replay_sth, 1) == SQLITE_TEXT)
  440. {
  441. function_name= (char *)sqlite3_column_text(replay_sth, 1);
  442. function_name_size= size_t(sqlite3_column_bytes(replay_sth, 1));
  443. }
  444. else
  445. {
  446. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR,
  447. "column %d is not type TEXT", 1);
  448. }
  449. gearman_job_priority_t priority;
  450. if (sqlite3_column_type(replay_sth, 2) == SQLITE_INTEGER)
  451. {
  452. priority= (gearman_job_priority_t)sqlite3_column_int64(replay_sth, 2);
  453. }
  454. else
  455. {
  456. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR,
  457. "column %d is not type INTEGER", 2);
  458. }
  459. if (sqlite3_column_type(replay_sth, 3) != SQLITE_BLOB)
  460. {
  461. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "column %d is not type TEXT", 3);
  462. }
  463. size_t data_size= (size_t)sqlite3_column_bytes(replay_sth, 3);
  464. char* data= (char*)malloc(data_size);
  465. /* need to make a copy here ... gearman_server_job_free will free it later */
  466. if (data == NULL)
  467. {
  468. return gearmand_perror(errno, "malloc");
  469. }
  470. memcpy(data, sqlite3_column_blob(replay_sth, 3), data_size);
  471. int64_t when;
  472. if (_epoch_support)
  473. {
  474. if (sqlite3_column_type(replay_sth, 4) == SQLITE_INTEGER)
  475. {
  476. when= int64_t(sqlite3_column_int64(replay_sth, 4));
  477. }
  478. else
  479. {
  480. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "column %d is not type INTEGER", 3);
  481. }
  482. }
  483. else
  484. {
  485. when= 0;
  486. }
  487. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  488. "sqlite replay: unique_key: %.*s, function_name: %.*s",
  489. int(unique_size), (char*)unique,
  490. int(function_name_size), (char*)function_name);
  491. gret= Instance::replay_add(server,
  492. NULL,
  493. unique, unique_size,
  494. function_name, function_name_size,
  495. data, data_size,
  496. priority, when);
  497. if (gearmand_failed(gret))
  498. {
  499. break;
  500. }
  501. }
  502. if (sqlite3_reset(replay_sth) != SQLITE_OK)
  503. {
  504. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR,
  505. "failed to reset REPLAY prep statement: %s", sqlite3_errmsg(_db));
  506. }
  507. if (row_count == 0)
  508. {
  509. return GEARMAND_SUCCESS;
  510. }
  511. return gret;
  512. }
  513. } // namespace queue
  514. } // namespace gearmand