datafile.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "rrdengine.h"
  3. void df_extent_insert(struct extent_info *extent)
  4. {
  5. struct rrdengine_datafile *datafile = extent->datafile;
  6. if (likely(NULL != datafile->extents.last)) {
  7. datafile->extents.last->next = extent;
  8. }
  9. if (unlikely(NULL == datafile->extents.first)) {
  10. datafile->extents.first = extent;
  11. }
  12. datafile->extents.last = extent;
  13. }
  14. void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
  15. {
  16. if (likely(NULL != ctx->datafiles.last)) {
  17. ctx->datafiles.last->next = datafile;
  18. }
  19. if (unlikely(NULL == ctx->datafiles.first)) {
  20. ctx->datafiles.first = datafile;
  21. }
  22. ctx->datafiles.last = datafile;
  23. }
  24. void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
  25. {
  26. struct rrdengine_datafile *next;
  27. next = datafile->next;
  28. fatal_assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile));
  29. ctx->datafiles.first = next;
  30. }
  31. static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_instance *ctx,
  32. unsigned tier, unsigned fileno)
  33. {
  34. fatal_assert(tier == 1);
  35. datafile->tier = tier;
  36. datafile->fileno = fileno;
  37. datafile->file = (uv_file)0;
  38. datafile->pos = 0;
  39. datafile->extents.first = datafile->extents.last = NULL; /* will be populated by journalfile */
  40. datafile->journalfile = NULL;
  41. datafile->next = NULL;
  42. datafile->ctx = ctx;
  43. }
  44. void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
  45. {
  46. (void) snprintfz(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION,
  47. datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
  48. }
  49. int close_data_file(struct rrdengine_datafile *datafile)
  50. {
  51. struct rrdengine_instance *ctx = datafile->ctx;
  52. uv_fs_t req;
  53. int ret;
  54. char path[RRDENG_PATH_MAX];
  55. generate_datafilepath(datafile, path, sizeof(path));
  56. ret = uv_fs_close(NULL, &req, datafile->file, NULL);
  57. if (ret < 0) {
  58. error("uv_fs_close(%s): %s", path, uv_strerror(ret));
  59. ++ctx->stats.fs_errors;
  60. rrd_stat_atomic_add(&global_fs_errors, 1);
  61. }
  62. uv_fs_req_cleanup(&req);
  63. return ret;
  64. }
  65. int unlink_data_file(struct rrdengine_datafile *datafile)
  66. {
  67. struct rrdengine_instance *ctx = datafile->ctx;
  68. uv_fs_t req;
  69. int ret;
  70. char path[RRDENG_PATH_MAX];
  71. generate_datafilepath(datafile, path, sizeof(path));
  72. ret = uv_fs_unlink(NULL, &req, path, NULL);
  73. if (ret < 0) {
  74. error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
  75. ++ctx->stats.fs_errors;
  76. rrd_stat_atomic_add(&global_fs_errors, 1);
  77. }
  78. uv_fs_req_cleanup(&req);
  79. ++ctx->stats.datafile_deletions;
  80. return ret;
  81. }
  82. int destroy_data_file(struct rrdengine_datafile *datafile)
  83. {
  84. struct rrdengine_instance *ctx = datafile->ctx;
  85. uv_fs_t req;
  86. int ret;
  87. char path[RRDENG_PATH_MAX];
  88. generate_datafilepath(datafile, path, sizeof(path));
  89. ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL);
  90. if (ret < 0) {
  91. error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
  92. ++ctx->stats.fs_errors;
  93. rrd_stat_atomic_add(&global_fs_errors, 1);
  94. }
  95. uv_fs_req_cleanup(&req);
  96. ret = uv_fs_close(NULL, &req, datafile->file, NULL);
  97. if (ret < 0) {
  98. error("uv_fs_close(%s): %s", path, uv_strerror(ret));
  99. ++ctx->stats.fs_errors;
  100. rrd_stat_atomic_add(&global_fs_errors, 1);
  101. }
  102. uv_fs_req_cleanup(&req);
  103. ret = uv_fs_unlink(NULL, &req, path, NULL);
  104. if (ret < 0) {
  105. error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
  106. ++ctx->stats.fs_errors;
  107. rrd_stat_atomic_add(&global_fs_errors, 1);
  108. }
  109. uv_fs_req_cleanup(&req);
  110. ++ctx->stats.datafile_deletions;
  111. return ret;
  112. }
  113. int create_data_file(struct rrdengine_datafile *datafile)
  114. {
  115. struct rrdengine_instance *ctx = datafile->ctx;
  116. uv_fs_t req;
  117. uv_file file;
  118. int ret, fd;
  119. struct rrdeng_df_sb *superblock;
  120. uv_buf_t iov;
  121. char path[RRDENG_PATH_MAX];
  122. generate_datafilepath(datafile, path, sizeof(path));
  123. fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
  124. if (fd < 0) {
  125. ++ctx->stats.fs_errors;
  126. rrd_stat_atomic_add(&global_fs_errors, 1);
  127. return fd;
  128. }
  129. datafile->file = file;
  130. ++ctx->stats.datafile_creations;
  131. ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
  132. if (unlikely(ret)) {
  133. fatal("posix_memalign:%s", strerror(ret));
  134. }
  135. memset(superblock, 0, sizeof(*superblock));
  136. (void) strncpy(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ);
  137. (void) strncpy(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ);
  138. superblock->tier = 1;
  139. iov = uv_buf_init((void *)superblock, sizeof(*superblock));
  140. ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
  141. if (ret < 0) {
  142. fatal_assert(req.result < 0);
  143. error("uv_fs_write: %s", uv_strerror(ret));
  144. ++ctx->stats.io_errors;
  145. rrd_stat_atomic_add(&global_io_errors, 1);
  146. }
  147. uv_fs_req_cleanup(&req);
  148. free(superblock);
  149. if (ret < 0) {
  150. destroy_data_file(datafile);
  151. return ret;
  152. }
  153. datafile->pos = sizeof(*superblock);
  154. ctx->stats.io_write_bytes += sizeof(*superblock);
  155. ++ctx->stats.io_write_requests;
  156. return 0;
  157. }
  158. static int check_data_file_superblock(uv_file file)
  159. {
  160. int ret;
  161. struct rrdeng_df_sb *superblock;
  162. uv_buf_t iov;
  163. uv_fs_t req;
  164. ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
  165. if (unlikely(ret)) {
  166. fatal("posix_memalign:%s", strerror(ret));
  167. }
  168. iov = uv_buf_init((void *)superblock, sizeof(*superblock));
  169. ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
  170. if (ret < 0) {
  171. error("uv_fs_read: %s", uv_strerror(ret));
  172. uv_fs_req_cleanup(&req);
  173. goto error;
  174. }
  175. fatal_assert(req.result >= 0);
  176. uv_fs_req_cleanup(&req);
  177. if (strncmp(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ) ||
  178. strncmp(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ) ||
  179. superblock->tier != 1) {
  180. error("File has invalid superblock.");
  181. ret = UV_EINVAL;
  182. } else {
  183. ret = 0;
  184. }
  185. error:
  186. free(superblock);
  187. return ret;
  188. }
  189. static int load_data_file(struct rrdengine_datafile *datafile)
  190. {
  191. struct rrdengine_instance *ctx = datafile->ctx;
  192. uv_fs_t req;
  193. uv_file file;
  194. int ret, fd, error;
  195. uint64_t file_size;
  196. char path[RRDENG_PATH_MAX];
  197. generate_datafilepath(datafile, path, sizeof(path));
  198. fd = open_file_direct_io(path, O_RDWR, &file);
  199. if (fd < 0) {
  200. ++ctx->stats.fs_errors;
  201. rrd_stat_atomic_add(&global_fs_errors, 1);
  202. return fd;
  203. }
  204. info("Initializing data file \"%s\".", path);
  205. ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
  206. if (ret)
  207. goto error;
  208. file_size = ALIGN_BYTES_CEILING(file_size);
  209. ret = check_data_file_superblock(file);
  210. if (ret)
  211. goto error;
  212. ctx->stats.io_read_bytes += sizeof(struct rrdeng_df_sb);
  213. ++ctx->stats.io_read_requests;
  214. datafile->file = file;
  215. datafile->pos = file_size;
  216. info("Data file \"%s\" initialized (size:%"PRIu64").", path, file_size);
  217. return 0;
  218. error:
  219. error = ret;
  220. ret = uv_fs_close(NULL, &req, file, NULL);
  221. if (ret < 0) {
  222. error("uv_fs_close(%s): %s", path, uv_strerror(ret));
  223. ++ctx->stats.fs_errors;
  224. rrd_stat_atomic_add(&global_fs_errors, 1);
  225. }
  226. uv_fs_req_cleanup(&req);
  227. return error;
  228. }
  229. static int scan_data_files_cmp(const void *a, const void *b)
  230. {
  231. struct rrdengine_datafile *file1, *file2;
  232. char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX];
  233. file1 = *(struct rrdengine_datafile **)a;
  234. file2 = *(struct rrdengine_datafile **)b;
  235. generate_datafilepath(file1, path1, sizeof(path1));
  236. generate_datafilepath(file2, path2, sizeof(path2));
  237. return strcmp(path1, path2);
  238. }
  239. /* Returns number of datafiles that were loaded or < 0 on error */
  240. static int scan_data_files(struct rrdengine_instance *ctx)
  241. {
  242. int ret;
  243. unsigned tier, no, matched_files, i,failed_to_load;
  244. static uv_fs_t req;
  245. uv_dirent_t dent;
  246. struct rrdengine_datafile **datafiles, *datafile;
  247. struct rrdengine_journalfile *journalfile;
  248. ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL);
  249. if (ret < 0) {
  250. fatal_assert(req.result < 0);
  251. uv_fs_req_cleanup(&req);
  252. error("uv_fs_scandir(%s): %s", ctx->dbfiles_path, uv_strerror(ret));
  253. ++ctx->stats.fs_errors;
  254. rrd_stat_atomic_add(&global_fs_errors, 1);
  255. return ret;
  256. }
  257. info("Found %d files in path %s", ret, ctx->dbfiles_path);
  258. datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles));
  259. for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
  260. info("Scanning file \"%s/%s\"", ctx->dbfiles_path, dent.name);
  261. ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no);
  262. if (2 == ret) {
  263. info("Matched file \"%s/%s\"", ctx->dbfiles_path, dent.name);
  264. datafile = mallocz(sizeof(*datafile));
  265. datafile_init(datafile, ctx, tier, no);
  266. datafiles[matched_files++] = datafile;
  267. }
  268. }
  269. uv_fs_req_cleanup(&req);
  270. if (0 == matched_files) {
  271. freez(datafiles);
  272. return 0;
  273. }
  274. if (matched_files == MAX_DATAFILES) {
  275. error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
  276. }
  277. qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp);
  278. /* TODO: change this when tiering is implemented */
  279. ctx->last_fileno = datafiles[matched_files - 1]->fileno;
  280. for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
  281. uint8_t must_delete_pair = 0;
  282. datafile = datafiles[i];
  283. ret = load_data_file(datafile);
  284. if (0 != ret) {
  285. must_delete_pair = 1;
  286. }
  287. journalfile = mallocz(sizeof(*journalfile));
  288. datafile->journalfile = journalfile;
  289. journalfile_init(journalfile, datafile);
  290. ret = load_journal_file(ctx, journalfile, datafile);
  291. if (0 != ret) {
  292. if (!must_delete_pair) /* If datafile is still open close it */
  293. close_data_file(datafile);
  294. must_delete_pair = 1;
  295. }
  296. if (must_delete_pair) {
  297. char path[RRDENG_PATH_MAX];
  298. error("Deleting invalid data and journal file pair.");
  299. ret = unlink_journal_file(journalfile);
  300. if (!ret) {
  301. generate_journalfilepath(datafile, path, sizeof(path));
  302. info("Deleted journal file \"%s\".", path);
  303. }
  304. ret = unlink_data_file(datafile);
  305. if (!ret) {
  306. generate_datafilepath(datafile, path, sizeof(path));
  307. info("Deleted data file \"%s\".", path);
  308. }
  309. freez(journalfile);
  310. freez(datafile);
  311. ++failed_to_load;
  312. continue;
  313. }
  314. datafile_list_insert(ctx, datafile);
  315. ctx->disk_space += datafile->pos + journalfile->pos;
  316. }
  317. matched_files -= failed_to_load;
  318. freez(datafiles);
  319. return matched_files;
  320. }
  321. /* Creates a datafile and a journalfile pair */
  322. int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno)
  323. {
  324. struct rrdengine_datafile *datafile;
  325. struct rrdengine_journalfile *journalfile;
  326. int ret;
  327. char path[RRDENG_PATH_MAX];
  328. info("Creating new data and journal files in path %s", ctx->dbfiles_path);
  329. datafile = mallocz(sizeof(*datafile));
  330. datafile_init(datafile, ctx, tier, fileno);
  331. ret = create_data_file(datafile);
  332. if (!ret) {
  333. generate_datafilepath(datafile, path, sizeof(path));
  334. info("Created data file \"%s\".", path);
  335. } else {
  336. goto error_after_datafile;
  337. }
  338. journalfile = mallocz(sizeof(*journalfile));
  339. datafile->journalfile = journalfile;
  340. journalfile_init(journalfile, datafile);
  341. ret = create_journal_file(journalfile, datafile);
  342. if (!ret) {
  343. generate_journalfilepath(datafile, path, sizeof(path));
  344. info("Created journal file \"%s\".", path);
  345. } else {
  346. goto error_after_journalfile;
  347. }
  348. datafile_list_insert(ctx, datafile);
  349. ctx->disk_space += datafile->pos + journalfile->pos;
  350. return 0;
  351. error_after_journalfile:
  352. destroy_data_file(datafile);
  353. freez(journalfile);
  354. error_after_datafile:
  355. freez(datafile);
  356. return ret;
  357. }
  358. /* Page cache must already be initialized.
  359. * Return 0 on success.
  360. */
  361. int init_data_files(struct rrdengine_instance *ctx)
  362. {
  363. int ret;
  364. ret = scan_data_files(ctx);
  365. if (ret < 0) {
  366. error("Failed to scan path \"%s\".", ctx->dbfiles_path);
  367. return ret;
  368. } else if (0 == ret) {
  369. info("Data files not found, creating in path \"%s\".", ctx->dbfiles_path);
  370. ret = create_new_datafile_pair(ctx, 1, 1);
  371. if (ret) {
  372. error("Failed to create data and journal files in path \"%s\".", ctx->dbfiles_path);
  373. return ret;
  374. }
  375. ctx->last_fileno = 1;
  376. }
  377. return 0;
  378. }
  379. void finalize_data_files(struct rrdengine_instance *ctx)
  380. {
  381. struct rrdengine_datafile *datafile, *next_datafile;
  382. struct rrdengine_journalfile *journalfile;
  383. struct extent_info *extent, *next_extent;
  384. size_t extents_number = 0;
  385. size_t extents_bytes = 0;
  386. size_t page_compressed_sizes = 0;
  387. size_t files_number = 0;
  388. size_t files_bytes = 0;
  389. for (datafile = ctx->datafiles.first ; datafile != NULL ; datafile = next_datafile) {
  390. journalfile = datafile->journalfile;
  391. next_datafile = datafile->next;
  392. for (extent = datafile->extents.first ; extent != NULL ; extent = next_extent) {
  393. extents_number++;
  394. extents_bytes += sizeof(*extent) + sizeof(struct rrdeng_page_descr *) * extent->number_of_pages;
  395. page_compressed_sizes += extent->size;
  396. next_extent = extent->next;
  397. freez(extent);
  398. }
  399. close_journal_file(journalfile, datafile);
  400. close_data_file(datafile);
  401. files_number++;
  402. files_bytes += sizeof(*journalfile) + sizeof(*datafile);
  403. freez(journalfile);
  404. freez(datafile);
  405. }
  406. if(!files_number) files_number = 1;
  407. if(!extents_number) extents_number = 1;
  408. info("DBENGINE STATISTICS ON DATAFILES:"
  409. " Files %zu, structures %zu bytes, %0.2f bytes per file."
  410. " Extents %zu, structures %zu bytes, %0.2f bytes per extent."
  411. " Compressed size of all pages: %zu bytes."
  412. , files_number, files_bytes, (double)files_bytes/files_number
  413. , extents_number, extents_bytes, (double)extents_bytes/extents_number
  414. , page_compressed_sizes
  415. );
  416. }