worker_utilization.c 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. #include "worker_utilization.h"
  2. #define WORKER_IDLE 'I'
  3. #define WORKER_BUSY 'B'
  4. struct worker_job_type {
  5. char name[WORKER_UTILIZATION_MAX_JOB_NAME_LENGTH + 1];
  6. // statistics controlled variables
  7. size_t statistics_last_jobs_started;
  8. usec_t statistics_last_busy_time;
  9. // worker controlled variables
  10. volatile size_t worker_jobs_started;
  11. volatile usec_t worker_busy_time;
  12. };
  13. struct worker {
  14. pid_t pid;
  15. const char *tag;
  16. const char *workname;
  17. uint32_t workname_hash;
  18. // statistics controlled variables
  19. volatile usec_t statistics_last_checkpoint;
  20. size_t statistics_last_jobs_started;
  21. usec_t statistics_last_busy_time;
  22. // the worker controlled variables
  23. volatile size_t job_id;
  24. volatile size_t jobs_started;
  25. volatile usec_t busy_time;
  26. volatile usec_t last_action_timestamp;
  27. volatile char last_action;
  28. struct worker_job_type per_job_type[WORKER_UTILIZATION_MAX_JOB_TYPES];
  29. struct worker *next;
  30. };
  31. static netdata_mutex_t base_lock = NETDATA_MUTEX_INITIALIZER;
  32. static struct worker *base = NULL;
  33. static __thread struct worker *worker = NULL;
  34. void worker_register(const char *workname) {
  35. if(unlikely(worker)) return;
  36. worker = callocz(1, sizeof(struct worker));
  37. worker->pid = gettid();
  38. worker->tag = strdupz(netdata_thread_tag());
  39. worker->workname = strdupz(workname);
  40. worker->workname_hash = simple_hash(worker->workname);
  41. usec_t now = now_realtime_usec();
  42. worker->statistics_last_checkpoint = now;
  43. worker->last_action_timestamp = now;
  44. worker->last_action = WORKER_IDLE;
  45. netdata_mutex_lock(&base_lock);
  46. worker->next = base;
  47. base = worker;
  48. netdata_mutex_unlock(&base_lock);
  49. }
  50. void worker_register_job_name(size_t job_id, const char *name) {
  51. if(unlikely(!worker)) return;
  52. if(unlikely(job_id >= WORKER_UTILIZATION_MAX_JOB_TYPES)) {
  53. error("WORKER_UTILIZATION: job_id %zu is too big. Max is %zu", job_id, (size_t)(WORKER_UTILIZATION_MAX_JOB_TYPES - 1));
  54. return;
  55. }
  56. if (*worker->per_job_type[job_id].name) {
  57. error("WORKER_UTILIZATION: duplicate job registration: worker '%s' job id %zu is '%s', ignoring '%s'", worker->workname, job_id, worker->per_job_type[job_id].name, name);
  58. return;
  59. }
  60. strncpy(worker->per_job_type[job_id].name, name, WORKER_UTILIZATION_MAX_JOB_NAME_LENGTH);
  61. }
  62. void worker_unregister(void) {
  63. if(unlikely(!worker)) return;
  64. netdata_mutex_lock(&base_lock);
  65. if(base == worker)
  66. base = worker->next;
  67. else {
  68. struct worker *p;
  69. for(p = base; p && p->next && p->next != worker ;p = p->next);
  70. if(p && p->next == worker)
  71. p->next = worker->next;
  72. }
  73. netdata_mutex_unlock(&base_lock);
  74. freez((void *)worker->tag);
  75. freez((void *)worker->workname);
  76. freez(worker);
  77. worker = NULL;
  78. }
  79. static inline void worker_is_idle_with_time(usec_t now) {
  80. usec_t delta = now - worker->last_action_timestamp;
  81. worker->busy_time += delta;
  82. worker->per_job_type[worker->job_id].worker_busy_time += delta;
  83. // the worker was busy
  84. // set it to idle before we set the timestamp
  85. worker->last_action = WORKER_IDLE;
  86. if(likely(worker->last_action_timestamp < now))
  87. worker->last_action_timestamp = now;
  88. }
  89. void worker_is_idle(void) {
  90. if(unlikely(!worker)) return;
  91. if(unlikely(worker->last_action != WORKER_BUSY)) return;
  92. worker_is_idle_with_time(now_realtime_usec());
  93. }
  94. void worker_is_busy(size_t job_id) {
  95. if(unlikely(!worker)) return;
  96. if(unlikely(job_id >= WORKER_UTILIZATION_MAX_JOB_TYPES))
  97. job_id = 0;
  98. usec_t now = now_realtime_usec();
  99. if(worker->last_action == WORKER_BUSY)
  100. worker_is_idle_with_time(now);
  101. // the worker was idle
  102. // set the timestamp and then set it to busy
  103. worker->job_id = job_id;
  104. worker->per_job_type[job_id].worker_jobs_started++;
  105. worker->jobs_started++;
  106. worker->last_action_timestamp = now;
  107. worker->last_action = WORKER_BUSY;
  108. }
  109. // statistics interface
  110. void workers_foreach(const char *workname, void (*callback)(void *data, pid_t pid, const char *thread_tag, size_t utilization_usec, size_t duration_usec, size_t jobs_started, size_t is_running, const char **job_types_names, size_t *job_types_jobs_started, usec_t *job_types_busy_time), void *data) {
  111. netdata_mutex_lock(&base_lock);
  112. uint32_t hash = simple_hash(workname);
  113. usec_t busy_time, delta;
  114. size_t i, jobs_started, jobs_running;
  115. struct worker *p;
  116. for(p = base; p ; p = p->next) {
  117. if(hash != p->workname_hash || strcmp(workname, p->workname)) continue;
  118. usec_t now = now_realtime_usec();
  119. // find per job type statistics
  120. const char *per_job_type_name[WORKER_UTILIZATION_MAX_JOB_TYPES];
  121. size_t per_job_type_jobs_started[WORKER_UTILIZATION_MAX_JOB_TYPES];
  122. usec_t per_job_type_busy_time[WORKER_UTILIZATION_MAX_JOB_TYPES];
  123. for(i = 0; i < WORKER_UTILIZATION_MAX_JOB_TYPES ;i++) {
  124. per_job_type_name[i] = p->per_job_type[i].name;
  125. size_t tmp_jobs_started = p->per_job_type[i].worker_jobs_started;
  126. per_job_type_jobs_started[i] = tmp_jobs_started - p->per_job_type[i].statistics_last_jobs_started;
  127. p->per_job_type[i].statistics_last_jobs_started = tmp_jobs_started;
  128. usec_t tmp_busy_time = p->per_job_type[i].worker_busy_time;
  129. per_job_type_busy_time[i] = tmp_busy_time - p->per_job_type[i].statistics_last_busy_time;
  130. p->per_job_type[i].statistics_last_busy_time = tmp_busy_time;
  131. }
  132. // get a copy of the worker variables
  133. size_t worker_job_id = p->job_id;
  134. usec_t worker_busy_time = p->busy_time;
  135. size_t worker_jobs_started = p->jobs_started;
  136. char worker_last_action = p->last_action;
  137. usec_t worker_last_action_timestamp = p->last_action_timestamp;
  138. delta = now - p->statistics_last_checkpoint;
  139. p->statistics_last_checkpoint = now;
  140. // this is the only variable both the worker thread and the statistics thread are writing
  141. // we set this only when the worker is busy, so that the worker will not
  142. // accumulate all the busy time, but only the time after the point we collected statistics
  143. if(worker_last_action == WORKER_BUSY && p->last_action_timestamp == worker_last_action_timestamp && p->last_action == WORKER_BUSY)
  144. p->last_action_timestamp = now;
  145. // calculate delta busy time
  146. busy_time = worker_busy_time - p->statistics_last_busy_time;
  147. p->statistics_last_busy_time = worker_busy_time;
  148. // calculate delta jobs done
  149. jobs_started = worker_jobs_started - p->statistics_last_jobs_started;
  150. p->statistics_last_jobs_started = worker_jobs_started;
  151. jobs_running = 0;
  152. if(worker_last_action == WORKER_BUSY) {
  153. // the worker is still busy with something
  154. // let's add that busy time to the reported one
  155. usec_t dt = now - worker_last_action_timestamp;
  156. busy_time += dt;
  157. per_job_type_busy_time[worker_job_id] += dt;
  158. jobs_running = 1;
  159. }
  160. callback(data, p->pid, p->tag, busy_time, delta, jobs_started, jobs_running, per_job_type_name, per_job_type_jobs_started, per_job_type_busy_time);
  161. }
  162. netdata_mutex_unlock(&base_lock);
  163. }