dyn_conf.c 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "dyn_conf.h"
  3. #define DYN_CONF_PATH_MAX (4096)
  4. #define DYN_CONF_DIR VARLIB_DIR "/etc"
  5. #define DYN_CONF_JOB_SCHEMA "job_schema"
  6. #define DYN_CONF_SCHEMA "schema"
  7. #define DYN_CONF_MODULE_LIST "modules"
  8. #define DYN_CONF_JOB_LIST "jobs"
  9. #define DYN_CONF_CFG_EXT ".cfg"
  10. DICTIONARY *plugins_dict = NULL;
  11. struct deferred_cfg_send {
  12. char *plugin_name;
  13. char *module_name;
  14. char *job_name;
  15. struct deferred_cfg_send *next;
  16. };
  17. bool dyncfg_shutdown = false;
  18. struct deferred_cfg_send *deferred_configs = NULL;
  19. pthread_mutex_t deferred_configs_lock = PTHREAD_MUTEX_INITIALIZER;
  20. pthread_cond_t deferred_configs_cond = PTHREAD_COND_INITIALIZER;
  21. static void deferred_config_free(struct deferred_cfg_send *dcs)
  22. {
  23. freez(dcs->plugin_name);
  24. freez(dcs->module_name);
  25. freez(dcs->job_name);
  26. freez(dcs);
  27. }
  28. static void deferred_config_push_back(const char *plugin_name, const char *module_name, const char *job_name)
  29. {
  30. struct deferred_cfg_send *deferred = callocz(1, sizeof(struct deferred_cfg_send));
  31. deferred->plugin_name = strdupz(plugin_name);
  32. if (module_name != NULL) {
  33. deferred->module_name = strdupz(module_name);
  34. if (job_name != NULL)
  35. deferred->job_name = strdupz(job_name);
  36. }
  37. pthread_mutex_lock(&deferred_configs_lock);
  38. if (dyncfg_shutdown) {
  39. pthread_mutex_unlock(&deferred_configs_lock);
  40. deferred_config_free(deferred);
  41. return;
  42. }
  43. struct deferred_cfg_send *last = deferred_configs;
  44. if (last == NULL)
  45. deferred_configs = deferred;
  46. else {
  47. while (last->next != NULL)
  48. last = last->next;
  49. last->next = deferred;
  50. }
  51. pthread_cond_signal(&deferred_configs_cond);
  52. pthread_mutex_unlock(&deferred_configs_lock);
  53. }
  54. static void deferred_configs_unlock()
  55. {
  56. dyncfg_shutdown = true;
  57. // if we get cancelled in pthread_cond_wait
  58. // we will arrive at cancelled cleanup handler
  59. // with mutex locked we need to unlock it
  60. pthread_mutex_unlock(&deferred_configs_lock);
  61. }
  62. static struct deferred_cfg_send *deferred_config_pop(void *ptr)
  63. {
  64. pthread_mutex_lock(&deferred_configs_lock);
  65. while (deferred_configs == NULL) {
  66. netdata_thread_cleanup_push(deferred_configs_unlock, ptr);
  67. pthread_cond_wait(&deferred_configs_cond, &deferred_configs_lock);
  68. netdata_thread_cleanup_pop(0);
  69. }
  70. struct deferred_cfg_send *deferred = deferred_configs;
  71. deferred_configs = deferred_configs->next;
  72. pthread_mutex_unlock(&deferred_configs_lock);
  73. return deferred;
  74. }
  75. static int _get_list_of_plugins_json_cb(const DICTIONARY_ITEM *item, void *entry, void *data)
  76. {
  77. UNUSED(item);
  78. json_object *obj = (json_object *)data;
  79. struct configurable_plugin *plugin = (struct configurable_plugin *)entry;
  80. json_object *plugin_name = json_object_new_string(plugin->name);
  81. json_object_array_add(obj, plugin_name);
  82. return 0;
  83. }
  84. json_object *get_list_of_plugins_json()
  85. {
  86. json_object *obj = json_object_new_array();
  87. dictionary_walkthrough_read(plugins_dict, _get_list_of_plugins_json_cb, obj);
  88. return obj;
  89. }
  90. static int _get_list_of_modules_json_cb(const DICTIONARY_ITEM *item, void *entry, void *data)
  91. {
  92. UNUSED(item);
  93. json_object *obj = (json_object *)data;
  94. struct module *module = (struct module *)entry;
  95. json_object *json_module = json_object_new_object();
  96. json_object *json_item = json_object_new_string(module->name);
  97. json_object_object_add(json_module, "name", json_item);
  98. const char *module_type;
  99. switch (module->type) {
  100. case MOD_TYPE_SINGLE:
  101. module_type = "single";
  102. break;
  103. case MOD_TYPE_ARRAY:
  104. module_type = "job_array";
  105. break;
  106. default:
  107. module_type = "unknown";
  108. break;
  109. }
  110. json_item = json_object_new_string(module_type);
  111. json_object_object_add(json_module, "type", json_item);
  112. json_object_array_add(obj, json_module);
  113. return 0;
  114. }
  115. json_object *get_list_of_modules_json(struct configurable_plugin *plugin)
  116. {
  117. json_object *obj = json_object_new_array();
  118. pthread_mutex_lock(&plugin->lock);
  119. dictionary_walkthrough_read(plugin->modules, _get_list_of_modules_json_cb, obj);
  120. pthread_mutex_unlock(&plugin->lock);
  121. return obj;
  122. }
  123. const char *job_status2str(enum job_status status)
  124. {
  125. switch (status) {
  126. case JOB_STATUS_UNKNOWN:
  127. return "unknown";
  128. case JOB_STATUS_STOPPED:
  129. return "stopped";
  130. case JOB_STATUS_RUNNING:
  131. return "running";
  132. case JOB_STATUS_ERROR:
  133. return "error";
  134. default:
  135. return "unknown";
  136. }
  137. }
  138. static int _get_list_of_jobs_json_cb(const DICTIONARY_ITEM *item, void *entry, void *data)
  139. {
  140. UNUSED(item);
  141. json_object *obj = (json_object *)data;
  142. struct job *job = (struct job *)entry;
  143. json_object *json_job = json_object_new_object();
  144. json_object *json_item = json_object_new_string(job->name);
  145. json_object_object_add(json_job, "name", json_item);
  146. json_item = json_object_new_string(job_status2str(job->status));
  147. json_object_object_add(json_job, "state", json_item);
  148. int64_t last_state_update_s = job->last_state_update / USEC_PER_SEC;
  149. int64_t last_state_update_us = job->last_state_update % USEC_PER_SEC;
  150. json_item = json_object_new_int64(last_state_update_s);
  151. json_object_object_add(json_job, "last_state_update_s", json_item);
  152. json_item = json_object_new_int64(last_state_update_us);
  153. json_object_object_add(json_job, "last_state_update_us", json_item);
  154. json_object_array_add(obj, json_job);
  155. return 0;
  156. }
  157. json_object *get_list_of_jobs_json(struct module *module)
  158. {
  159. json_object *obj = json_object_new_array();
  160. pthread_mutex_lock(&module->lock);
  161. dictionary_walkthrough_read(module->jobs, _get_list_of_jobs_json_cb, obj);
  162. pthread_mutex_unlock(&module->lock);
  163. return obj;
  164. }
  165. struct job *get_job_by_name(struct module *module, const char *job_name)
  166. {
  167. return dictionary_get(module->jobs, job_name);
  168. }
  169. int remove_job(struct module *module, struct job *job)
  170. {
  171. // as we are going to do unlink here we better make sure we have all to build proper path
  172. if (unlikely(job->name == NULL || module == NULL || module->name == NULL || module->plugin == NULL || module->plugin->name == NULL))
  173. return 0;
  174. enum set_config_result rc = module->delete_job_cb(module->job_config_cb_usr_ctx, module->name, job->name);
  175. if (rc != SET_CONFIG_ACCEPTED) {
  176. error_report("DYNCFG module \"%s\" rejected delete job for \"%s\"", module->name, job->name);
  177. return 0;
  178. }
  179. BUFFER *buffer = buffer_create(DYN_CONF_PATH_MAX, NULL);
  180. buffer_sprintf(buffer, DYN_CONF_DIR "/%s/%s/%s" DYN_CONF_CFG_EXT, module->plugin->name, module->name, job->name);
  181. unlink(buffer_tostring(buffer));
  182. buffer_free(buffer);
  183. return dictionary_del(module->jobs, job->name);
  184. }
  185. struct module *get_module_by_name(struct configurable_plugin *plugin, const char *module_name)
  186. {
  187. return dictionary_get(plugin->modules, module_name);
  188. }
  189. inline struct configurable_plugin *get_plugin_by_name(const char *name)
  190. {
  191. return dictionary_get(plugins_dict, name);
  192. }
  193. static int store_config(const char *module_name, const char *submodule_name, const char *cfg_idx, dyncfg_config_t cfg)
  194. {
  195. BUFFER *filename = buffer_create(DYN_CONF_PATH_MAX, NULL);
  196. buffer_sprintf(filename, DYN_CONF_DIR "/%s", module_name);
  197. if (mkdir(buffer_tostring(filename), 0755) == -1) {
  198. if (errno != EEXIST) {
  199. netdata_log_error("DYNCFG store_config: failed to create module directory %s", buffer_tostring(filename));
  200. buffer_free(filename);
  201. return 1;
  202. }
  203. }
  204. if (submodule_name != NULL) {
  205. buffer_sprintf(filename, "/%s", submodule_name);
  206. if (mkdir(buffer_tostring(filename), 0755) == -1) {
  207. if (errno != EEXIST) {
  208. netdata_log_error("DYNCFG store_config: failed to create submodule directory %s", buffer_tostring(filename));
  209. buffer_free(filename);
  210. return 1;
  211. }
  212. }
  213. }
  214. if (cfg_idx != NULL)
  215. buffer_sprintf(filename, "/%s", cfg_idx);
  216. buffer_strcat(filename, DYN_CONF_CFG_EXT);
  217. error_report("DYNCFG store_config: %s", buffer_tostring(filename));
  218. //write to file
  219. FILE *f = fopen(buffer_tostring(filename), "w");
  220. if (f == NULL) {
  221. error_report("DYNCFG store_config: failed to open %s for writing", buffer_tostring(filename));
  222. buffer_free(filename);
  223. return 1;
  224. }
  225. fwrite(cfg.data, cfg.data_size, 1, f);
  226. fclose(f);
  227. buffer_free(filename);
  228. return 0;
  229. }
  230. dyncfg_config_t load_config(const char *plugin_name, const char *module_name, const char *job_id)
  231. {
  232. BUFFER *filename = buffer_create(DYN_CONF_PATH_MAX, NULL);
  233. buffer_sprintf(filename, DYN_CONF_DIR "/%s", plugin_name);
  234. if (module_name != NULL)
  235. buffer_sprintf(filename, "/%s", module_name);
  236. if (job_id != NULL)
  237. buffer_sprintf(filename, "/%s", job_id);
  238. buffer_strcat(filename, DYN_CONF_CFG_EXT);
  239. dyncfg_config_t config;
  240. long bytes;
  241. config.data = read_by_filename(buffer_tostring(filename), &bytes);
  242. if (config.data == NULL)
  243. error_report("DYNCFG load_config: failed to load config from %s", buffer_tostring(filename));
  244. config.data_size = bytes;
  245. buffer_free(filename);
  246. return config;
  247. }
  248. char *set_plugin_config(struct configurable_plugin *plugin, dyncfg_config_t cfg)
  249. {
  250. enum set_config_result rc = plugin->set_config_cb(plugin->cb_usr_ctx, &cfg);
  251. if (rc != SET_CONFIG_ACCEPTED) {
  252. error_report("DYNCFG plugin \"%s\" rejected config", plugin->name);
  253. return "plugin rejected config";
  254. }
  255. if (store_config(plugin->name, NULL, NULL, cfg)) {
  256. error_report("DYNCFG could not store config for module \"%s\"", plugin->name);
  257. return "could not store config on disk";
  258. }
  259. return NULL;
  260. }
  261. static char *set_module_config(struct module *mod, dyncfg_config_t cfg)
  262. {
  263. struct configurable_plugin *plugin = mod->plugin;
  264. enum set_config_result rc = mod->set_config_cb(mod->config_cb_usr_ctx, mod->name, &cfg);
  265. if (rc != SET_CONFIG_ACCEPTED) {
  266. error_report("DYNCFG module \"%s\" rejected config", plugin->name);
  267. return "module rejected config";
  268. }
  269. if (store_config(plugin->name, mod->name, NULL, cfg)) {
  270. error_report("DYNCFG could not store config for module \"%s\"", mod->name);
  271. return "could not store config on disk";
  272. }
  273. return NULL;
  274. }
  275. struct job *job_new()
  276. {
  277. struct job *job = callocz(1, sizeof(struct job));
  278. job->state = JOB_STATUS_UNKNOWN;
  279. job->last_state_update = now_realtime_usec();
  280. return job;
  281. }
  282. static int set_job_config(struct job *job, dyncfg_config_t cfg)
  283. {
  284. struct module *mod = job->module;
  285. enum set_config_result rt = mod->set_job_config_cb(mod->job_config_cb_usr_ctx, mod->name, job->name, &cfg);
  286. if (rt != SET_CONFIG_ACCEPTED) {
  287. error_report("DYNCFG module \"%s\" rejected config for job \"%s\"", mod->name, job->name);
  288. return 1;
  289. }
  290. if (store_config(mod->plugin->name, mod->name, job->name, cfg)) {
  291. error_report("DYNCFG could not store config for module \"%s\"", mod->name);
  292. return 1;
  293. }
  294. return 0;
  295. }
  296. struct job *add_job(struct module *mod, const char *job_id, dyncfg_config_t cfg)
  297. {
  298. struct job *job = job_new();
  299. job->name = strdupz(job_id);
  300. job->module = mod;
  301. if (set_job_config(job, cfg)) {
  302. freez(job->name);
  303. freez(job);
  304. return NULL;
  305. }
  306. dictionary_set(mod->jobs, job->name, job, sizeof(job));
  307. return job;
  308. }
  309. void module_del_cb(const DICTIONARY_ITEM *item, void *value, void *data)
  310. {
  311. UNUSED(item);
  312. UNUSED(data);
  313. struct module *mod = (struct module *)value;
  314. dictionary_destroy(mod->jobs);
  315. freez(mod->name);
  316. freez(mod);
  317. }
  318. const DICTIONARY_ITEM *register_plugin(struct configurable_plugin *plugin)
  319. {
  320. if (get_plugin_by_name(plugin->name) != NULL) {
  321. error_report("DYNCFG plugin \"%s\" already registered", plugin->name);
  322. return NULL;
  323. }
  324. if (plugin->set_config_cb == NULL) {
  325. error_report("DYNCFG plugin \"%s\" has no set_config_cb", plugin->name);
  326. return NULL;
  327. }
  328. pthread_mutex_init(&plugin->lock, NULL);
  329. plugin->modules = dictionary_create(DICT_OPTION_VALUE_LINK_DONT_CLONE);
  330. dictionary_register_delete_callback(plugin->modules, module_del_cb, NULL);
  331. deferred_config_push_back(plugin->name, NULL, NULL);
  332. dictionary_set(plugins_dict, plugin->name, plugin, sizeof(plugin));
  333. // the plugin keeps the pointer to the dictionary item, so we need to acquire it
  334. return dictionary_get_and_acquire_item(plugins_dict, plugin->name);
  335. }
  336. void unregister_plugin(const DICTIONARY_ITEM *plugin)
  337. {
  338. struct configurable_plugin *plug = dictionary_acquired_item_value(plugin);
  339. dictionary_acquired_item_release(plugins_dict, plugin);
  340. dictionary_del(plugins_dict, plug->name);
  341. }
  342. void job_del_cb(const DICTIONARY_ITEM *item, void *value, void *data)
  343. {
  344. UNUSED(item);
  345. UNUSED(data);
  346. struct job *job = (struct job *)value;
  347. freez(job->reason);
  348. freez(job->name);
  349. freez(job);
  350. }
  351. int register_module(struct configurable_plugin *plugin, struct module *module)
  352. {
  353. if (get_module_by_name(plugin, module->name) != NULL) {
  354. error_report("DYNCFG module \"%s\" already registered", module->name);
  355. return 1;
  356. }
  357. pthread_mutex_init(&module->lock, NULL);
  358. deferred_config_push_back(plugin->name, module->name, NULL);
  359. module->plugin = plugin;
  360. if (module->type == MOD_TYPE_ARRAY) {
  361. module->jobs = dictionary_create(DICT_OPTION_VALUE_LINK_DONT_CLONE);
  362. dictionary_register_delete_callback(module->jobs, job_del_cb, NULL);
  363. // load all jobs from disk
  364. BUFFER *path = buffer_create(DYN_CONF_PATH_MAX, NULL);
  365. buffer_sprintf(path, "%s/%s/%s", DYN_CONF_DIR, plugin->name, module->name);
  366. DIR *dir = opendir(buffer_tostring(path));
  367. if (dir != NULL) {
  368. struct dirent *ent;
  369. while ((ent = readdir(dir)) != NULL) {
  370. if (ent->d_name[0] == '.')
  371. continue;
  372. if (ent->d_type != DT_REG)
  373. continue;
  374. size_t len = strnlen(ent->d_name, NAME_MAX);
  375. if (len <= strlen(DYN_CONF_CFG_EXT))
  376. continue;
  377. if (strcmp(ent->d_name + len - strlen(DYN_CONF_CFG_EXT), DYN_CONF_CFG_EXT) != 0)
  378. continue;
  379. ent->d_name[len - strlen(DYN_CONF_CFG_EXT)] = '\0';
  380. struct job *job = job_new();
  381. job->name = strdupz(ent->d_name);
  382. job->module = module;
  383. dictionary_set(module->jobs, job->name, job, sizeof(job));
  384. deferred_config_push_back(plugin->name, module->name, job->name);
  385. }
  386. closedir(dir);
  387. }
  388. buffer_free(path);
  389. }
  390. dictionary_set(plugin->modules, module->name, module, sizeof(module));
  391. return 0;
  392. }
  393. void freez_dyncfg(void *ptr) {
  394. freez(ptr);
  395. }
  396. static void handle_dyncfg_root(struct uni_http_response *resp, int method)
  397. {
  398. if (method != HTTP_METHOD_GET) {
  399. resp->content = "method not allowed";
  400. resp->content_length = strlen(resp->content);
  401. resp->status = HTTP_RESP_METHOD_NOT_ALLOWED;
  402. return;
  403. }
  404. json_object *obj = get_list_of_plugins_json();
  405. json_object *wrapper = json_object_new_object();
  406. json_object_object_add(wrapper, "configurable_plugins", obj);
  407. resp->content = strdupz(json_object_to_json_string_ext(wrapper, JSON_C_TO_STRING_PRETTY));
  408. json_object_put(wrapper);
  409. resp->status = HTTP_RESP_OK;
  410. resp->content_type = CT_APPLICATION_JSON;
  411. resp->content_free = freez_dyncfg;
  412. resp->content_length = strlen(resp->content);
  413. }
  414. static void handle_plugin_root(struct uni_http_response *resp, int method, struct configurable_plugin *plugin, void *post_payload, size_t post_payload_size)
  415. {
  416. switch(method) {
  417. case HTTP_METHOD_GET:
  418. {
  419. dyncfg_config_t cfg = plugin->get_config_cb(plugin->cb_usr_ctx);
  420. resp->content = mallocz(cfg.data_size);
  421. memcpy(resp->content, cfg.data, cfg.data_size);
  422. resp->status = HTTP_RESP_OK;
  423. resp->content_free = freez_dyncfg;
  424. resp->content_length = cfg.data_size;
  425. return;
  426. }
  427. case HTTP_METHOD_PUT:
  428. {
  429. char *response;
  430. if (post_payload == NULL) {
  431. resp->content = "no payload";
  432. resp->content_length = strlen(resp->content);
  433. resp->status = HTTP_RESP_BAD_REQUEST;
  434. return;
  435. }
  436. dyncfg_config_t cont = {
  437. .data = post_payload,
  438. .data_size = post_payload_size
  439. };
  440. response = set_plugin_config(plugin, cont);
  441. if (response == NULL) {
  442. resp->status = HTTP_RESP_OK;
  443. resp->content = "OK";
  444. resp->content_length = strlen(resp->content);
  445. } else {
  446. resp->status = HTTP_RESP_BAD_REQUEST;
  447. resp->content = response;
  448. resp->content_length = strlen(resp->content);
  449. }
  450. return;
  451. }
  452. default:
  453. resp->content = "method not allowed";
  454. resp->content_length = strlen(resp->content);
  455. resp->status = HTTP_RESP_METHOD_NOT_ALLOWED;
  456. return;
  457. }
  458. }
  459. void handle_module_root(struct uni_http_response *resp, int method, struct configurable_plugin *plugin, const char *module, void *post_payload, size_t post_payload_size)
  460. {
  461. if (strncmp(module, DYN_CONF_SCHEMA, strlen(DYN_CONF_SCHEMA)) == 0) {
  462. dyncfg_config_t cfg = plugin->get_config_schema_cb(plugin->cb_usr_ctx);
  463. resp->content = mallocz(cfg.data_size);
  464. memcpy(resp->content, cfg.data, cfg.data_size);
  465. resp->status = HTTP_RESP_OK;
  466. resp->content_free = freez_dyncfg;
  467. resp->content_length = cfg.data_size;
  468. return;
  469. }
  470. if (strncmp(module, DYN_CONF_MODULE_LIST, strlen(DYN_CONF_MODULE_LIST)) == 0) {
  471. if (method != HTTP_METHOD_GET) {
  472. resp->content = "method not allowed (only GET)";
  473. resp->content_length = strlen(resp->content);
  474. resp->status = HTTP_RESP_METHOD_NOT_ALLOWED;
  475. return;
  476. }
  477. json_object *obj = get_list_of_modules_json(plugin);
  478. json_object *wrapper = json_object_new_object();
  479. json_object_object_add(wrapper, "modules", obj);
  480. resp->content = strdupz(json_object_to_json_string_ext(wrapper, JSON_C_TO_STRING_PRETTY));
  481. json_object_put(wrapper);
  482. resp->status = HTTP_RESP_OK;
  483. resp->content_type = CT_APPLICATION_JSON;
  484. resp->content_free = freez_dyncfg;
  485. resp->content_length = strlen(resp->content);
  486. return;
  487. }
  488. struct module *mod = get_module_by_name(plugin, module);
  489. if (mod == NULL) {
  490. resp->content = "module not found";
  491. resp->content_length = strlen(resp->content);
  492. resp->status = HTTP_RESP_NOT_FOUND;
  493. return;
  494. }
  495. if (method == HTTP_METHOD_GET) {
  496. dyncfg_config_t cfg = mod->get_config_cb(mod->config_cb_usr_ctx, mod->name);
  497. resp->content = mallocz(cfg.data_size);
  498. memcpy(resp->content, cfg.data, cfg.data_size);
  499. resp->status = HTTP_RESP_OK;
  500. resp->content_free = freez_dyncfg;
  501. resp->content_length = cfg.data_size;
  502. return;
  503. } else if (method == HTTP_METHOD_PUT) {
  504. char *response;
  505. if (post_payload == NULL) {
  506. resp->content = "no payload";
  507. resp->content_length = strlen(resp->content);
  508. resp->status = HTTP_RESP_BAD_REQUEST;
  509. return;
  510. }
  511. dyncfg_config_t cont = {
  512. .data = post_payload,
  513. .data_size = post_payload_size
  514. };
  515. response = set_module_config(mod, cont);
  516. if (response == NULL) {
  517. resp->status = HTTP_RESP_OK;
  518. resp->content = "OK";
  519. resp->content_length = strlen(resp->content);
  520. } else {
  521. resp->status = HTTP_RESP_BAD_REQUEST;
  522. resp->content = response;
  523. resp->content_length = strlen(resp->content);
  524. }
  525. return;
  526. }
  527. resp->content = "method not allowed";
  528. resp->content_length = strlen(resp->content);
  529. resp->status = HTTP_RESP_METHOD_NOT_ALLOWED;
  530. }
  531. static inline void _handle_job_root(struct uni_http_response *resp, int method, struct module *mod, const char *job_id, void *post_payload, size_t post_payload_size, struct job *job)
  532. {
  533. if (method == HTTP_METHOD_POST) {
  534. if (job != NULL) {
  535. resp->content = "can't POST, job already exists (use PUT to update?)";
  536. resp->content_length = strlen(resp->content);
  537. resp->status = HTTP_RESP_BAD_REQUEST;
  538. return;
  539. }
  540. if (post_payload == NULL) {
  541. resp->content = "no payload";
  542. resp->content_length = strlen(resp->content);
  543. resp->status = HTTP_RESP_BAD_REQUEST;
  544. return;
  545. }
  546. dyncfg_config_t cont = {
  547. .data = post_payload,
  548. .data_size = post_payload_size
  549. };
  550. job = add_job(mod, job_id, cont);
  551. if (job == NULL) {
  552. resp->content = "failed to add job";
  553. resp->content_length = strlen(resp->content);
  554. resp->status = HTTP_RESP_INTERNAL_SERVER_ERROR;
  555. return;
  556. }
  557. resp->status = HTTP_RESP_OK;
  558. resp->content = "OK";
  559. resp->content_length = strlen(resp->content);
  560. return;
  561. }
  562. if (job == NULL) {
  563. resp->content = "job not found";
  564. resp->content_length = strlen(resp->content);
  565. resp->status = HTTP_RESP_NOT_FOUND;
  566. return;
  567. }
  568. switch (method) {
  569. case HTTP_METHOD_GET:
  570. {
  571. dyncfg_config_t cfg = mod->get_job_config_cb(mod->job_config_cb_usr_ctx, mod->name, job->name);
  572. resp->content = mallocz(cfg.data_size);
  573. memcpy(resp->content, cfg.data, cfg.data_size);
  574. resp->status = HTTP_RESP_OK;
  575. resp->content_free = freez_dyncfg;
  576. resp->content_length = cfg.data_size;
  577. return;
  578. }
  579. case HTTP_METHOD_PUT:
  580. {
  581. if (post_payload == NULL) {
  582. resp->content = "missing payload";
  583. resp->content_length = strlen(resp->content);
  584. resp->status = HTTP_RESP_BAD_REQUEST;
  585. return;
  586. }
  587. dyncfg_config_t cont = {
  588. .data = post_payload,
  589. .data_size = post_payload_size
  590. };
  591. if(set_job_config(job, cont)) {
  592. resp->status = HTTP_RESP_BAD_REQUEST;
  593. resp->content = "failed to set job config";
  594. resp->content_length = strlen(resp->content);
  595. return;
  596. }
  597. resp->status = HTTP_RESP_OK;
  598. resp->content = "OK";
  599. resp->content_length = strlen(resp->content);
  600. return;
  601. }
  602. case HTTP_METHOD_DELETE:
  603. {
  604. if (!remove_job(mod, job)) {
  605. resp->content = "failed to remove job";
  606. resp->content_length = strlen(resp->content);
  607. resp->status = HTTP_RESP_INTERNAL_SERVER_ERROR;
  608. return;
  609. }
  610. resp->status = HTTP_RESP_OK;
  611. resp->content = "OK";
  612. resp->content_length = strlen(resp->content);
  613. return;
  614. }
  615. default:
  616. resp->content = "method not allowed (only GET, PUT, DELETE)";
  617. resp->content_length = strlen(resp->content);
  618. resp->status = HTTP_RESP_METHOD_NOT_ALLOWED;
  619. return;
  620. }
  621. }
  622. void handle_job_root(struct uni_http_response *resp, int method, struct module *mod, const char *job_id, void *post_payload, size_t post_payload_size)
  623. {
  624. if (strncmp(job_id, DYN_CONF_SCHEMA, strlen(DYN_CONF_SCHEMA)) == 0) {
  625. dyncfg_config_t cfg = mod->get_config_schema_cb(mod->config_cb_usr_ctx, mod->name);
  626. resp->content = mallocz(cfg.data_size);
  627. memcpy(resp->content, cfg.data, cfg.data_size);
  628. resp->status = HTTP_RESP_OK;
  629. resp->content_free = freez_dyncfg;
  630. resp->content_length = cfg.data_size;
  631. return;
  632. }
  633. if (strncmp(job_id, DYN_CONF_JOB_SCHEMA, strlen(DYN_CONF_JOB_SCHEMA)) == 0) {
  634. dyncfg_config_t cfg = mod->get_job_config_schema_cb(mod->job_config_cb_usr_ctx, mod->name);
  635. resp->content = mallocz(cfg.data_size);
  636. memcpy(resp->content, cfg.data, cfg.data_size);
  637. resp->status = HTTP_RESP_OK;
  638. resp->content_free = freez_dyncfg;
  639. resp->content_length = cfg.data_size;
  640. return;
  641. }
  642. if (strncmp(job_id, DYN_CONF_JOB_LIST, strlen(DYN_CONF_JOB_LIST)) == 0) {
  643. if (mod->type != MOD_TYPE_ARRAY) {
  644. resp->content = "module type is not job_array (can't get the list of jobs)";
  645. resp->content_length = strlen(resp->content);
  646. resp->status = HTTP_RESP_NOT_FOUND;
  647. return;
  648. }
  649. if (method != HTTP_METHOD_GET) {
  650. resp->content = "method not allowed (only GET)";
  651. resp->content_length = strlen(resp->content);
  652. resp->status = HTTP_RESP_METHOD_NOT_ALLOWED;
  653. return;
  654. }
  655. json_object *obj = get_list_of_jobs_json(mod);
  656. json_object *wrapper = json_object_new_object();
  657. json_object_object_add(wrapper, "jobs", obj);
  658. resp->content = strdupz(json_object_to_json_string_ext(wrapper, JSON_C_TO_STRING_PRETTY));
  659. json_object_put(wrapper);
  660. resp->status = HTTP_RESP_OK;
  661. resp->content_type = CT_APPLICATION_JSON;
  662. resp->content_free = freez_dyncfg;
  663. resp->content_length = strlen(resp->content);
  664. return;
  665. }
  666. const DICTIONARY_ITEM *job_item = dictionary_get_and_acquire_item(mod->jobs, job_id);
  667. struct job *job = dictionary_acquired_item_value(job_item);
  668. _handle_job_root(resp, method, mod, job_id, post_payload, post_payload_size, job);
  669. dictionary_acquired_item_release(mod->jobs, job_item);
  670. }
  671. struct uni_http_response dyn_conf_process_http_request(int method, const char *plugin, const char *module, const char *job_id, void *post_payload, size_t post_payload_size)
  672. {
  673. struct uni_http_response resp = {
  674. .status = HTTP_RESP_INTERNAL_SERVER_ERROR,
  675. .content_type = CT_TEXT_PLAIN,
  676. .content = HTTP_RESP_INTERNAL_SERVER_ERROR_STR,
  677. .content_free = NULL,
  678. .content_length = 0
  679. };
  680. if (plugin == NULL) {
  681. handle_dyncfg_root(&resp, method);
  682. return resp;
  683. }
  684. const DICTIONARY_ITEM *plugin_item = dictionary_get_and_acquire_item(plugins_dict, plugin);
  685. if (plugin_item == NULL) {
  686. resp.content = "plugin not found";
  687. resp.content_length = strlen(resp.content);
  688. resp.status = HTTP_RESP_NOT_FOUND;
  689. return resp;
  690. }
  691. struct configurable_plugin *plug = dictionary_acquired_item_value(plugin_item);
  692. if (module == NULL) {
  693. handle_plugin_root(&resp, method, plug, post_payload, post_payload_size);
  694. goto EXIT_PLUGIN;
  695. }
  696. if (job_id == NULL) {
  697. handle_module_root(&resp, method, plug, module, post_payload, post_payload_size);
  698. goto EXIT_PLUGIN;
  699. }
  700. // for modules we do not do get_and_acquire as modules are never removed (only together with the plugin)
  701. struct module *mod = get_module_by_name(plug, module);
  702. if (mod == NULL) {
  703. resp.content = "module not found";
  704. resp.content_length = strlen(resp.content);
  705. resp.status = HTTP_RESP_NOT_FOUND;
  706. goto EXIT_PLUGIN;
  707. }
  708. if (mod->type != MOD_TYPE_ARRAY) {
  709. resp.content = "module is not array";
  710. resp.content_length = strlen(resp.content);
  711. resp.status = HTTP_RESP_NOT_FOUND;
  712. goto EXIT_PLUGIN;
  713. }
  714. handle_job_root(&resp, method, mod, job_id, post_payload, post_payload_size);
  715. EXIT_PLUGIN:
  716. dictionary_acquired_item_release(plugins_dict, plugin_item);
  717. return resp;
  718. }
  719. void plugin_del_cb(const DICTIONARY_ITEM *item, void *value, void *data)
  720. {
  721. UNUSED(item);
  722. UNUSED(data);
  723. struct configurable_plugin *plugin = (struct configurable_plugin *)value;
  724. dictionary_destroy(plugin->modules);
  725. freez(plugin->name);
  726. freez(plugin);
  727. }
  728. void report_job_status(struct configurable_plugin *plugin, const char *module_name, const char *job_name, enum job_status status, int status_code, char *reason)
  729. {
  730. const DICTIONARY_ITEM *item = dictionary_get_and_acquire_item(plugins_dict, plugin->name);
  731. if (item == NULL) {
  732. netdata_log_error("plugin %s not found", plugin->name);
  733. return;
  734. }
  735. struct configurable_plugin *plug = dictionary_acquired_item_value(item);
  736. struct module *mod = get_module_by_name(plug, module_name);
  737. if (mod == NULL) {
  738. netdata_log_error("module %s not found", module_name);
  739. goto EXIT_PLUGIN;
  740. }
  741. if (mod->type != MOD_TYPE_ARRAY) {
  742. netdata_log_error("module %s is not array", module_name);
  743. goto EXIT_PLUGIN;
  744. }
  745. const DICTIONARY_ITEM *job_item = dictionary_get_and_acquire_item(mod->jobs, job_name);
  746. if (job_item == NULL) {
  747. netdata_log_error("job %s not found", job_name);
  748. goto EXIT_PLUGIN;
  749. }
  750. struct job *job = dictionary_acquired_item_value(job_item);
  751. job->status = status;
  752. job->state = status_code;
  753. if (job->reason != NULL) {
  754. freez(job->reason);
  755. }
  756. job->reason = reason;
  757. job->last_state_update = now_realtime_usec();
  758. dictionary_acquired_item_release(mod->jobs, job_item);
  759. EXIT_PLUGIN:
  760. dictionary_acquired_item_release(plugins_dict, item);
  761. }
  762. int dyn_conf_init(void)
  763. {
  764. if (mkdir(DYN_CONF_DIR, 0755) == -1) {
  765. if (errno != EEXIST) {
  766. netdata_log_error("failed to create directory for dynamic configuration");
  767. return 1;
  768. }
  769. }
  770. plugins_dict = dictionary_create(DICT_OPTION_VALUE_LINK_DONT_CLONE);
  771. dictionary_register_delete_callback(plugins_dict, plugin_del_cb, NULL);
  772. return 0;
  773. }
  774. static void dyncfg_cleanup(void *ptr) {
  775. struct netdata_static_thread *static_thread = (struct netdata_static_thread *) ptr;
  776. static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
  777. netdata_log_info("cleaning up...");
  778. pthread_mutex_lock(&deferred_configs_lock);
  779. dyncfg_shutdown = true;
  780. while (deferred_configs != NULL) {
  781. struct deferred_cfg_send *dcs = deferred_configs;
  782. deferred_configs = dcs->next;
  783. deferred_config_free(dcs);
  784. }
  785. pthread_mutex_unlock(&deferred_configs_lock);
  786. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  787. }
  788. void *dyncfg_main(void *ptr)
  789. {
  790. netdata_thread_cleanup_push(dyncfg_cleanup, ptr);
  791. while (!netdata_exit) {
  792. struct deferred_cfg_send *dcs = deferred_config_pop(ptr);
  793. const DICTIONARY_ITEM *plugin_item = dictionary_get_and_acquire_item(plugins_dict, dcs->plugin_name);
  794. if (plugin_item == NULL) {
  795. error_report("DYNCFG, plugin %s not found", dcs->plugin_name);
  796. deferred_config_free(dcs);
  797. continue;
  798. }
  799. struct configurable_plugin *plugin = dictionary_acquired_item_value(plugin_item);
  800. if (dcs->module_name == NULL) {
  801. dyncfg_config_t cfg = load_config(dcs->plugin_name, NULL, NULL);
  802. if (cfg.data != NULL) {
  803. plugin->set_config_cb(plugin->cb_usr_ctx, &cfg);
  804. freez(cfg.data);
  805. }
  806. } else if (dcs->job_name == NULL) {
  807. dyncfg_config_t cfg = load_config(dcs->plugin_name, dcs->module_name, NULL);
  808. if (cfg.data != NULL) {
  809. struct module *mod = get_module_by_name(plugin, dcs->module_name);
  810. mod->set_config_cb(mod->config_cb_usr_ctx, mod->name, &cfg);
  811. freez(cfg.data);
  812. }
  813. } else {
  814. dyncfg_config_t cfg = load_config(dcs->plugin_name, dcs->module_name, dcs->job_name);
  815. if (cfg.data != NULL) {
  816. struct module *mod = get_module_by_name(plugin, dcs->module_name);
  817. mod->set_job_config_cb(mod->job_config_cb_usr_ctx, mod->name, dcs->job_name, &cfg);
  818. freez(cfg.data);
  819. }
  820. }
  821. deferred_config_free(dcs);
  822. dictionary_acquired_item_release(plugins_dict, plugin_item);
  823. }
  824. netdata_thread_cleanup_pop(1);
  825. return NULL;
  826. }