Source code for eotransform.streamed_process

from concurrent.futures import ThreadPoolExecutor
from queue import Queue, Empty, Full
from threading import Event
from typing import TypeVar, Iterable, Iterator

from eotransform.protocol.sink import Sink
from eotransform.protocol.transformer import Transformer

StreamedInT = TypeVar('StreamedInT')
StreamedOutT = TypeVar('StreamedOutT')
ReturnPipeT = TypeVar('ReturnPipeT')
InSource = Iterable[StreamedInT]
ProcessTransformer = Transformer[StreamedInT, StreamedOutT]
OutSink = Sink[StreamedOutT]


[docs]class ReturnPipeIterator(Iterator[ReturnPipeT]): def __init__(self, pipe: Queue, timeout: float): self._pipe = pipe self._timeout = timeout def __next__(self) -> ReturnPipeT: while True: elem = self._pipe.get(timeout=self._timeout) self._pipe.task_done() if elem is StopIteration: raise elem return elem
[docs]class ReturnPipeStream(Iterable[ReturnPipeT]): def __init__(self, pipe: Queue, timeout: float): self._pipe = pipe self._timeout = timeout def __iter__(self) -> Iterator[ReturnPipeT]: return ReturnPipeIterator(self._pipe, self._timeout)
def _producer(source: InSource, pipe: Queue, cancel_event: Event) -> None: for elem in source: if cancel_event.is_set(): return else: pipe.put(elem) if not cancel_event.is_set(): pipe.put(StopIteration) def _consumer(sink: OutSink, pipe: Queue) -> None: for elem in ReturnPipeStream(pipe, 9999): sink(elem)
[docs]def streamed_process(source: InSource, process: ProcessTransformer, dst_sink: OutSink, executor: ThreadPoolExecutor, loading_timeout: float = 120, storing_timeout: float = 120) -> None: """ Interleaves the data loading from source with the data processing, and finally putting it into the sink. This can be used to hide IO processed i.e.: thread_0: IN->IN->IN thread_1: ->PR->PR->PR thread_2: ->SK->SK->SK :param source: Input data fed to the process transformation :param process: Transformation changing the input data to the output data :param dst_sink: Sink receiving the output data :param executor: Thread pool executor used to spawn async threads for IO hiding :param loading_timeout: Timeout for the loading process in seconds (default: 120) :param storing_timeout: Timout for the storing process in seconds (default: 120) """ source_pipe = Queue(maxsize=1) sink_pipe = Queue(maxsize=1) cancel_production_event = Event() producer = executor.submit(_producer, source, source_pipe, cancel_production_event) consumer = executor.submit(_consumer, dst_sink, sink_pipe) try: for x in ReturnPipeStream(source_pipe, loading_timeout): sink_pipe.put(process(x), timeout=storing_timeout) except Empty: ... finally: cancel_production_event.set() try: sink_pipe.put(StopIteration, timeout=storing_timeout) except Full: ... finally: for _ in range(source_pipe.unfinished_tasks): source_pipe.get_nowait() source_pipe.task_done() producer.result(timeout=loading_timeout) consumer.result(timeout=storing_timeout) _unsafe_pipe_clear(source_pipe) source_pipe.join() sink_pipe.join()
def _unsafe_pipe_clear(pipe: Queue): pipe.queue.clear() for _ in range(pipe.unfinished_tasks): pipe.task_done()