db_api.c 75 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. /** @file db_api.c
  3. * @brief This is the file implementing the API to the
  4. * logs management database.
  5. */
  6. #include "daemon/common.h"
  7. #include "db_api.h"
  8. #include <inttypes.h>
  9. #include <stdio.h>
  10. #include "circular_buffer.h"
  11. #include "helper.h"
  12. #include "lz4.h"
  13. #include "parser.h"
  14. #define MAIN_DB "main.db" /**< Primary DB with metadata for all the logs managemt collections **/
  15. #define MAIN_COLLECTIONS_TABLE "LogCollections" /*< Table name where logs collections metadata is stored in MAIN_DB **/
  16. #define BLOB_STORE_FILENAME "logs.bin." /*< Filename of BLOBs where logs are stored in **/
  17. #define METADATA_DB_FILENAME "metadata.db" /**< Metadata DB for each log collection **/
  18. #define LOGS_TABLE "Logs" /*< Table name where logs metadata is stored in METADATA_DB_FILENAME **/
  19. #define BLOBS_TABLE "Blobs" /*< Table name where BLOBs metadata is stored in METADATA_DB_FILENAME **/
  20. #define LOGS_MANAG_DB_VERSION 1
  21. static sqlite3 *main_db = NULL; /**< SQLite DB handler for MAIN_DB **/
  22. static char *main_db_dir = NULL; /**< Directory where all the log management databases and log blobs are stored in **/
  23. static char *main_db_path = NULL; /**< Path of MAIN_DB **/
  24. /* -------------------------------------------------------------------------- */
  25. /* Database migrations */
  26. /* -------------------------------------------------------------------------- */
  27. /**
  28. * @brief No-op database migration, just to bump up starting version.
  29. * @param database Unused
  30. * @param name Unused
  31. * @return Always 0.
  32. */
  33. static int do_migration_noop(sqlite3 *database, const char *name){
  34. UNUSED(database);
  35. UNUSED(name);
  36. collector_info("Running database migration %s", name);
  37. return 0;
  38. }
  39. typedef struct database_func_migration_list{
  40. char *name;
  41. int (*func)(sqlite3 *database, const char *name);
  42. } DATABASE_FUNC_MIGRATION_LIST;
  43. DATABASE_FUNC_MIGRATION_LIST migration_list_main_db[] = {
  44. {.name = MAIN_DB" v0 to v1", .func = do_migration_noop},
  45. // the terminator of this array
  46. {.name = NULL, .func = NULL}
  47. };
  48. DATABASE_FUNC_MIGRATION_LIST migration_list_metadata_db[] = {
  49. {.name = METADATA_DB_FILENAME " v0 to v1", .func = do_migration_noop},
  50. // the terminator of this array
  51. {.name = NULL, .func = NULL}
  52. };
  53. typedef enum {
  54. ERR_TYPE_OTHER,
  55. ERR_TYPE_SQLITE,
  56. ERR_TYPE_LIBUV,
  57. } logs_manag_db_error_t;
  58. /**
  59. * @brief Logs a database error
  60. * @param[in] log_source Log source that caused the error
  61. * @param[in] error_type Type of error
  62. * @param[in] rc Error code
  63. * @param[in] line Line number where the error occurred (__LINE__)
  64. * @param[in] file Source file where the error occurred (__FILE__)
  65. * @param[in] func Function where the error occurred (__FUNCTION__)
  66. */
  67. static void throw_error(const char *const log_source,
  68. const logs_manag_db_error_t error_type,
  69. const int rc, const int line,
  70. const char *const file, const char *const func){
  71. collector_error("[%s]: %s database error: (%d) %s (%s:%s:%d))",
  72. log_source ? log_source : "-",
  73. error_type == ERR_TYPE_OTHER ? "" : ERR_TYPE_SQLITE ? "SQLite" : "libuv",
  74. rc, error_type == ERR_TYPE_OTHER ? "" : ERR_TYPE_SQLITE ? sqlite3_errstr(rc) : uv_strerror(rc),
  75. file, func, line);
  76. }
  77. /**
  78. * @brief Get or set user_version of database.
  79. * @param db SQLite database to act upon.
  80. * @param set_user_version If <= 0, just get user_version. Otherwise, set
  81. * user_version first, before returning it.
  82. * @return Database user_version or -1 in case of error.
  83. */
  84. int db_user_version(sqlite3 *const db, const int set_user_version){
  85. if(unlikely(!db)) return -1;
  86. int rc = 0;
  87. if(set_user_version <= 0){
  88. sqlite3_stmt *stmt_get_user_version;
  89. rc = sqlite3_prepare_v2(db, "PRAGMA user_version;", -1, &stmt_get_user_version, NULL);
  90. if (unlikely(SQLITE_OK != rc)) {
  91. throw_error(NULL, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  92. return -1;
  93. }
  94. rc = sqlite3_step(stmt_get_user_version);
  95. if (unlikely(SQLITE_ROW != rc)) {
  96. throw_error(NULL, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  97. return -1;
  98. }
  99. int current_user_version = sqlite3_column_int(stmt_get_user_version, 0);
  100. rc = sqlite3_finalize(stmt_get_user_version);
  101. if (unlikely(SQLITE_OK != rc)) {
  102. throw_error(NULL, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  103. return -1;
  104. }
  105. return current_user_version;
  106. } else {
  107. char buf[25];
  108. snprintfz(buf, 25, "PRAGMA user_version=%d;", set_user_version);
  109. rc = sqlite3_exec(db, buf, NULL, NULL, NULL);
  110. if (unlikely(SQLITE_OK!= rc)) {
  111. throw_error(NULL, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  112. return -1;
  113. }
  114. return set_user_version;
  115. }
  116. }
  117. static void db_writer_db_mode_none(void *arg){
  118. struct File_info *const p_file_info = (struct File_info *) arg;
  119. Circ_buff_item_t *item;
  120. while(__atomic_load_n(&p_file_info->state, __ATOMIC_RELAXED) == LOG_SRC_READY){
  121. uv_rwlock_rdlock(&p_file_info->circ_buff->buff_realloc_rwlock);
  122. do{ item = circ_buff_read_item(p_file_info->circ_buff);} while(item);
  123. circ_buff_read_done(p_file_info->circ_buff);
  124. uv_rwlock_rdunlock(&p_file_info->circ_buff->buff_realloc_rwlock);
  125. for(int i = 0; i < p_file_info->buff_flush_to_db_interval * 4; i++){
  126. if(__atomic_load_n(&p_file_info->state, __ATOMIC_RELAXED) != LOG_SRC_READY)
  127. break;
  128. sleep_usec(250 * USEC_PER_MS);
  129. }
  130. }
  131. }
  132. #define return_db_writer_db_mode_none(p_file_info, do_mut_unlock) do { \
  133. p_file_info->db_mode = LOGS_MANAG_DB_MODE_NONE; \
  134. freez((void *) p_file_info->db_dir); \
  135. p_file_info->db_dir = strdupz(""); \
  136. freez((void *) p_file_info->db_metadata); \
  137. p_file_info->db_metadata = NULL; \
  138. sqlite3_finalize(stmt_logs_insert); \
  139. sqlite3_finalize(stmt_blobs_get_total_filesize); \
  140. sqlite3_finalize(stmt_blobs_update); \
  141. sqlite3_finalize(stmt_blobs_set_zero_filesize); \
  142. sqlite3_finalize(stmt_logs_delete); \
  143. if(do_mut_unlock){ \
  144. uv_mutex_unlock(p_file_info->db_mut); \
  145. uv_rwlock_rdunlock(&p_file_info->circ_buff->buff_realloc_rwlock); \
  146. } \
  147. if(__atomic_load_n(&p_file_info->state, __ATOMIC_RELAXED) == LOG_SRC_READY) \
  148. return fatal_assert(!uv_thread_create( p_file_info->db_writer_thread, \
  149. db_writer_db_mode_none, \
  150. p_file_info)); \
  151. } while(0)
  152. static void db_writer_db_mode_full(void *arg){
  153. int rc = 0;
  154. struct File_info *const p_file_info = (struct File_info *) arg;
  155. sqlite3_stmt *stmt_logs_insert = NULL;
  156. sqlite3_stmt *stmt_blobs_get_total_filesize = NULL;
  157. sqlite3_stmt *stmt_blobs_update = NULL;
  158. sqlite3_stmt *stmt_blobs_set_zero_filesize = NULL;
  159. sqlite3_stmt *stmt_logs_delete = NULL;
  160. /* Prepare LOGS_TABLE INSERT statement */
  161. rc = sqlite3_prepare_v2(p_file_info->db,
  162. "INSERT INTO " LOGS_TABLE "("
  163. "FK_BLOB_Id,"
  164. "BLOB_Offset,"
  165. "Timestamp,"
  166. "Msg_compr_size,"
  167. "Msg_decompr_size,"
  168. "Num_lines"
  169. ") VALUES (?,?,?,?,?,?) ;",
  170. -1, &stmt_logs_insert, NULL);
  171. if (unlikely(SQLITE_OK != rc)) {
  172. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  173. return_db_writer_db_mode_none(p_file_info, 0);
  174. }
  175. /* Prepare BLOBS_TABLE get total filesize statement */
  176. rc = sqlite3_prepare_v2(p_file_info->db,
  177. "SELECT SUM(Filesize) FROM " BLOBS_TABLE " ;",
  178. -1, &stmt_blobs_get_total_filesize, NULL);
  179. if (unlikely(SQLITE_OK != rc)) {
  180. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  181. return_db_writer_db_mode_none(p_file_info, 0);
  182. }
  183. /* Prepare BLOBS_TABLE UPDATE statement */
  184. rc = sqlite3_prepare_v2(p_file_info->db,
  185. "UPDATE " BLOBS_TABLE
  186. " SET Filesize = Filesize + ?"
  187. " WHERE Id = ? ;",
  188. -1, &stmt_blobs_update, NULL);
  189. if (unlikely(SQLITE_OK != rc)) {
  190. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  191. return_db_writer_db_mode_none(p_file_info, 0);
  192. }
  193. /* Prepare BLOBS_TABLE UPDATE SET zero filesize statement */
  194. rc = sqlite3_prepare_v2(p_file_info->db,
  195. "UPDATE " BLOBS_TABLE
  196. " SET Filesize = 0"
  197. " WHERE Id = ? ;",
  198. -1, &stmt_blobs_set_zero_filesize, NULL);
  199. if (unlikely(SQLITE_OK != rc)) {
  200. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  201. return_db_writer_db_mode_none(p_file_info, 0);
  202. }
  203. /* Prepare LOGS_TABLE DELETE statement */
  204. rc = sqlite3_prepare_v2(p_file_info->db,
  205. "DELETE FROM " LOGS_TABLE
  206. " WHERE FK_BLOB_Id = ? ;",
  207. -1, &stmt_logs_delete, NULL);
  208. if (unlikely(SQLITE_OK != rc)) {
  209. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  210. return_db_writer_db_mode_none(p_file_info, 0);
  211. }
  212. /* Get initial filesize of logs.bin.0 BLOB */
  213. sqlite3_stmt *stmt_retrieve_filesize_from_id = NULL;
  214. if(unlikely(
  215. SQLITE_OK != (rc = sqlite3_prepare_v2(p_file_info->db,
  216. "SELECT Filesize FROM " BLOBS_TABLE
  217. " WHERE Id = ? ;",
  218. -1, &stmt_retrieve_filesize_from_id, NULL)) ||
  219. SQLITE_OK != (rc = sqlite3_bind_int(stmt_retrieve_filesize_from_id, 1,
  220. p_file_info->blob_write_handle_offset)) ||
  221. SQLITE_ROW != (rc = sqlite3_step(stmt_retrieve_filesize_from_id))
  222. )){
  223. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  224. return_db_writer_db_mode_none(p_file_info, 0);
  225. }
  226. int64_t blob_filesize = (int64_t) sqlite3_column_int64(stmt_retrieve_filesize_from_id, 0);
  227. rc = sqlite3_finalize(stmt_retrieve_filesize_from_id);
  228. if (unlikely(SQLITE_OK != rc)) {
  229. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  230. return_db_writer_db_mode_none(p_file_info, 0);
  231. }
  232. struct timespec ts_db_write_start, ts_db_write_end, ts_db_rotate_end;
  233. while(__atomic_load_n(&p_file_info->state, __ATOMIC_RELAXED) == LOG_SRC_READY){
  234. clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts_db_write_start);
  235. uv_rwlock_rdlock(&p_file_info->circ_buff->buff_realloc_rwlock);
  236. uv_mutex_lock(p_file_info->db_mut);
  237. /* ---------------------------------------------------------------------
  238. * Read items from circular buffer and store them in disk BLOBs.
  239. * After that, SQLite metadata is updated.
  240. * ------------------------------------------------------------------ */
  241. Circ_buff_item_t *item = circ_buff_read_item(p_file_info->circ_buff);
  242. while (item) {
  243. m_assert(TEST_MS_TIMESTAMP_VALID(item->timestamp), "item->timestamp == 0");
  244. m_assert(item->text_compressed_size != 0, "item->text_compressed_size == 0");
  245. m_assert(item->text_size != 0, "item->text_size == 0");
  246. /* Write logs in BLOB */
  247. uv_fs_t write_req;
  248. uv_buf_t uv_buf = uv_buf_init((char *) item->text_compressed, (unsigned int) item->text_compressed_size);
  249. rc = uv_fs_write( NULL, &write_req,
  250. p_file_info->blob_handles[p_file_info->blob_write_handle_offset],
  251. &uv_buf, 1, blob_filesize, NULL); // Write synchronously at the end of the BLOB file
  252. uv_fs_req_cleanup(&write_req);
  253. if(unlikely(rc < 0)){
  254. throw_error(p_file_info->chartname, ERR_TYPE_LIBUV, rc, __LINE__, __FILE__, __FUNCTION__);
  255. circ_buff_read_done(p_file_info->circ_buff);
  256. return_db_writer_db_mode_none(p_file_info, 1);
  257. }
  258. /* Ensure data is flushed to BLOB via fdatasync() */
  259. uv_fs_t dsync_req;
  260. rc = uv_fs_fdatasync( NULL, &dsync_req,
  261. p_file_info->blob_handles[p_file_info->blob_write_handle_offset], NULL);
  262. uv_fs_req_cleanup(&dsync_req);
  263. if (unlikely(rc)){
  264. throw_error(p_file_info->chartname, ERR_TYPE_LIBUV, rc, __LINE__, __FILE__, __FUNCTION__);
  265. circ_buff_read_done(p_file_info->circ_buff);
  266. return_db_writer_db_mode_none(p_file_info, 1);
  267. }
  268. if(unlikely(
  269. /* Write metadata of logs in LOGS_TABLE */
  270. SQLITE_OK != (rc = sqlite3_exec(p_file_info->db, "BEGIN TRANSACTION;", NULL, NULL, NULL)) ||
  271. SQLITE_OK != (rc = sqlite3_bind_int(stmt_logs_insert, 1, p_file_info->blob_write_handle_offset)) ||
  272. SQLITE_OK != (rc = sqlite3_bind_int64(stmt_logs_insert, 2, (sqlite3_int64) blob_filesize)) ||
  273. SQLITE_OK != (rc = sqlite3_bind_int64(stmt_logs_insert, 3, (sqlite3_int64) item->timestamp)) ||
  274. SQLITE_OK != (rc = sqlite3_bind_int64(stmt_logs_insert, 4, (sqlite3_int64) item->text_compressed_size)) ||
  275. SQLITE_OK != (rc = sqlite3_bind_int64(stmt_logs_insert, 5, (sqlite3_int64)item->text_size)) ||
  276. SQLITE_OK != (rc = sqlite3_bind_int64(stmt_logs_insert, 6, (sqlite3_int64)item->num_lines)) ||
  277. SQLITE_DONE != (rc = sqlite3_step(stmt_logs_insert)) ||
  278. SQLITE_OK != (rc = sqlite3_reset(stmt_logs_insert)) ||
  279. /* Update metadata of BLOBs filesize in BLOBS_TABLE */
  280. SQLITE_OK != (rc = sqlite3_bind_int64(stmt_blobs_update, 1, (sqlite3_int64)item->text_compressed_size)) ||
  281. SQLITE_OK != (rc = sqlite3_bind_int(stmt_blobs_update, 2, p_file_info->blob_write_handle_offset)) ||
  282. SQLITE_DONE != (rc = sqlite3_step(stmt_blobs_update)) ||
  283. SQLITE_OK != (rc = sqlite3_reset(stmt_blobs_update)) ||
  284. SQLITE_OK != (rc = sqlite3_exec(p_file_info->db, "END TRANSACTION;", NULL, NULL, NULL))
  285. )) {
  286. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  287. rc = sqlite3_exec(p_file_info->db, "ROLLBACK;", NULL, NULL, NULL);
  288. if (unlikely(SQLITE_OK != rc))
  289. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  290. circ_buff_read_done(p_file_info->circ_buff);
  291. return_db_writer_db_mode_none(p_file_info, 1);
  292. }
  293. /* TODO: Should we log it if there is a fatal error in the transaction,
  294. * as there will be a mismatch between BLOBs and SQLite metadata? */
  295. /* Increase BLOB offset and read next log message until no more messages in buff */
  296. blob_filesize += (int64_t) item->text_compressed_size;
  297. item = circ_buff_read_item(p_file_info->circ_buff);
  298. }
  299. circ_buff_read_done(p_file_info->circ_buff);
  300. clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts_db_write_end);
  301. /* ---------------------------------------------------------------------
  302. * If the filesize of the current write-to BLOB is >
  303. * p_file_info->blob_max_size, then perform a BLOBs rotation.
  304. * ------------------------------------------------------------------ */
  305. if(blob_filesize > p_file_info->blob_max_size){
  306. uv_fs_t rename_req;
  307. char old_path[FILENAME_MAX + 1], new_path[FILENAME_MAX + 1];
  308. /* Rotate path of BLOBs */
  309. for(int i = BLOB_MAX_FILES - 1; i >= 0; i--){
  310. snprintfz(old_path, FILENAME_MAX, "%s" BLOB_STORE_FILENAME "%d", p_file_info->db_dir, i);
  311. snprintfz(new_path, FILENAME_MAX, "%s" BLOB_STORE_FILENAME "%d", p_file_info->db_dir, i + 1);
  312. rc = uv_fs_rename(NULL, &rename_req, old_path, new_path, NULL);
  313. uv_fs_req_cleanup(&rename_req);
  314. if (unlikely(rc)){
  315. //TODO: This error case needs better handling, as it will result in mismatch with sqlite metadata.
  316. // We probably require a WAL or something similar.
  317. throw_error(p_file_info->chartname, ERR_TYPE_LIBUV, rc, __LINE__, __FILE__, __FUNCTION__);
  318. return_db_writer_db_mode_none(p_file_info, 1);
  319. }
  320. }
  321. /* Replace the maximum number with 0 in BLOB files. */
  322. snprintfz(old_path, FILENAME_MAX, "%s" BLOB_STORE_FILENAME "%d", p_file_info->db_dir, BLOB_MAX_FILES);
  323. snprintfz(new_path, FILENAME_MAX, "%s" BLOB_STORE_FILENAME "%d", p_file_info->db_dir, 0);
  324. rc = uv_fs_rename(NULL, &rename_req, old_path, new_path, NULL);
  325. uv_fs_req_cleanup(&rename_req);
  326. if (unlikely(rc)){
  327. //TODO: This error case needs better handling, as it will result in mismatch with sqlite metadata.
  328. // We probably require a WAL or something similar.
  329. throw_error(p_file_info->chartname, ERR_TYPE_LIBUV, rc, __LINE__, __FILE__, __FUNCTION__);
  330. return_db_writer_db_mode_none(p_file_info, 1);
  331. }
  332. /* Rotate BLOBS_TABLE Filenames */
  333. rc = sqlite3_exec(p_file_info->db,
  334. "UPDATE " BLOBS_TABLE
  335. " SET Filename = REPLACE( "
  336. " Filename, "
  337. " substr(Filename, -1), "
  338. " case when "
  339. " (cast(substr(Filename, -1) AS INTEGER) < (" LOGS_MANAG_STR(BLOB_MAX_FILES) " - 1)) then "
  340. " substr(Filename, -1) + 1 else 0 end);",
  341. NULL, NULL, NULL);
  342. if (unlikely(rc != SQLITE_OK)) {
  343. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  344. //TODO: Undo rotation if possible?
  345. return_db_writer_db_mode_none(p_file_info, 1);
  346. }
  347. /* -----------------------------------------------------------------
  348. * (a) Update blob_write_handle_offset,
  349. * (b) truncate new write-to BLOB,
  350. * (c) update filesize of truncated BLOB in SQLite DB,
  351. * (d) delete respective logs in LOGS_TABLE for the truncated BLOB and
  352. * (e) reset blob_filesize
  353. * -------------------------------------------------------------- */
  354. /* (a) */
  355. p_file_info->blob_write_handle_offset =
  356. p_file_info->blob_write_handle_offset == 1 ? BLOB_MAX_FILES : p_file_info->blob_write_handle_offset - 1;
  357. /* (b) */
  358. uv_fs_t trunc_req;
  359. rc = uv_fs_ftruncate(NULL, &trunc_req, p_file_info->blob_handles[p_file_info->blob_write_handle_offset], 0, NULL);
  360. uv_fs_req_cleanup(&trunc_req);
  361. if (unlikely(rc)){
  362. //TODO: This error case needs better handling, as it will result in mismatch with sqlite metadata.
  363. // We probably require a WAL or something similar.
  364. throw_error(p_file_info->chartname, ERR_TYPE_LIBUV, rc, __LINE__, __FILE__, __FUNCTION__);
  365. return_db_writer_db_mode_none(p_file_info, 1);
  366. }
  367. /* (c) */
  368. if(unlikely(
  369. SQLITE_OK != (rc = sqlite3_exec(p_file_info->db, "BEGIN TRANSACTION;", NULL, NULL, NULL)) ||
  370. SQLITE_OK != (rc = sqlite3_bind_int(stmt_blobs_set_zero_filesize, 1, p_file_info->blob_write_handle_offset)) ||
  371. SQLITE_DONE != (rc = sqlite3_step(stmt_blobs_set_zero_filesize)) ||
  372. SQLITE_OK != (rc = sqlite3_reset(stmt_blobs_set_zero_filesize)) ||
  373. /* (d) */
  374. SQLITE_OK != (rc = sqlite3_bind_int(stmt_logs_delete, 1, p_file_info->blob_write_handle_offset)) ||
  375. SQLITE_DONE != (rc = sqlite3_step(stmt_logs_delete)) ||
  376. SQLITE_OK != (rc = sqlite3_reset(stmt_logs_delete)) ||
  377. SQLITE_OK != (rc = sqlite3_exec(p_file_info->db, "END TRANSACTION;", NULL, NULL, NULL))
  378. )) {
  379. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  380. rc = sqlite3_exec(p_file_info->db, "ROLLBACK;", NULL, NULL, NULL);
  381. if (unlikely(SQLITE_OK != rc))
  382. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  383. return_db_writer_db_mode_none(p_file_info, 1);
  384. }
  385. /* (e) */
  386. blob_filesize = 0;
  387. }
  388. clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts_db_rotate_end);
  389. /* Update database write & rotate timings for this log source */
  390. __atomic_store_n(&p_file_info->db_write_duration,
  391. (ts_db_write_end.tv_sec - ts_db_write_start.tv_sec) * NSEC_PER_SEC +
  392. (ts_db_write_end.tv_nsec - ts_db_write_start.tv_nsec), __ATOMIC_RELAXED);
  393. __atomic_store_n(&p_file_info->db_rotate_duration,
  394. (ts_db_rotate_end.tv_sec - ts_db_write_end.tv_sec) * NSEC_PER_SEC +
  395. (ts_db_rotate_end.tv_nsec - ts_db_write_end.tv_nsec), __ATOMIC_RELAXED);
  396. /* Update total disk usage of all BLOBs for this log source */
  397. rc = sqlite3_step(stmt_blobs_get_total_filesize);
  398. if (unlikely(SQLITE_ROW != rc)) {
  399. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  400. return_db_writer_db_mode_none(p_file_info, 1);
  401. }
  402. __atomic_store_n(&p_file_info->blob_total_size, sqlite3_column_int64(stmt_blobs_get_total_filesize, 0), __ATOMIC_RELAXED);
  403. rc = sqlite3_reset(stmt_blobs_get_total_filesize);
  404. if (unlikely(SQLITE_OK != rc)) {
  405. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  406. return_db_writer_db_mode_none(p_file_info, 1);
  407. }
  408. // TODO: Can uv_mutex_unlock(p_file_info->db_mut) be moved before if(blob_filesize > p_file_info-> blob_max_size) ?
  409. uv_mutex_unlock(p_file_info->db_mut);
  410. uv_rwlock_rdunlock(&p_file_info->circ_buff->buff_realloc_rwlock);
  411. for(int i = 0; i < p_file_info->buff_flush_to_db_interval * 4; i++){
  412. if(__atomic_load_n(&p_file_info->state, __ATOMIC_RELAXED) != LOG_SRC_READY)
  413. break;
  414. sleep_usec(250 * USEC_PER_MS);
  415. }
  416. }
  417. return_db_writer_db_mode_none(p_file_info, 0);
  418. }
  419. inline void db_set_main_dir(char *const dir){
  420. main_db_dir = dir;
  421. }
  422. int db_init() {
  423. int rc = 0;
  424. char *err_msg = 0;
  425. uv_fs_t mkdir_req;
  426. if(unlikely(!main_db_dir || !*main_db_dir)){
  427. rc = -1;
  428. collector_error("main_db_dir is unset");
  429. throw_error(NULL, ERR_TYPE_OTHER, rc, __LINE__, __FILE__, __FUNCTION__);
  430. goto return_error;
  431. }
  432. size_t main_db_path_len = strlen(main_db_dir) + sizeof(MAIN_DB) + 1;
  433. main_db_path = mallocz(main_db_path_len);
  434. snprintfz(main_db_path, main_db_path_len, "%s/" MAIN_DB, main_db_dir);
  435. /* Create databases directory if it doesn't exist. */
  436. rc = uv_fs_mkdir(NULL, &mkdir_req, main_db_dir, 0775, NULL);
  437. uv_fs_req_cleanup(&mkdir_req);
  438. if(rc == 0) collector_info("DB directory created: %s", main_db_dir);
  439. else if (rc == UV_EEXIST) collector_info("DB directory %s found", main_db_dir);
  440. else {
  441. throw_error(NULL, ERR_TYPE_LIBUV, rc, __LINE__, __FILE__, __FUNCTION__);
  442. goto return_error;
  443. }
  444. /* Create or open main db */
  445. rc = sqlite3_open(main_db_path, &main_db);
  446. if (unlikely(rc != SQLITE_OK)){
  447. throw_error(MAIN_DB, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  448. goto return_error;
  449. }
  450. /* Configure main database */
  451. rc = sqlite3_exec(main_db,
  452. "PRAGMA auto_vacuum = INCREMENTAL;"
  453. "PRAGMA synchronous = 1;"
  454. "PRAGMA journal_mode = WAL;"
  455. "PRAGMA temp_store = MEMORY;"
  456. "PRAGMA foreign_keys = ON;",
  457. 0, 0, &err_msg);
  458. if (unlikely(rc != SQLITE_OK)) {
  459. collector_error("Failed to configure database, SQL error: %s\n", err_msg);
  460. throw_error(MAIN_DB, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  461. goto return_error;
  462. } else collector_info("%s configured successfully", MAIN_DB);
  463. /* Execute pending main database migrations */
  464. int main_db_ver = db_user_version(main_db, -1);
  465. if (likely(LOGS_MANAG_DB_VERSION == main_db_ver))
  466. collector_info("Logs management %s database version is %d (no migration needed)", MAIN_DB, main_db_ver);
  467. else {
  468. for(int ver = main_db_ver; ver < LOGS_MANAG_DB_VERSION && migration_list_main_db[ver].func; ver++){
  469. rc = (migration_list_main_db[ver].func)(main_db, migration_list_main_db[ver].name);
  470. if (unlikely(rc)){
  471. collector_error("Logs management %s database migration from version %d to version %d failed", MAIN_DB, ver, ver + 1);
  472. throw_error(MAIN_DB, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  473. goto return_error;
  474. }
  475. db_user_version(main_db, ver + 1);
  476. }
  477. }
  478. /* Create new main DB LogCollections table if it doesn't exist */
  479. rc = sqlite3_exec(main_db,
  480. "CREATE TABLE IF NOT EXISTS " MAIN_COLLECTIONS_TABLE "("
  481. "Id INTEGER PRIMARY KEY,"
  482. "Stream_Tag TEXT NOT NULL,"
  483. "Log_Source_Path TEXT NOT NULL,"
  484. "Type INTEGER NOT NULL,"
  485. "DB_Dir TEXT NOT NULL,"
  486. "UNIQUE(Stream_Tag, DB_Dir) "
  487. ");",
  488. 0, 0, &err_msg);
  489. if (unlikely(SQLITE_OK != rc)) {
  490. collector_error("Failed to create table" MAIN_COLLECTIONS_TABLE "SQL error: %s", err_msg);
  491. throw_error(MAIN_DB, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  492. goto return_error;
  493. }
  494. sqlite3_stmt *stmt_search_if_log_source_exists = NULL;
  495. rc = sqlite3_prepare_v2(main_db,
  496. "SELECT COUNT(*), Id, DB_Dir FROM " MAIN_COLLECTIONS_TABLE
  497. " WHERE Stream_Tag = ? AND Log_Source_Path = ? AND Type = ? ;",
  498. -1, &stmt_search_if_log_source_exists, NULL);
  499. if (unlikely(SQLITE_OK != rc)){
  500. throw_error(MAIN_DB, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  501. goto return_error;
  502. }
  503. sqlite3_stmt *stmt_insert_log_collection_metadata = NULL;
  504. rc = sqlite3_prepare_v2(main_db,
  505. "INSERT INTO " MAIN_COLLECTIONS_TABLE
  506. " (Stream_Tag, Log_Source_Path, Type, DB_Dir) VALUES (?,?,?,?) ;",
  507. -1, &stmt_insert_log_collection_metadata, NULL);
  508. if (unlikely(SQLITE_OK != rc)){
  509. throw_error(MAIN_DB, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  510. goto return_error;
  511. }
  512. for (int i = 0; i < p_file_infos_arr->count; i++) {
  513. struct File_info *const p_file_info = p_file_infos_arr->data[i];
  514. if(p_file_info->db_mode == LOGS_MANAG_DB_MODE_NONE){
  515. p_file_info->db_dir = strdupz("");
  516. p_file_info->db_writer_thread = mallocz(sizeof(uv_thread_t));
  517. rc = uv_thread_create(p_file_info->db_writer_thread, db_writer_db_mode_none, p_file_info);
  518. if (unlikely(rc)){
  519. throw_error(p_file_info->chartname, ERR_TYPE_LIBUV, rc, __LINE__, __FILE__, __FUNCTION__);
  520. goto return_error;
  521. }
  522. }
  523. else if(p_file_info->db_mode == LOGS_MANAG_DB_MODE_FULL){
  524. p_file_info->db_mut = mallocz(sizeof(uv_mutex_t));
  525. rc = uv_mutex_init(p_file_info->db_mut);
  526. if (unlikely(rc)) fatal("Failed to initialize uv_mutex_t");
  527. uv_mutex_lock(p_file_info->db_mut);
  528. // This error check will be used a lot, so define it here.
  529. #define do_sqlite_error_check(p_file_info, rc, rc_expctd) do { \
  530. if(unlikely(rc_expctd != rc)) { \
  531. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);\
  532. uv_mutex_unlock(p_file_info->db_mut); \
  533. goto return_error; \
  534. } \
  535. } while(0)
  536. if(unlikely(
  537. SQLITE_OK != (rc = sqlite3_bind_text(stmt_search_if_log_source_exists, 1, p_file_info->stream_guid, -1, NULL)) ||
  538. SQLITE_OK != (rc = sqlite3_bind_text(stmt_search_if_log_source_exists, 2, p_file_info->filename, -1, NULL)) ||
  539. SQLITE_OK != (rc = sqlite3_bind_int(stmt_search_if_log_source_exists, 3, p_file_info->log_type)) ||
  540. /* COUNT(*) query should always return SQLITE_ROW */
  541. SQLITE_ROW != (rc = sqlite3_step(stmt_search_if_log_source_exists)))){
  542. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  543. uv_mutex_unlock(p_file_info->db_mut);
  544. goto return_error;
  545. }
  546. const int log_source_occurences = sqlite3_column_int(stmt_search_if_log_source_exists, 0);
  547. switch (log_source_occurences) {
  548. case 0: { /* Log collection metadata not found in main DB - create a new record */
  549. /* Create directory of collection of logs for the particular
  550. * log source (in the form of a UUID) and bind it. */
  551. uuid_t uuid;
  552. uuid_generate(uuid);
  553. char uuid_str[UUID_STR_LEN]; // ex. "1b4e28ba-2fa1-11d2-883f-0016d3cca427" + "\0"
  554. uuid_unparse_lower(uuid, uuid_str);
  555. p_file_info->db_dir = mallocz(snprintf(NULL, 0, "%s/%s/", main_db_dir, uuid_str) + 1);
  556. sprintf((char *) p_file_info->db_dir, "%s/%s/", main_db_dir, uuid_str);
  557. rc = uv_fs_mkdir(NULL, &mkdir_req, p_file_info->db_dir, 0775, NULL);
  558. uv_fs_req_cleanup(&mkdir_req);
  559. if (unlikely(rc)) {
  560. if(errno == EEXIST)
  561. collector_error("DB directory %s exists but not found in %s.\n", p_file_info->db_dir, MAIN_DB);
  562. throw_error(p_file_info->chartname, ERR_TYPE_LIBUV, rc, __LINE__, __FILE__, __FUNCTION__);
  563. uv_mutex_unlock(p_file_info->db_mut);
  564. goto return_error;
  565. }
  566. if(unlikely(
  567. SQLITE_OK != (rc = sqlite3_bind_text(stmt_insert_log_collection_metadata, 1, p_file_info->stream_guid, -1, NULL)) ||
  568. SQLITE_OK != (rc = sqlite3_bind_text(stmt_insert_log_collection_metadata, 2, p_file_info->filename, -1, NULL)) ||
  569. SQLITE_OK != (rc = sqlite3_bind_int(stmt_insert_log_collection_metadata, 3, p_file_info->log_type)) ||
  570. SQLITE_OK != (rc = sqlite3_bind_text(stmt_insert_log_collection_metadata, 4, p_file_info->db_dir, -1, NULL)) ||
  571. SQLITE_DONE != (rc = sqlite3_step(stmt_insert_log_collection_metadata)) ||
  572. SQLITE_OK != (rc = sqlite3_reset(stmt_insert_log_collection_metadata)))) {
  573. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  574. uv_mutex_unlock(p_file_info->db_mut);
  575. goto return_error;
  576. }
  577. break;
  578. }
  579. case 1: { /* File metadata found in DB */
  580. p_file_info->db_dir = mallocz((size_t)sqlite3_column_bytes(stmt_search_if_log_source_exists, 2) + 1);
  581. sprintf((char*) p_file_info->db_dir, "%s", sqlite3_column_text(stmt_search_if_log_source_exists, 2));
  582. break;
  583. }
  584. default: { /* Error, file metadata can exist either 0 or 1 times in DB */
  585. m_assert(0, "Same file stored in DB more than once!");
  586. collector_error("[%s]: Record encountered multiple times in DB " MAIN_COLLECTIONS_TABLE " table \n",
  587. p_file_info->filename);
  588. throw_error(p_file_info->chartname, ERR_TYPE_OTHER, rc, __LINE__, __FILE__, __FUNCTION__);
  589. uv_mutex_unlock(p_file_info->db_mut);
  590. goto return_error;
  591. }
  592. }
  593. rc = sqlite3_reset(stmt_search_if_log_source_exists);
  594. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  595. /* Create or open metadata DBs for each log collection */
  596. p_file_info->db_metadata = mallocz(snprintf(NULL, 0, "%s" METADATA_DB_FILENAME, p_file_info->db_dir) + 1);
  597. sprintf((char *) p_file_info->db_metadata, "%s" METADATA_DB_FILENAME, p_file_info->db_dir);
  598. rc = sqlite3_open(p_file_info->db_metadata, &p_file_info->db);
  599. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  600. /* Configure metadata DB */
  601. rc = sqlite3_exec(p_file_info->db,
  602. "PRAGMA auto_vacuum = INCREMENTAL;"
  603. "PRAGMA synchronous = 1;"
  604. "PRAGMA journal_mode = WAL;"
  605. "PRAGMA temp_store = MEMORY;"
  606. "PRAGMA foreign_keys = ON;",
  607. 0, 0, &err_msg);
  608. if (unlikely(rc != SQLITE_OK)) {
  609. collector_error("[%s]: Failed to configure database, SQL error: %s", p_file_info->filename, err_msg);
  610. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  611. uv_mutex_unlock(p_file_info->db_mut);
  612. goto return_error;
  613. }
  614. /* Execute pending metadata database migrations */
  615. collector_info("[%s]: About to execute " METADATA_DB_FILENAME " migrations", p_file_info->chartname);
  616. int metadata_db_ver = db_user_version(p_file_info->db, -1);
  617. if (likely(LOGS_MANAG_DB_VERSION == metadata_db_ver)) {
  618. collector_info( "[%s]: Logs management " METADATA_DB_FILENAME " database version is %d (no migration needed)",
  619. p_file_info->chartname, metadata_db_ver);
  620. } else {
  621. for(int ver = metadata_db_ver; ver < LOGS_MANAG_DB_VERSION && migration_list_metadata_db[ver].func; ver++){
  622. rc = (migration_list_metadata_db[ver].func)(p_file_info->db, migration_list_metadata_db[ver].name);
  623. if (unlikely(rc)){
  624. collector_error("[%s]: Logs management " METADATA_DB_FILENAME " database migration from version %d to version %d failed",
  625. p_file_info->chartname, ver, ver + 1);
  626. throw_error(MAIN_DB, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  627. uv_mutex_unlock(p_file_info->db_mut);
  628. goto return_error;
  629. }
  630. db_user_version(p_file_info->db, ver + 1);
  631. }
  632. }
  633. /* -----------------------------------------------------------------
  634. * Create BLOBS_TABLE and LOGS_TABLE if they don't exist. Do it
  635. * as a transaction, so that it can all be rolled back if something
  636. * goes wrong.
  637. * -------------------------------------------------------------- */
  638. {
  639. rc = sqlite3_exec(p_file_info->db, "BEGIN TRANSACTION;", NULL, NULL, NULL);
  640. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  641. /* Check if BLOBS_TABLE exists or not */
  642. sqlite3_stmt *stmt_check_if_BLOBS_TABLE_exists = NULL;
  643. rc = sqlite3_prepare_v2(p_file_info->db,
  644. "SELECT COUNT(*) FROM sqlite_master"
  645. " WHERE type='table' AND name='"BLOBS_TABLE"';",
  646. -1, &stmt_check_if_BLOBS_TABLE_exists, NULL);
  647. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  648. rc = sqlite3_step(stmt_check_if_BLOBS_TABLE_exists);
  649. do_sqlite_error_check(p_file_info, rc, SQLITE_ROW);
  650. /* If BLOBS_TABLE doesn't exist, create and populate it */
  651. if(sqlite3_column_int(stmt_check_if_BLOBS_TABLE_exists, 0) == 0){
  652. /* 1. Create it */
  653. rc = sqlite3_exec(p_file_info->db,
  654. "CREATE TABLE IF NOT EXISTS " BLOBS_TABLE "("
  655. "Id INTEGER PRIMARY KEY,"
  656. "Filename TEXT NOT NULL,"
  657. "Filesize INTEGER NOT NULL"
  658. ");",
  659. 0, 0, &err_msg);
  660. if (unlikely(SQLITE_OK != rc)) {
  661. collector_error("[%s]: Failed to create " BLOBS_TABLE ", SQL error: %s", p_file_info->chartname, err_msg);
  662. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  663. uv_mutex_unlock(p_file_info->db_mut);
  664. goto return_error;
  665. } else collector_info("[%s]: Table " BLOBS_TABLE " created successfully", p_file_info->chartname);
  666. /* 2. Populate it */
  667. sqlite3_stmt *stmt_init_BLOBS_table = NULL;
  668. rc = sqlite3_prepare_v2(p_file_info->db,
  669. "INSERT INTO " BLOBS_TABLE
  670. " (Filename, Filesize) VALUES (?,?) ;",
  671. -1, &stmt_init_BLOBS_table, NULL);
  672. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  673. for(int i = 0; i < BLOB_MAX_FILES; i++){
  674. char filename[FILENAME_MAX + 1];
  675. snprintfz(filename, FILENAME_MAX, BLOB_STORE_FILENAME "%d", i);
  676. if(unlikely(
  677. SQLITE_OK != (rc = sqlite3_bind_text(stmt_init_BLOBS_table, 1, filename, -1, NULL)) ||
  678. SQLITE_OK != (rc = sqlite3_bind_int64(stmt_init_BLOBS_table, 2, (sqlite3_int64) 0)) ||
  679. SQLITE_DONE != (rc = sqlite3_step(stmt_init_BLOBS_table)) ||
  680. SQLITE_OK != (rc = sqlite3_reset(stmt_init_BLOBS_table)))){
  681. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  682. uv_mutex_unlock(p_file_info->db_mut);
  683. goto return_error;
  684. }
  685. }
  686. rc = sqlite3_finalize(stmt_init_BLOBS_table);
  687. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  688. }
  689. rc = sqlite3_finalize(stmt_check_if_BLOBS_TABLE_exists);
  690. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  691. /* If LOGS_TABLE doesn't exist, create it */
  692. rc = sqlite3_exec(p_file_info->db,
  693. "CREATE TABLE IF NOT EXISTS " LOGS_TABLE "("
  694. "Id INTEGER PRIMARY KEY,"
  695. "FK_BLOB_Id INTEGER NOT NULL,"
  696. "BLOB_Offset INTEGER NOT NULL,"
  697. "Timestamp INTEGER NOT NULL,"
  698. "Msg_compr_size INTEGER NOT NULL,"
  699. "Msg_decompr_size INTEGER NOT NULL,"
  700. "Num_lines INTEGER NOT NULL,"
  701. "FOREIGN KEY (FK_BLOB_Id) REFERENCES " BLOBS_TABLE " (Id) ON DELETE CASCADE ON UPDATE CASCADE"
  702. ");",
  703. 0, 0, &err_msg);
  704. if (unlikely(SQLITE_OK != rc)) {
  705. collector_error("[%s]: Failed to create " LOGS_TABLE ", SQL error: %s", p_file_info->chartname, err_msg);
  706. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  707. uv_mutex_unlock(p_file_info->db_mut);
  708. goto return_error;
  709. } else collector_info("[%s]: Table " LOGS_TABLE " created successfully", p_file_info->chartname);
  710. /* Create index on LOGS_TABLE Timestamp
  711. * TODO: If this doesn't speed up queries, check SQLITE R*tree
  712. * module. Requires benchmarking with/without index. */
  713. rc = sqlite3_exec(p_file_info->db,
  714. "CREATE INDEX IF NOT EXISTS logs_timestamps_idx "
  715. "ON " LOGS_TABLE "(Timestamp);",
  716. 0, 0, &err_msg);
  717. if (unlikely(SQLITE_OK != rc)) {
  718. collector_error("[%s]: Failed to create logs_timestamps_idx, SQL error: %s", p_file_info->chartname, err_msg);
  719. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  720. uv_mutex_unlock(p_file_info->db_mut);
  721. goto return_error;
  722. } else collector_info("[%s]: logs_timestamps_idx created successfully", p_file_info->chartname);
  723. rc = sqlite3_exec(p_file_info->db, "END TRANSACTION;", NULL, NULL, NULL);
  724. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  725. }
  726. /* -----------------------------------------------------------------
  727. * Remove excess BLOBs beyond BLOB_MAX_FILES (from both DB and disk
  728. * storage).
  729. *
  730. * This is useful if BLOB_MAX_FILES is reduced after an agent
  731. * restart (for example, if in the future it is not hardcoded,
  732. * but instead it is read from the configuration file). LOGS_TABLE
  733. * entries should be deleted automatically (due to ON DELETE CASCADE).
  734. * -------------------------------------------------------------- */
  735. {
  736. sqlite3_stmt *stmt_get_BLOBS_TABLE_size = NULL;
  737. rc = sqlite3_prepare_v2(p_file_info->db,
  738. "SELECT MAX(Id) FROM " BLOBS_TABLE ";",
  739. -1, &stmt_get_BLOBS_TABLE_size, NULL);
  740. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  741. rc = sqlite3_step(stmt_get_BLOBS_TABLE_size);
  742. do_sqlite_error_check(p_file_info, rc, SQLITE_ROW);
  743. const int blobs_table_max_id = sqlite3_column_int(stmt_get_BLOBS_TABLE_size, 0);
  744. sqlite3_stmt *stmt_retrieve_filename_last_digits = NULL; // This statement retrieves the last digit(s) from the Filename column of BLOBS_TABLE
  745. rc = sqlite3_prepare_v2(p_file_info->db,
  746. "WITH split(word, str) AS ( SELECT '', (SELECT Filename FROM " BLOBS_TABLE " WHERE Id = ? ) || '.' "
  747. "UNION ALL SELECT substr(str, 0, instr(str, '.')), substr(str, instr(str, '.')+1) FROM split WHERE str!='' ) "
  748. "SELECT word FROM split WHERE word!='' ORDER BY LENGTH(str) LIMIT 1;",
  749. -1, &stmt_retrieve_filename_last_digits, NULL);
  750. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  751. sqlite3_stmt *stmt_delete_row_by_id = NULL;
  752. rc = sqlite3_prepare_v2(p_file_info->db,
  753. "DELETE FROM " BLOBS_TABLE " WHERE Id = ?;",
  754. -1, &stmt_delete_row_by_id, NULL);
  755. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  756. for (int id = 1; id <= blobs_table_max_id; id++){
  757. rc = sqlite3_bind_int(stmt_retrieve_filename_last_digits, 1, id);
  758. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  759. rc = sqlite3_step(stmt_retrieve_filename_last_digits);
  760. do_sqlite_error_check(p_file_info, rc, SQLITE_ROW);
  761. int last_digits = sqlite3_column_int(stmt_retrieve_filename_last_digits, 0);
  762. rc = sqlite3_reset(stmt_retrieve_filename_last_digits);
  763. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  764. /* If last_digits > BLOB_MAX_FILES - 1, then some BLOB files
  765. * will need to be removed (both from DB BLOBS_TABLE and
  766. * also from the disk). */
  767. if(last_digits > BLOB_MAX_FILES - 1){
  768. /* Delete BLOB file from filesystem */
  769. char blob_delete_path[FILENAME_MAX + 1];
  770. snprintfz(blob_delete_path, FILENAME_MAX, "%s" BLOB_STORE_FILENAME "%d", p_file_info->db_dir, last_digits);
  771. uv_fs_t unlink_req;
  772. rc = uv_fs_unlink(NULL, &unlink_req, blob_delete_path, NULL);
  773. uv_fs_req_cleanup(&unlink_req);
  774. if (unlikely(rc)) {
  775. // TODO: If there is an erro here, the entry won't be deleted from BLOBS_TABLE. What to do?
  776. throw_error(p_file_info->chartname, ERR_TYPE_LIBUV, rc, __LINE__, __FILE__, __FUNCTION__);
  777. uv_mutex_unlock(p_file_info->db_mut);
  778. goto return_error;
  779. }
  780. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  781. /* Delete entry from DB BLOBS_TABLE */
  782. rc = sqlite3_bind_int(stmt_delete_row_by_id, 1, id);
  783. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  784. rc = sqlite3_step(stmt_delete_row_by_id);
  785. do_sqlite_error_check(p_file_info, rc, SQLITE_DONE);
  786. rc = sqlite3_reset(stmt_delete_row_by_id);
  787. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  788. }
  789. }
  790. rc = sqlite3_finalize(stmt_retrieve_filename_last_digits);
  791. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  792. rc = sqlite3_finalize(stmt_delete_row_by_id);
  793. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  794. /* -------------------------------------------------------------
  795. * BLOBS_TABLE ids after the deletion might not be contiguous.
  796. * This needs to be fixed, by having the ids updated.
  797. * LOGS_TABLE FKs will be updated automatically
  798. * (due to ON UPDATE CASCADE).
  799. * ---------------------------------------------------------- */
  800. int old_blobs_table_ids[BLOB_MAX_FILES];
  801. int off = 0;
  802. sqlite3_stmt *stmt_retrieve_all_ids = NULL;
  803. rc = sqlite3_prepare_v2(p_file_info->db,
  804. "SELECT Id FROM " BLOBS_TABLE " ORDER BY Id ASC;",
  805. -1, &stmt_retrieve_all_ids, NULL);
  806. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  807. rc = sqlite3_step(stmt_retrieve_all_ids);
  808. while(rc == SQLITE_ROW){
  809. old_blobs_table_ids[off++] = sqlite3_column_int(stmt_retrieve_all_ids, 0);
  810. rc = sqlite3_step(stmt_retrieve_all_ids);
  811. }
  812. do_sqlite_error_check(p_file_info, rc, SQLITE_DONE);
  813. rc = sqlite3_finalize(stmt_retrieve_all_ids);
  814. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  815. sqlite3_stmt *stmt_update_id = NULL;
  816. rc = sqlite3_prepare_v2(p_file_info->db,
  817. "UPDATE " BLOBS_TABLE " SET Id = ? WHERE Id = ?;",
  818. -1, &stmt_update_id, NULL);
  819. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  820. for (int i = 0; i < BLOB_MAX_FILES; i++){
  821. if(unlikely(
  822. SQLITE_OK != (rc = sqlite3_bind_int(stmt_update_id, 1, i + 1)) ||
  823. SQLITE_OK != (rc = sqlite3_bind_int(stmt_update_id, 2, old_blobs_table_ids[i])) ||
  824. SQLITE_DONE != (rc = sqlite3_step(stmt_update_id)) ||
  825. SQLITE_OK != (rc = sqlite3_reset(stmt_update_id)))) {
  826. throw_error(p_file_info->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  827. uv_mutex_unlock(p_file_info->db_mut);
  828. goto return_error;
  829. }
  830. }
  831. rc = sqlite3_finalize(stmt_update_id);
  832. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  833. }
  834. /* -----------------------------------------------------------------
  835. * Traverse BLOBS_TABLE, open logs.bin.X files and store their
  836. * file handles in p_file_info array.
  837. * -------------------------------------------------------------- */
  838. sqlite3_stmt *stmt_retrieve_metadata_from_id = NULL;
  839. rc = sqlite3_prepare_v2(p_file_info->db,
  840. "SELECT Filename, Filesize FROM " BLOBS_TABLE
  841. " WHERE Id = ? ;",
  842. -1, &stmt_retrieve_metadata_from_id, NULL);
  843. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  844. sqlite3_stmt *stmt_retrieve_total_logs_size = NULL;
  845. rc = sqlite3_prepare_v2(p_file_info->db,
  846. "SELECT SUM(Msg_compr_size) FROM " LOGS_TABLE
  847. " WHERE FK_BLOB_Id = ? GROUP BY FK_BLOB_Id ;",
  848. -1, &stmt_retrieve_total_logs_size, NULL);
  849. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  850. uv_fs_t open_req;
  851. for(int id = 1; id <= BLOB_MAX_FILES; id++){
  852. /* Open BLOB file based on filename stored in BLOBS_TABLE. */
  853. rc = sqlite3_bind_int(stmt_retrieve_metadata_from_id, 1, id);
  854. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  855. rc = sqlite3_step(stmt_retrieve_metadata_from_id);
  856. do_sqlite_error_check(p_file_info, rc, SQLITE_ROW);
  857. char filename[FILENAME_MAX + 1] = {0};
  858. snprintfz(filename, FILENAME_MAX, "%s%s", p_file_info->db_dir,
  859. sqlite3_column_text(stmt_retrieve_metadata_from_id, 0));
  860. rc = uv_fs_open(NULL, &open_req, filename,
  861. UV_FS_O_RDWR | UV_FS_O_CREAT | UV_FS_O_APPEND | UV_FS_O_RANDOM,
  862. 0644, NULL);
  863. if (unlikely(rc < 0)){
  864. uv_fs_req_cleanup(&open_req);
  865. throw_error(p_file_info->chartname, ERR_TYPE_LIBUV, rc, __LINE__, __FILE__, __FUNCTION__);
  866. uv_mutex_unlock(p_file_info->db_mut);
  867. goto return_error;
  868. }
  869. // open_req.result of a uv_fs_t is the file descriptor in case of the uv_fs_open
  870. p_file_info->blob_handles[id] = open_req.result;
  871. uv_fs_req_cleanup(&open_req);
  872. const int64_t metadata_filesize = (int64_t) sqlite3_column_int64(stmt_retrieve_metadata_from_id, 1);
  873. /* -------------------------------------------------------------
  874. * Retrieve total log messages compressed size from LOGS_TABLE
  875. * for current FK_BLOB_Id.
  876. * Only to assert whether correct - not used elsewhere.
  877. *
  878. * If no rows are returned, it means it is probably the initial
  879. * execution of the program so still valid (except if rc is other
  880. * than SQLITE_DONE, which is an error then).
  881. * ---------------------------------------------------------- */
  882. rc = sqlite3_bind_int(stmt_retrieve_total_logs_size, 1, id);
  883. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  884. rc = sqlite3_step(stmt_retrieve_total_logs_size);
  885. if (SQLITE_ROW == rc){
  886. const int64_t total_logs_filesize = (int64_t) sqlite3_column_int64(stmt_retrieve_total_logs_size, 0);
  887. if(unlikely(total_logs_filesize != metadata_filesize)){
  888. throw_error(p_file_info->chartname, ERR_TYPE_OTHER, rc, __LINE__, __FILE__, __FUNCTION__);
  889. uv_mutex_unlock(p_file_info->db_mut);
  890. goto return_error;
  891. }
  892. } else do_sqlite_error_check(p_file_info, rc, SQLITE_DONE);
  893. /* Get filesize of BLOB file. */
  894. uv_fs_t stat_req;
  895. rc = uv_fs_stat(NULL, &stat_req, filename, NULL);
  896. if (unlikely(rc)){
  897. uv_fs_req_cleanup(&stat_req);
  898. throw_error(p_file_info->chartname, ERR_TYPE_LIBUV, rc, __LINE__, __FILE__, __FUNCTION__);
  899. uv_mutex_unlock(p_file_info->db_mut);
  900. goto return_error;
  901. }
  902. const int64_t blob_filesize = (int64_t) stat_req.statbuf.st_size;
  903. uv_fs_req_cleanup(&stat_req);
  904. do{
  905. /* Case 1: blob_filesize == metadata_filesize (equal, either both zero or not): All good */
  906. if(likely(blob_filesize == metadata_filesize))
  907. break;
  908. /* Case 2: blob_filesize == 0 && metadata_filesize > 0: fatal(), however could it mean that
  909. * EXT_BLOB_STORE_FILENAME was rotated but the SQLite metadata wasn't updated? So can it
  910. * maybe be recovered by un-rotating? Either way, treat as fatal error for now. */
  911. // TODO: Can we avoid fatal()?
  912. if(unlikely(blob_filesize == 0 && metadata_filesize > 0)){
  913. collector_error("[%s]: blob_filesize == 0 but metadata_filesize > 0 for '%s'\n",
  914. p_file_info->chartname, filename);
  915. throw_error(p_file_info->chartname, ERR_TYPE_OTHER, rc, __LINE__, __FILE__, __FUNCTION__);
  916. uv_mutex_unlock(p_file_info->db_mut);
  917. goto return_error;
  918. }
  919. /* Case 3: blob_filesize > metadata_filesize: Truncate binary to sqlite filesize, program
  920. * crashed or terminated after writing BLOBs to external file but before metadata was updated */
  921. if(unlikely(blob_filesize > metadata_filesize)){
  922. collector_info("[%s]: blob_filesize > metadata_filesize for '%s'. Will attempt to fix it.",
  923. p_file_info->chartname, filename);
  924. uv_fs_t trunc_req;
  925. rc = uv_fs_ftruncate(NULL, &trunc_req, p_file_info->blob_handles[id], metadata_filesize, NULL);
  926. uv_fs_req_cleanup(&trunc_req);
  927. if(unlikely(rc)) {
  928. throw_error(p_file_info->chartname, ERR_TYPE_LIBUV, rc, __LINE__, __FILE__, __FUNCTION__);
  929. uv_mutex_unlock(p_file_info->db_mut);
  930. goto return_error;
  931. }
  932. break;
  933. }
  934. /* Case 4: blob_filesize < metadata_filesize: unrecoverable,
  935. * maybe rotation went horrible wrong?
  936. * TODO: Delete external BLOB and clear metadata from DB,
  937. * start from clean state but the most recent logs. */
  938. if(unlikely(blob_filesize < metadata_filesize)){
  939. collector_info("[%s]: blob_filesize < metadata_filesize for '%s'.",
  940. p_file_info->chartname, filename);
  941. throw_error(p_file_info->chartname, ERR_TYPE_OTHER, rc, __LINE__, __FILE__, __FUNCTION__);
  942. uv_mutex_unlock(p_file_info->db_mut);
  943. goto return_error;
  944. }
  945. /* Case 5: default if none of the above, should never reach here, fatal() */
  946. m_assert(0, "Code should not reach here");
  947. throw_error(p_file_info->chartname, ERR_TYPE_OTHER, rc, __LINE__, __FILE__, __FUNCTION__);
  948. uv_mutex_unlock(p_file_info->db_mut);
  949. goto return_error;
  950. } while(0);
  951. /* Initialise blob_write_handle with logs.bin.0 */
  952. if(filename[strlen(filename) - 1] == '0')
  953. p_file_info->blob_write_handle_offset = id;
  954. rc = sqlite3_reset(stmt_retrieve_total_logs_size);
  955. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  956. rc = sqlite3_reset(stmt_retrieve_metadata_from_id);
  957. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  958. }
  959. rc = sqlite3_finalize(stmt_retrieve_metadata_from_id);
  960. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  961. /* Prepare statements to be used in single database queries */
  962. rc = sqlite3_prepare_v2(p_file_info->db,
  963. "SELECT Timestamp, Msg_compr_size , Msg_decompr_size, "
  964. "BLOB_Offset, " BLOBS_TABLE".Id, Num_lines "
  965. "FROM " LOGS_TABLE " INNER JOIN " BLOBS_TABLE " "
  966. "ON " LOGS_TABLE ".FK_BLOB_Id = " BLOBS_TABLE ".Id "
  967. "WHERE Timestamp >= ? AND Timestamp <= ? "
  968. "ORDER BY Timestamp;",
  969. -1, &p_file_info->stmt_get_log_msg_metadata_asc, NULL);
  970. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  971. rc = sqlite3_prepare_v2(p_file_info->db,
  972. "SELECT Timestamp, Msg_compr_size , Msg_decompr_size, "
  973. "BLOB_Offset, " BLOBS_TABLE".Id, Num_lines "
  974. "FROM " LOGS_TABLE " INNER JOIN " BLOBS_TABLE " "
  975. "ON " LOGS_TABLE ".FK_BLOB_Id = " BLOBS_TABLE ".Id "
  976. "WHERE Timestamp <= ? AND Timestamp >= ? "
  977. "ORDER BY Timestamp DESC;",
  978. -1, &p_file_info->stmt_get_log_msg_metadata_desc, NULL);
  979. do_sqlite_error_check(p_file_info, rc, SQLITE_OK);
  980. /* DB initialisation finished; release lock */
  981. uv_mutex_unlock(p_file_info->db_mut);
  982. /* Create synchronous writer thread, one for each log source */
  983. p_file_info->db_writer_thread = mallocz(sizeof(uv_thread_t));
  984. rc = uv_thread_create(p_file_info->db_writer_thread, db_writer_db_mode_full, p_file_info);
  985. if (unlikely(rc)){
  986. throw_error(p_file_info->chartname, ERR_TYPE_LIBUV, rc, __LINE__, __FILE__, __FUNCTION__);
  987. goto return_error;
  988. }
  989. }
  990. }
  991. rc = sqlite3_finalize(stmt_search_if_log_source_exists);
  992. if (unlikely(rc != SQLITE_OK)){
  993. throw_error(MAIN_DB, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  994. // TODO: Some additional cleanup required here, e.g. terminate db_writer_thread.
  995. goto return_error;
  996. }
  997. rc = sqlite3_finalize(stmt_insert_log_collection_metadata);
  998. if (unlikely(rc != SQLITE_OK)){
  999. throw_error(MAIN_DB, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  1000. // TODO: Some additional cleanup required here, e.g. terminate db_writer_thread.
  1001. goto return_error;
  1002. }
  1003. return 0;
  1004. return_error:
  1005. freez(main_db_path);
  1006. main_db_path = NULL;
  1007. sqlite3_close(main_db); // No-op if main_db == NULL
  1008. sqlite3_free(err_msg); // No-op if err_msg == NULL
  1009. m_assert(rc != 0, "rc should not be == 0 in case of error");
  1010. return rc == 0 ? -1 : rc;
  1011. }
  1012. /**
  1013. * @brief Search database(s) for logs
  1014. * @details This function searches one or more databases for any results
  1015. * matching the query parameters. If any results are found, it will decompress
  1016. * the text of each returned row and add it to the results buffer, up to a
  1017. * maximum amount of p_query_params->quota bytes (unless timed out).
  1018. * @todo Make decompress buffer static to reduce mallocs/frees.
  1019. * @todo Limit number of results returned through SQLite Query to speed up search?
  1020. */
  1021. void db_search(logs_query_params_t *const p_query_params, struct File_info *const p_file_infos[]) {
  1022. int rc = 0;
  1023. sqlite3_stmt *stmt_get_log_msg_metadata;
  1024. sqlite3 *dbt = NULL; // Used only when multiple DBs are searched
  1025. if(!p_file_infos[1]){ /* Single DB to be searched */
  1026. stmt_get_log_msg_metadata = p_query_params->order_by_asc ?
  1027. p_file_infos[0]->stmt_get_log_msg_metadata_asc : p_file_infos[0]->stmt_get_log_msg_metadata_desc;
  1028. if(unlikely(
  1029. SQLITE_OK != (rc = sqlite3_bind_int64(stmt_get_log_msg_metadata, 1, p_query_params->req_from_ts)) ||
  1030. SQLITE_OK != (rc = sqlite3_bind_int64(stmt_get_log_msg_metadata, 2, p_query_params->req_to_ts)) ||
  1031. (SQLITE_ROW != (rc = sqlite3_step(stmt_get_log_msg_metadata)) && (SQLITE_DONE != rc))
  1032. )){
  1033. throw_error(p_file_infos[0]->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  1034. // TODO: If there are errors here, should db_writer_db_mode_full() be terminated?
  1035. sqlite3_reset(stmt_get_log_msg_metadata);
  1036. return;
  1037. }
  1038. } else { /* Multiple DBs to be searched */
  1039. sqlite3_stmt *stmt_attach_db;
  1040. sqlite3_stmt *stmt_create_tmp_view;
  1041. int pfi_off = 0;
  1042. /* Open a new DB connection on the first log source DB and attach other DBs */
  1043. if(unlikely(
  1044. SQLITE_OK != (rc = sqlite3_open_v2(p_file_infos[0]->db_metadata, &dbt, SQLITE_OPEN_READONLY, NULL)) ||
  1045. SQLITE_OK != (rc = sqlite3_prepare_v2(dbt,"ATTACH DATABASE ? AS ? ;", -1, &stmt_attach_db, NULL))
  1046. )){
  1047. throw_error(p_file_infos[0]->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  1048. sqlite3_close_v2(dbt);
  1049. return;
  1050. }
  1051. for(pfi_off = 0; p_file_infos[pfi_off]; pfi_off++){
  1052. if(unlikely(
  1053. SQLITE_OK != (rc = sqlite3_bind_text(stmt_attach_db, 1, p_file_infos[pfi_off]->db_metadata, -1, NULL)) ||
  1054. SQLITE_OK != (rc = sqlite3_bind_int(stmt_attach_db, 2, pfi_off)) ||
  1055. SQLITE_DONE != (rc = sqlite3_step(stmt_attach_db)) ||
  1056. SQLITE_OK != (rc = sqlite3_reset(stmt_attach_db))
  1057. )){
  1058. throw_error(p_file_infos[pfi_off]->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  1059. sqlite3_close_v2(dbt);
  1060. return;
  1061. }
  1062. }
  1063. /* Create temporary view, then prepare retrieval of metadata from
  1064. * TMP_VIEW_TABLE statement and execute search.
  1065. * TODO: Limit number of results returned through SQLite Query to speed up search? */
  1066. #define TMP_VIEW_TABLE "compound_view"
  1067. #define TMP_VIEW_QUERY_PREFIX "CREATE TEMP VIEW " TMP_VIEW_TABLE " AS SELECT * FROM (SELECT * FROM '0'."\
  1068. LOGS_TABLE " INNER JOIN (VALUES(0)) ORDER BY Timestamp) "
  1069. #define TMP_VIEW_QUERY_BODY_1 "UNION ALL SELECT * FROM (SELECT * FROM '"
  1070. #define TMP_VIEW_QUERY_BODY_2 "'." LOGS_TABLE " INNER JOIN (VALUES("
  1071. #define TMP_VIEW_QUERY_BODY_3 ")) ORDER BY Timestamp) "
  1072. #define TMP_VIEW_QUERY_POSTFIX "ORDER BY Timestamp;"
  1073. char tmp_view_query[sizeof(TMP_VIEW_QUERY_PREFIX) + (
  1074. sizeof(TMP_VIEW_QUERY_BODY_1) +
  1075. sizeof(TMP_VIEW_QUERY_BODY_2) +
  1076. sizeof(TMP_VIEW_QUERY_BODY_3) + 4
  1077. ) * (LOGS_MANAG_MAX_COMPOUND_QUERY_SOURCES - 1) +
  1078. sizeof(TMP_VIEW_QUERY_POSTFIX) +
  1079. 50 /* +50 bytes to play it safe */] = TMP_VIEW_QUERY_PREFIX;
  1080. int pos = sizeof(TMP_VIEW_QUERY_PREFIX) - 1;
  1081. for(pfi_off = 1; p_file_infos[pfi_off]; pfi_off++){ // Skip p_file_infos[0]
  1082. int n = snprintf(&tmp_view_query[pos], sizeof(tmp_view_query) - pos, "%s%d%s%d%s",
  1083. TMP_VIEW_QUERY_BODY_1, pfi_off,
  1084. TMP_VIEW_QUERY_BODY_2, pfi_off,
  1085. TMP_VIEW_QUERY_BODY_3);
  1086. if (n < 0 || n >= (int) sizeof(tmp_view_query) - pos){
  1087. throw_error(p_file_infos[pfi_off]->chartname, ERR_TYPE_OTHER, n, __LINE__, __FILE__, __FUNCTION__);
  1088. sqlite3_close_v2(dbt);
  1089. return;
  1090. }
  1091. pos += n;
  1092. }
  1093. snprintf(&tmp_view_query[pos], sizeof(tmp_view_query) - pos, "%s", TMP_VIEW_QUERY_POSTFIX);
  1094. if(unlikely(
  1095. SQLITE_OK != (rc = sqlite3_prepare_v2(dbt, tmp_view_query, -1, &stmt_create_tmp_view, NULL)) ||
  1096. SQLITE_DONE != (rc = sqlite3_step(stmt_create_tmp_view)) ||
  1097. SQLITE_OK != (rc = sqlite3_prepare_v2(dbt, p_query_params->order_by_asc ?
  1098. "SELECT Timestamp, Msg_compr_size , Msg_decompr_size, "
  1099. "BLOB_Offset, FK_BLOB_Id, Num_lines, column1 "
  1100. "FROM " TMP_VIEW_TABLE " "
  1101. "WHERE Timestamp >= ? AND Timestamp <= ?;" :
  1102. /* TODO: The following can also be done by defining
  1103. * a descending order tmp_view_query, which will
  1104. * probably be faster. Needs to be measured. */
  1105. "SELECT Timestamp, Msg_compr_size , Msg_decompr_size, "
  1106. "BLOB_Offset, FK_BLOB_Id, Num_lines, column1 "
  1107. "FROM " TMP_VIEW_TABLE " "
  1108. "WHERE Timestamp <= ? AND Timestamp >= ? ORDER BY Timestamp DESC;",
  1109. -1, &stmt_get_log_msg_metadata, NULL)) ||
  1110. SQLITE_OK != (rc = sqlite3_bind_int64(stmt_get_log_msg_metadata, 1,
  1111. (sqlite3_int64)p_query_params->req_from_ts)) ||
  1112. SQLITE_OK != (rc = sqlite3_bind_int64(stmt_get_log_msg_metadata, 2,
  1113. (sqlite3_int64)p_query_params->req_to_ts)) ||
  1114. (SQLITE_ROW != (rc = sqlite3_step(stmt_get_log_msg_metadata)) && (SQLITE_DONE != rc))
  1115. )){
  1116. throw_error(p_file_infos[0]->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  1117. sqlite3_close_v2(dbt);
  1118. return;
  1119. }
  1120. }
  1121. Circ_buff_item_t tmp_itm = {0};
  1122. BUFFER *const res_buff = p_query_params->results_buff;
  1123. logs_query_res_hdr_t res_hdr = { // results header
  1124. .timestamp = p_query_params->act_to_ts,
  1125. .text_size = 0,
  1126. .matches = 0,
  1127. .log_source = "",
  1128. .log_type = "",
  1129. .basename = "",
  1130. .filename = "",
  1131. .chartname =""
  1132. };
  1133. size_t text_compressed_size_max = 0;
  1134. while (rc == SQLITE_ROW) {
  1135. /* Retrieve metadata from DB */
  1136. tmp_itm.timestamp = (msec_t)sqlite3_column_int64(stmt_get_log_msg_metadata, 0);
  1137. tmp_itm.text_compressed_size = (size_t)sqlite3_column_int64(stmt_get_log_msg_metadata, 1);
  1138. tmp_itm.text_size = (size_t)sqlite3_column_int64(stmt_get_log_msg_metadata, 2);
  1139. int64_t blob_offset = (int64_t) sqlite3_column_int64(stmt_get_log_msg_metadata, 3);
  1140. int blob_handles_offset = sqlite3_column_int(stmt_get_log_msg_metadata, 4);
  1141. unsigned long num_lines = (unsigned long) sqlite3_column_int64(stmt_get_log_msg_metadata, 5);
  1142. int db_off = p_file_infos[1] ? sqlite3_column_int(stmt_get_log_msg_metadata, 6) : 0;
  1143. /* If exceeding quota or timeout is reached and new timestamp
  1144. * is different than previous, terminate query. */
  1145. if((res_buff->len >= p_query_params->quota || terminate_logs_manag_query(p_query_params)) &&
  1146. tmp_itm.timestamp != res_hdr.timestamp){
  1147. p_query_params->act_to_ts = res_hdr.timestamp;
  1148. break;
  1149. }
  1150. res_hdr.timestamp = tmp_itm.timestamp;
  1151. snprintfz(res_hdr.log_source, sizeof(res_hdr.log_source), "%s", log_src_t_str[p_file_infos[db_off]->log_source]);
  1152. snprintfz(res_hdr.log_type, sizeof(res_hdr.log_type), "%s", log_src_type_t_str[p_file_infos[db_off]->log_type]);
  1153. snprintfz(res_hdr.basename, sizeof(res_hdr.basename), "%s", p_file_infos[db_off]->file_basename);
  1154. snprintfz(res_hdr.filename, sizeof(res_hdr.filename), "%s", p_file_infos[db_off]->filename);
  1155. snprintfz(res_hdr.chartname, sizeof(res_hdr.chartname), "%s", p_file_infos[db_off]->chartname);
  1156. /* Retrieve compressed log messages from BLOB file */
  1157. if(tmp_itm.text_compressed_size > text_compressed_size_max){
  1158. text_compressed_size_max = tmp_itm.text_compressed_size;
  1159. tmp_itm.text_compressed = reallocz(tmp_itm.text_compressed, text_compressed_size_max);
  1160. }
  1161. uv_fs_t read_req;
  1162. uv_buf_t uv_buf = uv_buf_init(tmp_itm.text_compressed, tmp_itm.text_compressed_size);
  1163. rc = uv_fs_read(NULL,
  1164. &read_req,
  1165. p_file_infos[db_off]->blob_handles[blob_handles_offset],
  1166. &uv_buf, 1, blob_offset, NULL);
  1167. uv_fs_req_cleanup(&read_req);
  1168. if (unlikely(rc < 0)){
  1169. throw_error(NULL, ERR_TYPE_LIBUV, rc, __LINE__, __FILE__, __FUNCTION__);
  1170. break;
  1171. }
  1172. /* Append retrieved results to BUFFER.
  1173. * In the case of search_keyword(), less than sizeof(res_hdr) + tmp_itm.text_size
  1174. *space may be required, but go for worst case scenario for now */
  1175. buffer_increase(res_buff, sizeof(res_hdr) + tmp_itm.text_size);
  1176. if(!p_query_params->keyword || !*p_query_params->keyword || !strcmp(p_query_params->keyword, " ")){
  1177. rc = LZ4_decompress_safe(tmp_itm.text_compressed,
  1178. &res_buff->buffer[res_buff->len + sizeof(res_hdr)],
  1179. tmp_itm.text_compressed_size,
  1180. tmp_itm.text_size);
  1181. if(unlikely(rc < 0)){
  1182. throw_error(p_file_infos[db_off]->chartname, ERR_TYPE_OTHER, rc, __LINE__, __FILE__, __FUNCTION__);
  1183. break;
  1184. }
  1185. res_hdr.matches = num_lines;
  1186. res_hdr.text_size = tmp_itm.text_size;
  1187. }
  1188. else {
  1189. tmp_itm.data = mallocz(tmp_itm.text_size);
  1190. rc = LZ4_decompress_safe(tmp_itm.text_compressed,
  1191. tmp_itm.data,
  1192. tmp_itm.text_compressed_size,
  1193. tmp_itm.text_size);
  1194. if(unlikely(rc < 0)){
  1195. freez(tmp_itm.data);
  1196. throw_error(p_file_infos[db_off]->chartname, ERR_TYPE_OTHER, rc, __LINE__, __FILE__, __FUNCTION__);
  1197. break;
  1198. }
  1199. res_hdr.matches = search_keyword( tmp_itm.data, tmp_itm.text_size,
  1200. &res_buff->buffer[res_buff->len + sizeof(res_hdr)],
  1201. &res_hdr.text_size, p_query_params->keyword, NULL,
  1202. p_query_params->ignore_case);
  1203. freez(tmp_itm.data);
  1204. m_assert( (res_hdr.matches > 0 && res_hdr.text_size > 0) ||
  1205. (res_hdr.matches == 0 && res_hdr.text_size == 0),
  1206. "res_hdr.matches and res_hdr.text_size must both be > 0 or == 0.");
  1207. if(unlikely(res_hdr.matches < 0)){ /* res_hdr.matches < 0 - error during keyword search */
  1208. throw_error(p_file_infos[db_off]->chartname, ERR_TYPE_LIBUV, rc, __LINE__, __FILE__, __FUNCTION__);
  1209. break;
  1210. }
  1211. }
  1212. if(res_hdr.text_size){
  1213. res_buff->buffer[res_buff->len + sizeof(res_hdr) + res_hdr.text_size - 1] = '\n'; // replace '\0' with '\n'
  1214. memcpy(&res_buff->buffer[res_buff->len], &res_hdr, sizeof(res_hdr));
  1215. res_buff->len += sizeof(res_hdr) + res_hdr.text_size;
  1216. p_query_params->num_lines += res_hdr.matches;
  1217. }
  1218. m_assert(TEST_MS_TIMESTAMP_VALID(res_hdr.timestamp), "res_hdr.timestamp is invalid");
  1219. rc = sqlite3_step(stmt_get_log_msg_metadata);
  1220. if (unlikely(rc != SQLITE_ROW && rc != SQLITE_DONE)){
  1221. throw_error(p_file_infos[db_off]->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  1222. // TODO: If there are errors here, should db_writer_db_mode_full() be terminated?
  1223. break;
  1224. }
  1225. }
  1226. if(tmp_itm.text_compressed)
  1227. freez(tmp_itm.text_compressed);
  1228. if(p_file_infos[1])
  1229. rc = sqlite3_close_v2(dbt);
  1230. else
  1231. rc = sqlite3_reset(stmt_get_log_msg_metadata);
  1232. if (unlikely(SQLITE_OK != rc))
  1233. throw_error(p_file_infos[0]->chartname, ERR_TYPE_SQLITE, rc, __LINE__, __FILE__, __FUNCTION__);
  1234. }