send_metrics.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. # pylint: skip-file
  2. # flake8: noqa
  3. import datetime
  4. import itertools
  5. import json
  6. import pprint
  7. import random
  8. import string
  9. import click
  10. from arroyo.backends.kafka import KafkaPayload, KafkaProducer
  11. from arroyo.types import Topic
  12. from sentry.sentry_metrics.use_case_id_registry import UseCaseID
  13. make_counter_payload = lambda use_case, org_id, rand_str: {
  14. "name": f"c:{use_case}/{use_case}@none",
  15. "tags": {
  16. "environment": "production",
  17. "session.status": "init",
  18. f"metric_e2e_{use_case}_counter_k_{rand_str}": f"metric_e2e_{use_case}_counter_v_{rand_str}",
  19. },
  20. "timestamp": int(datetime.datetime.now(tz=datetime.UTC).timestamp()),
  21. "type": "c",
  22. "value": 1,
  23. "org_id": org_id,
  24. "retention_days": 90,
  25. "project_id": 3,
  26. }
  27. make_dist_payload = lambda use_case, org_id, rand_str, value_len: {
  28. "name": f"d:{use_case}/duration@second",
  29. "tags": {
  30. "environment": "production",
  31. "session.status": "healthy",
  32. f"metric_e2e_{use_case}_dist_k_{rand_str}": f"metric_e2e_{use_case}_dist_v_{rand_str}",
  33. },
  34. "timestamp": int(datetime.datetime.now(tz=datetime.UTC).timestamp()),
  35. "type": "d",
  36. "value": {"format": "array", "data": [random.random() for _ in range(value_len)]},
  37. "org_id": org_id,
  38. "retention_days": 90,
  39. "project_id": 3,
  40. }
  41. make_set_payload = lambda use_case, org_id, rand_str, value_len: {
  42. "name": f"s:{use_case}/error@none",
  43. "tags": {
  44. "environment": "production",
  45. "session.status": "errored",
  46. f"metric_e2e_{use_case}_set_k_{rand_str}": f"metric_e2e_{use_case}_set_v_{rand_str}",
  47. },
  48. "timestamp": int(datetime.datetime.now(tz=datetime.UTC).timestamp()),
  49. "type": "s",
  50. "value": {"format": "array", "data": [random.randint(0, 2048) for _ in range(value_len)]},
  51. "org_id": org_id,
  52. "retention_days": 90,
  53. "project_id": 3,
  54. }
  55. make_gauge_payload = lambda use_case, org_id, rand_str: {
  56. "name": f"s:{use_case}/error@none",
  57. "tags": {
  58. "environment": "production",
  59. "session.status": "errored",
  60. f"metric_e2e_{use_case}_set_k_{rand_str}": f"metric_e2e_{use_case}_set_v_{rand_str}",
  61. },
  62. "timestamp": int(datetime.datetime.now(tz=datetime.UTC).timestamp()),
  63. "type": "g",
  64. "value": {
  65. "min": 1,
  66. "max": 1,
  67. "sum": 1,
  68. "count": 1,
  69. "last": 1,
  70. },
  71. "org_id": org_id,
  72. "retention_days": 90,
  73. "project_id": 3,
  74. }
  75. make_psql = (
  76. lambda rand_str, is_generic: f"""
  77. SELECT string,
  78. organization_id,
  79. {"use_case_id," if is_generic else ""}
  80. date_added,
  81. last_seen
  82. FROM {"sentry_perfstringindexer" if is_generic else "sentry_stringindexer"}
  83. WHERE string ~ 'metric_e2e_.*{rand_str}';
  84. """
  85. )
  86. make_csql = lambda rand_str, is_generic: "UNION ALL".join(
  87. [
  88. f"""
  89. SELECT use_case_id,
  90. org_id,
  91. project_id,
  92. metric_id,
  93. timestamp,
  94. tags.key,
  95. tags.raw_value
  96. FROM {table_name}
  97. WHERE arrayExists(v -> match(v, 'metric_e2e_.*{rand_str}'), tags.raw_value)
  98. """
  99. for table_name in (
  100. [
  101. "generic_metric_counters_raw_local",
  102. "generic_metric_distributions_raw_local",
  103. "generic_metric_sets_raw_local",
  104. "generic_metric_gauges_raw_local",
  105. ]
  106. if is_generic
  107. else [
  108. "metrics_counters_v2_local",
  109. "metrics_distributions_v2_local",
  110. "metrics_sets_v2_local",
  111. ]
  112. )
  113. ]
  114. )
  115. def produce_msgs(messages, is_generic, host, dryrun, quiet):
  116. conf = {"bootstrap.servers": host}
  117. producer = KafkaProducer(conf)
  118. for i, message in enumerate(messages):
  119. print(f"{i + 1} / {len(messages)}")
  120. if not quiet:
  121. pprint.pprint(message)
  122. if not dryrun:
  123. producer.produce(
  124. Topic(name=("ingest-performance-metrics" if is_generic else "ingest-metrics")),
  125. KafkaPayload(key=None, value=json.dumps(message).encode("utf-8"), headers=[]),
  126. )
  127. print("Done")
  128. print()
  129. producer.close()
  130. @click.command()
  131. @click.option(
  132. "--use-cases",
  133. multiple=True,
  134. default=[
  135. use_case_id.value for use_case_id in UseCaseID if use_case_id is not UseCaseID.SESSIONS
  136. ],
  137. show_default=True,
  138. help="The use case IDs.",
  139. )
  140. @click.option("--rand-str", default=None, help="The random string prefix for each key value pairs.")
  141. @click.option(
  142. "--host", default="127.0.0.1:9092", show_default=True, help="The host and port for kafka."
  143. )
  144. @click.option(
  145. "--dryrun",
  146. "-d",
  147. is_flag=True,
  148. default=False,
  149. show_default=True,
  150. help="Generate the messages without sending them.",
  151. )
  152. @click.option(
  153. "--quiet",
  154. "-q",
  155. is_flag=True,
  156. default=False,
  157. show_default=True,
  158. help="Disable printing the messages.",
  159. )
  160. @click.option(
  161. "--start-org-id",
  162. default=1,
  163. show_default=True,
  164. help="Specify which org id(s) to start from.",
  165. )
  166. @click.option(
  167. "--end-org-id",
  168. default=1,
  169. show_default=True,
  170. help="Specify which org id(s) to end with.",
  171. )
  172. @click.option(
  173. "--num-bad-msg",
  174. default=0,
  175. show_default=True,
  176. help="Number of additional badly formatted metric messages to send.",
  177. )
  178. @click.option(
  179. "--value-len",
  180. default=8,
  181. show_default=True,
  182. help="Number of elements for metrics (sets and distributions).",
  183. )
  184. def main(
  185. use_cases, rand_str, host, dryrun, quiet, start_org_id, end_org_id, num_bad_msg, value_len
  186. ):
  187. if UseCaseID.SESSIONS.value in use_cases and len(use_cases) > 1:
  188. click.secho(
  189. "ERROR: UseCaseID.SESSIONS is in use_cases and there are more than 1 use cases",
  190. blink=True,
  191. bold=True,
  192. )
  193. exit(1)
  194. rand_str = rand_str or "".join(random.choices(string.ascii_uppercase + string.digits, k=8))
  195. is_generic = UseCaseID.SESSIONS.value not in use_cases
  196. messages = list(
  197. itertools.chain.from_iterable(
  198. (
  199. make_counter_payload(use_case, org, rand_str),
  200. make_dist_payload(use_case, org, rand_str, value_len),
  201. make_set_payload(use_case, org, rand_str, value_len),
  202. make_gauge_payload(use_case, org, rand_str),
  203. )
  204. for use_case in use_cases
  205. for org in range(start_org_id, end_org_id + 1)
  206. )
  207. )
  208. messages.extend([{"BAD_VALUE": rand_str, "idx": i} for i in range(num_bad_msg)])
  209. random.shuffle(messages)
  210. produce_msgs(messages, is_generic, host, dryrun, quiet)
  211. metrics_per_use_case = 4
  212. strs_per_use_case = 3
  213. print(
  214. f"Use the following SQL to verify postgres, "
  215. f"there should be {strs_per_use_case} strings for each use cases, "
  216. f"{strs_per_use_case * len(use_cases) * (end_org_id - start_org_id + 1)} in total."
  217. )
  218. print(make_psql(rand_str, is_generic))
  219. if is_generic:
  220. print(
  221. f"Use the following SQL to verify clickhouse, "
  222. f"there should be {metrics_per_use_case} metrics for each use cases, "
  223. f"{metrics_per_use_case * len(use_cases) * (end_org_id - start_org_id + 1)} in total."
  224. )
  225. print(make_csql(rand_str, is_generic))
  226. if __name__ == "__main__":
  227. main()