Reads events from an Event Stream.
The simplest way to read a stream forwards is to supply a stream name, direction
and revision to start from. This can either be a stream position or an unsigned
64 big integer. This will return an iterable yielding events from the stream.
There are a number of additional arguments you can provide when reading a stream:
max_count: passing in the max count allows you to limit the number of events
that returned.
resolve_link_to_pos: when using projections to create new events you can set
whether the generated events are pointers to existing events. Setting this
value to true will tell EventStoreDB to returne the event as well as the event
linking to it.
configure_operation_options: this argument is generic setting class for all
operations that can be set on all operations executed against EventStoreDB. (??)
user_credentials: the credentials used to read the data can be supplied. To be
used by the subscription as follows. This will override the default
credentials set on the connection.
Reading from a revision.
As well as providing a StreamPosition you can also provide a stream revision
in the form of an unsigned 64 big integer.
Reading backwards.
As well as being able to read a stream forwards, you can also go backwards. When
reading backwards is the stream position will have to be set to the end if you
want to read all the events.
Tip: You can use reading backwards to find the last position in the stream. Just
read backwards one event and get the position.
Checking for stream presence.
Reading a stream returns a ReadStreamResult containing a ReadState. This property
can have the value StreamNotFound and Ok. It is important to check the value of
this field before attempting to iterate an empty stream as it will throw an
exception.
Source code in src/eventstore_grpc/streams/read.py
| def read_from_stream(
stub: streams_pb2_grpc.StreamsStub,
stream: str,
count: int | None,
options: Dict, # TODO: use from_revision as a parameter.
**kwargs,
) -> Iterator[streams_pb2.ReadResp]:
"""Reads events from an Event Stream.
The simplest way to read a stream forwards is to supply a stream name, direction
and revision to start from. This can either be a stream position or an unsigned
64 big integer. This will return an iterable yielding events from the stream.
There are a number of additional arguments you can provide when reading a stream:
* `max_count`: passing in the max count allows you to limit the number of events
that returned.
* `resolve_link_to_pos`: when using projections to create new events you can set
whether the generated events are pointers to existing events. Setting this
value to true will tell EventStoreDB to returne the event as well as the event
linking to it.
* `configure_operation_options`: this argument is generic setting class for all
operations that can be set on all operations executed against EventStoreDB. (??)
* `user_credentials`: the credentials used to read the data can be supplied. To be
used by the subscription as follows. This will override the default
credentials set on the connection.
### Reading from a revision.
As well as providing a `StreamPosition` you can also provide a stream revision
in the form of an unsigned 64 big integer.
### Reading backwards.
As well as being able to read a stream forwards, you can also go backwards. When
reading backwards is the stream position will have to be set to the end if you
want to read all the events.
> Tip: You can use reading backwards to find the last position in the stream. Just
> read backwards one event and get the position.
### Checking for stream presence.
Reading a stream returns a ReadStreamResult containing a ReadState. This property
can have the value StreamNotFound and Ok. It is important to check the value of
this field before attempting to iterate an empty stream as it will throw an
exception.
"""
request = streams_pb2.ReadReq()
req_options = streams_pb2.ReadReq.Options()
identifier = shared_pb2.StreamIdentifier()
identifier.stream_name = stream.encode()
uuid_option = streams_pb2.ReadReq.Options.UUIDOption()
uuid_option.string.CopyFrom(shared_pb2.Empty())
stream_options = streams_pb2.ReadReq.Options.StreamOptions()
stream_options.stream_identifier.CopyFrom(identifier)
from_revision = options.get("from_revision")
if from_revision == constants.START:
stream_options.start.CopyFrom(shared_pb2.Empty())
elif from_revision == constants.END:
stream_options.end.CopyFrom(shared_pb2.Empty())
elif isinstance(from_revision, int):
stream_options.revision = from_revision
req_options.stream.CopyFrom(stream_options)
req_options.uuid_option.CopyFrom(uuid_option)
resolve_links = options.get("resolve_link_to_s", False)
req_options.resolve_links = resolve_links
req_options.count = count or sys.maxsize
req_options.no_filter.CopyFrom(shared_pb2.Empty())
default_direction = "backwards" if from_revision == constants.END else "forwards"
direction = options.get("direction", default_direction)
if direction.lower() == "forwards":
req_options.read_direction = streams_pb2.ReadReq.Options.ReadDirection.Forwards
elif direction.lower() == "backwards":
req_options.read_direction = streams_pb2.ReadReq.Options.ReadDirection.Backwards
request.options.CopyFrom(req_options)
response = stub.Read(request, **kwargs)
return response
|