worker_utilization.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. #include "worker_utilization.h"
  2. #define WORKER_IDLE 'I'
  3. #define WORKER_BUSY 'B'
  4. struct worker_job_type {
  5. STRING *name;
  6. STRING *units;
  7. // statistics controlled variables
  8. size_t statistics_last_jobs_started;
  9. usec_t statistics_last_busy_time;
  10. NETDATA_DOUBLE statistics_last_custom_value;
  11. // worker controlled variables
  12. volatile size_t worker_jobs_started;
  13. volatile usec_t worker_busy_time;
  14. WORKER_METRIC_TYPE type;
  15. NETDATA_DOUBLE custom_value;
  16. };
  17. struct worker {
  18. pid_t pid;
  19. const char *tag;
  20. const char *workname;
  21. // statistics controlled variables
  22. volatile usec_t statistics_last_checkpoint;
  23. size_t statistics_last_jobs_started;
  24. usec_t statistics_last_busy_time;
  25. // the worker controlled variables
  26. size_t worker_max_job_id;
  27. volatile size_t job_id;
  28. volatile size_t jobs_started;
  29. volatile usec_t busy_time;
  30. volatile usec_t last_action_timestamp;
  31. volatile char last_action;
  32. struct worker_job_type per_job_type[WORKER_UTILIZATION_MAX_JOB_TYPES];
  33. struct worker *next;
  34. struct worker *prev;
  35. };
  36. struct workers_workname { // this is what we add to JudyHS
  37. SPINLOCK spinlock;
  38. struct worker *base;
  39. };
  40. static struct workers_globals {
  41. SPINLOCK spinlock;
  42. Pvoid_t worknames_JudyHS;
  43. size_t memory;
  44. } workers_globals = { // workers globals, the base of all worknames
  45. .spinlock = NETDATA_SPINLOCK_INITIALIZER, // a lock for the worknames index
  46. .worknames_JudyHS = NULL, // the worknames index
  47. };
  48. static __thread struct worker *worker = NULL; // the current thread worker
  49. size_t workers_allocated_memory(void) {
  50. netdata_spinlock_lock(&workers_globals.spinlock);
  51. size_t memory = workers_globals.memory;
  52. netdata_spinlock_unlock(&workers_globals.spinlock);
  53. return memory;
  54. }
  55. void worker_register(const char *name) {
  56. if(unlikely(worker)) return;
  57. worker = callocz(1, sizeof(struct worker));
  58. worker->pid = gettid();
  59. worker->tag = strdupz(netdata_thread_tag());
  60. worker->workname = strdupz(name);
  61. usec_t now = now_monotonic_usec();
  62. worker->statistics_last_checkpoint = now;
  63. worker->last_action_timestamp = now;
  64. worker->last_action = WORKER_IDLE;
  65. size_t name_size = strlen(name) + 1;
  66. netdata_spinlock_lock(&workers_globals.spinlock);
  67. workers_globals.memory += sizeof(struct worker) + strlen(worker->tag) + 1 + strlen(worker->workname) + 1;
  68. Pvoid_t *PValue = JudyHSIns(&workers_globals.worknames_JudyHS, (void *)name, name_size, PJE0);
  69. struct workers_workname *workname = *PValue;
  70. if(!workname) {
  71. workname = mallocz(sizeof(struct workers_workname));
  72. netdata_spinlock_init(&workname->spinlock);
  73. workname->base = NULL;
  74. *PValue = workname;
  75. workers_globals.memory += sizeof(struct workers_workname) + JUDYHS_INDEX_SIZE_ESTIMATE(name_size);
  76. }
  77. netdata_spinlock_lock(&workname->spinlock);
  78. DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(workname->base, worker, prev, next);
  79. netdata_spinlock_unlock(&workname->spinlock);
  80. netdata_spinlock_unlock(&workers_globals.spinlock);
  81. }
  82. void worker_register_job_custom_metric(size_t job_id, const char *name, const char *units, WORKER_METRIC_TYPE type) {
  83. if(unlikely(!worker)) return;
  84. if(unlikely(job_id >= WORKER_UTILIZATION_MAX_JOB_TYPES)) {
  85. error("WORKER_UTILIZATION: job_id %zu is too big. Max is %zu", job_id, (size_t)(WORKER_UTILIZATION_MAX_JOB_TYPES - 1));
  86. return;
  87. }
  88. if(job_id > worker->worker_max_job_id)
  89. worker->worker_max_job_id = job_id;
  90. if(worker->per_job_type[job_id].name) {
  91. if(strcmp(string2str(worker->per_job_type[job_id].name), name) != 0 || worker->per_job_type[job_id].type != type || strcmp(string2str(worker->per_job_type[job_id].units), units) != 0)
  92. error("WORKER_UTILIZATION: duplicate job registration: worker '%s' job id %zu is '%s', ignoring the later '%s'", worker->workname, job_id, string2str(worker->per_job_type[job_id].name), name);
  93. return;
  94. }
  95. worker->per_job_type[job_id].name = string_strdupz(name);
  96. worker->per_job_type[job_id].units = string_strdupz(units);
  97. worker->per_job_type[job_id].type = type;
  98. }
  99. void worker_register_job_name(size_t job_id, const char *name) {
  100. worker_register_job_custom_metric(job_id, name, "", WORKER_METRIC_IDLE_BUSY);
  101. }
  102. void worker_unregister(void) {
  103. if(unlikely(!worker)) return;
  104. size_t workname_size = strlen(worker->workname) + 1;
  105. netdata_spinlock_lock(&workers_globals.spinlock);
  106. Pvoid_t *PValue = JudyHSGet(workers_globals.worknames_JudyHS, (void *)worker->workname, workname_size);
  107. if(PValue) {
  108. struct workers_workname *workname = *PValue;
  109. netdata_spinlock_lock(&workname->spinlock);
  110. DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(workname->base, worker, prev, next);
  111. netdata_spinlock_unlock(&workname->spinlock);
  112. if(!workname->base) {
  113. JudyHSDel(&workers_globals.worknames_JudyHS, (void *) worker->workname, workname_size, PJE0);
  114. freez(workname);
  115. workers_globals.memory -= sizeof(struct workers_workname) + JUDYHS_INDEX_SIZE_ESTIMATE(workname_size);
  116. }
  117. }
  118. workers_globals.memory -= sizeof(struct worker) + strlen(worker->tag) + 1 + strlen(worker->workname) + 1;
  119. netdata_spinlock_unlock(&workers_globals.spinlock);
  120. for(int i = 0; i < WORKER_UTILIZATION_MAX_JOB_TYPES ;i++) {
  121. string_freez(worker->per_job_type[i].name);
  122. string_freez(worker->per_job_type[i].units);
  123. }
  124. freez((void *)worker->tag);
  125. freez((void *)worker->workname);
  126. freez(worker);
  127. worker = NULL;
  128. }
  129. static inline void worker_is_idle_with_time(usec_t now) {
  130. usec_t delta = now - worker->last_action_timestamp;
  131. worker->busy_time += delta;
  132. worker->per_job_type[worker->job_id].worker_busy_time += delta;
  133. // the worker was busy
  134. // set it to idle before we set the timestamp
  135. worker->last_action = WORKER_IDLE;
  136. if(likely(worker->last_action_timestamp < now))
  137. worker->last_action_timestamp = now;
  138. }
  139. void worker_is_idle(void) {
  140. if(unlikely(!worker || worker->last_action != WORKER_BUSY)) return;
  141. worker_is_idle_with_time(now_monotonic_usec());
  142. }
  143. void worker_is_busy(size_t job_id) {
  144. if(unlikely(!worker || job_id >= WORKER_UTILIZATION_MAX_JOB_TYPES))
  145. return;
  146. usec_t now = now_monotonic_usec();
  147. if(worker->last_action == WORKER_BUSY)
  148. worker_is_idle_with_time(now);
  149. // the worker was idle
  150. // set the timestamp and then set it to busy
  151. worker->job_id = job_id;
  152. worker->per_job_type[job_id].worker_jobs_started++;
  153. worker->jobs_started++;
  154. worker->last_action_timestamp = now;
  155. worker->last_action = WORKER_BUSY;
  156. }
  157. void worker_set_metric(size_t job_id, NETDATA_DOUBLE value) {
  158. if(unlikely(!worker)) return;
  159. if(unlikely(job_id >= WORKER_UTILIZATION_MAX_JOB_TYPES))
  160. return;
  161. switch(worker->per_job_type[job_id].type) {
  162. case WORKER_METRIC_INCREMENT:
  163. worker->per_job_type[job_id].custom_value += value;
  164. break;
  165. case WORKER_METRIC_INCREMENTAL_TOTAL:
  166. case WORKER_METRIC_ABSOLUTE:
  167. default:
  168. worker->per_job_type[job_id].custom_value = value;
  169. break;
  170. }
  171. }
  172. // statistics interface
  173. void workers_foreach(const char *name, void (*callback)(
  174. void *data
  175. , pid_t pid
  176. , const char *thread_tag
  177. , size_t max_job_id
  178. , size_t utilization_usec
  179. , size_t duration_usec
  180. , size_t jobs_started, size_t is_running
  181. , STRING **job_types_names
  182. , STRING **job_types_units
  183. , WORKER_METRIC_TYPE *job_metric_types
  184. , size_t *job_types_jobs_started
  185. , usec_t *job_types_busy_time
  186. , NETDATA_DOUBLE *job_custom_values
  187. )
  188. , void *data) {
  189. netdata_spinlock_lock(&workers_globals.spinlock);
  190. usec_t busy_time, delta;
  191. size_t i, jobs_started, jobs_running;
  192. size_t workname_size = strlen(name) + 1;
  193. struct workers_workname *workname;
  194. Pvoid_t *PValue = JudyHSGet(workers_globals.worknames_JudyHS, (void *)name, workname_size);
  195. if(PValue) {
  196. workname = *PValue;
  197. netdata_spinlock_lock(&workname->spinlock);
  198. }
  199. else
  200. workname = NULL;
  201. netdata_spinlock_unlock(&workers_globals.spinlock);
  202. if(!workname)
  203. return;
  204. struct worker *p;
  205. DOUBLE_LINKED_LIST_FOREACH_FORWARD(workname->base, p, prev, next) {
  206. usec_t now = now_monotonic_usec();
  207. // find per job type statistics
  208. STRING *per_job_type_name[WORKER_UTILIZATION_MAX_JOB_TYPES];
  209. STRING *per_job_type_units[WORKER_UTILIZATION_MAX_JOB_TYPES];
  210. WORKER_METRIC_TYPE per_job_metric_type[WORKER_UTILIZATION_MAX_JOB_TYPES];
  211. size_t per_job_type_jobs_started[WORKER_UTILIZATION_MAX_JOB_TYPES];
  212. usec_t per_job_type_busy_time[WORKER_UTILIZATION_MAX_JOB_TYPES];
  213. NETDATA_DOUBLE per_job_custom_values[WORKER_UTILIZATION_MAX_JOB_TYPES];
  214. size_t max_job_id = p->worker_max_job_id;
  215. for(i = 0; i <= max_job_id ;i++) {
  216. per_job_type_name[i] = p->per_job_type[i].name;
  217. per_job_type_units[i] = p->per_job_type[i].units;
  218. per_job_metric_type[i] = p->per_job_type[i].type;
  219. switch(p->per_job_type[i].type) {
  220. default:
  221. case WORKER_METRIC_EMPTY: {
  222. per_job_type_jobs_started[i] = 0;
  223. per_job_type_busy_time[i] = 0;
  224. per_job_custom_values[i] = NAN;
  225. break;
  226. }
  227. case WORKER_METRIC_IDLE_BUSY: {
  228. size_t tmp_jobs_started = p->per_job_type[i].worker_jobs_started;
  229. per_job_type_jobs_started[i] = tmp_jobs_started - p->per_job_type[i].statistics_last_jobs_started;
  230. p->per_job_type[i].statistics_last_jobs_started = tmp_jobs_started;
  231. usec_t tmp_busy_time = p->per_job_type[i].worker_busy_time;
  232. per_job_type_busy_time[i] = tmp_busy_time - p->per_job_type[i].statistics_last_busy_time;
  233. p->per_job_type[i].statistics_last_busy_time = tmp_busy_time;
  234. per_job_custom_values[i] = NAN;
  235. break;
  236. }
  237. case WORKER_METRIC_ABSOLUTE: {
  238. per_job_type_jobs_started[i] = 0;
  239. per_job_type_busy_time[i] = 0;
  240. per_job_custom_values[i] = p->per_job_type[i].custom_value;
  241. break;
  242. }
  243. case WORKER_METRIC_INCREMENTAL_TOTAL:
  244. case WORKER_METRIC_INCREMENT: {
  245. per_job_type_jobs_started[i] = 0;
  246. per_job_type_busy_time[i] = 0;
  247. NETDATA_DOUBLE tmp_custom_value = p->per_job_type[i].custom_value;
  248. per_job_custom_values[i] = tmp_custom_value - p->per_job_type[i].statistics_last_custom_value;
  249. p->per_job_type[i].statistics_last_custom_value = tmp_custom_value;
  250. break;
  251. }
  252. }
  253. }
  254. // get a copy of the worker variables
  255. size_t worker_job_id = p->job_id;
  256. usec_t worker_busy_time = p->busy_time;
  257. size_t worker_jobs_started = p->jobs_started;
  258. char worker_last_action = p->last_action;
  259. usec_t worker_last_action_timestamp = p->last_action_timestamp;
  260. delta = now - p->statistics_last_checkpoint;
  261. p->statistics_last_checkpoint = now;
  262. // this is the only variable both the worker thread and the statistics thread are writing
  263. // we set this only when the worker is busy, so that the worker will not
  264. // accumulate all the busy time, but only the time after the point we collected statistics
  265. if(worker_last_action == WORKER_BUSY && p->last_action_timestamp == worker_last_action_timestamp && p->last_action == WORKER_BUSY)
  266. p->last_action_timestamp = now;
  267. // calculate delta busy time
  268. busy_time = worker_busy_time - p->statistics_last_busy_time;
  269. p->statistics_last_busy_time = worker_busy_time;
  270. // calculate delta jobs done
  271. jobs_started = worker_jobs_started - p->statistics_last_jobs_started;
  272. p->statistics_last_jobs_started = worker_jobs_started;
  273. jobs_running = 0;
  274. if(worker_last_action == WORKER_BUSY) {
  275. // the worker is still busy with something
  276. // let's add that busy time to the reported one
  277. usec_t dt = now - worker_last_action_timestamp;
  278. busy_time += dt;
  279. per_job_type_busy_time[worker_job_id] += dt;
  280. jobs_running = 1;
  281. }
  282. callback(data
  283. , p->pid
  284. , p->tag
  285. , max_job_id
  286. , busy_time
  287. , delta
  288. , jobs_started
  289. , jobs_running
  290. , per_job_type_name
  291. , per_job_type_units
  292. , per_job_metric_type
  293. , per_job_type_jobs_started
  294. , per_job_type_busy_time
  295. , per_job_custom_values
  296. );
  297. }
  298. netdata_spinlock_unlock(&workname->spinlock);
  299. }