send_metrics.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. # pylint: skip-file
  2. # flake8: noqa
  3. import base64
  4. import datetime
  5. import functools
  6. import itertools
  7. import json
  8. import pprint
  9. import random
  10. import string
  11. import struct
  12. import click
  13. from arroyo.backends.kafka import KafkaPayload, KafkaProducer
  14. from arroyo.types import Topic
  15. from sentry.sentry_metrics.use_case_id_registry import UseCaseID
  16. def make_counter_payload(use_case, org_id, rand_str):
  17. return {
  18. "name": f"c:{use_case}/{use_case}@none",
  19. "tags": {
  20. "environment": "production",
  21. "session.status": "init",
  22. f"metric_e2e_{use_case}_counter_k_{rand_str}": f"metric_e2e_{use_case}_counter_v_{rand_str}",
  23. },
  24. "timestamp": int(datetime.datetime.now(tz=datetime.UTC).timestamp()),
  25. "type": "c",
  26. "value": 1,
  27. "org_id": org_id,
  28. "retention_days": 90,
  29. "project_id": 3,
  30. }
  31. def make_dist_payload(use_case, org_id, rand_str, value_len, b64_encode):
  32. nums = [random.random() for _ in range(value_len)]
  33. print(f"nums: {nums}")
  34. return {
  35. "name": f"d:{use_case}/duration@second",
  36. "tags": {
  37. "environment": "production",
  38. "session.status": "healthy",
  39. f"metric_e2e_{use_case}_dist_k_{rand_str}": f"metric_e2e_{use_case}_dist_v_{rand_str}",
  40. },
  41. "timestamp": int(datetime.datetime.now(tz=datetime.UTC).timestamp()),
  42. "type": "d",
  43. "value": (
  44. {
  45. "format": "base64",
  46. "data": base64.b64encode(struct.pack(f"<{len(nums)}d", *nums))
  47. .replace(b"=", b"")
  48. .decode("ascii"),
  49. }
  50. if b64_encode
  51. else {
  52. "format": "zstd",
  53. "data": "KLUv/QBYrQAAcAAA8D8AQAAAAAAAAAhAAgBgRgCw",
  54. }
  55. ),
  56. "org_id": org_id,
  57. "retention_days": 90,
  58. "project_id": 3,
  59. }
  60. def make_set_payload(use_case, org_id, rand_str, value_len, b64_encode):
  61. INT_WIDTH = 4
  62. nums = [random.randint(0, 2048) for _ in range(value_len)]
  63. return {
  64. "name": f"s:{use_case}/error@none",
  65. "tags": {
  66. "environment": "production",
  67. "session.status": "errored",
  68. f"metric_e2e_{use_case}_set_k_{rand_str}": f"metric_e2e_{use_case}_set_v_{rand_str}",
  69. },
  70. "timestamp": int(datetime.datetime.now(tz=datetime.UTC).timestamp()),
  71. "type": "s",
  72. "value": (
  73. {
  74. "format": "base64",
  75. "data": base64.b64encode(
  76. b"".join([num.to_bytes(INT_WIDTH, byteorder="little") for num in nums])
  77. )
  78. .replace(b"=", b"")
  79. .decode("ascii"),
  80. }
  81. if b64_encode
  82. else {
  83. "format": "array",
  84. "data": nums,
  85. }
  86. ),
  87. "org_id": org_id,
  88. "retention_days": 90,
  89. "project_id": 3,
  90. }
  91. def make_gauge_payload(use_case, org_id, rand_str):
  92. return {
  93. "name": f"s:{use_case}/error@none",
  94. "tags": {
  95. "environment": "production",
  96. "session.status": "errored",
  97. f"metric_e2e_{use_case}_gauge_k_{rand_str}": f"metric_e2e_{use_case}_gauge_v_{rand_str}",
  98. },
  99. "timestamp": int(datetime.datetime.now(tz=datetime.UTC).timestamp()),
  100. "type": "g",
  101. "value": {
  102. "min": 1,
  103. "max": 1,
  104. "sum": 1,
  105. "count": 1,
  106. "last": 1,
  107. },
  108. "org_id": org_id,
  109. "retention_days": 90,
  110. "project_id": 3,
  111. }
  112. make_psql = (
  113. lambda rand_str, is_generic: f"""
  114. SELECT string,
  115. organization_id,
  116. {"use_case_id," if is_generic else ""}
  117. date_added,
  118. last_seen
  119. FROM {"sentry_perfstringindexer" if is_generic else "sentry_stringindexer"}
  120. WHERE string ~ 'metric_e2e_.*{rand_str}';
  121. """
  122. )
  123. make_csql = lambda rand_str, is_generic: "UNION ALL".join(
  124. [
  125. f"""
  126. SELECT use_case_id,
  127. org_id,
  128. project_id,
  129. metric_id,
  130. timestamp,
  131. tags.key,
  132. tags.raw_value
  133. FROM {table_name}
  134. WHERE arrayExists(v -> match(v, 'metric_e2e_.*{rand_str}'), tags.raw_value)
  135. """
  136. for table_name in (
  137. [
  138. "generic_metric_counters_raw_local",
  139. "generic_metric_distributions_raw_local",
  140. "generic_metric_sets_raw_local",
  141. "generic_metric_gauges_raw_local",
  142. ]
  143. if is_generic
  144. else [
  145. "metrics_counters_v2_local",
  146. "metrics_distributions_v2_local",
  147. "metrics_sets_v2_local",
  148. ]
  149. )
  150. ]
  151. )
  152. def produce_msgs(messages, is_generic, host, dryrun, quiet):
  153. conf = {"bootstrap.servers": host}
  154. producer = KafkaProducer(conf)
  155. for i, message in enumerate(messages):
  156. print(f"{i + 1} / {len(messages)}")
  157. if not quiet:
  158. pprint.pprint(message)
  159. if not dryrun:
  160. producer.produce(
  161. Topic(name=("ingest-performance-metrics" if is_generic else "ingest-metrics")),
  162. KafkaPayload(key=None, value=json.dumps(message).encode("utf-8"), headers=[]),
  163. )
  164. print("Done")
  165. print()
  166. producer.close()
  167. @click.command()
  168. @click.option(
  169. "--metric-types", default="cdsg", show_default=True, help="The types of metrics to send"
  170. )
  171. @click.option(
  172. "--use-cases",
  173. multiple=True,
  174. default=[
  175. use_case_id.value for use_case_id in UseCaseID if use_case_id is not UseCaseID.SESSIONS
  176. ],
  177. show_default=True,
  178. help="The use case IDs.",
  179. )
  180. @click.option("--rand-str", default=None, help="The random string prefix for each key value pairs.")
  181. @click.option(
  182. "--host", default="127.0.0.1:9092", show_default=True, help="The host and port for kafka."
  183. )
  184. @click.option(
  185. "--dryrun",
  186. "-d",
  187. is_flag=True,
  188. default=False,
  189. show_default=True,
  190. help="Generate the messages without sending them.",
  191. )
  192. @click.option(
  193. "--quiet",
  194. "-q",
  195. is_flag=True,
  196. default=False,
  197. show_default=True,
  198. help="Disable printing the messages.",
  199. )
  200. @click.option(
  201. "--start-org-id",
  202. default=1,
  203. show_default=True,
  204. help="Specify which org id(s) to start from.",
  205. )
  206. @click.option(
  207. "--end-org-id",
  208. default=1,
  209. show_default=True,
  210. help="Specify which org id(s) to end with.",
  211. )
  212. @click.option(
  213. "--num-bad-msg",
  214. default=0,
  215. show_default=True,
  216. help="Number of additional badly formatted metric messages to send.",
  217. )
  218. @click.option(
  219. "--value-len",
  220. default=8,
  221. show_default=True,
  222. help="Number of elements for metrics (sets and distributions).",
  223. )
  224. @click.option(
  225. "--b64-encode",
  226. default=True,
  227. show_default=True,
  228. help="Encode sets and distribution metrics values in base64",
  229. )
  230. def main(
  231. metric_types,
  232. use_cases,
  233. rand_str,
  234. host,
  235. dryrun,
  236. quiet,
  237. start_org_id,
  238. end_org_id,
  239. num_bad_msg,
  240. value_len,
  241. b64_encode,
  242. ):
  243. if UseCaseID.SESSIONS.value in use_cases and len(use_cases) > 1:
  244. click.secho(
  245. "ERROR: UseCaseID.SESSIONS is in use_cases and there are more than 1 use cases",
  246. blink=True,
  247. bold=True,
  248. )
  249. exit(1)
  250. is_generic = UseCaseID.SESSIONS.value not in use_cases
  251. metric_types = "".join(set(metric_types))
  252. rand_str = rand_str or "".join(random.choices(string.ascii_uppercase + string.digits, k=8))
  253. payload_generators = {
  254. "c": functools.partial(make_counter_payload, rand_str=rand_str),
  255. "d": functools.partial(
  256. make_dist_payload, rand_str=rand_str, value_len=value_len, b64_encode=b64_encode
  257. ),
  258. "s": functools.partial(
  259. make_set_payload, rand_str=rand_str, value_len=value_len, b64_encode=b64_encode
  260. ),
  261. "g": functools.partial(make_gauge_payload, rand_str=rand_str),
  262. }
  263. messages = list(
  264. itertools.chain.from_iterable(
  265. (
  266. payload_generators[metric_type](use_case=use_case, org_id=org_id)
  267. for metric_type in metric_types
  268. )
  269. for use_case in use_cases
  270. for org_id in range(start_org_id, end_org_id + 1)
  271. )
  272. )
  273. messages.extend([{"BAD_VALUE": rand_str, "idx": i} for i in range(num_bad_msg)])
  274. random.shuffle(messages)
  275. produce_msgs(messages, is_generic, host, dryrun, quiet)
  276. strs_per_use_case = 3
  277. print(
  278. f"Use the following SQL to verify postgres, "
  279. f"there should be {strs_per_use_case} strings for each use cases, "
  280. f"{strs_per_use_case * len(use_cases) * (end_org_id - start_org_id + 1)} in total."
  281. )
  282. print(make_psql(rand_str, is_generic))
  283. if is_generic:
  284. print(
  285. f"Use the following SQL to verify clickhouse, "
  286. f"there should be {len(metric_types)} metrics for each use cases, "
  287. f"{len(metric_types) * len(use_cases) * (end_org_id - start_org_id + 1)} in total."
  288. )
  289. print(make_csql(rand_str, is_generic))
  290. if __name__ == "__main__":
  291. main()