Skip to content

Persistent

Persistents Mixin.

Persistent

Bases: ClientBase

Handles Persistent operations.

Source code in src/eventstore_grpc/client/persistent.py
class Persistent(ClientBase):
    """Handles Persistent operations."""

    def create_persistent_subscription(
        self,
        group_name: str = None,
        stream: Optional[str] = None,
        resolve_link_to_s: bool = False,
        from_revision: Union[int, str] = END,
        commit_position: Optional[int] = None,
        prepare_position: Optional[int] = None,
        extra_statistics: bool = False,
        message_timeout_ms: int = 30000,
        checkpoint_after_ms: int = 2000,
        max_retry_count: int = 10,
        min_checkpoint_count: int = 10,
        max_checkpoint_count: int = 1000,
        max_subscriber_count: int = 0,
        live_buffer_size: int = 500,
        history_buffer_size: int = 500,
        read_batch_size: int = 20,
        strategy: str = ROUND_ROBIN,
        filter_options: Optional[
            persistent_pb2.CreateReq.AllOptions.FilterOptions
        ] = None,
        **kwargs
    ) -> persistent_pb2.CreateResp:
        """Creates a new persistent subscription.

        Args:
            group_name: a group name for the subscription that will be created.
            stream: the name of the stream. A persistent subscription to the `$all`
                    stream will be created if this value is left to `None`.
            resolve_link_to_s: whether or not to resolve events links to actual events.
            from_revision: the subscription will start from the revision specified here.
            extra_statistics: whether to track latency statistics on this subscription.
            message_timeout_ms: the amount of time after which to consider a message as
                                timed out and retried.
            checkpoint_after_ms: the amount of time to try to checkpoint after.
            max_retry_count: the maximum number of retries (due to timeout) before a
                             message is considered to be parked.
            min_checkpoint_count: The minimum number of messages to process before a
                                  checkpoint may be written.
            max_checkpoint_count: The maximum number of messages not checkpointed
                                  before forcing a checkpoint.
            max_subscriber_count: The maximum number of subscribers allowed.
            live_buffer_size: the size of the buffer (in-memory) listening to live
                              messages as they happen before pagin occurs.
            history_buffer_size: The number of events to cache when paging through
                                 history.
            read_batch_size: The number of events read at a time when paging through history.
            strategy: the strategy that will be used to send events to the subscribers
                      of the same group.
            filter_options: an optional FilterOptions instance to use to filter events
                            in the persistent subscription.

        Returns:
            A persistent_pb.CreateResp
        """
        stub = persistent_pb2_grpc.PersistentSubscriptionsStub(self.channel)
        result = persistent.create_persistent_subscription(
            stub=stub,
            stream=stream,
            group_name=group_name,
            resolve_link_to_s=resolve_link_to_s,
            from_revision=from_revision,
            commit_position=commit_position,
            prepare_position=prepare_position,
            extra_statistics=extra_statistics,
            message_timeout_ms=message_timeout_ms,
            checkpoint_after_ms=checkpoint_after_ms,
            max_retry_count=max_retry_count,
            min_checkpoint_count=min_checkpoint_count,
            max_checkpoint_count=max_checkpoint_count,
            max_subscriber_count=max_subscriber_count,
            live_buffer_size=live_buffer_size,
            history_buffer_size=history_buffer_size,
            read_batch_size=read_batch_size,
            strategy=strategy,
            filter_options=filter_options,
            **kwargs,
        )
        return result

    def update_persistent_subscription(
        self,
        group_name: Optional[str] = None,
        stream: Optional[str] = None,
        resolve_link_to_s: Optional[bool] = None,
        from_revision: Optional[Union[int, str]] = None,
        commit_position: Optional[int] = None,
        prepare_position: Optional[int] = None,
        extra_statistics: Optional[bool] = None,
        message_timeout_ms: Optional[int] = None,
        checkpoint_after_ms: Optional[int] = None,
        max_retry_count: Optional[int] = None,
        min_checkpoint_count: Optional[int] = None,
        max_checkpoint_count: Optional[int] = None,
        max_subscriber_count: Optional[int] = None,
        live_buffer_size: Optional[int] = None,
        history_buffer_size: Optional[int] = None,
        read_batch_size: Optional[int] = None,
        strategy: Optional[str] = None,
        **kwargs
    ) -> persistent_pb2.UpdateResp:
        """Updates a persistent subscription."""

        stub = persistent_pb2_grpc.PersistentSubscriptionsStub(self.channel)
        result = persistent.update_persistent_subscription(
            stub=stub,
            group_name=group_name,
            stream=stream,
            resolve_link_to_s=resolve_link_to_s,
            from_revision=from_revision,
            commit_position=commit_position,
            prepare_position=prepare_position,
            extra_statistics=extra_statistics,
            message_timeout_ms=message_timeout_ms,
            checkpoint_after_ms=checkpoint_after_ms,
            max_retry_count=max_retry_count,
            min_checkpoint_count=min_checkpoint_count,
            max_checkpoint_count=max_checkpoint_count,
            max_subscriber_count=max_subscriber_count,
            live_buffer_size=live_buffer_size,
            history_buffer_size=history_buffer_size,
            read_batch_size=read_batch_size,
            named_consumer_strategy=strategy,
            **kwargs,
        )
        return result

    def delete_persistent_subscription(
        self,
        group: str,
        stream: Optional[str] = None,
    ) -> persistent_pb2.DeleteResp:
        stub = persistent_pb2_grpc.PersistentSubscriptionsStub(self.channel)
        result = persistent.delete_persistent_subscription(
            stub=stub, stream=stream, group=group
        )
        return result

    def get_info(
        self,
        group_name: str,
        stream_name: Optional[str] = None,
    ) -> persistent_pb2.GetInfoResp:
        """Get info about a persistent subscription.

        Args:
            group_name: a group name to get info about.
            stream_name: the name of the stream, or None if it's some $all persistent
                         subscription.
        """
        stub = persistent_pb2_grpc.PersistentSubscriptionsStub(self.channel)
        result = persistent.get_info(
            stub=stub, group_name=group_name, stream_name=stream_name
        )
        return result

    def replay_parked(
        self,
        group_name: str,
        stream_name: Optional[str] = None,
        stop_at: Optional[int] = None,
    ) -> persistent_pb2.ReplayParkedResp:
        """Replays parked events.

        Args:
            group_name: the group name.
            stream_name: the name of the stream, or None for $all.
            stop_at: the postition at which to stop.
        """
        stub = persistent_pb2_grpc.PersistentSubscriptionsStub(self.channel)
        result = persistent.replay_parked(
            stub=stub, group_name=group_name, stream_name=stream_name, stop_at=stop_at
        )
        return result

    def list_persistent(
        self,
        stream_name: Optional[str] = None,
        list_all: bool = False,
    ) -> persistent_pb2.ListResp:
        """List persistent subscriptions.

        Args:
            stream_name: the name of the stream.
            list_all: whether to list all the persistent subscriptions available.
        """
        stub = persistent_pb2_grpc.PersistentSubscriptionsStub(self.channel)
        results = persistent.list_persistent(
            stub=stub, stream_name=stream_name, list_all=True
        )
        return results

create_persistent_subscription(group_name=None, stream=None, resolve_link_to_s=False, from_revision=END, commit_position=None, prepare_position=None, extra_statistics=False, message_timeout_ms=30000, checkpoint_after_ms=2000, max_retry_count=10, min_checkpoint_count=10, max_checkpoint_count=1000, max_subscriber_count=0, live_buffer_size=500, history_buffer_size=500, read_batch_size=20, strategy=ROUND_ROBIN, filter_options=None, **kwargs)

Creates a new persistent subscription.

Parameters:

Name Type Description Default
group_name str

a group name for the subscription that will be created.

None
stream Optional[str]

the name of the stream. A persistent subscription to the $all stream will be created if this value is left to None.

None
resolve_link_to_s bool

whether or not to resolve events links to actual events.

False
from_revision Union[int, str]

the subscription will start from the revision specified here.

END
extra_statistics bool

whether to track latency statistics on this subscription.

False
message_timeout_ms int

the amount of time after which to consider a message as timed out and retried.

30000
checkpoint_after_ms int

the amount of time to try to checkpoint after.

2000
max_retry_count int

the maximum number of retries (due to timeout) before a message is considered to be parked.

10
min_checkpoint_count int

The minimum number of messages to process before a checkpoint may be written.

10
max_checkpoint_count int

The maximum number of messages not checkpointed before forcing a checkpoint.

1000
max_subscriber_count int

The maximum number of subscribers allowed.

0
live_buffer_size int

the size of the buffer (in-memory) listening to live messages as they happen before pagin occurs.

500
history_buffer_size int

The number of events to cache when paging through history.

500
read_batch_size int

The number of events read at a time when paging through history.

20
strategy str

the strategy that will be used to send events to the subscribers of the same group.

ROUND_ROBIN
filter_options Optional[persistent_pb2.CreateReq.AllOptions.FilterOptions]

an optional FilterOptions instance to use to filter events in the persistent subscription.

None

Returns:

Type Description
persistent_pb2.CreateResp

A persistent_pb.CreateResp

Source code in src/eventstore_grpc/client/persistent.py
def create_persistent_subscription(
    self,
    group_name: str = None,
    stream: Optional[str] = None,
    resolve_link_to_s: bool = False,
    from_revision: Union[int, str] = END,
    commit_position: Optional[int] = None,
    prepare_position: Optional[int] = None,
    extra_statistics: bool = False,
    message_timeout_ms: int = 30000,
    checkpoint_after_ms: int = 2000,
    max_retry_count: int = 10,
    min_checkpoint_count: int = 10,
    max_checkpoint_count: int = 1000,
    max_subscriber_count: int = 0,
    live_buffer_size: int = 500,
    history_buffer_size: int = 500,
    read_batch_size: int = 20,
    strategy: str = ROUND_ROBIN,
    filter_options: Optional[
        persistent_pb2.CreateReq.AllOptions.FilterOptions
    ] = None,
    **kwargs
) -> persistent_pb2.CreateResp:
    """Creates a new persistent subscription.

    Args:
        group_name: a group name for the subscription that will be created.
        stream: the name of the stream. A persistent subscription to the `$all`
                stream will be created if this value is left to `None`.
        resolve_link_to_s: whether or not to resolve events links to actual events.
        from_revision: the subscription will start from the revision specified here.
        extra_statistics: whether to track latency statistics on this subscription.
        message_timeout_ms: the amount of time after which to consider a message as
                            timed out and retried.
        checkpoint_after_ms: the amount of time to try to checkpoint after.
        max_retry_count: the maximum number of retries (due to timeout) before a
                         message is considered to be parked.
        min_checkpoint_count: The minimum number of messages to process before a
                              checkpoint may be written.
        max_checkpoint_count: The maximum number of messages not checkpointed
                              before forcing a checkpoint.
        max_subscriber_count: The maximum number of subscribers allowed.
        live_buffer_size: the size of the buffer (in-memory) listening to live
                          messages as they happen before pagin occurs.
        history_buffer_size: The number of events to cache when paging through
                             history.
        read_batch_size: The number of events read at a time when paging through history.
        strategy: the strategy that will be used to send events to the subscribers
                  of the same group.
        filter_options: an optional FilterOptions instance to use to filter events
                        in the persistent subscription.

    Returns:
        A persistent_pb.CreateResp
    """
    stub = persistent_pb2_grpc.PersistentSubscriptionsStub(self.channel)
    result = persistent.create_persistent_subscription(
        stub=stub,
        stream=stream,
        group_name=group_name,
        resolve_link_to_s=resolve_link_to_s,
        from_revision=from_revision,
        commit_position=commit_position,
        prepare_position=prepare_position,
        extra_statistics=extra_statistics,
        message_timeout_ms=message_timeout_ms,
        checkpoint_after_ms=checkpoint_after_ms,
        max_retry_count=max_retry_count,
        min_checkpoint_count=min_checkpoint_count,
        max_checkpoint_count=max_checkpoint_count,
        max_subscriber_count=max_subscriber_count,
        live_buffer_size=live_buffer_size,
        history_buffer_size=history_buffer_size,
        read_batch_size=read_batch_size,
        strategy=strategy,
        filter_options=filter_options,
        **kwargs,
    )
    return result

get_info(group_name, stream_name=None)

Get info about a persistent subscription.

Parameters:

Name Type Description Default
group_name str

a group name to get info about.

required
stream_name Optional[str]

the name of the stream, or None if it's some $all persistent subscription.

None
Source code in src/eventstore_grpc/client/persistent.py
def get_info(
    self,
    group_name: str,
    stream_name: Optional[str] = None,
) -> persistent_pb2.GetInfoResp:
    """Get info about a persistent subscription.

    Args:
        group_name: a group name to get info about.
        stream_name: the name of the stream, or None if it's some $all persistent
                     subscription.
    """
    stub = persistent_pb2_grpc.PersistentSubscriptionsStub(self.channel)
    result = persistent.get_info(
        stub=stub, group_name=group_name, stream_name=stream_name
    )
    return result

list_persistent(stream_name=None, list_all=False)

List persistent subscriptions.

Parameters:

Name Type Description Default
stream_name Optional[str]

the name of the stream.

None
list_all bool

whether to list all the persistent subscriptions available.

False
Source code in src/eventstore_grpc/client/persistent.py
def list_persistent(
    self,
    stream_name: Optional[str] = None,
    list_all: bool = False,
) -> persistent_pb2.ListResp:
    """List persistent subscriptions.

    Args:
        stream_name: the name of the stream.
        list_all: whether to list all the persistent subscriptions available.
    """
    stub = persistent_pb2_grpc.PersistentSubscriptionsStub(self.channel)
    results = persistent.list_persistent(
        stub=stub, stream_name=stream_name, list_all=True
    )
    return results

replay_parked(group_name, stream_name=None, stop_at=None)

Replays parked events.

Parameters:

Name Type Description Default
group_name str

the group name.

required
stream_name Optional[str]

the name of the stream, or None for $all.

None
stop_at Optional[int]

the postition at which to stop.

None
Source code in src/eventstore_grpc/client/persistent.py
def replay_parked(
    self,
    group_name: str,
    stream_name: Optional[str] = None,
    stop_at: Optional[int] = None,
) -> persistent_pb2.ReplayParkedResp:
    """Replays parked events.

    Args:
        group_name: the group name.
        stream_name: the name of the stream, or None for $all.
        stop_at: the postition at which to stop.
    """
    stub = persistent_pb2_grpc.PersistentSubscriptionsStub(self.channel)
    result = persistent.replay_parked(
        stub=stub, group_name=group_name, stream_name=stream_name, stop_at=stop_at
    )
    return result

update_persistent_subscription(group_name=None, stream=None, resolve_link_to_s=None, from_revision=None, commit_position=None, prepare_position=None, extra_statistics=None, message_timeout_ms=None, checkpoint_after_ms=None, max_retry_count=None, min_checkpoint_count=None, max_checkpoint_count=None, max_subscriber_count=None, live_buffer_size=None, history_buffer_size=None, read_batch_size=None, strategy=None, **kwargs)

Updates a persistent subscription.

Source code in src/eventstore_grpc/client/persistent.py
def update_persistent_subscription(
    self,
    group_name: Optional[str] = None,
    stream: Optional[str] = None,
    resolve_link_to_s: Optional[bool] = None,
    from_revision: Optional[Union[int, str]] = None,
    commit_position: Optional[int] = None,
    prepare_position: Optional[int] = None,
    extra_statistics: Optional[bool] = None,
    message_timeout_ms: Optional[int] = None,
    checkpoint_after_ms: Optional[int] = None,
    max_retry_count: Optional[int] = None,
    min_checkpoint_count: Optional[int] = None,
    max_checkpoint_count: Optional[int] = None,
    max_subscriber_count: Optional[int] = None,
    live_buffer_size: Optional[int] = None,
    history_buffer_size: Optional[int] = None,
    read_batch_size: Optional[int] = None,
    strategy: Optional[str] = None,
    **kwargs
) -> persistent_pb2.UpdateResp:
    """Updates a persistent subscription."""

    stub = persistent_pb2_grpc.PersistentSubscriptionsStub(self.channel)
    result = persistent.update_persistent_subscription(
        stub=stub,
        group_name=group_name,
        stream=stream,
        resolve_link_to_s=resolve_link_to_s,
        from_revision=from_revision,
        commit_position=commit_position,
        prepare_position=prepare_position,
        extra_statistics=extra_statistics,
        message_timeout_ms=message_timeout_ms,
        checkpoint_after_ms=checkpoint_after_ms,
        max_retry_count=max_retry_count,
        min_checkpoint_count=min_checkpoint_count,
        max_checkpoint_count=max_checkpoint_count,
        max_subscriber_count=max_subscriber_count,
        live_buffer_size=live_buffer_size,
        history_buffer_size=history_buffer_size,
        read_batch_size=read_batch_size,
        named_consumer_strategy=strategy,
        **kwargs,
    )
    return result