logfile.c 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "metadatalog.h"
  3. #include "metalogpluginsd.h"
  4. static void mlf_record_block_insert(struct metadata_logfile *metalogfile, struct metalog_record_block *record_block)
  5. {
  6. if (likely(NULL != metalogfile->records.last)) {
  7. metalogfile->records.last->next = record_block;
  8. }
  9. if (unlikely(NULL == metalogfile->records.first)) {
  10. metalogfile->records.first = record_block;
  11. }
  12. metalogfile->records.last = record_block;
  13. }
  14. void mlf_record_insert(struct metadata_logfile *metalogfile, struct metalog_record *record)
  15. {
  16. struct metalog_record_block *record_block;
  17. struct metalog_instance *ctx = metalogfile->ctx;
  18. record_block = metalogfile->records.last;
  19. if (likely(NULL != record_block && record_block->records_nr < MAX_METALOG_RECORDS_PER_BLOCK)) {
  20. record_block->record_array[record_block->records_nr++] = *record;
  21. } else { /* Create new record block, the last one filled up */
  22. record_block = mallocz(sizeof(*record_block));
  23. record_block->records_nr = 1;
  24. record_block->record_array[0] = *record;
  25. record_block->next = NULL;
  26. mlf_record_block_insert(metalogfile, record_block);
  27. }
  28. rrd_atomic_fetch_add(&ctx->records_nr, 1);
  29. }
  30. struct metalog_record *mlf_record_get_first(struct metadata_logfile *metalogfile)
  31. {
  32. struct metalog_records *records = &metalogfile->records;
  33. struct metalog_record_block *record_block = metalogfile->records.first;
  34. records->iterator.current = record_block;
  35. records->iterator.record_i = 0;
  36. if (unlikely(NULL == record_block || !record_block->records_nr)) {
  37. error("Cannot iterate empty metadata log file %u-%u.", metalogfile->starting_fileno, metalogfile->fileno);
  38. return NULL;
  39. }
  40. return &record_block->record_array[0];
  41. }
  42. /* Must have called mlf_record_get_first before calling this function. */
  43. struct metalog_record *mlf_record_get_next(struct metadata_logfile *metalogfile)
  44. {
  45. struct metalog_records *records = &metalogfile->records;
  46. struct metalog_record_block *record_block = records->iterator.current;
  47. if (unlikely(NULL == record_block)) {
  48. return NULL;
  49. }
  50. if (++records->iterator.record_i >= record_block->records_nr) {
  51. record_block = record_block->next;
  52. if (unlikely(NULL == record_block || !record_block->records_nr)) {
  53. return NULL;
  54. }
  55. records->iterator.current = record_block;
  56. records->iterator.record_i = 0;
  57. return &record_block->record_array[0];
  58. }
  59. return &record_block->record_array[records->iterator.record_i];
  60. }
  61. static void flush_records_buffer_cb(uv_fs_t* req)
  62. {
  63. struct generic_io_descriptor *io_descr = req->data;
  64. struct metalog_worker_config *wc = req->loop->data;
  65. struct metalog_instance *ctx = wc->ctx;
  66. debug(D_METADATALOG, "%s: Metadata log file block was written to disk.", __func__);
  67. if (req->result < 0) {
  68. ++ctx->stats.io_errors;
  69. rrd_stat_atomic_add(&global_io_errors, 1);
  70. error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
  71. } else {
  72. debug(D_METADATALOG, "%s: Metadata log file block was written to disk.", __func__);
  73. }
  74. uv_fs_req_cleanup(req);
  75. free(io_descr->buf);
  76. freez(io_descr);
  77. }
  78. /* Careful to always call this before creating a new metadata log file to finish writing the old one */
  79. void mlf_flush_records_buffer(struct metalog_worker_config *wc, struct metadata_record_commit_log *records_log,
  80. struct metadata_logfile_list *metadata_logfiles)
  81. {
  82. struct metalog_instance *ctx = wc->ctx;
  83. int ret;
  84. struct generic_io_descriptor *io_descr;
  85. unsigned pos, size;
  86. struct metadata_logfile *metalogfile;
  87. if (unlikely(NULL == records_log->buf || 0 == records_log->buf_pos)) {
  88. return;
  89. }
  90. /* care with outstanding records when switching metadata log files */
  91. metalogfile = metadata_logfiles->last;
  92. io_descr = mallocz(sizeof(*io_descr));
  93. pos = records_log->buf_pos;
  94. size = pos; /* no need to align the I/O when doing buffered writes */
  95. io_descr->buf = records_log->buf;
  96. io_descr->bytes = size;
  97. io_descr->pos = metalogfile->pos;
  98. io_descr->req.data = io_descr;
  99. io_descr->completion = NULL;
  100. io_descr->iov = uv_buf_init((void *)io_descr->buf, size);
  101. ret = uv_fs_write(wc->loop, &io_descr->req, metalogfile->file, &io_descr->iov, 1,
  102. metalogfile->pos, flush_records_buffer_cb);
  103. fatal_assert(-1 != ret);
  104. metalogfile->pos += size;
  105. rrd_atomic_fetch_add(&ctx->disk_space, size);
  106. records_log->buf = NULL;
  107. ctx->stats.io_write_bytes += size;
  108. ++ctx->stats.io_write_requests;
  109. }
  110. void *mlf_get_records_buffer(struct metalog_worker_config *wc, struct metadata_record_commit_log *records_log,
  111. struct metadata_logfile_list *metadata_logfiles, unsigned size)
  112. {
  113. int ret;
  114. unsigned buf_pos = 0, buf_size;
  115. fatal_assert(size);
  116. if (records_log->buf) {
  117. unsigned remaining;
  118. buf_pos = records_log->buf_pos;
  119. buf_size = records_log->buf_size;
  120. remaining = buf_size - buf_pos;
  121. if (size > remaining) {
  122. /* we need a new buffer */
  123. mlf_flush_records_buffer(wc, records_log, metadata_logfiles);
  124. }
  125. }
  126. if (NULL == records_log->buf) {
  127. buf_size = ALIGN_BYTES_CEILING(size);
  128. ret = posix_memalign((void *)&records_log->buf, RRDFILE_ALIGNMENT, buf_size);
  129. if (unlikely(ret)) {
  130. fatal("posix_memalign:%s", strerror(ret));
  131. }
  132. buf_pos = records_log->buf_pos = 0;
  133. records_log->buf_size = buf_size;
  134. }
  135. records_log->buf_pos += size;
  136. return records_log->buf + buf_pos;
  137. }
  138. void metadata_logfile_list_insert(struct metadata_logfile_list *metadata_logfiles, struct metadata_logfile *metalogfile)
  139. {
  140. if (likely(NULL != metadata_logfiles->last)) {
  141. metadata_logfiles->last->next = metalogfile;
  142. }
  143. if (unlikely(NULL == metadata_logfiles->first)) {
  144. metadata_logfiles->first = metalogfile;
  145. }
  146. metadata_logfiles->last = metalogfile;
  147. }
  148. void metadata_logfile_list_delete(struct metadata_logfile_list *metadata_logfiles, struct metadata_logfile *metalogfile)
  149. {
  150. struct metadata_logfile *next;
  151. next = metalogfile->next;
  152. fatal_assert((NULL != next) && (metadata_logfiles->first == metalogfile) &&
  153. (metadata_logfiles->last != metalogfile));
  154. metadata_logfiles->first = next;
  155. }
  156. void generate_metadata_logfile_path(struct metadata_logfile *metalogfile, char *str, size_t maxlen)
  157. {
  158. (void) snprintf(str, maxlen, "%s/" METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL METALOG_EXTENSION,
  159. metalogfile->ctx->rrdeng_ctx->dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno);
  160. }
  161. void metadata_logfile_init(struct metadata_logfile *metalogfile, struct metalog_instance *ctx, unsigned starting_fileno,
  162. unsigned fileno)
  163. {
  164. metalogfile->starting_fileno = starting_fileno;
  165. metalogfile->fileno = fileno;
  166. metalogfile->file = (uv_file)0;
  167. metalogfile->pos = 0;
  168. metalogfile->records.first = metalogfile->records.last = NULL;
  169. metalogfile->next = NULL;
  170. metalogfile->ctx = ctx;
  171. }
  172. int rename_metadata_logfile(struct metadata_logfile *metalogfile, unsigned new_starting_fileno, unsigned new_fileno)
  173. {
  174. struct metalog_instance *ctx = metalogfile->ctx;
  175. uv_fs_t req;
  176. int ret;
  177. char oldpath[RRDENG_PATH_MAX], newpath[RRDENG_PATH_MAX];
  178. unsigned backup_starting_fileno, backup_fileno;
  179. backup_starting_fileno = metalogfile->starting_fileno;
  180. backup_fileno = metalogfile->fileno;
  181. generate_metadata_logfile_path(metalogfile, oldpath, sizeof(oldpath));
  182. metalogfile->starting_fileno = new_starting_fileno;
  183. metalogfile->fileno = new_fileno;
  184. generate_metadata_logfile_path(metalogfile, newpath, sizeof(newpath));
  185. info("Renaming metadata log file \"%s\" to \"%s\".", oldpath, newpath);
  186. ret = uv_fs_rename(NULL, &req, oldpath, newpath, NULL);
  187. if (ret < 0) {
  188. error("uv_fs_rename(%s): %s", oldpath, uv_strerror(ret));
  189. ++ctx->stats.fs_errors; /* this is racy, may miss some errors */
  190. rrd_stat_atomic_add(&global_fs_errors, 1);
  191. /* restore previous values */
  192. metalogfile->starting_fileno = backup_starting_fileno;
  193. metalogfile->fileno = backup_fileno;
  194. }
  195. uv_fs_req_cleanup(&req);
  196. return ret;
  197. }
  198. int close_metadata_logfile(struct metadata_logfile *metalogfile)
  199. {
  200. struct metalog_instance *ctx = metalogfile->ctx;
  201. uv_fs_t req;
  202. int ret;
  203. char path[RRDENG_PATH_MAX];
  204. generate_metadata_logfile_path(metalogfile, path, sizeof(path));
  205. ret = uv_fs_close(NULL, &req, metalogfile->file, NULL);
  206. if (ret < 0) {
  207. error("uv_fs_close(%s): %s", path, uv_strerror(ret));
  208. ++ctx->stats.fs_errors;
  209. rrd_stat_atomic_add(&global_fs_errors, 1);
  210. }
  211. uv_fs_req_cleanup(&req);
  212. return ret;
  213. }
  214. int unlink_metadata_logfile(struct metadata_logfile *metalogfile)
  215. {
  216. struct metalog_instance *ctx = metalogfile->ctx;
  217. uv_fs_t req;
  218. int ret;
  219. char path[RRDENG_PATH_MAX];
  220. generate_metadata_logfile_path(metalogfile, path, sizeof(path));
  221. ret = uv_fs_unlink(NULL, &req, path, NULL);
  222. if (ret < 0) {
  223. error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
  224. ++ctx->stats.fs_errors;
  225. rrd_stat_atomic_add(&global_fs_errors, 1);
  226. }
  227. uv_fs_req_cleanup(&req);
  228. return ret;
  229. }
  230. int destroy_metadata_logfile(struct metadata_logfile *metalogfile)
  231. {
  232. struct metalog_instance *ctx = metalogfile->ctx;
  233. uv_fs_t req;
  234. int ret;
  235. char path[RRDENG_PATH_MAX];
  236. generate_metadata_logfile_path(metalogfile, path, sizeof(path));
  237. ret = uv_fs_ftruncate(NULL, &req, metalogfile->file, 0, NULL);
  238. if (ret < 0) {
  239. error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
  240. ++ctx->stats.fs_errors;
  241. rrd_stat_atomic_add(&global_fs_errors, 1);
  242. }
  243. uv_fs_req_cleanup(&req);
  244. ret = uv_fs_close(NULL, &req, metalogfile->file, NULL);
  245. if (ret < 0) {
  246. error("uv_fs_close(%s): %s", path, uv_strerror(ret));
  247. ++ctx->stats.fs_errors;
  248. rrd_stat_atomic_add(&global_fs_errors, 1);
  249. }
  250. uv_fs_req_cleanup(&req);
  251. ret = uv_fs_unlink(NULL, &req, path, NULL);
  252. if (ret < 0) {
  253. error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
  254. ++ctx->stats.fs_errors;
  255. rrd_stat_atomic_add(&global_fs_errors, 1);
  256. }
  257. uv_fs_req_cleanup(&req);
  258. // ++ctx->stats.metadata_logfile_deletions;
  259. return ret;
  260. }
  261. int create_metadata_logfile(struct metadata_logfile *metalogfile)
  262. {
  263. struct metalog_instance *ctx = metalogfile->ctx;
  264. uv_fs_t req;
  265. uv_file file;
  266. int ret, fd;
  267. struct rrdeng_metalog_sb *superblock;
  268. uv_buf_t iov;
  269. char path[RRDENG_PATH_MAX];
  270. generate_metadata_logfile_path(metalogfile, path, sizeof(path));
  271. fd = open_file_buffered_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
  272. if (fd < 0) {
  273. ++ctx->stats.fs_errors;
  274. rrd_stat_atomic_add(&global_fs_errors, 1);
  275. return fd;
  276. }
  277. metalogfile->file = file;
  278. // ++ctx->stats.metadata_logfile_creations;
  279. ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
  280. if (unlikely(ret)) {
  281. fatal("posix_memalign:%s", strerror(ret));
  282. }
  283. (void) strncpy(superblock->magic_number, RRDENG_METALOG_MAGIC, RRDENG_MAGIC_SZ);
  284. superblock->version = RRDENG_METALOG_VER;
  285. iov = uv_buf_init((void *)superblock, sizeof(*superblock));
  286. ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
  287. if (ret < 0) {
  288. fatal_assert(req.result < 0);
  289. error("uv_fs_write: %s", uv_strerror(ret));
  290. ++ctx->stats.io_errors;
  291. rrd_stat_atomic_add(&global_io_errors, 1);
  292. }
  293. uv_fs_req_cleanup(&req);
  294. free(superblock);
  295. if (ret < 0) {
  296. destroy_metadata_logfile(metalogfile);
  297. return ret;
  298. }
  299. metalogfile->pos = sizeof(*superblock);
  300. ctx->stats.io_write_bytes += sizeof(*superblock);
  301. ++ctx->stats.io_write_requests;
  302. return 0;
  303. }
  304. static int check_metadata_logfile_superblock(uv_file file)
  305. {
  306. int ret;
  307. struct rrdeng_metalog_sb *superblock;
  308. uv_buf_t iov;
  309. uv_fs_t req;
  310. ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
  311. if (unlikely(ret)) {
  312. fatal("posix_memalign:%s", strerror(ret));
  313. }
  314. iov = uv_buf_init((void *)superblock, sizeof(*superblock));
  315. ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
  316. if (ret < 0) {
  317. error("uv_fs_read: %s", uv_strerror(ret));
  318. uv_fs_req_cleanup(&req);
  319. goto error;
  320. }
  321. fatal_assert(req.result >= 0);
  322. uv_fs_req_cleanup(&req);
  323. if (strncmp(superblock->magic_number, RRDENG_METALOG_MAGIC, RRDENG_MAGIC_SZ)) {
  324. error("File has invalid superblock.");
  325. ret = UV_EINVAL;
  326. } else {
  327. ret = 0;
  328. }
  329. if (superblock->version > RRDENG_METALOG_VER) {
  330. error("File has unknown version %"PRIu16". Compatibility is not guaranteed.", superblock->version);
  331. }
  332. error:
  333. free(superblock);
  334. return ret;
  335. }
  336. void replay_record(struct metadata_logfile *metalogfile, struct rrdeng_metalog_record_header *header, void *payload)
  337. {
  338. struct metalog_instance *ctx = metalogfile->ctx;
  339. char *line, *nextline, *record_end;
  340. int ret;
  341. debug(D_METADATALOG, "RECORD contents: %.*s", (int)header->payload_length, (char *)payload);
  342. record_end = (char *)payload + header->payload_length - 1;
  343. *record_end = '\0';
  344. for (line = payload ; line ; line = nextline) {
  345. nextline = strchr(line, '\n');
  346. if (nextline) {
  347. *nextline++ = '\0';
  348. }
  349. ret = parser_action(ctx->metalog_parser_object->parser, line);
  350. debug(D_METADATALOG, "parser_action ret:%d", ret);
  351. if (ret)
  352. return; /* skip record due to error */
  353. };
  354. }
  355. /* This function only works with buffered I/O */
  356. static inline int metalogfile_read(struct metadata_logfile *metalogfile, void *buf, size_t len, uint64_t offset)
  357. {
  358. struct metalog_instance *ctx;
  359. uv_file file;
  360. uv_buf_t iov;
  361. uv_fs_t req;
  362. int ret;
  363. ctx = metalogfile->ctx;
  364. file = metalogfile->file;
  365. iov = uv_buf_init(buf, len);
  366. ret = uv_fs_read(NULL, &req, file, &iov, 1, offset, NULL);
  367. if (unlikely(ret < 0 && ret != req.result)) {
  368. fatal("uv_fs_read: %s", uv_strerror(ret));
  369. }
  370. if (req.result < 0) {
  371. ++ctx->stats.io_errors;
  372. rrd_stat_atomic_add(&global_io_errors, 1);
  373. error("%s: uv_fs_read - %s - record at offset %"PRIu64"(%u) in metadata logfile %u-%u.", __func__,
  374. uv_strerror((int)req.result), offset, (unsigned)len, metalogfile->starting_fileno, metalogfile->fileno);
  375. }
  376. uv_fs_req_cleanup(&req);
  377. ctx->stats.io_read_bytes += len;
  378. ++ctx->stats.io_read_requests;
  379. return ret;
  380. }
  381. /* Return 0 on success */
  382. static int metadata_record_integrity_check(void *record)
  383. {
  384. int ret;
  385. uint32_t data_size;
  386. struct rrdeng_metalog_record_header *header;
  387. struct rrdeng_metalog_record_trailer *trailer;
  388. uLong crc;
  389. header = record;
  390. data_size = header->header_length + header->payload_length;
  391. trailer = record + data_size;
  392. crc = crc32(0L, Z_NULL, 0);
  393. crc = crc32(crc, record, data_size);
  394. ret = crc32cmp(trailer->checksum, crc);
  395. return ret;
  396. }
  397. #define MAX_READ_BYTES (RRDENG_BLOCK_SIZE * 32) /* no record should be over 128KiB in this version */
  398. /*
  399. * Iterates metadata log file records and creates database objects (host/chart/dimension)
  400. */
  401. static void iterate_records(struct metadata_logfile *metalogfile)
  402. {
  403. uint32_t file_size, pos, bytes_remaining, record_size;
  404. void *buf;
  405. struct rrdeng_metalog_record_header *header;
  406. struct metalog_instance *ctx = metalogfile->ctx;
  407. struct metalog_pluginsd_state *state = ctx->metalog_parser_object->private;
  408. const size_t min_header_size = offsetof(struct rrdeng_metalog_record_header, header_length) +
  409. sizeof(header->header_length);
  410. file_size = metalogfile->pos;
  411. state->metalogfile = metalogfile;
  412. buf = mallocz(MAX_READ_BYTES);
  413. for (pos = sizeof(struct rrdeng_metalog_sb) ; pos < file_size ; pos += record_size) {
  414. bytes_remaining = file_size - pos;
  415. if (bytes_remaining < min_header_size) {
  416. error("%s: unexpected end of file in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno,
  417. metalogfile->fileno);
  418. break;
  419. }
  420. if (metalogfile_read(metalogfile, buf, min_header_size, pos) < 0)
  421. break;
  422. header = (struct rrdeng_metalog_record_header *)buf;
  423. if (METALOG_STORE_PADDING == header->type) {
  424. info("%s: Skipping padding in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno,
  425. metalogfile->fileno);
  426. record_size = ALIGN_BYTES_FLOOR(pos + RRDENG_BLOCK_SIZE) - pos;
  427. continue;
  428. }
  429. if (metalogfile_read(metalogfile, buf + min_header_size, sizeof(*header) - min_header_size,
  430. pos + min_header_size) < 0)
  431. break;
  432. record_size = header->header_length + header->payload_length + sizeof(struct rrdeng_metalog_record_trailer);
  433. if (header->header_length < min_header_size || record_size > bytes_remaining) {
  434. error("%s: Corrupted record in metadata logfile %u-%u.", __func__, metalogfile->starting_fileno,
  435. metalogfile->fileno);
  436. break;
  437. }
  438. if (record_size > MAX_READ_BYTES) {
  439. error("%s: Record is too long (%u bytes) in metadata logfile %u-%u.", __func__, record_size,
  440. metalogfile->starting_fileno, metalogfile->fileno);
  441. continue;
  442. }
  443. if (metalogfile_read(metalogfile, buf + sizeof(*header), record_size - sizeof(*header),
  444. pos + sizeof(*header)) < 0)
  445. break;
  446. if (metadata_record_integrity_check(buf)) {
  447. error("%s: Record at offset %"PRIu32" was read from disk. CRC32 check: FAILED", __func__, pos);
  448. continue;
  449. }
  450. debug(D_METADATALOG, "%s: Record at offset %"PRIu32" was read from disk. CRC32 check: SUCCEEDED", __func__,
  451. pos);
  452. replay_record(metalogfile, header, buf + header->header_length);
  453. }
  454. freez(buf);
  455. }
  456. int load_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile *metalogfile)
  457. {
  458. uv_fs_t req;
  459. uv_file file;
  460. int ret, fd, error;
  461. uint64_t file_size;
  462. char path[RRDENG_PATH_MAX];
  463. generate_metadata_logfile_path(metalogfile, path, sizeof(path));
  464. fd = open_file_buffered_io(path, O_RDWR, &file);
  465. if (fd < 0) {
  466. ++ctx->stats.fs_errors;
  467. rrd_stat_atomic_add(&global_fs_errors, 1);
  468. return fd;
  469. }
  470. info("Loading metadata log \"%s\".", path);
  471. ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_metalog_sb));
  472. if (ret)
  473. goto error;
  474. ret = check_metadata_logfile_superblock(file);
  475. if (ret)
  476. goto error;
  477. ctx->stats.io_read_bytes += sizeof(struct rrdeng_jf_sb);
  478. ++ctx->stats.io_read_requests;
  479. metalogfile->file = file;
  480. metalogfile->pos = file_size;
  481. iterate_records(metalogfile);
  482. info("Metadata log \"%s\" loaded (size:%"PRIu64").", path, file_size);
  483. return 0;
  484. error:
  485. error = ret;
  486. ret = uv_fs_close(NULL, &req, file, NULL);
  487. if (ret < 0) {
  488. error("uv_fs_close(%s): %s", path, uv_strerror(ret));
  489. ++ctx->stats.fs_errors;
  490. rrd_stat_atomic_add(&global_fs_errors, 1);
  491. }
  492. uv_fs_req_cleanup(&req);
  493. return error;
  494. }
  495. void init_metadata_record_log(struct metadata_record_commit_log *records_log)
  496. {
  497. records_log->buf = NULL;
  498. records_log->buf_pos = 0;
  499. records_log->record_id = 1;
  500. }
  501. static int scan_metalog_files_cmp(const void *a, const void *b)
  502. {
  503. struct metadata_logfile *file1, *file2;
  504. char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX];
  505. file1 = *(struct metadata_logfile **)a;
  506. file2 = *(struct metadata_logfile **)b;
  507. generate_metadata_logfile_path(file1, path1, sizeof(path1));
  508. generate_metadata_logfile_path(file2, path2, sizeof(path2));
  509. return strcmp(path1, path2);
  510. }
  511. /* Returns number of metadata logfiles that were loaded or < 0 on error */
  512. static int scan_metalog_files(struct metalog_instance *ctx)
  513. {
  514. int ret;
  515. unsigned starting_no, no, matched_files, i, failed_to_load;
  516. static uv_fs_t req;
  517. uv_dirent_t dent;
  518. struct metadata_logfile **metalogfiles, *metalogfile;
  519. char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path;
  520. ret = uv_fs_scandir(NULL, &req, dbfiles_path, 0, NULL);
  521. if (ret < 0) {
  522. fatal_assert(req.result < 0);
  523. uv_fs_req_cleanup(&req);
  524. error("uv_fs_scandir(%s): %s", dbfiles_path, uv_strerror(ret));
  525. ++ctx->stats.fs_errors;
  526. rrd_stat_atomic_add(&global_fs_errors, 1);
  527. return ret;
  528. }
  529. info("Found %d files in path %s", ret, dbfiles_path);
  530. metalogfiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*metalogfiles));
  531. for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
  532. info("Scanning file \"%s/%s\"", dbfiles_path, dent.name);
  533. ret = sscanf(dent.name, METALOG_PREFIX METALOG_FILE_NUMBER_SCAN_TMPL METALOG_EXTENSION, &starting_no, &no);
  534. if (2 == ret) {
  535. info("Matched file \"%s/%s\"", dbfiles_path, dent.name);
  536. metalogfile = mallocz(sizeof(*metalogfile));
  537. metadata_logfile_init(metalogfile, ctx, starting_no, no);
  538. metalogfiles[matched_files++] = metalogfile;
  539. }
  540. }
  541. uv_fs_req_cleanup(&req);
  542. if (0 == matched_files) {
  543. freez(metalogfiles);
  544. return 0;
  545. }
  546. if (matched_files == MAX_DATAFILES) {
  547. error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
  548. }
  549. qsort(metalogfiles, matched_files, sizeof(*metalogfiles), scan_metalog_files_cmp);
  550. ret = compaction_failure_recovery(ctx, metalogfiles, &matched_files);
  551. if (ret) { /* If the files are corrupted fail */
  552. for (i = 0 ; i < matched_files ; ++i) {
  553. freez(metalogfiles[i]);
  554. }
  555. freez(metalogfiles);
  556. return UV_EINVAL;
  557. }
  558. ctx->last_fileno = metalogfiles[matched_files - 1]->fileno;
  559. struct plugind cd = {
  560. .enabled = 1,
  561. .update_every = 0,
  562. .pid = 0,
  563. .serial_failures = 0,
  564. .successful_collections = 0,
  565. .obsolete = 0,
  566. .started_t = INVALID_TIME,
  567. .next = NULL,
  568. .version = 0,
  569. };
  570. struct metalog_pluginsd_state metalog_parser_state;
  571. metalog_pluginsd_state_init(&metalog_parser_state, ctx);
  572. PARSER_USER_OBJECT metalog_parser_object;
  573. metalog_parser_object.enabled = cd.enabled;
  574. metalog_parser_object.host = ctx->rrdeng_ctx->host;
  575. metalog_parser_object.cd = &cd;
  576. metalog_parser_object.trust_durations = 0;
  577. metalog_parser_object.private = &metalog_parser_state;
  578. PARSER *parser = parser_init(metalog_parser_object.host, &metalog_parser_object, NULL, PARSER_INPUT_SPLIT);
  579. if (unlikely(!parser)) {
  580. error("Failed to initialize metadata log parser.");
  581. failed_to_load = matched_files;
  582. goto after_failed_to_parse;
  583. }
  584. parser_add_keyword(parser, PLUGINSD_KEYWORD_HOST, metalog_pluginsd_host);
  585. parser_add_keyword(parser, PLUGINSD_KEYWORD_GUID, pluginsd_guid);
  586. parser_add_keyword(parser, PLUGINSD_KEYWORD_CONTEXT, pluginsd_context);
  587. parser_add_keyword(parser, PLUGINSD_KEYWORD_TOMBSTONE, pluginsd_tombstone);
  588. parser->plugins_action->dimension_action = &metalog_pluginsd_dimension_action;
  589. parser->plugins_action->chart_action = &metalog_pluginsd_chart_action;
  590. parser->plugins_action->guid_action = &metalog_pluginsd_guid_action;
  591. parser->plugins_action->context_action = &metalog_pluginsd_context_action;
  592. parser->plugins_action->tombstone_action = &metalog_pluginsd_tombstone_action;
  593. parser->plugins_action->host_action = &metalog_pluginsd_host_action;
  594. metalog_parser_object.parser = parser;
  595. ctx->metalog_parser_object = &metalog_parser_object;
  596. for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
  597. metalogfile = metalogfiles[i];
  598. ret = load_metadata_logfile(ctx, metalogfile);
  599. if (0 != ret) {
  600. freez(metalogfile);
  601. ++failed_to_load;
  602. break;
  603. }
  604. metadata_logfile_list_insert(&ctx->metadata_logfiles, metalogfile);
  605. rrd_atomic_fetch_add(&ctx->disk_space, metalogfile->pos);
  606. }
  607. debug(D_METADATALOG, "PARSER ended");
  608. parser_destroy(parser);
  609. size_t count = metalog_parser_object.count;
  610. debug(D_METADATALOG, "Parsing count=%u", (unsigned)count);
  611. after_failed_to_parse:
  612. freez(metalogfiles);
  613. if (failed_to_load) {
  614. error("%u metadata log files failed to load.", failed_to_load);
  615. finalize_metalog_files(ctx);
  616. return UV_EIO;
  617. }
  618. return matched_files;
  619. }
  620. /* Creates a metadata log file */
  621. int add_new_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile_list *logfile_list,
  622. unsigned starting_fileno, unsigned fileno)
  623. {
  624. struct metadata_logfile *metalogfile;
  625. int ret;
  626. char path[RRDENG_PATH_MAX];
  627. info("Creating new metadata log file in path %s", ctx->rrdeng_ctx->dbfiles_path);
  628. metalogfile = mallocz(sizeof(*metalogfile));
  629. metadata_logfile_init(metalogfile, ctx, starting_fileno, fileno);
  630. ret = create_metadata_logfile(metalogfile);
  631. if (!ret) {
  632. generate_metadata_logfile_path(metalogfile, path, sizeof(path));
  633. info("Created metadata log file \"%s\".", path);
  634. } else {
  635. freez(metalogfile);
  636. return ret;
  637. }
  638. metadata_logfile_list_insert(logfile_list, metalogfile);
  639. rrd_atomic_fetch_add(&ctx->disk_space, metalogfile->pos);
  640. return 0;
  641. }
  642. /* Return 0 on success. */
  643. int init_metalog_files(struct metalog_instance *ctx)
  644. {
  645. int ret;
  646. char *dbfiles_path = ctx->rrdeng_ctx->dbfiles_path;
  647. ret = scan_metalog_files(ctx);
  648. if (ret < 0) {
  649. error("Failed to scan path \"%s\".", dbfiles_path);
  650. return ret;
  651. } else if (0 == ret) {
  652. info("Metadata log files not found, creating in path \"%s\".", dbfiles_path);
  653. ret = add_new_metadata_logfile(ctx, &ctx->metadata_logfiles, 0, 1);
  654. if (ret) {
  655. error("Failed to create metadata log file in path \"%s\".", dbfiles_path);
  656. return ret;
  657. }
  658. ctx->last_fileno = 1;
  659. }
  660. return 0;
  661. }
  662. void finalize_metalog_files(struct metalog_instance *ctx)
  663. {
  664. struct metadata_logfile *metalogfile, *next_metalogfile;
  665. struct metalog_record_block *record_block, *next_record_block;
  666. for (metalogfile = ctx->metadata_logfiles.first ; metalogfile != NULL ; metalogfile = next_metalogfile) {
  667. next_metalogfile = metalogfile->next;
  668. for (record_block = metalogfile->records.first ; record_block != NULL ; record_block = next_record_block) {
  669. next_record_block = record_block->next;
  670. freez(record_block);
  671. }
  672. close_metadata_logfile(metalogfile);
  673. freez(metalogfile);
  674. }
  675. }