Bases: threading.Thread
A Subscription Object
This object spawns a new thread and runs a custom callback against each of the
elements yielded from the stream iterator.
Attributes:
| Name |
Type |
Description |
_lock |
|
a threading.RLock used to sync.
|
Source code in src/eventstore_grpc/subscriptions/subscription.py
| class Subscription(threading.Thread):
"""A Subscription Object
This object spawns a new thread and runs a custom callback against each of the
elements yielded from the stream iterator.
Attributes:
_lock: a threading.RLock used to sync.
"""
_lock = threading.RLock()
def __init__(
self,
requests_stream: RequestsStream,
stub=None,
manager=None,
name=None,
**kwargs,
):
"""Initializes the subscription."""
threading.Thread.__init__(self, name=name)
self._requests_stream = requests_stream
self._responses_stream = None
self._stub = stub
self._unsubscribed = threading.Event()
self._manager = manager
self.call_options = kwargs
@property
def grpc_request(self):
if not isinstance(self._stub, persistent_pb2_grpc.PersistentSubscriptionsStub):
grpc_request = next(self._requests_stream)
else:
grpc_request = self._requests_stream
return grpc_request
def revoke(self):
log.debug(f"\033[38;5;190mRevoking subscribtion to {self.name}.\033[0m")
log.debug("\033[38;5;190mStopping stream.\033[0m")
self._requests_stream.stop()
self._unsubscribed.set()
log.debug("\033[38;5;190mSet unsubscribed event.\033[0m")
if self._responses_stream is not None:
log.debug("\033[38;5;190mCancelling grpc request.\033[0m")
self._responses_stream.cancel()
log.debug("\033[38;5;190mGRPC request cancelled.\033[0m")
return self
@property
def revoked(self):
return self._unsubscribed.is_set()
@property
def results(self):
return self._requests_stream._results
def run(self, *args, **kwargs):
"""Runs the thread activity."""
log.debug(f"{self.name:^10} activity started.")
req = self.grpc_request
self._responses_stream = self._stub.Read(req, **self.call_options)
try:
for response in self._responses_stream:
if self.revoked:
return self.results
self._requests_stream.update(response)
except grpc.RpcError as err:
if err.code() == grpc.StatusCode.CANCELLED:
log.debug(
f"\033[38;5;196mGRPC request cancelled for {self.name}\033[0m"
)
elif err.code() == grpc.StatusCode.UNKNOWN:
log.debug(
f"\033[38;5;209m[REVOKING SUBSCRIPTION {self.name}]: {err.code()} - {err.details()}\033[0m"
)
log.debug(f"\033[38;5;209m{err}\033[0m")
self.revoke()
if self.name in self._manager._registry:
del self._manager._registry[self.name]
return self.results
else:
raise err # pragma: nocover
return self.results
|
__init__(requests_stream, stub=None, manager=None, name=None, **kwargs)
Initializes the subscription.
Source code in src/eventstore_grpc/subscriptions/subscription.py
| def __init__(
self,
requests_stream: RequestsStream,
stub=None,
manager=None,
name=None,
**kwargs,
):
"""Initializes the subscription."""
threading.Thread.__init__(self, name=name)
self._requests_stream = requests_stream
self._responses_stream = None
self._stub = stub
self._unsubscribed = threading.Event()
self._manager = manager
self.call_options = kwargs
|
run(*args, **kwargs)
Runs the thread activity.
Source code in src/eventstore_grpc/subscriptions/subscription.py
| def run(self, *args, **kwargs):
"""Runs the thread activity."""
log.debug(f"{self.name:^10} activity started.")
req = self.grpc_request
self._responses_stream = self._stub.Read(req, **self.call_options)
try:
for response in self._responses_stream:
if self.revoked:
return self.results
self._requests_stream.update(response)
except grpc.RpcError as err:
if err.code() == grpc.StatusCode.CANCELLED:
log.debug(
f"\033[38;5;196mGRPC request cancelled for {self.name}\033[0m"
)
elif err.code() == grpc.StatusCode.UNKNOWN:
log.debug(
f"\033[38;5;209m[REVOKING SUBSCRIPTION {self.name}]: {err.code()} - {err.details()}\033[0m"
)
log.debug(f"\033[38;5;209m{err}\033[0m")
self.revoke()
if self.name in self._manager._registry:
del self._manager._registry[self.name]
return self.results
else:
raise err # pragma: nocover
return self.results
|