Source code for dammit.handler

# Copyright (C) 2015-2018 Camille Scott
# All rights reserved.
# This software may be modified and distributed under the terms
# of the BSD license.  See the LICENSE file for details.

from collections import OrderedDict
from os import mkdir, path

from doit.cmd_base import TaskLoader
from doit.doit_cmd import DoitMain
from doit.dependency import Dependency, SqliteDB

from dammit.profile import StartProfiler
from dammit.utils import Move
from dammit import ui

[docs]class TaskHandler(TaskLoader): def __init__(self, directory, logger, files=None, profile=False, db=None, n_threads=1, **doit_config_kwds): '''Stores tasks and the files they operate on, along with doit config and other metadata. This is the core of the pipelines: it passes its tasks along to doit for execution, and can check task and pipeline completion status. Args: directory (str): The directory in which to run the tasks. Will be created it it doesn't exist. logger (logging.Logger): Logger to record to. files (dict): Files used by the handler. Starts empty if omitted. profile (bool): If True, profile task execution. db (str): Name of the doit database. **doit_config_kwds: Keyword arguments passed to doit. Attributes: files (dict): Files used by the tasks. directory (str): Working directory for execution. tasks (OrderedDict): The tasks to execute. dep_file (str): Path of the doit database. doit_config (dict): The doit configuration given to the task runner. doit_dep_mgr (doit.dependency.Dependency): Doit object to track task status. profile (bool): Whether to run the profiler on tasks. logger (logging.Logger): Logger to use. ''' super(TaskHandler, self).__init__() if files is None: self.files = {} elif type(files) is not dict: raise TypeError('files must be of type dict') else: self.files = files self.tasks = OrderedDict() = directory try: mkdir(directory) except OSError: pass if db is None: dep_file = path.join(, 'doit.db') else: dep_file = path.join(, '{0}.doit.db'.format(db)) self.dep_file = dep_file logger.debug('Dependency Database File: {0}'.format(dep_file)) self.doit_config = dict(dep_file=self.dep_file, reporter=ui.GithubMarkdownReporter, **doit_config_kwds) self.doit_dep_mgr = Dependency(SqliteDB, dep_file) self.n_threads = n_threads self.profile = profile self.logger = logger
[docs] def register_task(self, name, task, files=None): '''Register a new task and its files with the handler. It may seem redundant or confusing to give the tasks a name different than their internal doit name. I do this because doit tasks need to have names as unique as possible, so that they can be reused in different projects. A particular TaskHandler instance is only used for one pipeline run, and allowing different names makes it easier to reference tasks from elsewhere. Args: name (str): Name of the task. Does not have to correspond to doit's internal task name. task (:obj:): Either a dictionary or Task object. files (dict): Dictionary of files used. ''' if files is None: files = {} if type(files) is not dict: raise TypeError('files must be of type dict') self.tasks[name] = task self.files.update(files) self.logger.debug('registered task {0}: {1}\n' ' with files {2}'.format(name, task, files))
[docs] def clear_tasks(self): '''Empty the task dictionary.''' self.logger.debug('Clearing {0} tasks'.format(len(self.tasks))) self.tasks = {}
[docs] def get_status(self, task, move=False): '''Get the up-to-date status of a single task. Args: task (str): The task name to look up. move (bool): If True, move to the handler's directory before checking. Whether this is necessary depends mostly on whether the task uses relative or absolute paths. Returns: str: The string represenation of the status. Either "run" or "uptodate". ''' if type(task) is str: try: task = self.tasks[task] except KeyError: self.logger.error('Task not found:{0}'.format(task)) raise self.logger.debug('Getting status for task {0}'.format( if move: with Move( status = self.doit_dep_mgr.get_status(task, self.tasks.values(), get_log=True) else: status = self.doit_dep_mgr.get_status(task, self.tasks.values(), get_log=True) self.logger.debug('Task {0} had status {1}'.format(task, status.status)) try: self.logger.debug('Task {0} had reasons {1}'.format(task, status.reasons)) except AttributeError: pass return status.status
[docs] def print_statuses(self, uptodate_msg='All tasks up-to-date!', outofdate_msg='Some tasks out of date!'): '''Print the up-to-date status of all tasks. Args: uptodate_msg (str): The message to print if all tasks are up to date. Returns: tuple: A bool (True if all up to date) and a dictionary of statuses. ''' uptodate, statuses = self.check_uptodate() if uptodate: print(ui.paragraph(uptodate_msg)) else: print(ui.paragraph(outofdate_msg)) uptodate_list = [t for t,s in statuses.items() if s is True] outofdate_list = [t for t,s in statuses.items() if s is False] if uptodate_list: print('\nUp-to-date tasks:') print(ui.listing(uptodate_list)) if outofdate_list: print('\nOut-of-date tasks:') print(ui.listing(outofdate_list)) return uptodate, statuses
[docs] def check_uptodate(self): '''Check if all tasks are up-to-date, ie if the pipeline is complete. Note that this moves to the handler's directory to lessen issues with relative versus absolute paths. Returns: bool: True if all are up to date. ''' with Move( statuses = {} outofdate = False for task_name, task in self.tasks.items(): status = self.get_status(task) statuses[task_name] = status == 'up-to-date' return all(statuses.values()), statuses
[docs] def load_tasks(self, cmd, opt_values, pos_args): '''Internal to doit -- triggered by the TaskLoader.''' self.logger.debug('loading {0} tasks'.format(len(self.tasks))) return self.tasks.values(), self.doit_config
[docs] def run(self, doit_args=None, verbose=True): '''Run the pipeline. Movees to the directory, loads the tasks into doit, and executes that tasks that are not up-to-date. Args: doit_args (list): Args that would be passed to the doit shell command. By default, just run. verbose (bool): If True, print UI stuff. Returns: int: Exit status of the doit command. ''' if verbose: print(ui.header('Run Tasks', level=4)) if doit_args is None: doit_args = ['run'] if self.n_threads > 1: doit_args.extend(['-n', str(self.n_threads)]) runner = DoitMain(self) with Move( if self.profile is True: profile_fn = path.join(, 'profile.csv') with StartProfiler(filename=profile_fn): return else: return