tests.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. from collections.abc import Iterable, Mapping
  2. from typing import Optional
  3. from django.urls import reverse
  4. from model_bakery import baker
  5. from prometheus_client import Metric
  6. from prometheus_client.parser import text_string_to_metric_families
  7. from rest_framework import status
  8. from rest_framework.test import APIClient, APITestCase
  9. from .metrics import clear_metrics_cache, organizations_metric, projects_metric
  10. def get_sample_value(
  11. metric_families: Iterable[Metric],
  12. metric_name: str,
  13. metric_type: str,
  14. labels: Mapping[str, str],
  15. ) -> Optional[float]:
  16. for metric_family in metric_families:
  17. if metric_family.name != metric_name or metric_family.type != metric_type:
  18. continue
  19. for metric in metric_family.samples:
  20. if metric[1] != labels:
  21. continue
  22. return metric.value
  23. return None
  24. def parse_prometheus_text(text: str) -> list[Metric]:
  25. parser = text_string_to_metric_families(text)
  26. return list(parser)
  27. class ObservabilityAPITestCase(APITestCase):
  28. def setUp(self):
  29. self.client = APIClient()
  30. self.user = baker.make("users.user", is_staff=True)
  31. self.client.force_login(self.user)
  32. self.url = reverse("prometheus-django-metrics")
  33. def _get_metrics(self) -> list[Metric]:
  34. resp = self.client.get(self.url)
  35. return parse_prometheus_text(resp.content.decode("utf-8"))
  36. def test_get_metrics_and_cache(self):
  37. clear_metrics_cache()
  38. with self.assertNumQueries(2):
  39. resp = self.client.get(self.url)
  40. self.assertEqual(resp.status_code, status.HTTP_200_OK)
  41. with self.assertNumQueries(1):
  42. resp = self.client.get(self.url)
  43. self.assertEqual(resp.status_code, status.HTTP_200_OK)
  44. def test_org_metric(self):
  45. before_orgs_metric = get_sample_value(
  46. self._get_metrics(),
  47. organizations_metric._name,
  48. organizations_metric._type,
  49. {},
  50. )
  51. # create new org; must invalidate the cache
  52. org = baker.make("organizations_ext.Organization")
  53. metrics = self._get_metrics()
  54. orgs_metric = get_sample_value(
  55. metrics, organizations_metric._name, organizations_metric._type, {}
  56. )
  57. self.assertEqual(orgs_metric, before_orgs_metric + 1)
  58. # delete org and test again
  59. org.delete()
  60. metrics = self._get_metrics()
  61. orgs_metric = get_sample_value(
  62. metrics, organizations_metric._name, organizations_metric._type, {}
  63. )
  64. self.assertEqual(orgs_metric, before_orgs_metric)
  65. def test_project_metric(self):
  66. # create new org
  67. org = baker.make("organizations_ext.Organization")
  68. # no projects yet
  69. metrics = self._get_metrics()
  70. projs_metric = get_sample_value(
  71. metrics,
  72. projects_metric._name,
  73. projects_metric._type,
  74. {"organization": org.slug},
  75. )
  76. self.assertEqual(projs_metric, 0)
  77. # create new project
  78. proj = baker.make("projects.Project", organization=org)
  79. # test
  80. metrics = self._get_metrics()
  81. projs_metric = get_sample_value(
  82. metrics,
  83. projects_metric._name,
  84. projects_metric._type,
  85. {"organization": org.slug},
  86. )
  87. self.assertEqual(projs_metric, 1)
  88. # delete project
  89. proj.force_delete()
  90. # test
  91. metrics = self._get_metrics()
  92. projs_metric = get_sample_value(
  93. metrics,
  94. projects_metric._name,
  95. projects_metric._type,
  96. {"organization": org.slug},
  97. )
  98. self.assertEqual(projs_metric, 0)