worker_utilization.c 14 KB


  1. #include "worker_utilization.h"
  2. #define WORKER_IDLE 'I'
  3. #define WORKER_BUSY 'B'
  4. struct worker_job_type {
  5. STRING *name;
  6. STRING *units;
  7. // statistics controlled variables
  8. size_t statistics_last_jobs_started;
  9. usec_t statistics_last_busy_time;
  10. NETDATA_DOUBLE statistics_last_custom_value;
  11. // worker controlled variables
  12. volatile size_t worker_jobs_started;
  13. volatile usec_t worker_busy_time;
  14. WORKER_METRIC_TYPE type;
  15. NETDATA_DOUBLE custom_value;
  16. };
  17. struct worker {
  18. pid_t pid;
  19. const char *tag;
  20. const char *workname;
  21. // statistics controlled variables
  22. volatile usec_t statistics_last_checkpoint;
  23. size_t statistics_last_jobs_started;
  24. usec_t statistics_last_busy_time;
  25. // the worker controlled variables
  26. size_t worker_max_job_id;
  27. volatile size_t job_id;
  28. volatile size_t jobs_started;
  29. volatile usec_t busy_time;
  30. volatile usec_t last_action_timestamp;
  31. volatile char last_action;
  32. struct worker_job_type per_job_type[WORKER_UTILIZATION_MAX_JOB_TYPES];
  33. struct worker *next;
  34. struct worker *prev;
  35. };
  36. struct workers_workname { // this is what we add to JudyHS
  37. SPINLOCK spinlock;
  38. struct worker *base;
  39. };
  40. static struct workers_globals {
  41. bool enabled;
  42. SPINLOCK spinlock;
  43. Pvoid_t worknames_JudyHS;
  44. size_t memory;
  45. } workers_globals = { // workers globals, the base of all worknames
  46. .enabled = false,
  47. .spinlock = NETDATA_SPINLOCK_INITIALIZER, // a lock for the worknames index
  48. .worknames_JudyHS = NULL, // the worknames index
  49. };
  50. static __thread struct worker *worker = NULL; // the current thread worker
  51. static inline usec_t worker_now_monotonic_usec(void) {
  52. #ifdef NETDATA_WITHOUT_WORKERS_LATENCY
  53. return 0;
  54. #else
  55. return now_monotonic_usec();
  56. #endif
  57. }
  58. void workers_utilization_enable(void) {
  59. workers_globals.enabled = true;
  60. }
  61. size_t workers_allocated_memory(void) {
  62. if(!workers_globals.enabled)
  63. return 0;
  64. spinlock_lock(&workers_globals.spinlock);
  65. size_t memory = workers_globals.memory;
  66. spinlock_unlock(&workers_globals.spinlock);
  67. return memory;
  68. }
  69. void worker_register(const char *name) {
  70. if(unlikely(worker || !workers_globals.enabled))
  71. return;
  72. worker = callocz(1, sizeof(struct worker));
  73. worker->pid = gettid();
  74. worker->tag = strdupz(netdata_thread_tag());
  75. worker->workname = strdupz(name);
  76. usec_t now = worker_now_monotonic_usec();
  77. worker->statistics_last_checkpoint = now;
  78. worker->last_action_timestamp = now;
  79. worker->last_action = WORKER_IDLE;
  80. size_t name_size = strlen(name) + 1;
  81. spinlock_lock(&workers_globals.spinlock);
  82. workers_globals.memory += sizeof(struct worker) + strlen(worker->tag) + 1 + strlen(worker->workname) + 1;
  83. Pvoid_t *PValue = JudyHSIns(&workers_globals.worknames_JudyHS, (void *)name, name_size, PJE0);
  84. struct workers_workname *workname = *PValue;
  85. if(!workname) {
  86. workname = mallocz(sizeof(struct workers_workname));
  87. spinlock_init(&workname->spinlock);
  88. workname->base = NULL;
  89. *PValue = workname;
  90. workers_globals.memory += sizeof(struct workers_workname) + JUDYHS_INDEX_SIZE_ESTIMATE(name_size);
  91. }
  92. spinlock_lock(&workname->spinlock);
  93. DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(workname->base, worker, prev, next);
  94. spinlock_unlock(&workname->spinlock);
  95. spinlock_unlock(&workers_globals.spinlock);
  96. }
  97. void worker_register_job_custom_metric(size_t job_id, const char *name, const char *units, WORKER_METRIC_TYPE type) {
  98. if(unlikely(!worker)) return;
  99. if(unlikely(job_id >= WORKER_UTILIZATION_MAX_JOB_TYPES)) {
  100. netdata_log_error("WORKER_UTILIZATION: job_id %zu is too big. Max is %zu", job_id, (size_t)(WORKER_UTILIZATION_MAX_JOB_TYPES - 1));
  101. return;
  102. }
  103. if(job_id > worker->worker_max_job_id)
  104. worker->worker_max_job_id = job_id;
  105. if(worker->per_job_type[job_id].name) {
  106. if(strcmp(string2str(worker->per_job_type[job_id].name), name) != 0 || worker->per_job_type[job_id].type != type || strcmp(string2str(worker->per_job_type[job_id].units), units) != 0)
  107. netdata_log_error("WORKER_UTILIZATION: duplicate job registration: worker '%s' job id %zu is '%s', ignoring the later '%s'", worker->workname, job_id, string2str(worker->per_job_type[job_id].name), name);
  108. return;
  109. }
  110. worker->per_job_type[job_id].name = string_strdupz(name);
  111. worker->per_job_type[job_id].units = string_strdupz(units);
  112. worker->per_job_type[job_id].type = type;
  113. }
  114. void worker_register_job_name(size_t job_id, const char *name) {
  115. worker_register_job_custom_metric(job_id, name, "", WORKER_METRIC_IDLE_BUSY);
  116. }
  117. void worker_unregister(void) {
  118. if(unlikely(!worker)) return;
  119. size_t workname_size = strlen(worker->workname) + 1;
  120. spinlock_lock(&workers_globals.spinlock);
  121. Pvoid_t *PValue = JudyHSGet(workers_globals.worknames_JudyHS, (void *)worker->workname, workname_size);
  122. if(PValue) {
  123. struct workers_workname *workname = *PValue;
  124. spinlock_lock(&workname->spinlock);
  125. DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(workname->base, worker, prev, next);
  126. spinlock_unlock(&workname->spinlock);
  127. if(!workname->base) {
  128. JudyHSDel(&workers_globals.worknames_JudyHS, (void *) worker->workname, workname_size, PJE0);
  129. freez(workname);
  130. workers_globals.memory -= sizeof(struct workers_workname) + JUDYHS_INDEX_SIZE_ESTIMATE(workname_size);
  131. }
  132. }
  133. workers_globals.memory -= sizeof(struct worker) + strlen(worker->tag) + 1 + strlen(worker->workname) + 1;
  134. spinlock_unlock(&workers_globals.spinlock);
  135. for(int i = 0; i < WORKER_UTILIZATION_MAX_JOB_TYPES ;i++) {
  136. string_freez(worker->per_job_type[i].name);
  137. string_freez(worker->per_job_type[i].units);
  138. }
  139. freez((void *)worker->tag);
  140. freez((void *)worker->workname);
  141. freez(worker);
  142. worker = NULL;
  143. }
  144. static inline void worker_is_idle_with_time(usec_t now) {
  145. usec_t delta = now - worker->last_action_timestamp;
  146. worker->busy_time += delta;
  147. worker->per_job_type[worker->job_id].worker_busy_time += delta;
  148. // the worker was busy
  149. // set it to idle before we set the timestamp
  150. worker->last_action = WORKER_IDLE;
  151. if(likely(worker->last_action_timestamp < now))
  152. worker->last_action_timestamp = now;
  153. }
  154. void worker_is_idle(void) {
  155. if(unlikely(!worker || worker->last_action != WORKER_BUSY)) return;
  156. worker_is_idle_with_time(worker_now_monotonic_usec());
  157. }
  158. void worker_is_busy(size_t job_id) {
  159. if(unlikely(!worker || job_id >= WORKER_UTILIZATION_MAX_JOB_TYPES))
  160. return;
  161. usec_t now = worker_now_monotonic_usec();
  162. if(worker->last_action == WORKER_BUSY)
  163. worker_is_idle_with_time(now);
  164. // the worker was idle
  165. // set the timestamp and then set it to busy
  166. worker->job_id = job_id;
  167. worker->per_job_type[job_id].worker_jobs_started++;
  168. worker->jobs_started++;
  169. worker->last_action_timestamp = now;
  170. worker->last_action = WORKER_BUSY;
  171. }
  172. void worker_set_metric(size_t job_id, NETDATA_DOUBLE value) {
  173. if(unlikely(!worker)) return;
  174. if(unlikely(job_id >= WORKER_UTILIZATION_MAX_JOB_TYPES))
  175. return;
  176. switch(worker->per_job_type[job_id].type) {
  177. case WORKER_METRIC_INCREMENT:
  178. worker->per_job_type[job_id].custom_value += value;
  179. break;
  180. case WORKER_METRIC_INCREMENTAL_TOTAL:
  181. case WORKER_METRIC_ABSOLUTE:
  182. default:
  183. worker->per_job_type[job_id].custom_value = value;
  184. break;
  185. }
  186. }
  187. // statistics interface
  188. void workers_foreach(const char *name, void (*callback)(
  189. void *data
  190. , pid_t pid
  191. , const char *thread_tag
  192. , size_t max_job_id
  193. , size_t utilization_usec
  194. , size_t duration_usec
  195. , size_t jobs_started, size_t is_running
  196. , STRING **job_types_names
  197. , STRING **job_types_units
  198. , WORKER_METRIC_TYPE *job_metric_types
  199. , size_t *job_types_jobs_started
  200. , usec_t *job_types_busy_time
  201. , NETDATA_DOUBLE *job_custom_values
  202. )
  203. , void *data) {
  204. if(!workers_globals.enabled)
  205. return;
  206. spinlock_lock(&workers_globals.spinlock);
  207. usec_t busy_time, delta;
  208. size_t i, jobs_started, jobs_running;
  209. size_t workname_size = strlen(name) + 1;
  210. struct workers_workname *workname;
  211. Pvoid_t *PValue = JudyHSGet(workers_globals.worknames_JudyHS, (void *)name, workname_size);
  212. if(PValue) {
  213. workname = *PValue;
  214. spinlock_lock(&workname->spinlock);
  215. }
  216. else
  217. workname = NULL;
  218. spinlock_unlock(&workers_globals.spinlock);
  219. if(!workname)
  220. return;
  221. struct worker *p;
  222. DOUBLE_LINKED_LIST_FOREACH_FORWARD(workname->base, p, prev, next) {
  223. usec_t now = worker_now_monotonic_usec();
  224. // find per job type statistics
  225. STRING *per_job_type_name[WORKER_UTILIZATION_MAX_JOB_TYPES];
  226. STRING *per_job_type_units[WORKER_UTILIZATION_MAX_JOB_TYPES];
  227. WORKER_METRIC_TYPE per_job_metric_type[WORKER_UTILIZATION_MAX_JOB_TYPES];
  228. size_t per_job_type_jobs_started[WORKER_UTILIZATION_MAX_JOB_TYPES];
  229. usec_t per_job_type_busy_time[WORKER_UTILIZATION_MAX_JOB_TYPES];
  230. NETDATA_DOUBLE per_job_custom_values[WORKER_UTILIZATION_MAX_JOB_TYPES];
  231. size_t max_job_id = p->worker_max_job_id;
  232. for(i = 0; i <= max_job_id ;i++) {
  233. per_job_type_name[i] = p->per_job_type[i].name;
  234. per_job_type_units[i] = p->per_job_type[i].units;
  235. per_job_metric_type[i] = p->per_job_type[i].type;
  236. switch(p->per_job_type[i].type) {
  237. default:
  238. case WORKER_METRIC_EMPTY: {
  239. per_job_type_jobs_started[i] = 0;
  240. per_job_type_busy_time[i] = 0;
  241. per_job_custom_values[i] = NAN;
  242. break;
  243. }
  244. case WORKER_METRIC_IDLE_BUSY: {
  245. size_t tmp_jobs_started = p->per_job_type[i].worker_jobs_started;
  246. per_job_type_jobs_started[i] = tmp_jobs_started - p->per_job_type[i].statistics_last_jobs_started;
  247. p->per_job_type[i].statistics_last_jobs_started = tmp_jobs_started;
  248. usec_t tmp_busy_time = p->per_job_type[i].worker_busy_time;
  249. per_job_type_busy_time[i] = tmp_busy_time - p->per_job_type[i].statistics_last_busy_time;
  250. p->per_job_type[i].statistics_last_busy_time = tmp_busy_time;
  251. per_job_custom_values[i] = NAN;
  252. break;
  253. }
  254. case WORKER_METRIC_ABSOLUTE: {
  255. per_job_type_jobs_started[i] = 0;
  256. per_job_type_busy_time[i] = 0;
  257. per_job_custom_values[i] = p->per_job_type[i].custom_value;
  258. break;
  259. }
  260. case WORKER_METRIC_INCREMENTAL_TOTAL:
  261. case WORKER_METRIC_INCREMENT: {
  262. per_job_type_jobs_started[i] = 0;
  263. per_job_type_busy_time[i] = 0;
  264. NETDATA_DOUBLE tmp_custom_value = p->per_job_type[i].custom_value;
  265. per_job_custom_values[i] = tmp_custom_value - p->per_job_type[i].statistics_last_custom_value;
  266. p->per_job_type[i].statistics_last_custom_value = tmp_custom_value;
  267. break;
  268. }
  269. }
  270. }
  271. // get a copy of the worker variables
  272. size_t worker_job_id = p->job_id;
  273. usec_t worker_busy_time = p->busy_time;
  274. size_t worker_jobs_started = p->jobs_started;
  275. char worker_last_action = p->last_action;
  276. usec_t worker_last_action_timestamp = p->last_action_timestamp;
  277. delta = now - p->statistics_last_checkpoint;
  278. p->statistics_last_checkpoint = now;
  279. // this is the only variable both the worker thread and the statistics thread are writing
  280. // we set this only when the worker is busy, so that the worker will not
  281. // accumulate all the busy time, but only the time after the point we collected statistics
  282. if(worker_last_action == WORKER_BUSY && p->last_action_timestamp == worker_last_action_timestamp && p->last_action == WORKER_BUSY)
  283. p->last_action_timestamp = now;
  284. // calculate delta busy time
  285. busy_time = worker_busy_time - p->statistics_last_busy_time;
  286. p->statistics_last_busy_time = worker_busy_time;
  287. // calculate delta jobs done
  288. jobs_started = worker_jobs_started - p->statistics_last_jobs_started;
  289. p->statistics_last_jobs_started = worker_jobs_started;
  290. jobs_running = 0;
  291. if(worker_last_action == WORKER_BUSY) {
  292. // the worker is still busy with something
  293. // let's add that busy time to the reported one
  294. usec_t dt = now - worker_last_action_timestamp;
  295. busy_time += dt;
  296. per_job_type_busy_time[worker_job_id] += dt;
  297. jobs_running = 1;
  298. }
  299. callback(data
  300. , p->pid
  301. , p->tag
  302. , max_job_id
  303. , busy_time
  304. , delta
  305. , jobs_started
  306. , jobs_running
  307. , per_job_type_name
  308. , per_job_type_units
  309. , per_job_metric_type
  310. , per_job_type_jobs_started
  311. , per_job_type_busy_time
  312. , per_job_custom_values
  313. );
  314. }
  315. spinlock_unlock(&workname->spinlock);
  316. }