Source code for daisy.task

from .client import Client
from inspect import signature


[docs]class Task: '''Definition of a ``daisy`` task that is to be run in a block-wise fashion. Args: name (``string``): The unique name of the task. total_roi (`class:daisy.Roi`): The region of interest (ROI) of the complete volume to process. read_roi (`class:daisy.Roi`): The ROI every block needs to read data from. Will be shifted over the ``total_roi`` to cover the whole volume. write_roi (`class:daisy.Roi`): The ROI every block writes data from. Will be shifted over the ``total_roi`` to cover the whole volume. process_function (function): A function that will be called as:: process_function(block) with ``block`` being the shifted read and write ROI for each location in the volume. If ``read_write_conflict`` is ``True`, the callee can assume that there are no read/write concurencies, i.e., at any given point in time the ``read_roi`` does not overlap with the ``write_roi`` of another process. check_function (function, optional): A function that will be called as:: check_function(block) This function should return ``True`` if the block was completed. This is used internally to avoid processing blocks that are already done and to check if a block was correctly processed. If a tuple of two functions is given, the first one will be called to check if the block needs to be run, and if so, the second one will be called after it was run to check if the run succeeded. init_callback_fn (function, optional): A function that Daisy will call once when the task is started. It will be called as:: init_callback_fn(context) Where `context` is the `daisy.Context` string that can be used by the daisy clients to connect to the server. read_write_conflict (``bool``, optional): Whether the read and write ROIs are conflicting, i.e., accessing the same resource. If set to ``False``, all blocks can run at the same time in parallel. In this case, providing a ``read_roi`` is simply a means of convenience to ensure no out-of-bound accesses and to avoid re-computation of it in each block. fit (``string``, optional): How to handle cases where shifting blocks by the size of ``write_roi`` does not tile the ``total_roi``. Possible options are: "valid": Skip blocks that would lie outside of ``total_roi``. This is the default:: |---------------------------| total ROI |rrrr|wwwwww|rrrr| block 1 |rrrr|wwwwww|rrrr| block 2 no further block "overhang": Add all blocks that overlap with ``total_roi``, even if they leave it. Client code has to take care of save access beyond ``total_roi`` in this case.:: |---------------------------| total ROI |rrrr|wwwwww|rrrr| block 1 |rrrr|wwwwww|rrrr| block 2 |rrrr|wwwwww|rrrr| block 3 (overhanging) "shrink": Like "overhang", but shrink the boundary blocks' read and write ROIs such that they are guaranteed to lie within ``total_roi``. The shrinking will preserve the context, i.e., the difference between the read ROI and write ROI stays the same.:: |---------------------------| total ROI |rrrr|wwwwww|rrrr| block 1 |rrrr|wwwwww|rrrr| block 2 |rrrr|www|rrrr| block 3 (shrunk) num_workers (int, optional): The number of parallel processes to run. max_retries (int, optional): The maximum number of times a task will be retried if failed (either due to failed post check or application crashes or network failure) timeout (int, optional): Time in seconds to wait for a block to be returned from a worker. The worker is killed (and the block retried) if this time is exceeded. ''' def __init__( self, task_id, total_roi, read_roi, write_roi, process_function, check_function=None, init_callback_fn=None, read_write_conflict=True, num_workers=1, max_retries=2, fit="valid", timeout=None, upstream_tasks=None, ): self.task_id = task_id self.total_roi = total_roi self.orig_total_roi = total_roi self.read_roi = read_roi self.write_roi = write_roi self.total_write_roi = self.total_roi.grow( -(write_roi.begin - read_roi.begin), -(read_roi.end - write_roi.end), ) self.process_function = process_function self.check_function = check_function self.read_write_conflict = read_write_conflict self.fit = fit self.num_workers = num_workers self.max_retries = max_retries self.timeout = timeout self.upstream_tasks = [] if upstream_tasks is not None: self.upstream_tasks.extend(upstream_tasks) if init_callback_fn is not None: self.init_callback_fn = init_callback_fn else: self.init_callback_fn = lambda context: None if len(signature(process_function).parameters) == 0: self.spawn_worker_function = process_function else: self.spawn_worker_function = lambda: self._process_blocks() def _process_blocks(self): client = Client() while True: with client.acquire_block() as block: if block is None: break self.process_function(block) def requires(self): return self.upstream_tasks