Source code for dammit.parallel

# Copyright (C) 2015-2018 Camille Scott
# All rights reserved.
#
# This software may be modified and distributed under the terms
# of the BSD license.  See the LICENSE file for details.

import os
import subprocess

from dammit.utils import which, doit_task
from dammit.tasks.utils import InstallationError


[docs]def check_parallel(logger=None): parallel = which('parallel') if parallel is None: raise InstallationError('parallel not found.') else: try: version_string = subprocess.check_output(['parallel', '--version']) except subprocess.CalledProcessError as e: raise InstallationError('Error checking parallel '\ 'version: [{0}] {1}'.format(e.returncode, e.output)) except OSError as e: raise InstallationError('Error checking parallel version: '\ '[{0}] {1}'.format(e.errno, str(e))) else: version = version_string.strip().split()[2] if logger: logger.debug('parallel version:{0}'.format(version)) if int(version) < 20150000: raise InstallationError('parallel version {0} < 20150000, '\ 'please update'.format(version)) if logger: logger.debug('parallel:' + parallel) return parallel
[docs]def parallel_fasta(input_filename, output_filename, command, n_jobs, sshloginfile=None, check_dep=True, logger=None): '''Given an input FASTA source, target, shell command, and number of jobs, construct a gnu-parallel command to act on the sequences. Args: input_filename (str): The source FASTA. output_filename (str): The target. command (list): The shell command (in subprocess format). n_jobs (int): Number of cores or nodes to split to. sshloginfile (str): Path to file with node addresses. check_dep (bool): If True, check for the gnu-parallel executable. logger (logging.Logger): A logger to use. Returns: str: The constructed shell command. ''' exc = which('parallel') if not check_dep else check_parallel(logger=logger) cmd = ['cat', input_filename, '|', exc, '--round-robin', '--pipe', '-L', 2, '-N', 10000, '--gnu'] if sshloginfile is not None: cmd.extend(['--sshloginfile', sshloginfile, '--workdir $PWD']) else: cmd.extend(['-j', n_jobs]) cmd.extend(['-a', input_filename]) if isinstance(command, list): command = ' '.join(command) cmd.extend([command, '>', output_filename]) return ' '.join(map(str, cmd))