|
@@ -31,6 +31,32 @@ class MissingOrgInRoutingHeader(Exception):
|
|
"""
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
+def _validate_slicing_config() -> None:
|
|
|
|
+ """
|
|
|
|
+ Validates the generalized slicing config (not focusing on an individual
|
|
|
|
+ sliceable)
|
|
|
|
+ """
|
|
|
|
+ for (sliceable, assignments) in settings.SENTRY_SLICING_CONFIG.items():
|
|
|
|
+ acc = {}
|
|
|
|
+ for ((assign_lo, assign_hi), _slice_id) in assignments.items():
|
|
|
|
+ for logical_part in range(assign_lo, assign_hi):
|
|
|
|
+ if logical_part in acc:
|
|
|
|
+ raise SlicingConfigurationException(
|
|
|
|
+ f"'{sliceable}' has two assignments to logical partition {logical_part}"
|
|
|
|
+ )
|
|
|
|
+ else:
|
|
|
|
+ acc[logical_part] = _slice_id
|
|
|
|
+
|
|
|
|
+ missing_logical_parts = set(
|
|
|
|
+ range(0, settings.SENTRY_SLICING_LOGICAL_PARTITION_COUNT)
|
|
|
|
+ ) - set(acc.keys())
|
|
|
|
+
|
|
|
|
+ if not len(missing_logical_parts) == 0:
|
|
|
|
+ raise SlicingConfigurationException(
|
|
|
|
+ f"'{sliceable}' is missing logical partition assignments: {missing_logical_parts}"
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+
|
|
def _validate_slicing_consumer_config(sliceable: Sliceable) -> None:
|
|
def _validate_slicing_consumer_config(sliceable: Sliceable) -> None:
|
|
"""
|
|
"""
|
|
Validate all the required settings needed for a slicing router.
|
|
Validate all the required settings needed for a slicing router.
|
|
@@ -70,6 +96,7 @@ class SlicingRouter(MessageRouter):
|
|
) -> None:
|
|
) -> None:
|
|
self.__sliceable = sliceable
|
|
self.__sliceable = sliceable
|
|
self.__slice_to_producer: MutableMapping[int, MessageRoute] = {}
|
|
self.__slice_to_producer: MutableMapping[int, MessageRoute] = {}
|
|
|
|
+ _validate_slicing_config()
|
|
_validate_slicing_consumer_config(self.__sliceable)
|
|
_validate_slicing_consumer_config(self.__sliceable)
|
|
|
|
|
|
for (
|
|
for (
|
|
@@ -82,9 +109,11 @@ class SlicingRouter(MessageRouter):
|
|
),
|
|
),
|
|
topic=Topic(configuration["topic"]),
|
|
topic=Topic(configuration["topic"]),
|
|
)
|
|
)
|
|
- assert len(self.__slice_to_producer) == len(
|
|
|
|
- settings.SENTRY_SLICING_CONFIG[sliceable].keys()
|
|
|
|
- )
|
|
|
|
|
|
+ # All logical partitions should be routed to a slice ID that's present in the slice
|
|
|
|
+ # ID to producer message route mapping
|
|
|
|
+ assert set(settings.SENTRY_SLICING_CONFIG[sliceable].values()).issubset(
|
|
|
|
+ self.__slice_to_producer.keys()
|
|
|
|
+ ), f"Unknown slice ID in SENTRY_SLICING_CONFIG for {sliceable}"
|
|
|
|
|
|
def get_all_producers(self) -> Sequence[Producer]:
|
|
def get_all_producers(self) -> Sequence[Producer]:
|
|
return [route.producer for route in self.__slice_to_producer.values()]
|
|
return [route.producer for route in self.__slice_to_producer.values()]
|