classRequestsStream:"""RequestStream."""_lock=threading.RLock()def__init__(self,handler:Optional[Callable]=None,queue:Optional[Iterable]=None,persistent:bool=False,timeout:int=5,):self._handle_task=handlerself._tasks:q.Queue=q.Queue()self._queue:q.Queue=q.Queue()ifqueueisnotNone:forrequestinqueue:self._queue.put(request)self._results:list=[]self._persistent=persistentself._timeout=timeoutself._stop=threading.Event()def__iter__(self):returnself# pragma: nocoverdef__next__(self):next_element=Nonewhilenext_elementisNone:try:next_element=self._queue.get(block=True,timeout=self._timeout)exceptException:ifself._stop.is_set():log.info("\033[38;5;196mStopping requests streaming.\033[0m")raiseStopIterationself._queue.task_done()log.debug(f"\033[38;5;45m---- to server --->\033[0m")log.debug(f"\033[38;5;45m{next_element}\033[0m")log.debug("\033[38;5;45m<-------------------\033[0m")returnnext_elementdefhandle_task(self,task):log.debug("\033[38;5;79m<---- from server --\033[0m")log.debug(f"\033[38;5;79m{task}\033[0m")log.debug("\033[38;5;79m------------------->\033[0m")ifself._handle_taskisnotNone:withRequestsStream._lock:try:result=self._handle_task(task)exceptExceptionaserr:log.error(err)ifself._persistent:iftask.HasField("event"):self._queue.put(persistent.nack_request(task))result=Noneifself._persistent:iftask.HasField("event"):ack_request=persistent.ack_request(task)self._queue.put(ack_request)self._results.append(result)defstop(self):self._stop.set()defcollect(self,task):"""Adds a task to the tasks queue."""iftaskisnotNone:self._tasks.put(task)returnselfdefupdate(self,task):ifself._stop.is_set():returnselfself.handle_task(task)# self.collect(task)# if not self._tasks.empty():# task = self._tasks.get(lock=False)# self.handle_task(task)# self._tasks.task_done()returnself
collect(task)
Adds a task to the tasks queue.
Source code in src/eventstore_grpc/subscriptions/requests_stream.py