Package src :: Module pyflow :: Class WorkflowRunner
[hide private]
[frames] | no frames]

Class WorkflowRunner

source code


This object is designed to be inherited by a class in client code. This inheriting class can override the workflow() method to define the tasks that need to be run and their dependencies.

The inheriting class defining a workflow can be executed in client code by calling the WorkflowRunner.run() method. This method provides various run options such as whether to run locally or on sge.

Nested Classes [hide private]
  _AbortWorkflowException
Instance Methods [hide private]
 
run(self, mode='local', dataDirRoot='.', isContinue=False, isForceContinue=False, nCores=None, memMb=None, isDryRun=False, retryMax=2, retryWait=90, retryWindow=360, retryMode='nonlocal', mailTo=None, updateInterval=60, schedulerArgList=None, isQuiet=False, warningLogFile=None, errorLogFile=None, successMsg=None, startFromTasks=None, ignoreTasksAfter=None, resetTasks=None)
Call this method to execute the workflow() method overridden in a child class and specify the resources available for the workflow to run.
source code
 
addTask(self, label, command=None, cwd=None, env=None, nCores=1, memMb=2048, dependencies=None, priority=0, isForceLocal=False, isCommandMakePath=False, isTaskStable=True, mutex=None, retryMax=None, retryWait=None, retryWindow=None, retryMode=None)
Add task to workflow, including resource requirements and specification of dependencies.
source code
 
addWorkflowTask(self, label, workflowRunnerInstance, dependencies=None)
Add another WorkflowRunner instance as a task to this workflow.
source code
 
waitForTasks(self, labels=None)
Wait for a list of tasks to complete.
source code
 
isTaskComplete(self, taskLabel)
Query if a specific task is in the workflow and completed without error.
source code
 
isTaskDone(self, taskLabel)
Query if a specific task is in the workflow and is done, with or without error
source code
 
cancelTaskTree(self, taskLabel)
Cancel the given task and all of its dependencies.
source code
 
getRunMode(self)
Get the current run mode
source code
Integer value or 'unlimited'
getNCores(self)
Get the current run core limit
source code
 
limitNCores(self, nCores)
Takes an task nCores argument and reduces it to the maximum value allowed for the current run.
source code
Integer value or 'unlimited'
getMemMb(self)
Get the current run's total memory limit (in megabytes)
source code
 
limitMemMb(self, memMb)
Takes a task memMb argument and reduces it to the maximum value allowed for the current run.
source code
 
isDryRun(self)
Get isDryRun flag value.
source code
 
flowLog(self, msg, logState=1)
Send a message to the WorkflowRunner's log.
source code
 
workflow(self)
Workflow definition defined in child class
source code
 
_flowLog(self, msg, logState) source code
 
_infoLog(self, msg) source code
 
_warningLog(self, msg) source code
 
_errorLog(self, msg) source code
 
_whoami(self) source code
 
_getNamespaceList(self) source code
 
_getNamespace(self) source code
 
_appendNamespace(self, names) source code
 
_addTaskCore(self, namespace, label, payload, dependencies) source code
 
_getWaitStatus(self, namespace, labels, status) source code
 
_waitForTasksCore(self, namespace, labels=None, isVerbose=True) source code
 
_isTaskCompleteCore(self, namespace, taskLabel)
Returns: A boolean tuple specifying (task is done, task finished with error)
source code
 
_cancelTaskTreeCore(self, namespace, taskLabel) source code
 
_startTaskManager(self) source code
 
_notify(self, msg, logState) source code
 
_killWorkflow(self, errorMsg) source code
 
_shutdownAll(self, timeoutSec) source code
 
_cdata(self) source code
 
_runUpdate(self, runStatus) source code
 
_runWorkflow(self, param) source code
 
_setupWorkflow(self, param) source code
 
_createContinuedStateFile(self) source code
 
_createContinuedInfoFile(self, complete) source code
 
_setupContinuedWorkflow(self) source code
 
_initMessage(self) source code
 
_getTaskErrorsSummaryMsg(self, isForceTaskHarvest=False) source code
 
_evalWorkflow(self, masterRunStatus) source code
 
_requireInWorkflow(self)
check that the calling method is being called as part of a pyflow workflow() method only
source code
 
_initRunning(self) source code
 
_setRunning(self, *args, **kw) source code
 
_getRunning(self, *args, **kw) source code

Inherited from object: __delattr__, __format__, __getattribute__, __hash__, __init__, __new__, __reduce__, __reduce_ex__, __repr__, __setattr__, __sizeof__, __str__, __subclasshook__

Static Methods [hide private]
Either 'unlimited', or a string representation of the integer limit
runModeDefaultCores(mode)
Get the default core limit for run mode (local,sge,..)
source code
 
_stopAllWorkflows() source code
 
_isWorkflowStopped() source code
 
_checkTaskLabel(label) source code
Class Variables [hide private]
  _maxWorkflowRecursion = 30
This limit protects against a runaway forkbomb in case a workflow task recursively adds itself w/o termination:
  _allStop = <threading._Event object>
Properties [hide private]

Inherited from object: __class__

Method Details [hide private]

run(self, mode='local', dataDirRoot='.', isContinue=False, isForceContinue=False, nCores=None, memMb=None, isDryRun=False, retryMax=2, retryWait=90, retryWindow=360, retryMode='nonlocal', mailTo=None, updateInterval=60, schedulerArgList=None, isQuiet=False, warningLogFile=None, errorLogFile=None, successMsg=None, startFromTasks=None, ignoreTasksAfter=None, resetTasks=None)

source code 

Call this method to execute the workflow() method overridden in a child class and specify the resources available for the workflow to run.

Task retry behavior: Retry attempts will be made per the arguments below for distributed workflow runs (eg. sge run mode). Note this means that retries will be attempted for tasks with an 'isForceLocal' setting during distributed runs.

Task error behavior: When a task error occurs the task manager stops submitting new tasks and allows all currently running tasks to complete. Note that in this case 'task error' means that the task could not be completed after exhausting attempted retries.

Workflow exception behavior: Any exceptions thrown from the python code of classes derived from WorkflowRunner will be logged and trigger notification (e.g. email). The exception will not come down to the client's stack. In sub-workflows the exception is handled exactly like a task error (ie. task submission is shut-down and remaining tasks are allowed to complete). An exception in the master workflow will lead to workflow termination without waiting for currently running tasks to finish.

Parameters:
  • mode - Workflow run mode. Current options are (local|sge)
  • dataDirRoot - All workflow data is written to {dataDirRoot}/pyflow.data/ These include workflow/task logs, persistent task state data, and summary run info. Two workflows cannot simultaneously use the same dataDir.
  • isContinue - If True, continue workflow from a previous incomplete run based on the workflow data files. You must use the same dataDirRoot as a previous run for this to work. Set to 'Auto' to have the run continue only if the previous dataDir exists. (default: False)
  • isForceContinue - Only used if isContinue is not False. Normally when isContinue is run, the commands of completed tasks are checked to ensure they match. When isForceContinue is true, failing this check is reduced from an error to a warning
  • nCores - Total number of cores available, or 'unlimited', sge is currently configured for a maximum job count of 128, any value higher than this in sge mode will be reduced to the maximum. (default: 1 for local mode, 128 for sge mode)
  • memMb - Total memory available (in megabytes), or 'unlimited', Note that this value will be ignored in non-local modes (such as sge), because in this case total memory available is expected to be known by the scheduler for each node in its cluster. (default: 2048*nCores for local mode, 'unlimited' for sge mode)
  • isDryRun - List the commands to be executed without running them. Note that recursive and dynamic workflows will potentially have to account for the fact that expected files will be missing -- here 'recursive workflow' refers to any workflow which uses the addWorkflowTask() method, and 'dynamic workflow' refers to any workflow which uses the waitForTasks() method. These types of workflows can query this status with the isDryRun() to make accomadations. (default: False)
  • retryMax - Maximum number of task retries
  • retryWait - Delay (in seconds) before resubmitting task
  • retryWindow - Maximum time (in seconds) after the first task submission in which retries are allowed. A value of zero or less puts no limit on the time when retries will be attempted. Retries are always allowed (up to retryMax times), for failed make jobs.
  • retryMode - Modes are 'nonlocal' and 'all'. For 'nonlocal' retries are not attempted in local run mode. For 'all' retries are attempted for any run mode. The default mode is 'nonolocal'.
  • mailTo - An email address or container of email addresses. Notification will be sent to each email address when either (1) the run successfully completes (2) the first task error occurs or (3) an unhandled exception is raised. The intention is to send one status message per run() indicating either success or the reason for failure. This should occur for all cases except a host hardware/power failure. Note that mail comes from 'pyflow-bot@csaunders-ubuntu64' (configurable), which may be classified as junk-mail by your system.
  • updateInterval - How often (in minutes) should pyflow log a status update message summarizing the run status. Set this to zero or less to turn the update off.
  • schedulerArgList - A list of arguments can be specified to be passed on to an external scheduler when non-local modes are used (e.g. in sge mode you could pass schedulerArgList=['-q','work.q'] to put the whole pyflow job into the sge work.q queue)
  • isQuiet - Don't write any logging output to stderr (but still write log to pyflow_log.txt)
  • warningLogFile - Replicate all warning messages to the specified file. Warning messages will still appear in the standard logs, this file will contain a subset of the log messages pertaining to warnings only.
  • errorLogFile - Replicate all error messages to the specified file. Error messages will still appear in the standard logs, this file will contain a subset of the log messages pertaining to errors only. It should be empty for a successful run.
  • successMsg - Provide a string containing a custom message which will be prepended to pyflow's standard success notification. This message will appear in the log and any configured notifications (e.g. email). The message may contain linebreaks.
  • startFromTasks (A single string, or set, tuple or list of strings) - A task label or container of task labels. Any tasks which are not in this set or descendants of this set will be marked as completed.
  • ignoreTasksAfter (A single string, or set, tuple or list of strings) - A task label or container of task labels. All descendants of these task labels will be ignored.
  • resetTasks (A single string, or set, tuple or list of strings) - A task label or container of task labels. These tasks and all of their descendants will be reset to the "waiting" state to be re-run. Note this option will only affect a workflow which has been continued from a previous run. This will not override any nodes altered by the startFromTasks setting in the case that both options are used together.
Returns:
0 if all tasks completed successfully and 1 otherwise

addTask(self, label, command=None, cwd=None, env=None, nCores=1, memMb=2048, dependencies=None, priority=0, isForceLocal=False, isCommandMakePath=False, isTaskStable=True, mutex=None, retryMax=None, retryWait=None, retryWindow=None, retryMode=None)

source code 

Add task to workflow, including resource requirements and specification of dependencies. Dependency tasks must already exist in the workflow.

Parameters:
  • label - A string used to identify each task. The label must be composed of only ascii letters, digits, underscores and dashes (ie. /[A-Za-z0-9_-]+/). The label must also be unique within the workflow, and non-empty.
  • command - The task command. Commands can be: (1) a shell string (2) an iterable container of strings (argument list) (3) None. In all cases strings must not contain newline characters. A single string is typically used for commands that require shell features (such as pipes), an argument list can be used for any other commands, this is often a useful way to simplify quoting issues or to submit extremely long commands. The default command (None), can be used to create a 'checkpoint', ie. a task which does not run anything, but provides a label associated with the completion of a set of dependencies.
  • cwd - Specify current working directory to use for command execution. Note that if submitting the command as an argument list (as opposed to a shell string) the executable (arg[0]) is searched for before changing the working directory, so you cannot specify the executable relative to the cwd setting. If submitting a shell string command this restriction does not apply.
  • env - A map of environment variables for this task, for example 'env={"PATH": "/usr/bin"}'. When env is set to None (the default) the environment of the pyflow client process is used.
  • nCores - Number of cpu threads required
  • memMb - Amount of memory required (in megabytes)
  • dependencies (A single string, or set, tuple or list of strings) - A task label or container of task labels specifying all dependent tasks. Dependent tasks must already exist in the workflow.
  • priority - Among all tasks which are eligible to run at the same time, launch tasks with higher priority first. this value can be set from[-100,100]. Note that this will strongly control the order of task launch on a local run, but will only control task submission order to a secondary scheduler (like sge). All jobs with the same priority are already submitted in order from highest to lowest nCores requested, so there is no need to set priorities to replicate this behavior. The taskManager can start executing tasks as soon as each addTask() method is called, so lower-priority tasks may be launched first if they are specified first in the workflow.
  • isForceLocal - Force this task to run locally when a distributed task mode is used. This can be used to launch very small jobs outside of the sge queue. Note that 'isForceLocal' jobs launched during a non-local task mode are not subject to resource management, so it is important that these represent small jobs. Tasks which delete, move or touch a small number of files are ideal for this setting.
  • isCommandMakePath - If true, command is assumed to be a path containing a makefile. It will be run using make/qmake according to the run's mode and the task's isForceLocal setting
  • isTaskStable - If false, indicates that the task command and/or dependencies may change if the run is interrupted and restarted. A command marked as unstable will not be checked to make sure it matches its previous definition during run continuation. Unstable examples: command contains a date/time, or lists a set of files which are deleted at some point in the workflow, etc.
  • mutex - Provide an optional id associated with a pyflow task mutex. For all tasks with the same mutex id, no more than one will be run at once. Id name must follow task id restrictions. Mutex ids are global across all recursively invoked workflows. Example use case: This feature has been added as a simpler alternative to file locking, to ensure sequential, but not ordered, access to a file.
  • retryMax - The number of times this task will be retried after failing. If defined, this overrides the workflow retryMax value.
  • retryWait - The number of seconds to wait before relaunching a failed task. If defined, this overrides the workflow retryWait value.
  • retryWindow - The number of seconds after job submission in which retries will be attempted for non-make jobs. A value of zero or less causes retries to be attempted anytime after job submission. If defined, this overrides the workflow retryWindow value.
  • retryMode - Modes are 'nonlocal' and 'all'. For 'nonlocal' retries are not attempted in local run mode. For 'all' retries are attempted for any run mode. If defined, this overrides the workflow retryMode value.
Returns:
The 'label' argument is returned without modification.

addWorkflowTask(self, label, workflowRunnerInstance, dependencies=None)

source code 

Add another WorkflowRunner instance as a task to this workflow. The added Workflow's workflow() method will be called once the dependencies specified in this call have completed. Once started, all of the submitted workflow's method calls (like addTask) will be placed into the enclosing workflow instance and bound by the run parameters of the enclosing workflow.

This task will be marked complete once the submitted workflow's workflow() method has finished, and any tasks it initiated have completed.

Note that all workflow tasks will have their own tasks namespaced with the workflow task label. This namespace is recursive in the case that you add workflow tasks which add their own workflow tasks, etc.

Note that the submitted workflow instance will be deep copied before being altered in any way.

Parameters:
  • label - A string used to identify each task. The label must be composed of only ascii letters, digits, underscores and dashes (ie. /[A-Za-z0-9_-]+/). The label must also be unique within the workflow, and non-empty.
  • workflowRunnerInstance - A WorkflowRunner instance.
  • dependencies (A single string, or set, tuple or list of strings) - A label string or container of labels specifying all dependent tasks. Dependent tasks must already exist in the workflow.
Returns:
The 'label' argument is returned without modification.

waitForTasks(self, labels=None)

source code 

Wait for a list of tasks to complete.

Parameters:
  • labels (A single string, or set, tuple or list of strings) - Container of task labels to wait for. If an empty container is given or no list is provided then wait for all outstanding tasks to complete.
Returns:
In case of an error in a task being waited for, or in one of these task's dependencies, the function returns 1. Else return 0.

isTaskComplete(self, taskLabel)

source code 

Query if a specific task is in the workflow and completed without error.

This can assist workflows with providing stable interrupt/resume behavior.

Parameters:
  • taskLabel - A task string
Returns:
Completion status of task

isTaskDone(self, taskLabel)

source code 

Query if a specific task is in the workflow and is done, with or without error

This can assist workflows with providing stable interrupt/resume behavior.

Parameters:
  • taskLabel - A task string
Returns:
A boolean tuple specifying (task is done, task finished with error)

cancelTaskTree(self, taskLabel)

source code 

Cancel the given task and all of its dependencies. Canceling means that any running jobs will be stopped and any waiting job will be unqueued. Canceled tasks will not be treated as errors. Canceled tasks that are not already complete will be put into the waiting/ignored state.

getRunMode(self)

source code 

Get the current run mode

This can be used to access the current run mode from within the workflow function. Although the runmode should be transparent to client code, this is occasionally needed to hack workarounds.

Returns:
Current run mode

getNCores(self)

source code 

Get the current run core limit

This function can be used to access the current run's core limit from within the workflow function. This can be useful to eg. limit the number of cores requested by a single task.

Returns: Integer value or 'unlimited'
Total cores available to this workflow run

limitNCores(self, nCores)

source code 

Takes an task nCores argument and reduces it to the maximum value allowed for the current run.

Parameters:
  • nCores - Proposed core requirement
Returns:
Min(nCores,Total cores available to this workflow run)

getMemMb(self)

source code 

Get the current run's total memory limit (in megabytes)

Returns: Integer value or 'unlimited'
Memory limit in megabytes

limitMemMb(self, memMb)

source code 

Takes a task memMb argument and reduces it to the maximum value allowed for the current run.

Parameters:
  • memMb - Proposed task memory requirement in megabytes
Returns:
Min(memMb,Total memory available to this workflow run)

isDryRun(self)

source code 

Get isDryRun flag value.

When the dryrun flag is set, no commands are actually run. Querying this flag allows dynamic workflows to correct for dry run behaviors, such as tasks which do no produce expected files.

Returns:
DryRun status flag

runModeDefaultCores(mode)
Static Method

source code 

Get the default core limit for run mode (local,sge,..)

Parameters:
Returns: Either 'unlimited', or a string representation of the integer limit
Default maximum number of cores for mode

flowLog(self, msg, logState=1)

source code 

Send a message to the WorkflowRunner's log.

Parameters:
  • msg (A string or an array of strings. String arrays will be separated by newlines in the log.) - Log message
  • logState (A value in pyflow.LogState.{INFO,WARNING,ERROR}) - Message severity, defaults to INFO.

workflow(self)

source code 

Workflow definition defined in child class

This method should be overridden in the class derived from WorkflowRunner to specify the actual workflow logic. Client code should not call this method directly.

_isTaskCompleteCore(self, namespace, taskLabel)

source code 
Returns:
A boolean tuple specifying (task is done, task finished with error)

_setRunning(self, *args, **kw)

source code 
Decorators:
  • @lockMethod

_getRunning(self, *args, **kw)

source code 
Decorators:
  • @lockMethod