ebpf_mdflush.c 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "ebpf.h"
  3. #include "ebpf_mdflush.h"
  4. struct config mdflush_config = { .first_section = NULL,
  5. .last_section = NULL,
  6. .mutex = NETDATA_MUTEX_INITIALIZER,
  7. .index = { .avl_tree = { .root = NULL, .compar = appconfig_section_compare },
  8. .rwlock = AVL_LOCK_INITIALIZER } };
  9. #define MDFLUSH_MAP_COUNT 0
  10. static ebpf_local_maps_t mdflush_maps[] = {
  11. {
  12. .name = "tbl_mdflush",
  13. .internal_input = 1024,
  14. .user_input = 0,
  15. .type = NETDATA_EBPF_MAP_STATIC,
  16. .map_fd = ND_EBPF_MAP_FD_NOT_INITIALIZED
  17. },
  18. /* end */
  19. {
  20. .name = NULL,
  21. .internal_input = 0,
  22. .user_input = 0,
  23. .type = NETDATA_EBPF_MAP_CONTROLLER,
  24. .map_fd = ND_EBPF_MAP_FD_NOT_INITIALIZED
  25. }
  26. };
  27. // store for "published" data from the reader thread, which the collector
  28. // thread will write to netdata agent.
  29. static avl_tree_lock mdflush_pub;
  30. // tmp store for mdflush values we get from a per-CPU eBPF map.
  31. static mdflush_ebpf_val_t *mdflush_ebpf_vals = NULL;
  32. static struct netdata_static_thread mdflush_threads = {
  33. .name = "MDFLUSH KERNEL",
  34. .config_section = NULL,
  35. .config_name = NULL,
  36. .env_name = NULL,
  37. .enabled = 1,
  38. .thread = NULL,
  39. .init_routine = NULL,
  40. .start_routine = NULL
  41. };
  42. static enum ebpf_threads_status ebpf_mdflush_exited = NETDATA_THREAD_EBPF_RUNNING;
  43. /**
  44. * MDflush exit
  45. *
  46. * Cancel thread and exit.
  47. *
  48. * @param ptr thread data.
  49. */
  50. static void mdflush_exit(void *ptr)
  51. {
  52. ebpf_module_t *em = (ebpf_module_t *)ptr;
  53. if (!em->enabled) {
  54. em->enabled = NETDATA_MAIN_THREAD_EXITED;
  55. return;
  56. }
  57. ebpf_mdflush_exited = NETDATA_THREAD_EBPF_STOPPING;
  58. }
  59. /**
  60. * CLeanup
  61. *
  62. * Clean allocated memory.
  63. *
  64. * @param ptr thread data.
  65. */
  66. static void mdflush_cleanup(void *ptr)
  67. {
  68. ebpf_module_t *em = (ebpf_module_t *)ptr;
  69. if (ebpf_mdflush_exited != NETDATA_THREAD_EBPF_STOPPED)
  70. return;
  71. freez(mdflush_ebpf_vals);
  72. freez(mdflush_threads.thread);
  73. mdflush_threads.enabled = NETDATA_MAIN_THREAD_EXITED;
  74. em->enabled = NETDATA_MAIN_THREAD_EXITED;
  75. }
  76. /**
  77. * Compare mdflush values.
  78. *
  79. * @param a `netdata_mdflush_t *`.
  80. * @param b `netdata_mdflush_t *`.
  81. *
  82. * @return 0 if a==b, 1 if a>b, -1 if a<b.
  83. */
  84. static int mdflush_val_cmp(void *a, void *b)
  85. {
  86. netdata_mdflush_t *ptr1 = a;
  87. netdata_mdflush_t *ptr2 = b;
  88. if (ptr1->unit > ptr2->unit) {
  89. return 1;
  90. }
  91. else if (ptr1->unit < ptr2->unit) {
  92. return -1;
  93. }
  94. else {
  95. return 0;
  96. }
  97. }
  98. static void mdflush_read_count_map()
  99. {
  100. int mapfd = mdflush_maps[MDFLUSH_MAP_COUNT].map_fd;
  101. mdflush_ebpf_key_t curr_key = (uint32_t)-1;
  102. mdflush_ebpf_key_t key = (uint32_t)-1;
  103. netdata_mdflush_t search_v;
  104. netdata_mdflush_t *v = NULL;
  105. while (bpf_map_get_next_key(mapfd, &curr_key, &key) == 0) {
  106. curr_key = key;
  107. // get val for this key.
  108. int test = bpf_map_lookup_elem(mapfd, &key, mdflush_ebpf_vals);
  109. if (unlikely(test < 0)) {
  110. continue;
  111. }
  112. // is this record saved yet?
  113. //
  114. // if not, make a new one, mark it as unsaved for now, and continue; we
  115. // will insert it at the end after all of its values are correctly set,
  116. // so that we can safely publish it to the collector within a single,
  117. // short locked operation.
  118. //
  119. // otherwise simply continue; we will only update the flush count,
  120. // which can be republished safely without a lock.
  121. //
  122. // NOTE: lock isn't strictly necessary for this initial search, as only
  123. // this thread does writing, but the AVL is using a read-write lock so
  124. // there is no congestion.
  125. bool v_is_new = false;
  126. search_v.unit = key;
  127. v = (netdata_mdflush_t *)avl_search_lock(
  128. &mdflush_pub,
  129. (avl_t *)&search_v
  130. );
  131. if (unlikely(v == NULL)) {
  132. // flush count can only be added reliably at a later time.
  133. // when they're added, only then will we AVL insert.
  134. v = callocz(1, sizeof(netdata_mdflush_t));
  135. v->unit = key;
  136. sprintf(v->disk_name, "md%u", key);
  137. v->dim_exists = false;
  138. v_is_new = true;
  139. }
  140. // we must add up count value for this record across all CPUs.
  141. uint64_t total_cnt = 0;
  142. int i;
  143. int end = (running_on_kernel < NETDATA_KERNEL_V4_15) ? 1 : ebpf_nprocs;
  144. for (i = 0; i < end; i++) {
  145. total_cnt += mdflush_ebpf_vals[i];
  146. }
  147. // can now safely publish count for existing records.
  148. v->cnt = total_cnt;
  149. // can now safely publish new record.
  150. if (v_is_new) {
  151. avl_t *check = avl_insert_lock(&mdflush_pub, (avl_t *)v);
  152. if (check != (avl_t *)v) {
  153. error("Internal error, cannot insert the AVL tree.");
  154. }
  155. }
  156. }
  157. }
  158. /**
  159. * Read eBPF maps for mdflush.
  160. */
  161. static void *mdflush_reader(void *ptr)
  162. {
  163. netdata_thread_cleanup_push(mdflush_cleanup, ptr);
  164. heartbeat_t hb;
  165. heartbeat_init(&hb);
  166. ebpf_module_t *em = (ebpf_module_t *)ptr;
  167. usec_t step = NETDATA_MDFLUSH_SLEEP_MS * em->update_every;
  168. while (ebpf_mdflush_exited == NETDATA_THREAD_EBPF_RUNNING) {
  169. usec_t dt = heartbeat_next(&hb, step);
  170. UNUSED(dt);
  171. if (ebpf_mdflush_exited == NETDATA_THREAD_EBPF_STOPPING)
  172. break;
  173. mdflush_read_count_map();
  174. }
  175. ebpf_mdflush_exited = NETDATA_THREAD_EBPF_STOPPED;
  176. netdata_thread_cleanup_pop(1);
  177. return NULL;
  178. }
  179. static void mdflush_create_charts(int update_every)
  180. {
  181. ebpf_create_chart(
  182. "mdstat",
  183. "mdstat_flush",
  184. "MD flushes",
  185. "flushes",
  186. "flush (eBPF)",
  187. "md.flush",
  188. NETDATA_EBPF_CHART_TYPE_STACKED,
  189. NETDATA_CHART_PRIO_MDSTAT_FLUSH,
  190. NULL, NULL, 0, update_every,
  191. NETDATA_EBPF_MODULE_NAME_MDFLUSH
  192. );
  193. fflush(stdout);
  194. }
  195. // callback for avl tree traversal on `mdflush_pub`.
  196. static int mdflush_write_dims(void *entry, void *data)
  197. {
  198. UNUSED(data);
  199. netdata_mdflush_t *v = entry;
  200. // records get dynamically added in, so add the dim if we haven't yet.
  201. if (!v->dim_exists) {
  202. ebpf_write_global_dimension(
  203. v->disk_name, v->disk_name,
  204. ebpf_algorithms[NETDATA_EBPF_INCREMENTAL_IDX]
  205. );
  206. v->dim_exists = true;
  207. }
  208. write_chart_dimension(v->disk_name, v->cnt);
  209. return 1;
  210. }
  211. /**
  212. * Main loop for this collector.
  213. */
  214. static void mdflush_collector(ebpf_module_t *em)
  215. {
  216. mdflush_ebpf_vals = callocz(ebpf_nprocs, sizeof(mdflush_ebpf_val_t));
  217. avl_init_lock(&mdflush_pub, mdflush_val_cmp);
  218. // create reader thread.
  219. mdflush_threads.thread = mallocz(sizeof(netdata_thread_t));
  220. mdflush_threads.start_routine = mdflush_reader;
  221. netdata_thread_create(
  222. mdflush_threads.thread,
  223. mdflush_threads.name,
  224. NETDATA_THREAD_OPTION_DEFAULT,
  225. mdflush_reader,
  226. em
  227. );
  228. // create chart and static dims.
  229. pthread_mutex_lock(&lock);
  230. mdflush_create_charts(em->update_every);
  231. ebpf_update_stats(&plugin_statistics, em);
  232. pthread_mutex_unlock(&lock);
  233. // loop and read from published data until ebpf plugin is closed.
  234. heartbeat_t hb;
  235. heartbeat_init(&hb);
  236. usec_t step = em->update_every * USEC_PER_SEC;
  237. while (!ebpf_exit_plugin) {
  238. (void)heartbeat_next(&hb, step);
  239. if (ebpf_exit_plugin)
  240. break;
  241. // write dims now for all hitherto discovered devices.
  242. write_begin_chart("mdstat", "mdstat_flush");
  243. avl_traverse_lock(&mdflush_pub, mdflush_write_dims, NULL);
  244. write_end_chart();
  245. pthread_mutex_unlock(&lock);
  246. }
  247. }
  248. /**
  249. * mdflush thread.
  250. *
  251. * @param ptr a `ebpf_module_t *`.
  252. * @return always NULL.
  253. */
  254. void *ebpf_mdflush_thread(void *ptr)
  255. {
  256. netdata_thread_cleanup_push(mdflush_exit, ptr);
  257. ebpf_module_t *em = (ebpf_module_t *)ptr;
  258. em->maps = mdflush_maps;
  259. char *md_flush_request = ebpf_find_symbol("md_flush_request");
  260. if (!md_flush_request) {
  261. em->enabled = CONFIG_BOOLEAN_NO;
  262. error("Cannot monitor MD devices, because md is not loaded.");
  263. }
  264. freez(md_flush_request);
  265. if (!em->enabled) {
  266. goto endmdflush;
  267. }
  268. em->probe_links = ebpf_load_program(ebpf_plugin_dir, em, running_on_kernel, isrh, &em->objects);
  269. if (!em->probe_links) {
  270. em->enabled = CONFIG_BOOLEAN_NO;
  271. goto endmdflush;
  272. }
  273. mdflush_collector(em);
  274. endmdflush:
  275. if (!em->enabled)
  276. ebpf_update_disabled_plugin_stats(em);
  277. netdata_thread_cleanup_pop(1);
  278. return NULL;
  279. }