datafile.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "rrdengine.h"
  3. void df_extent_insert(struct extent_info *extent)
  4. {
  5. struct rrdengine_datafile *datafile = extent->datafile;
  6. if (likely(NULL != datafile->extents.last)) {
  7. datafile->extents.last->next = extent;
  8. }
  9. if (unlikely(NULL == datafile->extents.first)) {
  10. datafile->extents.first = extent;
  11. }
  12. datafile->extents.last = extent;
  13. }
  14. void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
  15. {
  16. if (likely(NULL != ctx->datafiles.last)) {
  17. ctx->datafiles.last->next = datafile;
  18. }
  19. if (unlikely(NULL == ctx->datafiles.first)) {
  20. ctx->datafiles.first = datafile;
  21. }
  22. ctx->datafiles.last = datafile;
  23. }
  24. void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
  25. {
  26. struct rrdengine_datafile *next;
  27. next = datafile->next;
  28. fatal_assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile));
  29. ctx->datafiles.first = next;
  30. }
  31. static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_instance *ctx,
  32. unsigned tier, unsigned fileno)
  33. {
  34. fatal_assert(tier == 1);
  35. datafile->tier = tier;
  36. datafile->fileno = fileno;
  37. datafile->file = (uv_file)0;
  38. datafile->pos = 0;
  39. datafile->extents.first = datafile->extents.last = NULL; /* will be populated by journalfile */
  40. datafile->journalfile = NULL;
  41. datafile->next = NULL;
  42. datafile->ctx = ctx;
  43. }
  44. void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
  45. {
  46. (void) snprintfz(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION,
  47. datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
  48. }
  49. int close_data_file(struct rrdengine_datafile *datafile)
  50. {
  51. struct rrdengine_instance *ctx = datafile->ctx;
  52. uv_fs_t req;
  53. int ret;
  54. char path[RRDENG_PATH_MAX];
  55. generate_datafilepath(datafile, path, sizeof(path));
  56. ret = uv_fs_close(NULL, &req, datafile->file, NULL);
  57. if (ret < 0) {
  58. error("uv_fs_close(%s): %s", path, uv_strerror(ret));
  59. ++ctx->stats.fs_errors;
  60. rrd_stat_atomic_add(&global_fs_errors, 1);
  61. }
  62. uv_fs_req_cleanup(&req);
  63. return ret;
  64. }
  65. int unlink_data_file(struct rrdengine_datafile *datafile)
  66. {
  67. struct rrdengine_instance *ctx = datafile->ctx;
  68. uv_fs_t req;
  69. int ret;
  70. char path[RRDENG_PATH_MAX];
  71. generate_datafilepath(datafile, path, sizeof(path));
  72. ret = uv_fs_unlink(NULL, &req, path, NULL);
  73. if (ret < 0) {
  74. error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
  75. ++ctx->stats.fs_errors;
  76. rrd_stat_atomic_add(&global_fs_errors, 1);
  77. }
  78. uv_fs_req_cleanup(&req);
  79. ++ctx->stats.datafile_deletions;
  80. return ret;
  81. }
  82. int destroy_data_file(struct rrdengine_datafile *datafile)
  83. {
  84. struct rrdengine_instance *ctx = datafile->ctx;
  85. uv_fs_t req;
  86. int ret;
  87. char path[RRDENG_PATH_MAX];
  88. generate_datafilepath(datafile, path, sizeof(path));
  89. ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL);
  90. if (ret < 0) {
  91. error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
  92. ++ctx->stats.fs_errors;
  93. rrd_stat_atomic_add(&global_fs_errors, 1);
  94. }
  95. uv_fs_req_cleanup(&req);
  96. ret = uv_fs_close(NULL, &req, datafile->file, NULL);
  97. if (ret < 0) {
  98. error("uv_fs_close(%s): %s", path, uv_strerror(ret));
  99. ++ctx->stats.fs_errors;
  100. rrd_stat_atomic_add(&global_fs_errors, 1);
  101. }
  102. uv_fs_req_cleanup(&req);
  103. ret = uv_fs_unlink(NULL, &req, path, NULL);
  104. if (ret < 0) {
  105. error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
  106. ++ctx->stats.fs_errors;
  107. rrd_stat_atomic_add(&global_fs_errors, 1);
  108. }
  109. uv_fs_req_cleanup(&req);
  110. ++ctx->stats.datafile_deletions;
  111. return ret;
  112. }
  113. int create_data_file(struct rrdengine_datafile *datafile)
  114. {
  115. struct rrdengine_instance *ctx = datafile->ctx;
  116. uv_fs_t req;
  117. uv_file file;
  118. int ret, fd;
  119. struct rrdeng_df_sb *superblock;
  120. uv_buf_t iov;
  121. char path[RRDENG_PATH_MAX];
  122. generate_datafilepath(datafile, path, sizeof(path));
  123. fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
  124. if (fd < 0) {
  125. ++ctx->stats.fs_errors;
  126. rrd_stat_atomic_add(&global_fs_errors, 1);
  127. return fd;
  128. }
  129. datafile->file = file;
  130. ++ctx->stats.datafile_creations;
  131. ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
  132. if (unlikely(ret)) {
  133. fatal("posix_memalign:%s", strerror(ret));
  134. }
  135. (void) strncpy(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ);
  136. (void) strncpy(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ);
  137. superblock->tier = 1;
  138. iov = uv_buf_init((void *)superblock, sizeof(*superblock));
  139. ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
  140. if (ret < 0) {
  141. fatal_assert(req.result < 0);
  142. error("uv_fs_write: %s", uv_strerror(ret));
  143. ++ctx->stats.io_errors;
  144. rrd_stat_atomic_add(&global_io_errors, 1);
  145. }
  146. uv_fs_req_cleanup(&req);
  147. free(superblock);
  148. if (ret < 0) {
  149. destroy_data_file(datafile);
  150. return ret;
  151. }
  152. datafile->pos = sizeof(*superblock);
  153. ctx->stats.io_write_bytes += sizeof(*superblock);
  154. ++ctx->stats.io_write_requests;
  155. return 0;
  156. }
  157. static int check_data_file_superblock(uv_file file)
  158. {
  159. int ret;
  160. struct rrdeng_df_sb *superblock;
  161. uv_buf_t iov;
  162. uv_fs_t req;
  163. ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
  164. if (unlikely(ret)) {
  165. fatal("posix_memalign:%s", strerror(ret));
  166. }
  167. iov = uv_buf_init((void *)superblock, sizeof(*superblock));
  168. ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
  169. if (ret < 0) {
  170. error("uv_fs_read: %s", uv_strerror(ret));
  171. uv_fs_req_cleanup(&req);
  172. goto error;
  173. }
  174. fatal_assert(req.result >= 0);
  175. uv_fs_req_cleanup(&req);
  176. if (strncmp(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ) ||
  177. strncmp(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ) ||
  178. superblock->tier != 1) {
  179. error("File has invalid superblock.");
  180. ret = UV_EINVAL;
  181. } else {
  182. ret = 0;
  183. }
  184. error:
  185. free(superblock);
  186. return ret;
  187. }
  188. static int load_data_file(struct rrdengine_datafile *datafile)
  189. {
  190. struct rrdengine_instance *ctx = datafile->ctx;
  191. uv_fs_t req;
  192. uv_file file;
  193. int ret, fd, error;
  194. uint64_t file_size;
  195. char path[RRDENG_PATH_MAX];
  196. generate_datafilepath(datafile, path, sizeof(path));
  197. fd = open_file_direct_io(path, O_RDWR, &file);
  198. if (fd < 0) {
  199. ++ctx->stats.fs_errors;
  200. rrd_stat_atomic_add(&global_fs_errors, 1);
  201. return fd;
  202. }
  203. info("Initializing data file \"%s\".", path);
  204. ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
  205. if (ret)
  206. goto error;
  207. file_size = ALIGN_BYTES_CEILING(file_size);
  208. ret = check_data_file_superblock(file);
  209. if (ret)
  210. goto error;
  211. ctx->stats.io_read_bytes += sizeof(struct rrdeng_df_sb);
  212. ++ctx->stats.io_read_requests;
  213. datafile->file = file;
  214. datafile->pos = file_size;
  215. info("Data file \"%s\" initialized (size:%"PRIu64").", path, file_size);
  216. return 0;
  217. error:
  218. error = ret;
  219. ret = uv_fs_close(NULL, &req, file, NULL);
  220. if (ret < 0) {
  221. error("uv_fs_close(%s): %s", path, uv_strerror(ret));
  222. ++ctx->stats.fs_errors;
  223. rrd_stat_atomic_add(&global_fs_errors, 1);
  224. }
  225. uv_fs_req_cleanup(&req);
  226. return error;
  227. }
  228. static int scan_data_files_cmp(const void *a, const void *b)
  229. {
  230. struct rrdengine_datafile *file1, *file2;
  231. char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX];
  232. file1 = *(struct rrdengine_datafile **)a;
  233. file2 = *(struct rrdengine_datafile **)b;
  234. generate_datafilepath(file1, path1, sizeof(path1));
  235. generate_datafilepath(file2, path2, sizeof(path2));
  236. return strcmp(path1, path2);
  237. }
  238. /* Returns number of datafiles that were loaded or < 0 on error */
  239. static int scan_data_files(struct rrdengine_instance *ctx)
  240. {
  241. int ret;
  242. unsigned tier, no, matched_files, i,failed_to_load;
  243. static uv_fs_t req;
  244. uv_dirent_t dent;
  245. struct rrdengine_datafile **datafiles, *datafile;
  246. struct rrdengine_journalfile *journalfile;
  247. ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL);
  248. if (ret < 0) {
  249. fatal_assert(req.result < 0);
  250. uv_fs_req_cleanup(&req);
  251. error("uv_fs_scandir(%s): %s", ctx->dbfiles_path, uv_strerror(ret));
  252. ++ctx->stats.fs_errors;
  253. rrd_stat_atomic_add(&global_fs_errors, 1);
  254. return ret;
  255. }
  256. info("Found %d files in path %s", ret, ctx->dbfiles_path);
  257. datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles));
  258. for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
  259. info("Scanning file \"%s/%s\"", ctx->dbfiles_path, dent.name);
  260. ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no);
  261. if (2 == ret) {
  262. info("Matched file \"%s/%s\"", ctx->dbfiles_path, dent.name);
  263. datafile = mallocz(sizeof(*datafile));
  264. datafile_init(datafile, ctx, tier, no);
  265. datafiles[matched_files++] = datafile;
  266. }
  267. }
  268. uv_fs_req_cleanup(&req);
  269. if (0 == matched_files) {
  270. freez(datafiles);
  271. return 0;
  272. }
  273. if (matched_files == MAX_DATAFILES) {
  274. error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
  275. }
  276. qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp);
  277. /* TODO: change this when tiering is implemented */
  278. ctx->last_fileno = datafiles[matched_files - 1]->fileno;
  279. for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
  280. uint8_t must_delete_pair = 0;
  281. datafile = datafiles[i];
  282. ret = load_data_file(datafile);
  283. if (0 != ret) {
  284. must_delete_pair = 1;
  285. }
  286. journalfile = mallocz(sizeof(*journalfile));
  287. datafile->journalfile = journalfile;
  288. journalfile_init(journalfile, datafile);
  289. ret = load_journal_file(ctx, journalfile, datafile);
  290. if (0 != ret) {
  291. if (!must_delete_pair) /* If datafile is still open close it */
  292. close_data_file(datafile);
  293. must_delete_pair = 1;
  294. }
  295. if (must_delete_pair) {
  296. char path[RRDENG_PATH_MAX];
  297. error("Deleting invalid data and journal file pair.");
  298. ret = unlink_journal_file(journalfile);
  299. if (!ret) {
  300. generate_journalfilepath(datafile, path, sizeof(path));
  301. info("Deleted journal file \"%s\".", path);
  302. }
  303. ret = unlink_data_file(datafile);
  304. if (!ret) {
  305. generate_datafilepath(datafile, path, sizeof(path));
  306. info("Deleted data file \"%s\".", path);
  307. }
  308. freez(journalfile);
  309. freez(datafile);
  310. ++failed_to_load;
  311. continue;
  312. }
  313. datafile_list_insert(ctx, datafile);
  314. ctx->disk_space += datafile->pos + journalfile->pos;
  315. }
  316. matched_files -= failed_to_load;
  317. freez(datafiles);
  318. return matched_files;
  319. }
  320. /* Creates a datafile and a journalfile pair */
  321. int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno)
  322. {
  323. struct rrdengine_datafile *datafile;
  324. struct rrdengine_journalfile *journalfile;
  325. int ret;
  326. char path[RRDENG_PATH_MAX];
  327. info("Creating new data and journal files in path %s", ctx->dbfiles_path);
  328. datafile = mallocz(sizeof(*datafile));
  329. datafile_init(datafile, ctx, tier, fileno);
  330. ret = create_data_file(datafile);
  331. if (!ret) {
  332. generate_datafilepath(datafile, path, sizeof(path));
  333. info("Created data file \"%s\".", path);
  334. } else {
  335. goto error_after_datafile;
  336. }
  337. journalfile = mallocz(sizeof(*journalfile));
  338. datafile->journalfile = journalfile;
  339. journalfile_init(journalfile, datafile);
  340. ret = create_journal_file(journalfile, datafile);
  341. if (!ret) {
  342. generate_journalfilepath(datafile, path, sizeof(path));
  343. info("Created journal file \"%s\".", path);
  344. } else {
  345. goto error_after_journalfile;
  346. }
  347. datafile_list_insert(ctx, datafile);
  348. ctx->disk_space += datafile->pos + journalfile->pos;
  349. return 0;
  350. error_after_journalfile:
  351. destroy_data_file(datafile);
  352. freez(journalfile);
  353. error_after_datafile:
  354. freez(datafile);
  355. return ret;
  356. }
  357. /* Page cache must already be initialized.
  358. * Return 0 on success.
  359. */
  360. int init_data_files(struct rrdengine_instance *ctx)
  361. {
  362. int ret;
  363. ret = scan_data_files(ctx);
  364. if (ret < 0) {
  365. error("Failed to scan path \"%s\".", ctx->dbfiles_path);
  366. return ret;
  367. } else if (0 == ret) {
  368. info("Data files not found, creating in path \"%s\".", ctx->dbfiles_path);
  369. ret = create_new_datafile_pair(ctx, 1, 1);
  370. if (ret) {
  371. error("Failed to create data and journal files in path \"%s\".", ctx->dbfiles_path);
  372. return ret;
  373. }
  374. ctx->last_fileno = 1;
  375. }
  376. return 0;
  377. }
  378. void finalize_data_files(struct rrdengine_instance *ctx)
  379. {
  380. struct rrdengine_datafile *datafile, *next_datafile;
  381. struct rrdengine_journalfile *journalfile;
  382. struct extent_info *extent, *next_extent;
  383. for (datafile = ctx->datafiles.first ; datafile != NULL ; datafile = next_datafile) {
  384. journalfile = datafile->journalfile;
  385. next_datafile = datafile->next;
  386. for (extent = datafile->extents.first ; extent != NULL ; extent = next_extent) {
  387. next_extent = extent->next;
  388. freez(extent);
  389. }
  390. close_journal_file(journalfile, datafile);
  391. close_data_file(datafile);
  392. freez(journalfile);
  393. freez(datafile);
  394. }
  395. }