send_metrics.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. # pylint: skip-file
  2. # flake8: noqa
  3. """
  4. Script that sends generic metrics messages to sentry locally
  5. Overview
  6. This script is designed to be used when creating a new use case ID for the first
  7. time for the generic metrics platform.
  8. Usage
  9. python send_metrics.py
  10. Without any command line argument, the script will send 3 metrics
  11. (counter/set/distribution) for each use case ID registered in
  12. src/sentry/sentry_metrics/use_case_id_registry.py.
  13. python send_metrics.py hello world
  14. The script will treat any arguments supplied as a use case ID, and send 3 metrics
  15. (counter/set/distribution) for each use case ID specified.
  16. """
  17. import datetime
  18. import itertools
  19. import json
  20. import pprint
  21. import random
  22. import string
  23. import sys
  24. from arroyo.backends.kafka import KafkaPayload, KafkaProducer
  25. from arroyo.types import Topic
  26. from sentry.sentry_metrics.use_case_id_registry import UseCaseID
  27. BOOTSTRAP_HOST = "127.0.0.1:9092"
  28. TOPIC_NAME = "ingest-performance-metrics"
  29. conf = {"bootstrap.servers": BOOTSTRAP_HOST}
  30. make_counter_payload = lambda use_case, rand_str: {
  31. "name": f"c:{use_case}/{use_case}@none",
  32. "tags": {
  33. "environment": "production",
  34. "session.status": "init",
  35. f"gen_metric_e2e_{use_case}_counter_k_{rand_str}": f"gen_metric_e2e_{use_case}_counter_v_{rand_str}",
  36. },
  37. "timestamp": int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp()),
  38. "type": "c",
  39. "value": 1,
  40. "org_id": 1,
  41. "retention_days": 90,
  42. "project_id": 3,
  43. }
  44. make_dist_payload = lambda use_case, rand_str: {
  45. "name": f"d:{use_case}/duration@second",
  46. "tags": {
  47. "environment": "production",
  48. "session.status": "healthy",
  49. f"gen_metric_e2e_{use_case}_dist_k_{rand_str}": f"gen_metric_e2e_{use_case}_dist_v_{rand_str}",
  50. },
  51. "timestamp": int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp()),
  52. "type": "d",
  53. "value": [4, 5, 6],
  54. "org_id": 1,
  55. "retention_days": 90,
  56. "project_id": 3,
  57. }
  58. make_set_payload = lambda use_case, rand_str: {
  59. "name": f"s:{use_case}/error@none",
  60. "tags": {
  61. "environment": "production",
  62. "session.status": "errored",
  63. f"gen_metric_e2e_{use_case}_set_k_{rand_str}": f"gen_metric_e2e_{use_case}_set_v_{rand_str}",
  64. },
  65. "timestamp": int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp()),
  66. "type": "s",
  67. "value": [3],
  68. "org_id": 1,
  69. "retention_days": 90,
  70. "project_id": 3,
  71. }
  72. make_psql = (
  73. lambda rand_str: f"""
  74. SELECT string,
  75. organization_id,
  76. date_added,
  77. use_case_id
  78. FROM sentry_perfstringindexer
  79. WHERE string ~ 'gen_metric_e2e_.*{rand_str}';
  80. """
  81. )
  82. make_csql = lambda rand_str: "UNION ALL".join(
  83. [
  84. f"""
  85. SELECT use_case_id,
  86. org_id,
  87. project_id,
  88. metric_id,
  89. timestamp,
  90. tags.key,
  91. tags.raw_value
  92. FROM {table_name}
  93. WHERE arrayExists(v -> match(v, 'gen_metric_e2e_.*{rand_str}'), tags.raw_value)
  94. """
  95. for table_name in [
  96. "generic_metric_counters_raw_local",
  97. "generic_metric_distributions_raw_local",
  98. "generic_metric_sets_raw_local",
  99. ]
  100. ]
  101. )
  102. def produce_msgs(messages):
  103. producer = KafkaProducer(conf)
  104. for i, message in enumerate(messages):
  105. print(f"Sending message {i + 1} of {len(messages)}:")
  106. pprint.pprint(message)
  107. producer.produce(
  108. Topic(name=TOPIC_NAME),
  109. KafkaPayload(key=None, value=json.dumps(message).encode("utf-8"), headers=[]),
  110. )
  111. print("Done")
  112. print()
  113. producer.close()
  114. if __name__ == "__main__":
  115. rand_str = "".join(random.choices(string.ascii_uppercase + string.digits, k=8))
  116. use_cases = (
  117. [use_case_id.value for use_case_id in UseCaseID if use_case_id is not UseCaseID.SESSIONS]
  118. if len(sys.argv) == 1
  119. else sys.argv[1:]
  120. )
  121. messages = list(
  122. itertools.chain.from_iterable(
  123. (
  124. make_counter_payload(use_case, rand_str),
  125. make_dist_payload(use_case, rand_str),
  126. make_set_payload(use_case, rand_str),
  127. )
  128. for use_case in use_cases
  129. )
  130. )
  131. random.shuffle(messages)
  132. produce_msgs(messages)
  133. print(
  134. f"Use the following SQL to verify postgres, there should be {(strs_per_use_case := 6)} strings for each use cases, {strs_per_use_case * len(use_cases)} in total."
  135. )
  136. print(make_psql(rand_str))
  137. print(
  138. f"Use the following SQL to verify clickhouse, there should be {(metrics_per_use_case := 3)} metrics for each use cases, {metrics_per_use_case * len(use_cases)} in total."
  139. )
  140. print(make_csql(rand_str))