Skip to content

Subscriptions

Subscriptions Mixin.

Subscriptions

Bases: ClientBase

Handles Subscriptions operations.

Source code in src/eventstore_grpc/client/subscriptions.py
class Subscriptions(ClientBase):
    """Handles Subscriptions operations."""

    def _initialize_subscriptions_manager(self):
        if getattr(self, "_subscriptions_manager", None) is None:
            self._subscriptions_manager = subscriptions.SubscriptionsManager(
                self.channel
            )
            signal.signal(signal.SIGINT, self.kill)
            signal.signal(signal.SIGTERM, self.kill)

    def kill(self, signum, frame):
        log.info("\033[38;5;120mGracefully shutting down...\033[0m")
        self.unsubscribe_all()

    def subscribe_to_stream(
        self,
        stream: str,
        from_revision: Union[str, int] = constants.START,
        resolve_link_to_s: bool = False,
        handler: Optional[Callable] = None,
        **kwargs,
    ):
        self._initialize_subscriptions_manager()
        subscription_id = self._subscriptions_manager.subscribe_to_stream(
            stream=stream,
            from_revision=from_revision,
            resolve_link_to_s=resolve_link_to_s,
            handler=handler,
            **kwargs,
        )
        return subscription_id

    def subscribe_to_all(
        self,
        from_position: Union[str, int] = constants.START,
        resolve_link_to_s: bool = False,
        filters: Optional[Dict] = None,
        handler: Optional[Callable] = None,
        **kwargs,
    ):
        self._initialize_subscriptions_manager()
        subscription_id = self._subscriptions_manager.subscribe_to_all(
            from_position=from_position,
            resolve_link_to_s=resolve_link_to_s,
            filters=filters,
            handler=handler,
            **kwargs,
        )
        return subscription_id

    def subscribe_persistent(
        self,
        stream: str,
        group_name: str,
        buffer_size: int = 10,
        handler: Optional[Callable] = None,
        **kwargs,
    ):
        self._initialize_subscriptions_manager()
        subscription_id = self._subscriptions_manager.subscribe_persistent(
            stream=stream,
            group_name=group_name,
            buffer_size=buffer_size,
            handler=handler,
            **kwargs,
        )
        return subscription_id

    def unsubscribe(self, subscription_id: str):
        self._initialize_subscriptions_manager()
        return self._subscriptions_manager.unsubscribe(subscription_id)

    def unsubscribe_all(self):
        self._initialize_subscriptions_manager()
        for k in self._subscriptions_manager.subscription_ids:
            self.unsubscribe(k)