def create_persistent_subscription(
stub: persistent_pb2_grpc.PersistentSubscriptionsStub,
group_name: str | None,
stream: str | None = None,
resolve_link_to_s: bool = False,
from_revision: Union[int, str] = START,
commit_position: Optional[int] = None,
prepare_position: Optional[int] = None,
extra_statistics: bool = False,
message_timeout_ms: int = 30000,
checkpoint_after_ms: int = 2000,
max_retry_count: int = 10,
min_checkpoint_count: int = 10,
max_checkpoint_count: int = 1000,
max_subscriber_count: int = 0,
live_buffer_size: int = 500,
history_buffer_size: int = 500,
read_batch_size: int = 20,
strategy: str = ROUND_ROBIN,
filter_options: Optional[persistent_pb2.CreateReq.AllOptions.FilterOptions] = None,
**kwargs,
) -> persistent_pb2.CreateResp:
"""Creates a persistent subscription."""
if history_buffer_size is None:
# TODO: talk with EventStoreDB about this behavior?
log.warning(
f"If you don't send `history_buffer_size` EventStoreDB will hang indefinitely: forcing it to its default value -> 500"
)
history_buffer_size = 500
settings = _build_settings(
resolve_link_to_s=resolve_link_to_s,
extra_statistics=extra_statistics,
max_retry_count=max_retry_count,
min_checkpoint_count=min_checkpoint_count,
max_checkpoint_count=max_checkpoint_count,
max_subscriber_count=max_subscriber_count,
live_buffer_size=live_buffer_size,
read_batch_size=read_batch_size,
history_buffer_size=history_buffer_size,
message_timeout_ms=message_timeout_ms,
checkpoint_after_ms=checkpoint_after_ms,
strategy=strategy,
)
options = persistent_pb2.CreateReq.Options(settings=settings, group_name=group_name)
if stream is None: # $all
options.all.CopyFrom(
_build_options_all(
commit_position=commit_position,
prepare_position=prepare_position,
from_revision=from_revision,
filter_options=filter_options,
)
)
else:
options.stream.CopyFrom(
_build_options_stream(stream=stream, from_revision=from_revision)
)
request = persistent_pb2.CreateReq(options=options)
response = stub.Create(request, **kwargs)
return response