Skip to content

Create

Create persistent subscription.

create_persistent_subscription(stub, group_name, stream=None, resolve_link_to_s=False, from_revision=START, commit_position=None, prepare_position=None, extra_statistics=False, message_timeout_ms=30000, checkpoint_after_ms=2000, max_retry_count=10, min_checkpoint_count=10, max_checkpoint_count=1000, max_subscriber_count=0, live_buffer_size=500, history_buffer_size=500, read_batch_size=20, strategy=ROUND_ROBIN, filter_options=None, **kwargs)

Creates a persistent subscription.

Source code in src/eventstore_grpc/persistent/create.py
def create_persistent_subscription(
    stub: persistent_pb2_grpc.PersistentSubscriptionsStub,
    group_name: str | None,
    stream: str | None = None,
    resolve_link_to_s: bool = False,
    from_revision: Union[int, str] = START,
    commit_position: Optional[int] = None,
    prepare_position: Optional[int] = None,
    extra_statistics: bool = False,
    message_timeout_ms: int = 30000,
    checkpoint_after_ms: int = 2000,
    max_retry_count: int = 10,
    min_checkpoint_count: int = 10,
    max_checkpoint_count: int = 1000,
    max_subscriber_count: int = 0,
    live_buffer_size: int = 500,
    history_buffer_size: int = 500,
    read_batch_size: int = 20,
    strategy: str = ROUND_ROBIN,
    filter_options: Optional[persistent_pb2.CreateReq.AllOptions.FilterOptions] = None,
    **kwargs,
) -> persistent_pb2.CreateResp:
    """Creates a persistent subscription."""
    if history_buffer_size is None:
        # TODO: talk with EventStoreDB about this behavior?
        log.warning(
            f"If you don't send `history_buffer_size` EventStoreDB will hang indefinitely: forcing it to its default value -> 500"
        )
        history_buffer_size = 500

    settings = _build_settings(
        resolve_link_to_s=resolve_link_to_s,
        extra_statistics=extra_statistics,
        max_retry_count=max_retry_count,
        min_checkpoint_count=min_checkpoint_count,
        max_checkpoint_count=max_checkpoint_count,
        max_subscriber_count=max_subscriber_count,
        live_buffer_size=live_buffer_size,
        read_batch_size=read_batch_size,
        history_buffer_size=history_buffer_size,
        message_timeout_ms=message_timeout_ms,
        checkpoint_after_ms=checkpoint_after_ms,
        strategy=strategy,
    )
    options = persistent_pb2.CreateReq.Options(settings=settings, group_name=group_name)
    if stream is None:  # $all
        options.all.CopyFrom(
            _build_options_all(
                commit_position=commit_position,
                prepare_position=prepare_position,
                from_revision=from_revision,
                filter_options=filter_options,
            )
        )
    else:
        options.stream.CopyFrom(
            _build_options_stream(stream=stream, from_revision=from_revision)
        )
    request = persistent_pb2.CreateReq(options=options)
    response = stub.Create(request, **kwargs)
    return response