taskworker-test 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. #!/usr/bin/env python
  2. import csv
  3. import os
  4. import random
  5. import sys
  6. import time
  7. from collections import defaultdict
  8. from subprocess import list2cmdline
  9. import click
  10. from honcho.manager import Manager
  11. from sentry.runner import configure
  12. from sentry.runner.formatting import get_honcho_printer
  13. configure()
  14. @click.command("taskworker-test")
  15. @click.option("--number", "-n", help="Number of messages to append", default=10_000)
  16. @click.option("--verbose", "-v", help="Enable verbose output", default=False, is_flag=True)
  17. @click.option("--seed", "-s", help="pseudo random number generator seed", default=None)
  18. @click.option("--consumers", help="Number of consumer processes", default=1)
  19. @click.option("--workers", help="Number of worker processes", default=1)
  20. @click.option("--mode", help="The worker/consumer mode to use", default="pull")
  21. @click.option("--storage", help="The storage mode to use", default="postgres")
  22. @click.option("--report-only", help="Run report output only", is_flag=True, default=False)
  23. def main(
  24. number: int,
  25. verbose: bool,
  26. consumers: int,
  27. workers: int,
  28. seed: float | None,
  29. mode: str,
  30. report_only: bool,
  31. storage: str,
  32. ):
  33. from sentry.taskdemo import variable_time
  34. if report_only:
  35. return print_results("./taskworker.log", workers)
  36. if verbose:
  37. click.echo(f"Adding {number} task messages")
  38. if not seed:
  39. seed = random.random()
  40. # Fill the topic up with a number of tasks
  41. start = time.monotonic()
  42. for i in range(number):
  43. variable_time.delay(wait=random.random(), taskno=i)
  44. end = time.monotonic()
  45. click.echo(f"Appending {number} tasks took {(end-start)}s")
  46. cwd = os.getcwd()
  47. os.unlink("./taskworker.log")
  48. # Use honcho to control the worker and consumer proceses.
  49. honcho_printer = get_honcho_printer(prefix=True, pretty=False)
  50. manager = Manager(honcho_printer)
  51. processes = []
  52. if mode == "pull":
  53. processes.append(
  54. {
  55. "name": "grpc",
  56. "cmd": ["sentry", "run", "kafka-task-grpc-pull", "--storage", storage],
  57. },
  58. )
  59. for i in range(consumers):
  60. processes.append(
  61. {
  62. "name": f"consumer-{i}",
  63. "cmd": [
  64. "sentry",
  65. "run",
  66. "consumer",
  67. "taskworker",
  68. "--consumer-group",
  69. "taskworker-pull",
  70. "--log-level",
  71. "warning",
  72. "--",
  73. "--storage",
  74. storage,
  75. ],
  76. }
  77. )
  78. for i in range(workers):
  79. processes.append(
  80. {
  81. "name": f"worker-{i}",
  82. "cmd": [
  83. "sentry",
  84. "run",
  85. "taskworker-pull",
  86. "--namespace",
  87. "demos",
  88. ],
  89. },
  90. )
  91. elif mode == "push":
  92. worker_ports = [50051 + i for i in range(workers)]
  93. worker_addrs = ",".join([f"127.0.0.1:{port}" for port in worker_ports])
  94. processes.append(
  95. {
  96. "name": "grpc",
  97. "cmd": [
  98. "sentry",
  99. "run",
  100. "kafka-task-grpc-push",
  101. "--worker-addrs",
  102. worker_addrs,
  103. "--storage",
  104. storage,
  105. ],
  106. },
  107. )
  108. processes.append(
  109. {
  110. "name": "consumer",
  111. "cmd": [
  112. "sentry",
  113. "run",
  114. "consumer",
  115. "taskworker",
  116. "--consumer-group",
  117. "taskworker-pull",
  118. "--log-level",
  119. "warning",
  120. "--",
  121. "--storage",
  122. storage,
  123. ],
  124. }
  125. )
  126. for port in worker_ports:
  127. processes.append(
  128. {
  129. "name": f"worker-{port}",
  130. "cmd": [
  131. "sentry",
  132. "run",
  133. "taskworker-push",
  134. "--namespace",
  135. "demos",
  136. "--port",
  137. str(port),
  138. ],
  139. },
  140. )
  141. elif mode == "reply":
  142. worker_ports = [50051 + i for i in range(workers)]
  143. worker_addrs = ",".join([f"127.0.0.1:{port}" for port in worker_ports])
  144. processes.append(
  145. {
  146. "name": "consumer",
  147. "cmd": [
  148. "sentry",
  149. "run",
  150. "consumer",
  151. "taskworker-reply",
  152. "--consumer-group",
  153. "taskworker-pull",
  154. "--log-level",
  155. "warning",
  156. "--",
  157. "--storage",
  158. storage,
  159. "--worker-addrs",
  160. worker_addrs,
  161. ],
  162. }
  163. )
  164. for port in worker_ports:
  165. processes.append(
  166. {
  167. "name": f"worker-{port}",
  168. "cmd": [
  169. "sentry",
  170. "run",
  171. "taskworker-reply",
  172. "--namespace",
  173. "demos",
  174. "--port",
  175. str(port),
  176. ],
  177. },
  178. )
  179. else:
  180. raise RuntimeError(f"Unexpected mode of {mode}. Use `push`, `pull`, or `reply` instead.")
  181. for process in processes:
  182. manager.add_process(process["name"], list2cmdline(process["cmd"]), cwd=cwd)
  183. # Lets go!
  184. manager.loop()
  185. print_results("./taskworker.log", workers)
  186. sys.exit(manager.returncode)
  187. def print_results(log_file: str, worker_count: int) -> None:
  188. click.echo("")
  189. click.echo("== Test run complete ==")
  190. click.echo("")
  191. fieldnames = ["event", "worker_id", "task_add_time", "execution_time", "latency", "task_id"]
  192. latency_times = []
  193. execution_times = []
  194. task_ids = set()
  195. duplicates = []
  196. with open(log_file) as logs:
  197. results = csv.DictReader(logs, fieldnames=fieldnames)
  198. for row in results:
  199. latency_times.append(float(row["latency"].strip()))
  200. execution_times.append(float(row["execution_time"].strip()))
  201. row_id = row["task_id"].strip()
  202. if row_id in task_ids:
  203. duplicates.append(row_id)
  204. task_ids.add(row_id)
  205. # We append tasks and then start applications. The first
  206. # message always has long latency as application startup
  207. # and kafka take some time to get going.
  208. first_latency = latency_times[0]
  209. min_latency = min(latency_times)
  210. max_latency = max(latency_times)
  211. avg_latency = sum(latency_times) / len(latency_times)
  212. processing_time = execution_times[-1] - execution_times[0]
  213. task_throughput = len(latency_times) / processing_time
  214. # Remove the startup overhead to get relative latency.
  215. adj_min_latency = min_latency - first_latency
  216. adj_max_latency = max_latency - first_latency
  217. adj_avg_latency = avg_latency - first_latency
  218. # Bucket latency and count totals in each bucket
  219. latency_spread = adj_max_latency - adj_min_latency
  220. bucket_count = 20
  221. bucket_width = latency_spread / bucket_count
  222. buckets = defaultdict(int)
  223. for value in latency_times:
  224. adjusted = max(value - first_latency, 0)
  225. bucket = int(adjusted / bucket_width)
  226. buckets[bucket] += 1
  227. click.echo("")
  228. click.echo("## Run summary")
  229. click.echo("")
  230. click.echo(f"Task count: {len(latency_times)}")
  231. click.echo(f"Processing time: {processing_time:.4f}")
  232. click.echo(f"Throughput: {task_throughput:.4f}")
  233. if duplicates:
  234. click.echo("")
  235. click.echo("Duplicate executions found:")
  236. for dupe in duplicates:
  237. click.echo(f"- {dupe}")
  238. click.echo("")
  239. click.echo("")
  240. click.echo("## Task latency")
  241. click.echo("")
  242. click.echo(f"First task Latency: {first_latency:.5f}")
  243. click.echo(
  244. f"Raw Min / Max / Avg latency: {min_latency:.5f} / {max_latency:.5f} / {avg_latency:.5f}"
  245. )
  246. click.echo(
  247. f"Adjusted Min / Max / Avg latency: {adj_min_latency:.5f} / {adj_max_latency:.5f} / {adj_avg_latency:.5f}"
  248. )
  249. click.echo("")
  250. click.echo("## Latency histogram")
  251. click.echo("")
  252. bars = []
  253. for key, count in buckets.items():
  254. bucket_upper = key * bucket_width
  255. # Limit to 50 to prevent wrapping in output
  256. bar = "█" * min(count, 50)
  257. bar += f" {count} "
  258. bars.append((bucket_upper, bar))
  259. for bucket_upper, bar in sorted(bars, key=lambda x: x[0]):
  260. click.echo(f"{bucket_upper:.5f} {bar}")
  261. if __name__ == "__main__":
  262. main()