Source code for daisy.convenience

from .cl_monitor import CLMonitor
from .server import Server
from .tcp import IOLooper
from multiprocessing.pool import ThreadPool
from multiprocessing import Event

[docs]def run_blockwise(tasks): '''Schedule and run the given tasks. Args: list_of_tasks: The tasks to schedule over. Return: bool: `True` if all blocks in the given `tasks` were successfully run, else `False` ''' task_ids = set() all_tasks = [] while len(tasks) > 0: task, tasks = tasks[0], tasks[1:] if task.task_id not in task_ids: task_ids.add(task.task_id) all_tasks.append(task) tasks.extend(task.upstream_tasks) tasks = all_tasks stop_event = Event() IOLooper.clear() pool = ThreadPool(processes=1) result = pool.apply_async(_run_blockwise, args=(tasks, stop_event)) try: return result.get() except KeyboardInterrupt: stop_event.set() return result.get()
def _run_blockwise(tasks, stop_event): server = Server(stop_event=stop_event) cl_monitor = CLMonitor(server) # noqa return server.run_blockwise(tasks)