-
Notifications
You must be signed in to change notification settings - Fork 232
Add jetstream consumer priority groups (ADR-42) #786
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -46,6 +46,7 @@ class StatusCode(str, Enum): | |||||
| NO_MESSAGES = "404" | ||||||
| REQUEST_TIMEOUT = "408" | ||||||
| CONFLICT = "409" | ||||||
| PIN_ID_MISMATCH = "423" | ||||||
| CONTROL_MESSAGE = "100" | ||||||
|
|
||||||
|
|
||||||
|
|
@@ -491,6 +492,31 @@ class ReplayPolicy(str, Enum): | |||||
| ORIGINAL = "original" | ||||||
|
|
||||||
|
|
||||||
| class PriorityPolicy(str, Enum): | ||||||
| """Priority policy for priority groups. | ||||||
|
|
||||||
| Enables flexible failover and priority management when multiple clients are | ||||||
| pulling from the same consumer | ||||||
|
|
||||||
| Introduced in nats-server 2.12.0. | ||||||
|
|
||||||
| References: | ||||||
| * `Consumers, Pull consumer priority groups <https://docs.nats.io/release-notes/whats_new/whats_new_211#consumers>` | ||||||
| * `Consumers, Prioritized pull consumer policy <https://docs.nats.io/release-notes/whats_new/whats_new_212#consumers>` | ||||||
| """ # noqa: E501 | ||||||
|
|
||||||
| NONE = "" | ||||||
| "default" | ||||||
| PINNED = "pinned_client" | ||||||
| "pins a consumer to a specific client" | ||||||
| OVERFLOW = "overflow" | ||||||
| "allows for restricting when a consumer will receive messages based on the number of pending messages or acks" | ||||||
| PRIORITIZED = "prioritized" | ||||||
| """allows for restricting when a consumer will receive messages based on a priority from 0-9 (0 is highest priority & default) | ||||||
| Introduced in nats-server 2.12.0. | ||||||
| """ | ||||||
|
|
||||||
|
|
||||||
| @dataclass | ||||||
| class ConsumerConfig(Base): | ||||||
| """Consumer configuration. | ||||||
|
|
@@ -543,11 +569,25 @@ class ConsumerConfig(Base): | |||||
| # Introduced in nats-server 2.11.0. | ||||||
| pause_until: Optional[str] = None | ||||||
|
|
||||||
| # Priority policy. | ||||||
| # Introduced in nats-server 2.11.0. | ||||||
| priority_policy: Optional[PriorityPolicy] = None | ||||||
|
|
||||||
| # The duration (seconds) after which the client will be unpinned if no new | ||||||
| # pull requests are sent.Used with PriorityPolicy.PINNED. | ||||||
|
||||||
| # pull requests are sent.Used with PriorityPolicy.PINNED. | |
| # pull requests are sent. Used with PriorityPolicy.PINNED. |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -546,6 +546,7 @@ async def pull_subscribe( | |||||
| pending_msgs_limit: int = DEFAULT_JS_SUB_PENDING_MSGS_LIMIT, | ||||||
| pending_bytes_limit: int = DEFAULT_JS_SUB_PENDING_BYTES_LIMIT, | ||||||
| inbox_prefix: Optional[bytes] = None, | ||||||
| priority_group: Optional[str] = None, | ||||||
| ) -> JetStreamContext.PullSubscription: | ||||||
| """Create consumer and pull subscription. | ||||||
|
|
||||||
|
|
@@ -580,6 +581,9 @@ async def main(): | |||||
| if stream is None: | ||||||
| stream = await self._jsm.find_stream_name_by_subject(subject) | ||||||
|
|
||||||
| if config and config.priority_groups and priority_group is None: | ||||||
| raise ValueError("nats: priority_group is required when consumer has priority_groups configured") | ||||||
|
|
||||||
| should_create = True | ||||||
| try: | ||||||
| if durable: | ||||||
|
|
@@ -605,6 +609,10 @@ async def main(): | |||||
| consumer_name = self._nc._nuid.next().decode() | ||||||
| config.name = consumer_name | ||||||
|
|
||||||
| # Auto created consumers use the priority group, unless priority_groups is set. | ||||||
| if not config.priority_groups and priority_group: | ||||||
| config.priority_groups = [priority_group] | ||||||
|
|
||||||
| await self._jsm.add_consumer(stream, config=config) | ||||||
|
|
||||||
| return await self.pull_subscribe_bind( | ||||||
|
|
@@ -614,6 +622,7 @@ async def main(): | |||||
| pending_bytes_limit=pending_bytes_limit, | ||||||
| pending_msgs_limit=pending_msgs_limit, | ||||||
| name=consumer_name, | ||||||
| priority_group=priority_group, | ||||||
| ) | ||||||
|
|
||||||
| async def pull_subscribe_bind( | ||||||
|
|
@@ -625,6 +634,7 @@ async def pull_subscribe_bind( | |||||
| pending_bytes_limit: int = DEFAULT_JS_SUB_PENDING_BYTES_LIMIT, | ||||||
| name: Optional[str] = None, | ||||||
| durable: Optional[str] = None, | ||||||
| priority_group: Optional[str] = None, | ||||||
| ) -> JetStreamContext.PullSubscription: | ||||||
| """ | ||||||
| pull_subscribe returns a `PullSubscription` that can be delivered messages | ||||||
|
|
@@ -680,6 +690,7 @@ async def main(): | |||||
| stream=stream, | ||||||
| consumer=consumer_name, | ||||||
| deliver=deliver, | ||||||
| group=priority_group, | ||||||
| ) | ||||||
|
|
||||||
| @classmethod | ||||||
|
|
@@ -703,11 +714,16 @@ def _is_temporary_error(cls, status: Optional[str]) -> bool: | |||||
| status == api.StatusCode.NO_MESSAGES | ||||||
| or status == api.StatusCode.CONFLICT | ||||||
| or status == api.StatusCode.REQUEST_TIMEOUT | ||||||
| or status == api.StatusCode.PIN_ID_MISMATCH | ||||||
| ): | ||||||
| return True | ||||||
| else: | ||||||
| return False | ||||||
|
|
||||||
| @classmethod | ||||||
| def _is_pin_id_mismatch_error(cls, status: Optional[str]) -> bool: | ||||||
| return status == api.StatusCode.PIN_ID_MISMATCH | ||||||
|
|
||||||
| @classmethod | ||||||
| def _is_heartbeat(cls, status: Optional[str]) -> bool: | ||||||
| if status == api.StatusCode.CONTROL_MESSAGE: | ||||||
|
|
@@ -997,6 +1013,7 @@ def __init__( | |||||
| stream: str, | ||||||
| consumer: str, | ||||||
| deliver: bytes, | ||||||
| group: Optional[str] = None, | ||||||
| ) -> None: | ||||||
| # JS/JSM context | ||||||
| self._js = js | ||||||
|
|
@@ -1009,6 +1026,8 @@ def __init__( | |||||
| prefix = self._js._prefix | ||||||
| self._nms = f"{prefix}.CONSUMER.MSG.NEXT.{stream}.{consumer}" | ||||||
| self._deliver = deliver.decode() | ||||||
| self._pin_id: Optional[str] = None | ||||||
| self._group = group | ||||||
|
|
||||||
| @property | ||||||
| def pending_msgs(self) -> int: | ||||||
|
|
@@ -1055,6 +1074,9 @@ async def fetch( | |||||
| batch: int = 1, | ||||||
| timeout: Optional[float] = 5, | ||||||
| heartbeat: Optional[float] = None, | ||||||
| min_pending: Optional[int] = None, | ||||||
| min_ack_pending: Optional[int] = None, | ||||||
| priority: Optional[int] = None, | ||||||
| ) -> List[Msg]: | ||||||
| """ | ||||||
| fetch makes a request to JetStream to be delivered a set of messages. | ||||||
|
|
@@ -1095,17 +1117,26 @@ async def main(): | |||||
|
|
||||||
| expires = int(timeout * 1_000_000_000) - 100_000 if timeout else None | ||||||
| if batch == 1: | ||||||
| msg = await self._fetch_one(expires, timeout, heartbeat) | ||||||
| msg = await self._fetch_one(expires, timeout, heartbeat, min_pending, min_ack_pending, priority) | ||||||
| return [msg] | ||||||
| msgs = await self._fetch_n(batch, expires, timeout, heartbeat) | ||||||
| msgs = await self._fetch_n(batch, expires, timeout, heartbeat, min_pending, min_ack_pending, priority) | ||||||
| return msgs | ||||||
|
|
||||||
| async def _fetch_one( | ||||||
| self, | ||||||
| expires: Optional[int], | ||||||
| timeout: Optional[float], | ||||||
| heartbeat: Optional[float] = None, | ||||||
| min_pending: Optional[int] = None, | ||||||
| min_ack_pending: Optional[int] = None, | ||||||
| priority: Optional[int] = None, | ||||||
| ) -> Msg: | ||||||
| if min_pending is not None and not (min_pending > 0): | ||||||
| raise ValueError("nats: min_pending must be more than 0") | ||||||
| if min_ack_pending is not None and not (min_ack_pending > 0): | ||||||
| raise ValueError("nats: min_ack_pending must be more than 0") | ||||||
| if priority is not None and not (0 <= priority <= 9): | ||||||
| raise ValueError("nats: priority must be 0-9") | ||||||
| queue = self._sub._pending_queue | ||||||
|
|
||||||
| # Check the next message in case there are any. | ||||||
|
|
@@ -1130,7 +1161,17 @@ async def _fetch_one( | |||||
| next_req["expires"] = int(expires) | ||||||
| if heartbeat: | ||||||
| next_req["idle_heartbeat"] = int(heartbeat * 1_000_000_000) # to nanoseconds | ||||||
|
|
||||||
| if self._group: | ||||||
| next_req["group"] = self._group | ||||||
| pin_id = self.pin_id | ||||||
| if pin_id: | ||||||
| next_req["id"] = pin_id | ||||||
| if min_pending: | ||||||
| next_req["min_pending"] = min_pending | ||||||
| if min_ack_pending: | ||||||
| next_req["min_ack_pending"] = min_ack_pending | ||||||
| if priority: | ||||||
|
||||||
| if priority: | |
| if priority is not None: |
Copilot
AI
Nov 27, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: The condition if priority: will not add priority to the request when priority=0, which is the highest priority value. This should be if priority is not None: to match the validation logic and ensure priority=0 is correctly sent to the server.
| if priority: | |
| if priority is not None: |
Copilot
AI
Nov 27, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: The condition if priority: will not add priority to the request when priority=0, which is the highest priority value. This should be if priority is not None: to match the validation logic and ensure priority=0 is correctly sent to the server.
| if priority: | |
| if priority is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Documentation inconsistency: The class docstring states "Introduced in nats-server 2.12.0" but PINNED and OVERFLOW were actually introduced in nats-server 2.11.0 (as mentioned in the references and test skip messages). Only PRIORITIZED was introduced in 2.12.0. Consider updating the class-level docstring to indicate "Introduced in nats-server 2.11.0" and keep the specific note about PRIORITIZED being from 2.12.0.