List persistent subscriptions.
list_persistent(stub, stream_name=None, list_all=False, **kwargs)
Lists persistent subscriptions.
Source code in src/eventstore_grpc/persistent/list.py
| def list_persistent(
stub: persistent_pb2_grpc.PersistentSubscriptionsStub,
stream_name: Optional[str] = None,
list_all: bool = False,
**kwargs,
) -> persistent_pb2.ReplayParkedResp:
"""Lists persistent subscriptions."""
options = persistent_pb2.ListReq.Options()
if not list_all:
stream_option = persistent_pb2.ListReq.StreamOption()
if stream_name is not None:
stream_option.stream.CopyFrom(
shared_pb2.StreamIdentifier(stream_name=stream_name.encode())
)
else:
stream_option.all.CopyFrom(shared_pb2.Empty())
options.list_for_stream.CopyFrom()
else:
options.list_all_subscriptions.CopyFrom(shared_pb2.Empty())
request = persistent_pb2.ListReq(options=options)
response = stub.ReplayParked(request, **kwargs)
return response
|