test_synchronized.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. import operator
  2. import time
  3. from collections.abc import Callable, Iterator, Mapping
  4. from contextlib import closing, contextmanager
  5. from datetime import datetime
  6. from threading import Event
  7. from typing import TypeVar
  8. import pytest
  9. from arroyo.backends.abstract import Consumer
  10. from arroyo.backends.kafka import KafkaPayload
  11. from arroyo.backends.local.backend import LocalBroker, LocalConsumer
  12. from arroyo.backends.local.storages.memory import MemoryMessageStorage
  13. from arroyo.commit import Commit
  14. from arroyo.types import BrokerValue, Partition, Topic
  15. from sentry.consumers.synchronized import SynchronizedConsumer, commit_codec
  16. T = TypeVar("T")
  17. @contextmanager
  18. def assert_changes(
  19. callable: Callable[[], object],
  20. before: object,
  21. after: object,
  22. operator: Callable[[object, object], bool] = operator.eq,
  23. ) -> Iterator[None]:
  24. actual = callable()
  25. assert operator(
  26. actual, before
  27. ), f"precondition ({operator}) on {callable} failed: expected: {before!r}, actual: {actual!r}"
  28. yield
  29. actual = callable()
  30. assert operator(
  31. actual, after
  32. ), f"postcondition ({operator}) on {callable} failed: expected: {after!r}, actual: {actual!r}"
  33. @contextmanager
  34. def assert_does_not_change(
  35. callable: Callable[[], object],
  36. value: object,
  37. operator: Callable[[object, object], bool] = operator.eq,
  38. ) -> Iterator[None]:
  39. actual = callable()
  40. assert operator(
  41. actual, value
  42. ), f"precondition ({operator}) on {callable} failed: expected: {value!r}, actual: {actual!r}"
  43. yield
  44. actual = callable()
  45. assert operator(
  46. actual, value
  47. ), f"postcondition ({operator}) on {callable} failed: expected: {value!r}, actual: {actual!r}"
  48. def wait_for_consumer(consumer: Consumer[T], message: BrokerValue[T], attempts: int = 10) -> None:
  49. """Block until the provided consumer has received the provided message."""
  50. for i in range(attempts):
  51. part = consumer.tell().get(message.partition)
  52. if part is not None and part >= message.next_offset:
  53. return
  54. time.sleep(0.1)
  55. raise Exception(f"{message} was not received by {consumer} within {attempts} attempts")
  56. def test_synchronized_consumer() -> None:
  57. broker: LocalBroker[KafkaPayload] = LocalBroker(MemoryMessageStorage())
  58. topic = Topic("topic")
  59. commit_log_topic = Topic("commit-log")
  60. broker.create_topic(topic, partitions=1)
  61. broker.create_topic(commit_log_topic, partitions=1)
  62. consumer = broker.get_consumer("consumer")
  63. producer = broker.get_producer()
  64. commit_log_consumer = broker.get_consumer("commit-log-consumer")
  65. messages = [
  66. producer.produce(topic, KafkaPayload(None, f"{i}".encode(), [])).result(1.0)
  67. for i in range(6)
  68. ]
  69. synchronized_consumer: Consumer[KafkaPayload] = SynchronizedConsumer(
  70. consumer,
  71. commit_log_consumer,
  72. commit_log_topic=commit_log_topic,
  73. commit_log_groups={"leader-a", "leader-b"},
  74. )
  75. with closing(synchronized_consumer):
  76. synchronized_consumer.subscribe([topic])
  77. # The consumer should not consume any messages until it receives a
  78. # commit from both groups that are being followed.
  79. with assert_changes(consumer.paused, [], [Partition(topic, 0)]), assert_changes(
  80. consumer.tell, {}, {Partition(topic, 0): messages[0].offset}
  81. ):
  82. assert synchronized_consumer.poll(0.0) is None
  83. wait_for_consumer(
  84. commit_log_consumer,
  85. producer.produce(
  86. commit_log_topic,
  87. commit_codec.encode(
  88. Commit(
  89. "leader-a",
  90. Partition(topic, 0),
  91. messages[0].next_offset,
  92. datetime.now().timestamp(),
  93. None,
  94. ),
  95. ),
  96. ).result(),
  97. )
  98. # The consumer should remain paused, since it needs both groups to
  99. # advance before it may continue.
  100. with assert_does_not_change(consumer.paused, [Partition(topic, 0)]), assert_does_not_change(
  101. consumer.tell, {Partition(topic, 0): messages[0].offset}
  102. ):
  103. assert synchronized_consumer.poll(0.0) is None
  104. wait_for_consumer(
  105. commit_log_consumer,
  106. producer.produce(
  107. commit_log_topic,
  108. commit_codec.encode(
  109. Commit(
  110. "leader-b",
  111. Partition(topic, 0),
  112. messages[0].next_offset,
  113. datetime.now().timestamp(),
  114. None,
  115. )
  116. ),
  117. ).result(),
  118. )
  119. # The consumer should be able to resume consuming, since both consumers
  120. # have processed the first message.
  121. with assert_changes(consumer.paused, [Partition(topic, 0)], []), assert_changes(
  122. consumer.tell,
  123. {Partition(topic, 0): messages[0].offset},
  124. {Partition(topic, 0): messages[0].next_offset},
  125. ):
  126. assert synchronized_consumer.poll(0.0) == messages[0]
  127. # After consuming the one available message, the consumer should be
  128. # paused again until the remote offsets advance.
  129. with assert_changes(consumer.paused, [], [Partition(topic, 0)]), assert_does_not_change(
  130. consumer.tell, {Partition(topic, 0): messages[1].offset}
  131. ):
  132. assert synchronized_consumer.poll(0.0) is None
  133. # Emulate the unlikely (but possible) scenario of the leader offsets
  134. # being within a series of compacted (deleted) messages by:
  135. # 1. moving the remote offsets forward, so that the partition is resumed
  136. # 2. seeking the consumer beyond the remote offsets
  137. producer.produce(
  138. commit_log_topic,
  139. commit_codec.encode(
  140. Commit(
  141. "leader-a",
  142. Partition(topic, 0),
  143. messages[3].offset,
  144. datetime.now().timestamp(),
  145. None,
  146. )
  147. ),
  148. ).result()
  149. wait_for_consumer(
  150. commit_log_consumer,
  151. producer.produce(
  152. commit_log_topic,
  153. commit_codec.encode(
  154. Commit(
  155. "leader-b",
  156. Partition(topic, 0),
  157. messages[5].offset,
  158. datetime.now().timestamp(),
  159. None,
  160. )
  161. ),
  162. ).result(),
  163. )
  164. # The consumer should be able to resume consuming, since both consumers
  165. # have processed the first message.
  166. with assert_changes(consumer.paused, [Partition(topic, 0)], []), assert_changes(
  167. consumer.tell,
  168. {Partition(topic, 0): messages[1].offset},
  169. {Partition(topic, 0): messages[1].next_offset},
  170. ):
  171. assert synchronized_consumer.poll(0.0) == messages[1]
  172. # At this point, we manually seek the consumer offset, to emulate messages being skipped.
  173. with assert_changes(
  174. consumer.tell,
  175. {Partition(topic, 0): messages[2].offset},
  176. {Partition(topic, 0): messages[4].offset},
  177. ):
  178. consumer.seek({Partition(topic, 0): messages[4].offset})
  179. # Since the (effective) remote offset is the offset for message #3 (via
  180. # ``leader-a``), and the local offset is the offset of message #4, when
  181. # message #4 is consumed, it should be discarded and the offset should
  182. # be rolled back to wait for the commit log to advance.
  183. with assert_changes(consumer.paused, [], [Partition(topic, 0)]), assert_does_not_change(
  184. consumer.tell, {Partition(topic, 0): messages[4].offset}
  185. ):
  186. assert synchronized_consumer.poll(0.0) is None
  187. wait_for_consumer(
  188. commit_log_consumer,
  189. producer.produce(
  190. commit_log_topic,
  191. commit_codec.encode(
  192. Commit(
  193. "leader-a",
  194. Partition(topic, 0),
  195. messages[5].offset,
  196. datetime.now().timestamp(),
  197. None,
  198. )
  199. ),
  200. ).result(),
  201. )
  202. # The consumer should be able to resume consuming.
  203. with assert_changes(consumer.paused, [Partition(topic, 0)], []), assert_changes(
  204. consumer.tell,
  205. {Partition(topic, 0): messages[4].offset},
  206. {Partition(topic, 0): messages[4].next_offset},
  207. ):
  208. assert synchronized_consumer.poll(0.0) == messages[4]
  209. def test_synchronized_consumer_pause_resume() -> None:
  210. broker: LocalBroker[KafkaPayload] = LocalBroker(MemoryMessageStorage())
  211. topic = Topic("topic")
  212. commit_log_topic = Topic("commit-log")
  213. broker.create_topic(topic, partitions=1)
  214. broker.create_topic(commit_log_topic, partitions=1)
  215. consumer = broker.get_consumer("consumer")
  216. producer = broker.get_producer()
  217. commit_log_consumer = broker.get_consumer("commit-log-consumer")
  218. messages = [
  219. producer.produce(topic, KafkaPayload(None, f"{i}".encode(), [])).result(1.0)
  220. for i in range(2)
  221. ]
  222. synchronized_consumer: Consumer[KafkaPayload] = SynchronizedConsumer(
  223. consumer,
  224. commit_log_consumer,
  225. commit_log_topic=commit_log_topic,
  226. commit_log_groups={"leader"},
  227. )
  228. with closing(synchronized_consumer):
  229. def assignment_callback(offsets: Mapping[Partition, int]) -> None:
  230. synchronized_consumer.pause([Partition(topic, 0)])
  231. synchronized_consumer.subscribe([topic], on_assign=assignment_callback)
  232. with assert_changes(
  233. synchronized_consumer.paused, [], [Partition(topic, 0)]
  234. ), assert_changes(consumer.paused, [], [Partition(topic, 0)]):
  235. assert synchronized_consumer.poll(0.0) is None
  236. # Advancing the commit log offset should not cause the consumer to
  237. # resume, since it has been explicitly paused.
  238. wait_for_consumer(
  239. commit_log_consumer,
  240. producer.produce(
  241. commit_log_topic,
  242. commit_codec.encode(
  243. Commit(
  244. "leader",
  245. Partition(topic, 0),
  246. messages[0].next_offset,
  247. datetime.now().timestamp(),
  248. None,
  249. )
  250. ),
  251. ).result(),
  252. )
  253. with assert_does_not_change(consumer.paused, [Partition(topic, 0)]):
  254. assert synchronized_consumer.poll(0) is None
  255. # Resuming the partition does not immediately cause the partition to
  256. # resume, but it should look as if it is resumed to the caller.
  257. with assert_changes(
  258. synchronized_consumer.paused, [Partition(topic, 0)], []
  259. ), assert_does_not_change(consumer.paused, [Partition(topic, 0)]):
  260. synchronized_consumer.resume([Partition(topic, 0)])
  261. # The partition should be resumed on the next poll call, however.
  262. with assert_changes(consumer.paused, [Partition(topic, 0)], []):
  263. assert synchronized_consumer.poll(0) == messages[0]
  264. # Pausing due to hitting the offset fence should not appear as a paused
  265. # partition to the caller.
  266. with assert_does_not_change(synchronized_consumer.paused, []), assert_changes(
  267. consumer.paused, [], [Partition(topic, 0)]
  268. ):
  269. assert synchronized_consumer.poll(0) is None
  270. # Other pause and resume actions should not cause the inner consumer to
  271. # change its state while up against the fence.
  272. with assert_changes(
  273. synchronized_consumer.paused, [], [Partition(topic, 0)]
  274. ), assert_does_not_change(consumer.paused, [Partition(topic, 0)]):
  275. synchronized_consumer.pause([Partition(topic, 0)])
  276. with assert_changes(
  277. synchronized_consumer.paused, [Partition(topic, 0)], []
  278. ), assert_does_not_change(consumer.paused, [Partition(topic, 0)]):
  279. synchronized_consumer.resume([Partition(topic, 0)])
  280. def test_synchronized_consumer_handles_end_of_partition() -> None:
  281. broker: LocalBroker[KafkaPayload] = LocalBroker(MemoryMessageStorage())
  282. topic = Topic("topic")
  283. commit_log_topic = Topic("commit-log")
  284. broker.create_topic(topic, partitions=1)
  285. broker.create_topic(commit_log_topic, partitions=1)
  286. consumer = broker.get_consumer("consumer", enable_end_of_partition=True)
  287. producer = broker.get_producer()
  288. commit_log_consumer = broker.get_consumer("commit-log-consumer")
  289. messages = [
  290. producer.produce(topic, KafkaPayload(None, f"{i}".encode(), [])).result(1.0)
  291. for i in range(2)
  292. ]
  293. synchronized_consumer: Consumer[KafkaPayload] = SynchronizedConsumer(
  294. consumer,
  295. commit_log_consumer,
  296. commit_log_topic=commit_log_topic,
  297. commit_log_groups={"leader"},
  298. )
  299. with closing(synchronized_consumer):
  300. synchronized_consumer.subscribe([topic])
  301. wait_for_consumer(
  302. commit_log_consumer,
  303. producer.produce(
  304. commit_log_topic,
  305. commit_codec.encode(
  306. Commit(
  307. "leader",
  308. Partition(topic, 0),
  309. messages[0].next_offset,
  310. datetime.now().timestamp(),
  311. None,
  312. ),
  313. ),
  314. ).result(),
  315. )
  316. assert synchronized_consumer.poll(0) == messages[0]
  317. # If the commit log consumer does not handle EOF, it will have crashed
  318. # here and will never return the next message.
  319. wait_for_consumer(
  320. commit_log_consumer,
  321. producer.produce(
  322. commit_log_topic,
  323. commit_codec.encode(
  324. Commit(
  325. "leader",
  326. Partition(topic, 0),
  327. messages[1].next_offset,
  328. datetime.now().timestamp(),
  329. None,
  330. ),
  331. ),
  332. ).result(),
  333. )
  334. assert synchronized_consumer.poll(0) == messages[1]
  335. def test_synchronized_consumer_worker_crash_before_assignment() -> None:
  336. broker: LocalBroker[KafkaPayload] = LocalBroker(MemoryMessageStorage())
  337. topic = Topic("topic")
  338. commit_log_topic = Topic("commit-log")
  339. broker.create_topic(topic, partitions=1)
  340. broker.create_topic(commit_log_topic, partitions=1)
  341. poll_called = Event()
  342. class BrokenConsumerException(Exception):
  343. pass
  344. class BrokenConsumer(LocalConsumer[KafkaPayload]):
  345. def poll(self, timeout: float | None = None) -> BrokerValue[KafkaPayload] | None:
  346. try:
  347. raise BrokenConsumerException()
  348. finally:
  349. poll_called.set()
  350. consumer = broker.get_consumer("consumer")
  351. commit_log_consumer: Consumer[KafkaPayload] = BrokenConsumer(broker, "commit-log-consumer")
  352. with pytest.raises(BrokenConsumerException):
  353. SynchronizedConsumer(
  354. consumer,
  355. commit_log_consumer,
  356. commit_log_topic=commit_log_topic,
  357. commit_log_groups={"leader"},
  358. )
  359. def test_synchronized_consumer_worker_crash_after_assignment() -> None:
  360. broker: LocalBroker[KafkaPayload] = LocalBroker(MemoryMessageStorage())
  361. topic = Topic("topic")
  362. commit_log_topic = Topic("commit-log")
  363. broker.create_topic(topic, partitions=1)
  364. broker.create_topic(commit_log_topic, partitions=1)
  365. poll_called = Event()
  366. class BrokenConsumerException(Exception):
  367. pass
  368. class BrokenConsumer(LocalConsumer[KafkaPayload]):
  369. def poll(self, timeout: float | None = None) -> BrokerValue[KafkaPayload] | None:
  370. if not self.tell():
  371. return super().poll(timeout)
  372. else:
  373. try:
  374. raise BrokenConsumerException()
  375. finally:
  376. poll_called.set()
  377. consumer: Consumer[KafkaPayload] = broker.get_consumer("consumer")
  378. commit_log_consumer: Consumer[KafkaPayload] = BrokenConsumer(broker, "commit-log-consumer")
  379. synchronized_consumer: Consumer[KafkaPayload] = SynchronizedConsumer(
  380. consumer,
  381. commit_log_consumer,
  382. commit_log_topic=commit_log_topic,
  383. commit_log_groups={"leader"},
  384. )
  385. assert poll_called.wait(1.0) is True
  386. # If the worker thread has exited without a close request, calling ``poll``
  387. # should raise an error that originated from the worker thread.
  388. with pytest.raises(RuntimeError) as e:
  389. synchronized_consumer.poll(0.0)
  390. assert type(e.value.__cause__) is BrokenConsumerException
  391. # If a close request has been sent, the normal runtime error due to the
  392. # closed consumer should be raised instead.
  393. synchronized_consumer.close()
  394. with pytest.raises(RuntimeError) as e:
  395. synchronized_consumer.poll(0.0)
  396. assert type(e.value.__cause__) is not BrokenConsumerException