server_metrics.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. /*
  2. *
  3. * Copyright 2023 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package orca
  19. import (
  20. "sync"
  21. v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
  22. )
  23. // ServerMetrics is the data returned from a server to a client to describe the
  24. // current state of the server and/or the cost of a request when used per-call.
  25. type ServerMetrics struct {
  26. CPUUtilization float64 // CPU utilization: [0, inf); unset=-1
  27. MemUtilization float64 // Memory utilization: [0, 1.0]; unset=-1
  28. AppUtilization float64 // Application utilization: [0, inf); unset=-1
  29. QPS float64 // queries per second: [0, inf); unset=-1
  30. EPS float64 // errors per second: [0, inf); unset=-1
  31. // The following maps must never be nil.
  32. Utilization map[string]float64 // Custom fields: [0, 1.0]
  33. RequestCost map[string]float64 // Custom fields: [0, inf); not sent OOB
  34. NamedMetrics map[string]float64 // Custom fields: [0, inf); not sent OOB
  35. }
  36. // toLoadReportProto dumps sm as an OrcaLoadReport proto.
  37. func (sm *ServerMetrics) toLoadReportProto() *v3orcapb.OrcaLoadReport {
  38. ret := &v3orcapb.OrcaLoadReport{
  39. Utilization: sm.Utilization,
  40. RequestCost: sm.RequestCost,
  41. NamedMetrics: sm.NamedMetrics,
  42. }
  43. if sm.CPUUtilization != -1 {
  44. ret.CpuUtilization = sm.CPUUtilization
  45. }
  46. if sm.MemUtilization != -1 {
  47. ret.MemUtilization = sm.MemUtilization
  48. }
  49. if sm.AppUtilization != -1 {
  50. ret.ApplicationUtilization = sm.AppUtilization
  51. }
  52. if sm.QPS != -1 {
  53. ret.RpsFractional = sm.QPS
  54. }
  55. if sm.EPS != -1 {
  56. ret.Eps = sm.EPS
  57. }
  58. return ret
  59. }
  60. // merge merges o into sm, overwriting any values present in both.
  61. func (sm *ServerMetrics) merge(o *ServerMetrics) {
  62. mergeMap(sm.Utilization, o.Utilization)
  63. mergeMap(sm.RequestCost, o.RequestCost)
  64. mergeMap(sm.NamedMetrics, o.NamedMetrics)
  65. if o.CPUUtilization != -1 {
  66. sm.CPUUtilization = o.CPUUtilization
  67. }
  68. if o.MemUtilization != -1 {
  69. sm.MemUtilization = o.MemUtilization
  70. }
  71. if o.AppUtilization != -1 {
  72. sm.AppUtilization = o.AppUtilization
  73. }
  74. if o.QPS != -1 {
  75. sm.QPS = o.QPS
  76. }
  77. if o.EPS != -1 {
  78. sm.EPS = o.EPS
  79. }
  80. }
  81. func mergeMap(a, b map[string]float64) {
  82. for k, v := range b {
  83. a[k] = v
  84. }
  85. }
  86. // ServerMetricsRecorder allows for recording and providing out of band server
  87. // metrics.
  88. type ServerMetricsRecorder interface {
  89. ServerMetricsProvider
  90. // SetCPUUtilization sets the CPU utilization server metric. Must be
  91. // greater than zero.
  92. SetCPUUtilization(float64)
  93. // DeleteCPUUtilization deletes the CPU utilization server metric to
  94. // prevent it from being sent.
  95. DeleteCPUUtilization()
  96. // SetMemoryUtilization sets the memory utilization server metric. Must be
  97. // in the range [0, 1].
  98. SetMemoryUtilization(float64)
  99. // DeleteMemoryUtilization deletes the memory utiliztion server metric to
  100. // prevent it from being sent.
  101. DeleteMemoryUtilization()
  102. // SetApplicationUtilization sets the application utilization server
  103. // metric. Must be greater than zero.
  104. SetApplicationUtilization(float64)
  105. // DeleteApplicationUtilization deletes the application utilization server
  106. // metric to prevent it from being sent.
  107. DeleteApplicationUtilization()
  108. // SetQPS sets the Queries Per Second server metric. Must be greater than
  109. // zero.
  110. SetQPS(float64)
  111. // DeleteQPS deletes the Queries Per Second server metric to prevent it
  112. // from being sent.
  113. DeleteQPS()
  114. // SetEPS sets the Errors Per Second server metric. Must be greater than
  115. // zero.
  116. SetEPS(float64)
  117. // DeleteEPS deletes the Errors Per Second server metric to prevent it from
  118. // being sent.
  119. DeleteEPS()
  120. // SetNamedUtilization sets the named utilization server metric for the
  121. // name provided. val must be in the range [0, 1].
  122. SetNamedUtilization(name string, val float64)
  123. // DeleteNamedUtilization deletes the named utilization server metric for
  124. // the name provided to prevent it from being sent.
  125. DeleteNamedUtilization(name string)
  126. }
  127. type serverMetricsRecorder struct {
  128. mu sync.Mutex // protects state
  129. state *ServerMetrics // the current metrics
  130. }
  131. // NewServerMetricsRecorder returns an in-memory store for ServerMetrics and
  132. // allows for safe setting and retrieving of ServerMetrics. Also implements
  133. // ServerMetricsProvider for use with NewService.
  134. func NewServerMetricsRecorder() ServerMetricsRecorder {
  135. return newServerMetricsRecorder()
  136. }
  137. func newServerMetricsRecorder() *serverMetricsRecorder {
  138. return &serverMetricsRecorder{
  139. state: &ServerMetrics{
  140. CPUUtilization: -1,
  141. MemUtilization: -1,
  142. AppUtilization: -1,
  143. QPS: -1,
  144. EPS: -1,
  145. Utilization: make(map[string]float64),
  146. RequestCost: make(map[string]float64),
  147. NamedMetrics: make(map[string]float64),
  148. },
  149. }
  150. }
  151. // ServerMetrics returns a copy of the current ServerMetrics.
  152. func (s *serverMetricsRecorder) ServerMetrics() *ServerMetrics {
  153. s.mu.Lock()
  154. defer s.mu.Unlock()
  155. return &ServerMetrics{
  156. CPUUtilization: s.state.CPUUtilization,
  157. MemUtilization: s.state.MemUtilization,
  158. AppUtilization: s.state.AppUtilization,
  159. QPS: s.state.QPS,
  160. EPS: s.state.EPS,
  161. Utilization: copyMap(s.state.Utilization),
  162. RequestCost: copyMap(s.state.RequestCost),
  163. NamedMetrics: copyMap(s.state.NamedMetrics),
  164. }
  165. }
  166. func copyMap(m map[string]float64) map[string]float64 {
  167. ret := make(map[string]float64, len(m))
  168. for k, v := range m {
  169. ret[k] = v
  170. }
  171. return ret
  172. }
  173. // SetCPUUtilization records a measurement for the CPU utilization metric.
  174. func (s *serverMetricsRecorder) SetCPUUtilization(val float64) {
  175. if val < 0 {
  176. if logger.V(2) {
  177. logger.Infof("Ignoring CPU Utilization value out of range: %v", val)
  178. }
  179. return
  180. }
  181. s.mu.Lock()
  182. defer s.mu.Unlock()
  183. s.state.CPUUtilization = val
  184. }
  185. // DeleteCPUUtilization deletes the relevant server metric to prevent it from
  186. // being sent.
  187. func (s *serverMetricsRecorder) DeleteCPUUtilization() {
  188. s.mu.Lock()
  189. defer s.mu.Unlock()
  190. s.state.CPUUtilization = -1
  191. }
  192. // SetMemoryUtilization records a measurement for the memory utilization metric.
  193. func (s *serverMetricsRecorder) SetMemoryUtilization(val float64) {
  194. if val < 0 || val > 1 {
  195. if logger.V(2) {
  196. logger.Infof("Ignoring Memory Utilization value out of range: %v", val)
  197. }
  198. return
  199. }
  200. s.mu.Lock()
  201. defer s.mu.Unlock()
  202. s.state.MemUtilization = val
  203. }
  204. // DeleteMemoryUtilization deletes the relevant server metric to prevent it
  205. // from being sent.
  206. func (s *serverMetricsRecorder) DeleteMemoryUtilization() {
  207. s.mu.Lock()
  208. defer s.mu.Unlock()
  209. s.state.MemUtilization = -1
  210. }
  211. // SetApplicationUtilization records a measurement for a generic utilization
  212. // metric.
  213. func (s *serverMetricsRecorder) SetApplicationUtilization(val float64) {
  214. if val < 0 {
  215. if logger.V(2) {
  216. logger.Infof("Ignoring Application Utilization value out of range: %v", val)
  217. }
  218. return
  219. }
  220. s.mu.Lock()
  221. defer s.mu.Unlock()
  222. s.state.AppUtilization = val
  223. }
  224. // DeleteApplicationUtilization deletes the relevant server metric to prevent
  225. // it from being sent.
  226. func (s *serverMetricsRecorder) DeleteApplicationUtilization() {
  227. s.mu.Lock()
  228. defer s.mu.Unlock()
  229. s.state.AppUtilization = -1
  230. }
  231. // SetQPS records a measurement for the QPS metric.
  232. func (s *serverMetricsRecorder) SetQPS(val float64) {
  233. if val < 0 {
  234. if logger.V(2) {
  235. logger.Infof("Ignoring QPS value out of range: %v", val)
  236. }
  237. return
  238. }
  239. s.mu.Lock()
  240. defer s.mu.Unlock()
  241. s.state.QPS = val
  242. }
  243. // DeleteQPS deletes the relevant server metric to prevent it from being sent.
  244. func (s *serverMetricsRecorder) DeleteQPS() {
  245. s.mu.Lock()
  246. defer s.mu.Unlock()
  247. s.state.QPS = -1
  248. }
  249. // SetEPS records a measurement for the EPS metric.
  250. func (s *serverMetricsRecorder) SetEPS(val float64) {
  251. if val < 0 {
  252. if logger.V(2) {
  253. logger.Infof("Ignoring EPS value out of range: %v", val)
  254. }
  255. return
  256. }
  257. s.mu.Lock()
  258. defer s.mu.Unlock()
  259. s.state.EPS = val
  260. }
  261. // DeleteEPS deletes the relevant server metric to prevent it from being sent.
  262. func (s *serverMetricsRecorder) DeleteEPS() {
  263. s.mu.Lock()
  264. defer s.mu.Unlock()
  265. s.state.EPS = -1
  266. }
  267. // SetNamedUtilization records a measurement for a utilization metric uniquely
  268. // identifiable by name.
  269. func (s *serverMetricsRecorder) SetNamedUtilization(name string, val float64) {
  270. if val < 0 || val > 1 {
  271. if logger.V(2) {
  272. logger.Infof("Ignoring Named Utilization value out of range: %v", val)
  273. }
  274. return
  275. }
  276. s.mu.Lock()
  277. defer s.mu.Unlock()
  278. s.state.Utilization[name] = val
  279. }
  280. // DeleteNamedUtilization deletes any previously recorded measurement for a
  281. // utilization metric uniquely identifiable by name.
  282. func (s *serverMetricsRecorder) DeleteNamedUtilization(name string) {
  283. s.mu.Lock()
  284. defer s.mu.Unlock()
  285. delete(s.state.Utilization, name)
  286. }
  287. // SetRequestCost records a measurement for a utilization metric uniquely
  288. // identifiable by name.
  289. func (s *serverMetricsRecorder) SetRequestCost(name string, val float64) {
  290. s.mu.Lock()
  291. defer s.mu.Unlock()
  292. s.state.RequestCost[name] = val
  293. }
  294. // DeleteRequestCost deletes any previously recorded measurement for a
  295. // utilization metric uniquely identifiable by name.
  296. func (s *serverMetricsRecorder) DeleteRequestCost(name string) {
  297. s.mu.Lock()
  298. defer s.mu.Unlock()
  299. delete(s.state.RequestCost, name)
  300. }
  301. // SetNamedMetric records a measurement for a utilization metric uniquely
  302. // identifiable by name.
  303. func (s *serverMetricsRecorder) SetNamedMetric(name string, val float64) {
  304. s.mu.Lock()
  305. defer s.mu.Unlock()
  306. s.state.NamedMetrics[name] = val
  307. }
  308. // DeleteNamedMetric deletes any previously recorded measurement for a
  309. // utilization metric uniquely identifiable by name.
  310. func (s *serverMetricsRecorder) DeleteNamedMetric(name string) {
  311. s.mu.Lock()
  312. defer s.mu.Unlock()
  313. delete(s.state.NamedMetrics, name)
  314. }