Package src :: Module pyflow
[hide private]
[frames] | no frames]

Source Code for Module src.pyflow

   1  #!/usr/bin/env python 
   2  # 
   3  # pyFlow - a lightweight parallel task engine 
   4  # 
   5  # Copyright (c) 2012-2017 Illumina, Inc. 
   6  # All rights reserved. 
   7  # 
   8  # Redistribution and use in source and binary forms, with or without 
   9  # modification, are permitted provided that the following conditions 
  10  # are met: 
  11  # 
  12  # 1. Redistributions of source code must retain the above copyright 
  13  #    notice, this list of conditions and the following disclaimer. 
  14  # 
  15  # 2. Redistributions in binary form must reproduce the above copyright 
  16  #    notice, this list of conditions and the following disclaimer in 
  17  #    the documentation and/or other materials provided with the 
  18  #    distribution. 
  19  # 
  20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
  21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
  22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
  23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
  24  # COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
  25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
  26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
  27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
  28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
  29  # LIABILITY, OR TORT INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY 
  30  # WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
  31  # POSSIBILITY OF SUCH DAMAGE. 
  32  # 
  33  # 
  34   
  35  """ 
  36  pyflow -- a lightweight parallel task engine 
  37  """ 
  38   
  39  __author__ = 'Christopher Saunders' 
  40   
  41   
  42  import copy 
  43  import datetime 
  44  import os 
  45  import re 
  46  import shutil 
  47  import subprocess 
  48  import sys 
  49  import threading 
  50  import time 
  51  import traceback 
  52   
  53  from pyflowConfig import siteConfig 
  54   
  55   
  56  moduleDir = os.path.abspath(os.path.dirname(__file__)) 
  57   
  58   
  59  # minimum python version 
  60  # 
  61  pyver = sys.version_info 
  62  if pyver[0] != 2 or (pyver[0] == 2 and pyver[1] < 4) : 
  63      raise Exception("pyflow module has only been tested for python versions [2.4,3.0)") 
  64   
  65  # problem python versions: 
  66  # 
  67  # Internal interpreter deadlock issue in python 2.7.2: 
  68  # http://bugs.python.org/issue13817 
  69  # ..is so bad that pyflow can partially, but not completely, work around it -- so issue a warning for this case. 
  70  if pyver[0] == 2 and pyver[1] == 7 and pyver[2] == 2 : 
  71      raise Exception("Python interpreter errors in python 2.7.2 may cause a pyflow workflow hang or crash. Please use a different python version.") 
  72   
  73   
  74  # The line below is a workaround for a python 2.4/2.5 bug in 
  75  # the subprocess module. 
  76  # 
  77  # Bug is described here: http://bugs.python.org/issue1731717 
  78  # Workaround is described here: http://bugs.python.org/issue1236 
  79  # 
  80  subprocess._cleanup = lambda: None 
  81   
  82   
  83  # In python 2.5 or greater, we can lower the per-thread stack size to 
  84  # improve memory consumption when a very large number of jobs are 
  85  # run. Below it is lowered to 256Kb (compare to linux default of 
  86  # 8Mb). 
  87  # 
  88  try: 
  89      threading.stack_size(min(256 * 1024, threading.stack_size)) 
  90  except AttributeError: 
  91      # Assuming this means python version < 2.5 
  92      pass 
93 94 95 -class GlobalSync :
96 """ 97 Control total memory usage in non-local run modes by 98 limiting the number of simultaneous subprocess calls 99 100 Note that in practice this only controls the total number 101 of qsub/qstat calls in SGE mode 102 """ 103 maxSubprocess = 2 104 subprocessControl = threading.Semaphore(maxSubprocess)
105
106 107 108 -def getPythonVersion() :
109 python_version = sys.version_info 110 return ".".join([str(i) for i in python_version])
111 112 pythonVersion = getPythonVersion()
113 114 115 # Get pyflow version number 116 # 117 118 -def getPyflowVersion() :
119 # this will be automatically macro-ed in for pyflow releases: 120 pyflowAutoVersion = None 121 122 # Get version number in regular release code: 123 if pyflowAutoVersion is not None : return pyflowAutoVersion 124 125 # Get version number during dev: 126 try : 127 proc = subprocess.Popen(["git", "describe"], stdout=subprocess.PIPE, stderr=open(os.devnull, "w"), cwd=moduleDir, shell=False) 128 (stdout, _stderr) = proc.communicate() 129 retval = proc.wait() 130 stdoutList = stdout.split("\n")[:-1] 131 if (retval == 0) and (len(stdoutList) == 1) : return stdoutList[0] 132 except OSError: 133 # no git installed 134 pass 135 136 return "unknown"
137 138 139 __version__ = getPyflowVersion()
140 141 142 # portability functions: 143 # 144 145 -def _isWindows() :
146 import platform 147 return (platform.system().find("Windows") > -1)
148
149 -class GlobalConstants :
150 isWindows=_isWindows()
151
152 153 -def isWindows() :
154 return GlobalConstants.isWindows
155
156 157 158 159 -def forceRename(src,dst) :
160 """ 161 dst is only overwritten in a single atomic operation on *nix 162 on windows, we can't have atomic rename, but we can recreate the behavior otherwise 163 """ 164 if isWindows() : 165 if os.path.exists(dst) : 166 os.remove(dst) 167 168 maxTrials=5 169 for trial in range(maxTrials) : 170 try : 171 os.rename(src,dst) 172 return 173 except OSError : 174 if (trial+1) >= maxTrials : raise 175 time.sleep(5)
176
177 178 179 -def cleanEnv() :
180 """ 181 clear bash functions out of the env 182 183 without this change the shellshock security update causes pyflow SGE jobs to 184 fail with the behavior of current (201512) versions of SGE qsub 185 """ 186 187 ekeys = os.environ.keys() 188 for key in ekeys : 189 if key.endswith("()") : 190 del os.environ[key]
191
192 193 # utility values and functions: 194 # 195 196 -def ensureDir(d):
197 """ 198 make directory if it doesn't already exist, raise exception if 199 something else is in the way: 200 """ 201 if os.path.exists(d): 202 if not os.path.isdir(d) : 203 raise Exception("Can't create directory: %s" % (d)) 204 else : 205 os.makedirs(d)
206
207 208 # 209 # time functions -- note there's an additional copy in the pyflow wrapper script: 210 # 211 # all times in pyflow are utc (never local) and printed to iso8601 212 # 213 -def timeStampToTimeStr(ts) :
214 """ 215 converts time.time() output to timenow() string 216 """ 217 return datetime.datetime.utcfromtimestamp(ts).isoformat()
218
219 -def timeStrNow():
220 return timeStampToTimeStr(time.time())
221
222 -def timeStrToTimeStamp(ts):
223 import calendar 224 d = datetime.datetime(*map(int, re.split(r'[^\d]', ts)[:-1])) 225 return calendar.timegm(d.timetuple())
226
227 228 229 -def isInt(x) :
230 return isinstance(x, (int, long))
231
232 -def isString(x):
233 return isinstance(x, basestring)
234
235 236 -def isIterable(x):
237 return (getattr(x, '__iter__', False) != False)
238
239 240 -def lister(x):
241 """ 242 Convert input into a list, whether it's already iterable or 243 not. Make an exception for individual strings to be returned 244 as a list of one string, instead of being chopped into letters 245 Also, convert None type to empty list: 246 """ 247 # special handling in case a single string is given: 248 if x is None : return [] 249 if (isString(x) or (not isIterable(x))) : return [x] 250 return list(x)
251
252 253 254 -def setzer(x) :
255 """ 256 convert user input into a set, handling the pathological case 257 that you have been handed a single string, and you don't want 258 a set of letters: 259 """ 260 return set(lister(x))
261
262 263 264 -class LogState :
265 """ 266 A simple logging enum 267 """ 268 INFO = 1 269 WARNING = 2 270 ERROR = 3 271 272 @classmethod
273 - def toString(cls,logState) :
274 if logState == cls.INFO : return "INFO" 275 if logState == cls.WARNING : return "WARNING" 276 if logState == cls.ERROR : return "ERROR" 277 278 raise Exception("Unknown log state: " + str(logState))
279
280 281 # allow fsync to be globally turned off 282 -class LogGlobals :
283 isFsync = True
284
285 286 -def hardFlush(ofp):
287 ofp.flush() 288 if ofp.isatty() : return 289 # fsync call has been reported to consistently fail in some contexts (rsh?) 290 # so allow OSError 291 if not LogGlobals.isFsync : return 292 try : 293 os.fsync(ofp.fileno()) 294 except OSError: 295 LogGlobals.isFsync = False
296
297 298 299 -def log(ofpList, msgList, linePrefix=None):
300 """ 301 General logging function. 302 303 @param ofpList: A container of file objects to write to 304 305 @param msgList: A container of (or a single) multi-line log message 306 string. Final newlines are not required 307 308 @param linePrefix: A prefix to add before every line. This will come 309 *after* the log function's own '[time] [hostname]' 310 prefix. 311 312 @return: Returns a boolean tuple of size ofpList indicating the success of 313 writing to each file object 314 """ 315 msgList = lister(msgList) 316 ofpList = setzer(ofpList) 317 retval = [True] * len(ofpList) 318 for msg in msgList : 319 # strip final trailing newline if it exists: 320 if (len(msg) > 0) and (msg[-1] == "\n") : msg = msg[:-1] 321 linePrefixOut = "[%s] [%s]" % (timeStrNow(), siteConfig.getHostName()) 322 if linePrefix is not None : linePrefixOut += " " + linePrefix 323 # split message into prefixable lines: 324 for i, ofp in enumerate(ofpList): 325 # skip io streams which have failed before: 326 if not retval[i] : continue 327 try : 328 for line in msg.split("\n") : 329 ofp.write("%s %s\n" % (linePrefixOut, line)) 330 hardFlush(ofp) 331 except IOError: 332 retval[i] = False 333 return retval
334
335 336 337 -def getThreadName():
338 return threading.currentThread().getName()
339
340 -def isMainThread() :
341 return (getThreadName == "MainThread")
342
343 344 -class StrFileObject(object) :
345 """ 346 fakes a filehandle for library functions which write to a stream, 347 and captures output in a string 348 """
349 - def __init__(self) :
350 self.str = ""
351
352 - def write(self, string) :
353 self.str += string
354
355 - def __str__(self) :
356 return self.str
357
358 359 -def getTracebackStr() :
360 return traceback.format_exc()
361
362 363 -def getExceptionMsg() :
364 365 msg = ("Unhandled Exception in %s\n" % (getThreadName())) + getTracebackStr() 366 if msg[-1] == "\n" : msg = msg[:-1] 367 return msg.split("\n")
368
369 370 -def cmdline() :
371 return " ".join(sys.argv)
372
373 374 375 -def msgListToMsg(msgList):
376 """ 377 convert string or list of strings into a single string message 378 """ 379 msg = "" 380 isFirst=True 381 for chunk in lister(msgList) : 382 if isFirst : 383 isFirst = False 384 else : 385 msg += "\n" 386 if ((len(chunk)>0) and (chunk[-1] == '\n')) : 387 chunk = chunk[:-1] 388 msg += chunk 389 390 return msg
391 392 393 394 emailRegex = re.compile(r"(?:^|\s)[-a-z0-9_.]+@(?:[-a-z0-9]+\.)+[a-z]{2,6}(?:\s|$)", re.IGNORECASE)
395 396 -def verifyEmailAddy(x) :
397 return (emailRegex.match(x) is not None)
398
399 400 -def isLocalSmtp() :
401 """ 402 return true if a local smtp server is available 403 """ 404 import smtplib 405 try : 406 s = smtplib.SMTP('localhost') 407 except : 408 return False 409 return True
410
411 412 -def sendEmail(mailTo, mailFrom, subject, msgList) :
413 import smtplib 414 # this is the way to import MIMEText in py 2.4: 415 from email.MIMEText import MIMEText 416 417 # format message list into a single string: 418 msg = msgListToMsg(msgList) 419 420 mailTo = setzer(mailTo) 421 422 msg = MIMEText(msg) 423 msg["Subject"] = subject 424 msg["From"] = mailFrom 425 msg["To"] = ", ".join(mailTo) 426 427 s = smtplib.SMTP('localhost') 428 s.sendmail(mailFrom, list(mailTo), msg.as_string()) 429 s.quit()
430
431 432 -def boolToStr(b) :
433 return str(int(b))
434
435 436 -def argToBool(x) :
437 """ 438 convert argument of unknown type to a bool: 439 """ 440 class FalseStrings : 441 val = ("", "0", "false", "f", "no", "n", "off")
442 443 if isinstance(x, basestring) : 444 return (x.lower() not in FalseStrings.val) 445 return bool(x) 446
447 448 -def hashObjectValue(obj) :
449 """ 450 This function hashes objects values -- the hash will be the 451 same for two objects containing the same methods and data, so 452 it corresponds to 'A==B' and *not* 'A is B'. 453 """ 454 import pickle 455 import hashlib 456 hashlib.md5(pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)).hexdigest()
457 458 459 namespaceSep = "+"
460 461 462 -def namespaceJoin(a, b) :
463 """ 464 join two strings with a separator only if a exists 465 """ 466 if a == "" : return b 467 elif b == "" : return a 468 return a + namespaceSep + b
469
470 471 -def namespaceLabel(namespace) :
472 """ 473 provide a consistent naming scheme to users for embedded workflows 474 """ 475 if namespace == "" : 476 return "master workflow" 477 else : 478 return "sub-workflow '%s'" % (namespace)
479
480 481 482 -class ExpWaiter(object) :
483 """ 484 Convenience object to setup exponentially increasing wait/polling times 485 """
486 - def __init__(self, startSec, factor, maxSec, event = None) :
487 """ 488 optionally allow an event to interrupt wait cycle 489 """ 490 assert (startSec > 0.) 491 assert (factor > 1.) 492 assert (maxSec >= startSec) 493 self.startSec = startSec 494 self.factor = factor 495 self.maxSec = maxSec 496 self.event = event 497 498 self.sec = self.startSec 499 self.isMax = False
500
501 - def reset(self) :
502 self.sec = self.startSec
503
504 - def wait(self) :
505 if self.event is None : 506 time.sleep(self.sec) 507 else : 508 self.event.wait(self.sec) 509 if self.isMax : return 510 self.sec = min(self.sec * self.factor, self.maxSec) 511 self.isMax = (self.sec == self.maxSec) 512 assert self.sec <= self.maxSec
513
514 515 516 -def lockMethod(f):
517 """ 518 method decorator acquires/releases object's lock 519 """ 520 521 def wrapped(self, *args, **kw): 522 if not hasattr(self,"lock") : 523 self.lock = threading.RLock() 524 525 self.lock.acquire() 526 try: 527 return f(self, *args, **kw) 528 finally: 529 self.lock.release()
530 return wrapped 531
532 533 534 -class Bunch:
535 """ 536 generic struct with named argument constructor 537 """
538 - def __init__(self, **kwds):
539 self.__dict__.update(kwds)
540
541 542 543 -def stackDump(dumpfp):
544 """ 545 adapted from haridsv @ stackoverflow: 546 """ 547 548 athreads = threading.enumerate() 549 tnames = [(th.getName()) for th in athreads] 550 551 frames = None 552 try: 553 frames = sys._current_frames() 554 except AttributeError: 555 # python version < 2.5 556 pass 557 558 id2name = {} 559 try: 560 id2name = dict([(th.ident, th.getName()) for th in athreads]) 561 except AttributeError : 562 # python version < 2.6 563 pass 564 565 if (frames is None) or (len(tnames) > 50) : 566 dumpfp.write("ActiveThreadCount: %i\n" % (len(tnames))) 567 dumpfp.write("KnownActiveThreadNames:\n") 568 for name in tnames : dumpfp.write(" %s\n" % (name)) 569 dumpfp.write("\n") 570 return 571 572 dumpfp.write("ActiveThreadCount: %i\n" % (len(frames))) 573 dumpfp.write("KnownActiveThreadNames:\n") 574 for name in tnames : dumpfp.write(" %s\n" % (name)) 575 dumpfp.write("\n") 576 577 for tid, stack in frames.items(): 578 dumpfp.write("Thread: %d %s\n" % (tid, id2name.get(tid, "NAME_UNKNOWN"))) 579 for filename, lineno, name, line in traceback.extract_stack(stack): 580 dumpfp.write('File: "%s", line %d, in %s\n' % (filename, lineno, name)) 581 if line is not None: 582 dumpfp.write(" %s\n" % (line.strip())) 583 dumpfp.write("\n") 584 dumpfp.write("\n")
585
586 587 588 589 ####################################################################### 590 # 591 # these functions are written out to a utility script which allows users 592 # to make a dot graph from their current state directory output. We 593 # keep it in pyflow as working code so that pyflow can call sections of it. 594 # 595 596 -def taskStateHeader() :
597 return "#taskLabel\ttaskNamespace\trunState\terrorCode\trunStateUpdateTime\n"
598
599 600 -def taskStateParser(stateFile) :
601 class Constants : 602 nStateCols = 5
603 604 for line in open(stateFile) : 605 if len(line) and line[0] == "#" : continue 606 line = line.strip() 607 w = line.split("\t") 608 if len(w) != Constants.nStateCols : 609 raise Exception("Unexpected format in taskStateFile: '%s' line: '%s'" % (stateFile, line)) 610 yield [x.strip() for x in w] 611
612 613 -def taskInfoHeader() :
614 return "#%s\n" % ("\t".join(("taskLabel", "taskNamespace", "taskType", "nCores", "memMb", "priority", "isForceLocal", "dependencies", "cwd", "command")))
615
616 617 -def taskInfoParser(infoFile) :
618 class Constants : 619 nInfoCols = 10
620 621 for line in open(infoFile) : 622 if len(line) and line[0] == "#" : continue 623 line = line.lstrip() 624 w = line.split("\t", (Constants.nInfoCols - 1)) 625 if len(w) != Constants.nInfoCols : 626 raise Exception("Unexpected format in taskInfoFile: '%s' line: '%s'" % (infoFile, line)) 627 yield [x.strip() for x in w] 628
629 630 -def getTaskInfoDepSet(s) :
631 # reconstruct dependencies allowing for extraneous whitespace in the file: 632 s = s.strip() 633 if s == "" : return [] 634 return set([d.strip() for d in s.split(",")])
635
636 637 638 -class TaskNodeConstants(object) :
639 640 validRunstates = ("complete", "running", "queued", "waiting", "error")
641
642 643 644 -class DotConfig(object) :
645 """ 646 A static container of configuration data for dot graph output 647 """ 648 649 runstateDotColor = {"waiting" : "grey", 650 "running" : "green", 651 "queued" : "yellow", 652 "error" : "red", 653 "complete" : "blue" } 654 655 runstateDotStyle = {"waiting" : "dashed", 656 "running" : None, 657 "queued" : None, 658 "error" : "bold", 659 "complete" : None } 660 661 @staticmethod
662 - def getRunstateDotAttrib(runstate) :
663 color = DotConfig.runstateDotColor[runstate] 664 style = DotConfig.runstateDotStyle[runstate] 665 attrib = "" 666 if color is not None : attrib += " color=%s" % (color) 667 if style is not None : attrib += " style=%s" % (style) 668 return attrib
669 670 @staticmethod
671 - def getTypeDotAttrib(nodeType) :
672 attrib = "" 673 if nodeType == "workflow" : 674 attrib += " shape=rect style=rounded" 675 return attrib
676 677 @staticmethod
678 - def getDotLegend() :
679 string = '{ rank = source; Legend [shape=none, margin=0, label=<\n' 680 string += '<TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0" CELLPADDING="4">\n' 681 string += '<TR><TD COLSPAN="2">Legend</TD></TR>\n' 682 for state in TaskNodeConstants.validRunstates : 683 color = DotConfig.runstateDotColor[state] 684 string += '<TR> <TD>%s</TD> <TD BGCOLOR="%s"></TD> </TR>\n' % (state, color) 685 string += '</TABLE>>];}\n' 686 return string
687
688 689 690 -def writeDotGraph(taskInfoFile, taskStateFile, workflowClassName) :
691 """ 692 write out the current graph state in dot format 693 """ 694 695 addOrder = [] 696 taskInfo = {} 697 headNodes = set() 698 tailNodes = set() 699 700 # read info file: 701 for (label, namespace, ptype, _nCores, _memMb, _priority, _isForceLocal, depStr, _cwdStr, _command) in taskInfoParser(taskInfoFile) : 702 tid = (namespace, label) 703 addOrder.append(tid) 704 taskInfo[tid] = Bunch(ptype=ptype, 705 parentLabels=getTaskInfoDepSet(depStr)) 706 if len(taskInfo[tid].parentLabels) == 0 : headNodes.add(tid) 707 tailNodes.add(tid) 708 for plabel in taskInfo[tid].parentLabels : 709 ptid = (namespace, plabel) 710 if ptid in tailNodes : tailNodes.remove(ptid) 711 712 for (label, namespace, runState, _errorCode, _time) in taskStateParser(taskStateFile) : 713 tid = (namespace, label) 714 taskInfo[tid].runState = runState 715 716 dotFp = sys.stdout 717 dotFp.write("// Task graph from pyflow object '%s'\n" % (workflowClassName)) 718 dotFp.write("// Process command: '%s'\n" % (cmdline())) 719 dotFp.write("// Process working dir: '%s'\n" % (os.getcwd())) 720 dotFp.write("// Graph capture time: %s\n" % (timeStrNow())) 721 dotFp.write("\n") 722 dotFp.write("digraph %s {\n" % (workflowClassName + "Graph")) 723 dotFp.write("\tcompound=true;\nrankdir=LR;\nnode[fontsize=10];\n") 724 labelToSym = {} 725 namespaceGraph = {} 726 for (i, (namespace, label)) in enumerate(addOrder) : 727 tid = (namespace, label) 728 if namespace not in namespaceGraph : 729 namespaceGraph[namespace] = "" 730 sym = "n%i" % i 731 labelToSym[tid] = sym 732 attrib1 = DotConfig.getRunstateDotAttrib(taskInfo[tid].runState) 733 attrib2 = DotConfig.getTypeDotAttrib(taskInfo[tid].ptype) 734 namespaceGraph[namespace] += "\t\t%s [label=\"%s\"%s%s];\n" % (sym, label, attrib1, attrib2) 735 736 for (namespace, label) in addOrder : 737 tid = (namespace, label) 738 sym = labelToSym[tid] 739 for plabel in taskInfo[tid].parentLabels : 740 ptid = (namespace, plabel) 741 namespaceGraph[namespace] += ("\t\t%s -> %s;\n" % (labelToSym[ptid], sym)) 742 743 for (i, ns) in enumerate(namespaceGraph.keys()) : 744 isNs = ((ns is not None) and (ns != "")) 745 dotFp.write("\tsubgraph cluster_sg%i {\n" % (i)) 746 if isNs : 747 dotFp.write("\t\tlabel = \"%s\";\n" % (ns)) 748 else : 749 dotFp.write("\t\tlabel = \"%s\";\n" % (workflowClassName)) 750 dotFp.write(namespaceGraph[ns]) 751 dotFp.write("\t\tbegin%i [label=\"begin\" shape=diamond];\n" % (i)) 752 dotFp.write("\t\tend%i [label=\"end\" shape=diamond];\n" % (i)) 753 for (namespace, label) in headNodes : 754 if namespace != ns : continue 755 sym = labelToSym[(namespace, label)] 756 dotFp.write("\t\tbegin%i -> %s;\n" % (i, sym)) 757 for (namespace, label) in tailNodes : 758 if namespace != ns : continue 759 sym = labelToSym[(namespace, label)] 760 dotFp.write("\t\t%s -> end%i;\n" % (sym, i)) 761 dotFp.write("\t}\n") 762 if ns in labelToSym : 763 dotFp.write("\t%s -> begin%i [style=dotted];\n" % (labelToSym[ns], i)) 764 # in LR orientation this will make the graph look messy: 765 # dotFp.write("\tend%i -> %s [style=invis];\n" % (i,labelToSym[ns])) 766 767 dotFp.write(DotConfig.getDotLegend()) 768 dotFp.write("}\n") 769 hardFlush(dotFp)
770
771 772 773 -def writeDotScript(taskDotScriptFile, 774 taskInfoFileName, taskStateFileName, 775 workflowClassName) :
776 """ 777 write dot task graph creation script 778 """ 779 import inspect 780 781 dsfp = os.fdopen(os.open(taskDotScriptFile, os.O_WRONLY | os.O_CREAT, 0755), 'w') 782 783 dsfp.write("""#!/usr/bin/env python 784 # 785 # This is a script to create a dot graph from pyflow state files. 786 # Usage: $script >| task_graph.dot 787 # 788 # Note that script assumes the default pyflow state files are in the script directory. 789 # 790 # This file was autogenerated by process: '%s' 791 # ...from working directory: '%s' 792 # 793 794 import datetime,os,sys,time 795 796 scriptDir=os.path.abspath(os.path.dirname(__file__)) 797 """ % (os.getcwd(), cmdline())) 798 799 for dobj in (timeStampToTimeStr, timeStrNow, cmdline, Bunch, LogGlobals, hardFlush, TaskNodeConstants, DotConfig, taskStateParser, taskInfoParser, getTaskInfoDepSet, writeDotGraph) : 800 dsfp.write("\n\n") 801 dsfp.write(inspect.getsource(dobj)) 802 803 dsfp.write(""" 804 805 if __name__ == '__main__' : 806 writeDotGraph(os.path.join(scriptDir,'%s'),os.path.join(scriptDir,'%s'),'%s') 807 808 """ % (taskInfoFileName, taskStateFileName, workflowClassName))
809
810 811 812 ################################################################ 813 # 814 # workflowRunner Helper Classes: 815 # 816 # 817 818 819 -class Command(object) :
820 """ 821 Commands can be presented as strings or argument lists (or none) 822 """ 823
824 - def __init__(self, cmd, cwd, env=None) :
825 # 1: sanitize/error-check cmd 826 if ((cmd is None) or 827 (cmd == "") or 828 (isIterable(cmd) and len(cmd) == 0)) : 829 self.cmd = None 830 self.type = "none" 831 elif isString(cmd) : 832 self.cmd = Command.cleanStr(cmd) 833 self.type = "str" 834 elif isIterable(cmd) : 835 self.cmd = [] 836 for i, s in enumerate(cmd): 837 if not (isString(s) or isInt(s)): 838 raise Exception("Argument: '%s' from position %i in argument list command is not a string or integer. Full command: '%s'" % 839 (str(s), (i + 1), " ".join([str(s) for s in cmd]))) 840 self.cmd.append(Command.cleanStr(s)) 841 self.type = "list" 842 else : 843 raise Exception("Invalid task command: '%s'" % (str(cmd))) 844 845 # 2: sanitize cwd 846 self.cwd = "" 847 if cwd is not None and cwd != "" : 848 self.cwd = os.path.abspath(cwd) 849 if os.path.exists(self.cwd) and not os.path.isdir(self.cwd) : 850 raise Exception("Cwd argument is not a directory: '%s', provided for command '%s'" % (cwd, str(cmd))) 851 852 # copy env: 853 self.env = env
854
855 - def __repr__(self) :
856 if self.cmd is None : return "" 857 if self.type == "str" : return self.cmd 858 return " ".join(self.cmd)
859 860 @staticmethod
861 - def cleanStr(s) :
862 if isInt(s) : s = str(s) 863 if "\n" in s : raise Exception("Task command/argument contains newline characters: '%s'" % (s)) 864 return s.strip()
865
866 867 868 -class StoppableThread(threading.Thread):
869 """ 870 Thread class with a stop() method. The thread itself has to check 871 regularly for the stopped() condition. 872 873 Note that this is a very new thread base class for pyflow, and most 874 threads do not (yet) check their stopped status. 875 876 """ 877 878 _stopAll = threading.Event() 879
880 - def __init__(self, *args, **kw):
881 threading.Thread.__init__(self, *args, **kw) 882 self._stop = threading.Event()
883
884 - def stop(self):
885 "thread specific stop method, may be overridden to add async thread-specific kill behavior" 886 self._stop.set()
887 888 @staticmethod
889 - def stopAll():
890 "quick global stop signal for threads that happen to poll stopped() very soon after event" 891 StoppableThread._stopAll.set()
892
893 - def stopped(self):
894 return (StoppableThread._stopAll.isSet() or self._stop.isSet())
895
896 897 898 -def getSGEJobsDefault() :
899 if ((siteConfig.maxSGEJobs is not None) and 900 (siteConfig.maxSGEJobs != "") and 901 (siteConfig.maxSGEJobs != "unlimited")) : 902 return int(siteConfig.maxSGEJobs) 903 return "unlimited"
904
905 906 907 -class ModeInfo(object) :
908 """ 909 Stores default values associated with each runmode: local,sge,... 910 """
911 - def __init__(self, defaultCores, defaultMemMbPerCore, defaultIsRetry) :
912 self.defaultCores = defaultCores 913 self.defaultMemMbPerCore = defaultMemMbPerCore 914 self.defaultIsRetry = defaultIsRetry
915
916 917 918 -class RunMode(object):
919 920 data = { "local" : ModeInfo(defaultCores=1, 921 defaultMemMbPerCore=siteConfig.defaultHostMemMbPerCore, 922 defaultIsRetry=False), 923 "sge" : ModeInfo(defaultCores=getSGEJobsDefault(), 924 defaultMemMbPerCore="unlimited", 925 defaultIsRetry=True) }
926
927 928 929 -class RetryParam(object) :
930 """ 931 parameters pertaining to task retry behavior 932 """ 933 allowed_modes = [ "nonlocal" , "all" ] 934
935 - def __init__(self, run_mode, retry_max, wait, window, retry_mode) :
936 if retry_mode not in self.allowed_modes : 937 raise Exception("Invalid retry mode parameter '%s'. Accepted retry modes are {%s}." \ 938 % (retry_mode, ",".join(self.allowed_modes))) 939 940 self._retry_max = retry_max 941 self.wait = wait 942 self.window = window 943 self._retry_mode = retry_mode 944 self._run_mode = run_mode 945 946 self._finalize() 947 self.validate()
948 949
950 - def _finalize(self) :
951 """ 952 decide whether to turn retry off based on retry and run modes: 953 """ 954 if (self._retry_mode == "nonlocal") and \ 955 (not RunMode.data[self._run_mode].defaultIsRetry) : 956 self.max = 0 957 else : 958 self.max = int(self._retry_max)
959 960
961 - def validate(self):
962 """ 963 check that the public parameters are valid 964 """ 965 def nonNegParamCheck(val, valLabel) : 966 if val < 0 : raise Exception("Parameter %s must be non-negative" % valLabel)
967 968 nonNegParamCheck(self.max, "retryMax") 969 nonNegParamCheck(self.wait, "retryWait") 970 nonNegParamCheck(self.window, "retryWindow")
971 972
973 - def getTaskCopy(self,retry_max, wait, window, retry_mode):
974 """ 975 return a deepcopy of the class customized for each individual task for 976 any retry parameters which are not None 977 """ 978 taskself = copy.deepcopy(self) 979 980 if retry_max is not None: 981 taskself._retry_max = retry_max 982 if wait is not None: 983 taskself.wait = wait 984 if window is not None: 985 taskself.window = window 986 if retry_mode is not None : 987 taskself._retry_mode = retry_mode 988 989 taskself._finalize() 990 taskself.validate() 991 return taskself
992
993 994 -class RunningTaskStatus(object) :
995 """ 996 simple object allowing remote task threads to communicate their 997 status back to the TaskManager 998 """
999 - def __init__(self,isFinishedEvent) :
1000 self.isFinishedEvent = isFinishedEvent 1001 self.isComplete = threading.Event() 1002 self.errorCode = 0 1003 1004 # errorMessage is filled in by sub-workflow 1005 # and command-line tasks. 1006 # 1007 # Sub-workflows use this to convey whether they have 1008 # failed (1) because of failures of their own tasks or (2) 1009 # because of an exception in the sub-workflow code, in which 1010 # case the exception message and stacktrace are provided. 1011 # 1012 # command tasks use this to report the stderr tail of a failing 1013 # task 1014 # 1015 self.errorMessage = "" 1016 1017 # only used by sub-workflows to indicate that all tasks have been specified 1018 self.isSpecificationComplete = threading.Event()
1019
1020 1021 -class BaseTaskRunner(StoppableThread) :
1022 """ 1023 Each individual command-task or sub workflow task 1024 is run on its own thread using a class inherited from 1025 BaseTaskRunner 1026 """ 1027
1028 - def __init__(self, runStatus, taskStr, sharedFlowLog, setRunstate) :
1029 StoppableThread.__init__(self) 1030 self.setDaemon(True) 1031 self.taskStr = taskStr 1032 self.setName("TaskRunner-Thread-%s" % (taskStr)) 1033 self.runStatus = runStatus 1034 self._sharedFlowLog = sharedFlowLog 1035 self.lock = threading.RLock() 1036 1037 # allows taskRunner to update between queued and running status: 1038 self._setRunstate = setRunstate 1039 1040 # this is moved into the ctor now, so that a race condition that would double-launch a task 1041 # is now not possible (however unlikely it was before): 1042 self.setInitialRunstate()
1043 1044
1045 - def run(self) :
1046 """ 1047 BaseTaskRunner's run() method ensures that we can 1048 capture exceptions which might occur in this thread. 1049 Do not override this method -- instead define the core 1050 logic for the task run operation in '_run()' 1051 1052 Note that for sub-workflow tasks we're interpreting raw 1053 client python code on this thread, so exceptions are 1054 *very likely* here -- this is not a corner case. 1055 """ 1056 retval = 1 1057 retmsg = "" 1058 try: 1059 (retval, retmsg) = self._run() 1060 except WorkflowRunner._AbortWorkflowException : 1061 # This indicates an intended workflow interruption. 1062 # send a retval of 1 but not an error message 1063 pass 1064 except: 1065 retmsg = getExceptionMsg() 1066 self.runStatus.errorCode = retval 1067 self.runStatus.errorMessage = retmsg 1068 # this indicates that this specific task has finished: 1069 self.runStatus.isComplete.set() 1070 # this indicates that *any* task has just finished, so 1071 # taskmanager can stop polling and immediately sweep 1072 self.runStatus.isFinishedEvent.set() 1073 return retval
1074
1075 - def setRunstate(self, *args, **kw) :
1076 if self._setRunstate is None : return 1077 self._setRunstate(*args, **kw)
1078
1079 - def setInitialRunstate(self) :
1080 self.setRunstate("running")
1081
1082 - def flowLog(self, msg, logState) :
1083 linePrefixOut = "[TaskRunner:%s]" % (self.taskStr) 1084 self._sharedFlowLog(msg, linePrefix=linePrefixOut, logState=logState)
1085
1086 - def infoLog(self, msg) :
1087 self.flowLog(msg, logState=LogState.INFO)
1088
1089 - def warningLog(self, msg) :
1090 self.flowLog(msg, logState=LogState.WARNING)
1091
1092 - def errorLog(self, msg) :
1093 self.flowLog(msg, logState=LogState.ERROR)
1094
1095 1096 1097 -class WorkflowTaskRunner(BaseTaskRunner) :
1098 """ 1099 Manages a sub-workflow task 1100 """ 1101
1102 - def __init__(self, runStatus, taskStr, workflow, sharedFlowLog, setRunstate) :
1103 BaseTaskRunner.__init__(self, runStatus, taskStr, sharedFlowLog, setRunstate) 1104 self.workflow = workflow
1105
1106 - def _run(self) :
1107 namespace = self.workflow._getNamespace() 1108 nsLabel = namespaceLabel(namespace) 1109 self.infoLog("Starting task specification for %s" % (nsLabel)) 1110 self.workflow._setRunning(True) 1111 self.workflow.workflow() 1112 self.workflow._setRunning(False) 1113 self.runStatus.isSpecificationComplete.set() 1114 self.infoLog("Finished task specification for %s, waiting for task completion" % (nsLabel)) 1115 retval = self.workflow._waitForTasksCore(namespace, isVerbose=False) 1116 retmsg = "" 1117 return (retval, retmsg)
1118
1119 1120 -class CommandTaskRunner(BaseTaskRunner) :
1121 """ 1122 Parent to local and SGE TaskRunner specializations for command tasks 1123 """ 1124 1125 taskWrapper = os.path.join(moduleDir, "pyflowTaskWrapper.py") 1126
1127 - def __init__(self, runStatus, runid, taskStr, cmd, nCores, memMb, retry, isDryRun, 1128 outFile, errFile, tmpDir, schedulerArgList, 1129 sharedFlowLog, setRunstate) :
1130 """ 1131 @param outFile: stdout file 1132 @param errFile: stderr file 1133 @param tmpDir: location to write files containing output from 1134 the task wrapper script (and not the wrapped task) 1135 """ 1136 BaseTaskRunner.__init__(self, runStatus, taskStr, sharedFlowLog, setRunstate) 1137 1138 self.cmd = cmd 1139 self.nCores = nCores 1140 self.memMb = memMb 1141 self.retry = retry 1142 self.isDryRun = isDryRun 1143 self.outFile = outFile 1144 self.errFile = errFile 1145 self.tmpDir = tmpDir 1146 self.schedulerArgList = schedulerArgList 1147 self.runid = runid 1148 self.taskStr = taskStr 1149 if not os.path.isfile(self.taskWrapper) : 1150 raise Exception("Can't find task wrapper script: %s" % self.taskWrapper)
1151 1152
1153 - def initFileSystemItems(self):
1154 import pickle 1155 1156 ensureDir(self.tmpDir) 1157 self.wrapFile = os.path.join(self.tmpDir, "pyflowTaskWrapper.signal.txt") 1158 1159 # setup all the data to be passed to the taskWrapper and put this in argFile: 1160 taskInfo = { 'nCores' : self.nCores, 1161 'outFile' : self.outFile, 'errFile' : self.errFile, 1162 'cwd' : self.cmd.cwd, 'env' : self.cmd.env, 1163 'cmd' : self.cmd.cmd, 'isShellCmd' : (self.cmd.type == "str") } 1164 1165 argFile = os.path.join(self.tmpDir, "taskWrapperParameters.pickle") 1166 pickle.dump(taskInfo, open(argFile, "w")) 1167 1168 self.wrapperCmd = [self.taskWrapper, self.runid, self.taskStr, argFile]
1169 1170
1171 - def _run(self) :
1172 """ 1173 Outer loop of _run() handles task retry behavior: 1174 """ 1175 1176 # these initialization steps only need to happen once: 1177 self.initFileSystemItems() 1178 1179 startTime = time.time() 1180 retries = 0 1181 retInfo = Bunch(retval=1, taskExitMsg="", isAllowRetry=False) 1182 1183 while not self.stopped() : 1184 if retries : 1185 self.infoLog("Retrying task: '%s'. Total prior task failures: %i" % (self.taskStr, retries)) 1186 1187 if self.isDryRun : 1188 self.infoLog("Dryrunning task: '%s' task arg list: [%s]" % (self.taskStr, ",".join(['"%s"' % (s) for s in self.getFullCmd()]))) 1189 retInfo.retval = 0 1190 else : 1191 self.runOnce(retInfo) 1192 1193 if retInfo.retval == 0 : break 1194 if retries >= self.retry.max : break 1195 elapsed = (time.time() - startTime) 1196 if (self.retry.window > 0) and \ 1197 (elapsed >= self.retry.window) : break 1198 if self.stopped() : break 1199 if not retInfo.isAllowRetry : break 1200 retries += 1 1201 self.warningLog("Task: '%s' failed but qualifies for retry. Total task failures (including this one): %i. Task command: '%s'" % (self.taskStr, retries, str(self.cmd))) 1202 retInfo = Bunch(retval=1, taskExitMsg="", isAllowRetry=False) 1203 time.sleep(self.retry.wait) 1204 1205 return (retInfo.retval, retInfo.taskExitMsg)
1206 1207
1208 - def getExitMsg(self) :
1209 """ 1210 Attempt to extract exit message from a failed command task, do not complain in 1211 case of any errors in task signal file for this case. 1212 """ 1213 msgSize = None 1214 wrapFp = open(self.wrapFile) 1215 for line in wrapFp: 1216 w = line.strip().split() 1217 if (len(w) < 6) or (w[4] != "[wrapperSignal]") : 1218 break 1219 if w[5] == "taskStderrTail" : 1220 if (len(w) == 7) : msgSize = int(w[6]) 1221 break 1222 1223 taskExitMsg = "" 1224 if msgSize is not None : 1225 i = 0 1226 for line in wrapFp: 1227 if i >= msgSize : break 1228 taskExitMsg += line 1229 i += 1 1230 wrapFp.close() 1231 return taskExitMsg
1232 1233
1234 - def getWrapFileResult(self) :
1235 """ 1236 When the task is theoretically done, go and read the task wrapper to 1237 see the actual task exit code. This is required because: 1238 1239 1) On SGE or similar: We have no other way to get the exit code 1240 1241 2) On all systems, we can distinguish between a conventional task error 1242 and other problems, such as (a) linux OOM killer (b) exception in the 1243 task wrapper itself (c) filesystem failures. 1244 """ 1245 1246 def checkWrapFileExit(result) : 1247 """ 1248 return isError=True on error in file format only, missing or incomplete file 1249 is not considered an error and the function should not return an error for this 1250 case. 1251 """ 1252 1253 if not os.path.isfile(self.wrapFile) : return 1254 1255 for line in open(self.wrapFile) : 1256 # an incomplete line indicates that the file is still being written: 1257 if len(line) == 0 or line[-1] != '\n' : return 1258 1259 w = line.strip().split() 1260 1261 if len(w) < 6 : 1262 result.isError = True 1263 return 1264 if (w[4] != "[wrapperSignal]") : 1265 result.isError = True 1266 return 1267 if w[5] == "taskExitCode" : 1268 if (len(w) == 7) : 1269 result.taskExitCode = int(w[6]) 1270 return
1271 1272 retryCount = 8 1273 retryDelaySec = 30 1274 1275 wrapResult = Bunch(taskExitCode=None, isError=False) 1276 1277 totalDelaySec = 0 1278 for trialIndex in range(retryCount) : 1279 # if the problem occurs at 0 seconds don't bother with a warning, but 1280 # if we've gone through a full retry cycle, then the filesystem delay is 1281 # getting unusual and should be a warning: 1282 if trialIndex > 1 : 1283 msg = "No complete signal file found after %i seconds, retrying after delay. Signal file path: '%s'" % (totalDelaySec,self.wrapFile) 1284 self.flowLog(msg, logState=LogState.WARNING) 1285 1286 if trialIndex != 0 : 1287 time.sleep(retryDelaySec) 1288 totalDelaySec += retryDelaySec 1289 1290 checkWrapFileExit(wrapResult) 1291 if wrapResult.isError : break 1292 if wrapResult.taskExitCode is not None : break 1293 1294 return wrapResult
1295 1296
1297 - def getWrapperErrorMsg(self) :
1298 if os.path.isfile(self.wrapFile) : 1299 stderrList = open(self.wrapFile).readlines() 1300 taskExitMsg = ["Anomalous task wrapper stderr output. Wrapper signal file: '%s'" % (self.wrapFile), 1301 "Logging %i line(s) of task wrapper log output below:" % (len(stderrList))] 1302 linePrefix = "[taskWrapper-stderr]" 1303 taskExitMsg.extend([linePrefix + " " + line for line in stderrList]) 1304 else : 1305 taskExitMsg = ["Anomalous task wrapper condition: Wrapper signal file is missing: '%s'" % (self.wrapFile)] 1306 1307 return taskExitMsg
1308
1309 1310 1311 -class LocalTaskRunner(CommandTaskRunner) :
1312
1313 - def getFullCmd(self) :
1314 return [sys.executable] + self.wrapperCmd
1315
1316 - def runOnce(self, retInfo) :
1317 # sys.stderr.write("starting subprocess call. task '%s' cmd '%s'" % (self.taskStr,self.cmd)) 1318 # sys.stderr.write("full cmd: "+" ".join(self.getFullCmd()) + "\n") 1319 wrapFp = open(self.wrapFile, "w") 1320 proc = subprocess.Popen(self.getFullCmd(), stdout=wrapFp, stderr=subprocess.STDOUT, shell=False, bufsize=1) 1321 self.infoLog("Task initiated on local node") 1322 retInfo.retval = proc.wait() 1323 wrapFp.close() 1324 1325 wrapResult = self.getWrapFileResult() 1326 1327 if (wrapResult.taskExitCode is None) or (wrapResult.taskExitCode != retInfo.retval): 1328 retInfo.taskExitMsg = self.getWrapperErrorMsg() 1329 retInfo.retval = 1 1330 return retInfo 1331 elif retInfo.retval != 0 : 1332 retInfo.taskExitMsg = self.getExitMsg() 1333 1334 retInfo.isAllowRetry = True 1335 1336 # success! (taskWrapper, but maybe not for the task...) 1337 return retInfo
1338
1339 1340 1341 -class QCaller(threading.Thread) :
1342 """ 1343 Calls to both qsub and qstat go through this run() method so that we 1344 can time them out: 1345 """ 1346
1347 - def __init__(self, cmd, infoLog) :
1348 threading.Thread.__init__(self) 1349 self.setDaemon(True) 1350 self.setName("QCaller-Timeout-Thread") 1351 self.lock = threading.RLock() 1352 self.cmd = cmd 1353 self.infoLog = infoLog 1354 self.results = Bunch(isComplete=False, retval=1, outList=[]) 1355 self.proc = None 1356 self.is_kill_attempt = False
1357
1358 - def run(self) :
1359 # Note: Moved Popen() call outside of the mutex and 1360 # stopped using proc.communicate() here after 1361 # observing python interpreter bug: 1362 # http://bugs.python.org/issue13817 1363 # 1364 # The interpreter deadlock for this issue has been 1365 # observed to block the Popen() call below when using 1366 # python 2.7.2: 1367 # 1368 # Oct 2014 - also wrapped this call with a semaphore because 1369 # of the high memory usage associated with each qsub/qstat 1370 # subprocess. This was causing pyflow jobs to become unstable 1371 # as they would spontaneously exceed the maximum allowed master 1372 # process memory. 1373 # 1374 GlobalSync.subprocessControl.acquire() 1375 try : 1376 tmp_proc = subprocess.Popen(self.cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False) 1377 self.lock.acquire() 1378 try: 1379 self.proc = tmp_proc 1380 # handle the case where Popen was taking its good sweet time and a killProc() was sent in the meantime: 1381 if self.is_kill_attempt: self.killProc() 1382 finally: 1383 self.lock.release() 1384 1385 if self.is_kill_attempt: return 1386 1387 for line in self.proc.stdout : 1388 self.results.outList.append(line) 1389 self.results.retval = self.proc.wait() 1390 finally: 1391 GlobalSync.subprocessControl.release() 1392 self.results.isComplete = True
1393 1394 @lockMethod
1395 - def killProc(self) :
1396 import signal 1397 1398 self.is_kill_attempt = True 1399 1400 if self.proc is None : return 1401 1402 try: 1403 os.kill(self.proc.pid , signal.SIGTERM) 1404 self.infoLog("Sent SIGTERM to sge command process id: %i" % (self.proc.pid)) 1405 except OSError : 1406 # process ended before we could kill it (hopefully rare, but possible race condition artifact) 1407 pass
1408
1409 1410 1411 -class SGETaskRunner(CommandTaskRunner) :
1412
1413 - def getFullCmd(self):
1414 # qsub options: 1415 # 1416 qsubCmd = ["qsub", 1417 "-V", # import environment variables from shell 1418 "-cwd", # use current working directory 1419 "-S", sys.executable, # The taskwrapper script is python 1420 "-o", self.wrapFile, 1421 "-e", self.wrapFile] 1422 1423 qsubCmd.extend(self.schedulerArgList) 1424 qsubCmd.extend(siteConfig.qsubResourceArg(self.nCores, self.memMb)) 1425 qsubCmd.extend(self.wrapperCmd) 1426 1427 return tuple(qsubCmd)
1428 1429
1430 - def setInitialRunstate(self) :
1431 self.setRunstate("queued")
1432 1433 1434 @lockMethod
1435 - def setNewJobId(self, jobId) :
1436 """ 1437 if stopped here, this is the case where a ctrl-c was entered while the qsub 1438 command was being submitted, so we must kill the job here: 1439 """ 1440 self.jobId = jobId 1441 if self.stopped(): self._killJob()
1442 1443
1444 - def runOnce(self, retInfo) :
1445 1446 def qcallWithTimeouts(cmd, maxQcallAttempt=1) : 1447 maxQcallWait = 180 1448 qcall = None 1449 for i in range(maxQcallAttempt) : 1450 qcall = QCaller(cmd,self.infoLog) 1451 qcall.start() 1452 qcall.join(maxQcallWait) 1453 if not qcall.isAlive() : break 1454 self.infoLog("Trial %i of sge command has timed out. Killing process for cmd '%s'" % ((i + 1), cmd)) 1455 qcall.killProc() 1456 self.infoLog("Finished attempting to kill sge command") 1457 1458 return qcall.results
1459 1460 # 1) call qsub, check for errors and retrieve taskId: 1461 # 1462 if os.path.isfile(self.wrapFile): os.remove(self.wrapFile) 1463 1464 # write extra info, just in case we need it for post-mortem debug: 1465 qsubFile = os.path.join(os.path.dirname(self.wrapFile), "qsub.args.txt") 1466 if os.path.isfile(qsubFile): os.remove(qsubFile) 1467 qsubfp = open(qsubFile, "w") 1468 for arg in self.getFullCmd() : 1469 qsubfp.write(arg + "\n") 1470 qsubfp.close() 1471 1472 results = qcallWithTimeouts(self.getFullCmd()) 1473 1474 isQsubError = False 1475 self.jobId = None 1476 if len(results.outList) != 1 : 1477 isQsubError = True 1478 else : 1479 w = results.outList[0].split() 1480 if (len(w) > 3) and (w[0] == "Your") and (w[1] == "job") : 1481 self.setNewJobId(int(w[2])) 1482 else : 1483 isQsubError = True 1484 1485 if not results.isComplete : 1486 self._killJob() # just in case... 1487 retInfo.taskExitMsg = ["Job submission failure -- qsub command timed-out"] 1488 return retInfo 1489 1490 if isQsubError or (self.jobId is None): 1491 retInfo.taskExitMsg = ["Unexpected qsub output. Logging %i line(s) of qsub output below:" % (len(results.outList)) ] 1492 retInfo.taskExitMsg.extend([ "[qsub-out] " + line for line in results.outList ]) 1493 return retInfo 1494 1495 if results.retval != 0 : 1496 retInfo.retval = results.retval 1497 retInfo.taskExitMsg = ["Job submission failure -- qsub returned exit code: %i" % (retInfo.retval)] 1498 return retInfo 1499 1500 # No qsub errors detected and an sge job_number is acquired -- success! 1501 self.infoLog("Task submitted to sge queue with job_number: %i" % (self.jobId)) 1502 1503 1504 # 2) poll jobId until sge indicates it's not running or queued: 1505 # 1506 queueStatus = Bunch(isQueued=True, runStartTimeStamp=None) 1507 1508 def checkWrapFileRunStart(result) : 1509 """ 1510 check wrapper file for a line indicating that it has transitioned from queued to 1511 running state. Allow for NFS delay or incomplete file 1512 """ 1513 if not os.path.isfile(self.wrapFile) : return 1514 for line in open(self.wrapFile) : 1515 w = line.strip().split() 1516 if (len(w) < 6) or (w[4] != "[wrapperSignal]") : 1517 # this could be incomplete flush to the signal file, so 1518 # don't treat it as error: 1519 return 1520 if w[5] == "taskStart" : 1521 result.runStartTimeStamp = timeStrToTimeStamp(w[0].strip('[]')) 1522 result.isQueued = False 1523 return
1524 1525 1526 # exponential polling times -- make small jobs responsive but give sge a break on long runs... 1527 ewaiter = ExpWaiter(5, 1.7, 60) 1528 1529 pollCmd = ("/bin/bash", "--noprofile", "-o", "pipefail", "-c", "qstat -j %i | awk '/^error reason/'" % (self.jobId)) 1530 while not self.stopped(): 1531 results = qcallWithTimeouts(pollCmd, 6) 1532 isQstatError = False 1533 if results.retval != 0: 1534 if ((len(results.outList) == 2) and 1535 (results.outList[0].strip() == "Following jobs do not exist:") and 1536 (int(results.outList[1]) == self.jobId)) : 1537 break 1538 else : 1539 isQstatError = True 1540 else : 1541 if (len(results.outList) != 0) : 1542 isQstatError = True 1543 1544 if isQstatError : 1545 if not results.isComplete : 1546 retInfo.taskExitMsg = ["The qstat command for sge job_number %i has timed out for all attempted retries" % (self.jobId)] 1547 self._killJob() 1548 else : 1549 retInfo.taskExitMsg = ["Unexpected qstat output or task has entered sge error state. Sge job_number: %i" % (self.jobId)] 1550 retInfo.taskExitMsg.extend(["Logging %i line(s) of qstat output below:" % (len(results.outList)) ]) 1551 retInfo.taskExitMsg.extend([ "[qstat-out] " + line for line in results.outList ]) 1552 # self._killJob() # leave the job there so the user can better diagnose whetever unexpected pattern has occurred 1553 return retInfo 1554 1555 # also check to see if job has transitioned from queued to running state: 1556 if queueStatus.isQueued : 1557 checkWrapFileRunStart(queueStatus) 1558 if not queueStatus.isQueued : 1559 self.setRunstate("running", queueStatus.runStartTimeStamp) 1560 1561 ewaiter.wait() 1562 1563 if self.stopped() : 1564 # self._killJob() # no need, job should already have been killed at the stop() call... 1565 return retInfo 1566 1567 lastJobId = self.jobId 1568 1569 # if we've correctly communicated with SGE, then its roll is done here 1570 # if a job kill is required for any of the error states above, it needs to be 1571 # added before this point: 1572 self.jobId = None 1573 1574 wrapResult = self.getWrapFileResult() 1575 1576 if wrapResult.taskExitCode is None : 1577 retInfo.taskExitMsg = ["Sge job_number: '%s'" % (lastJobId)] 1578 retInfo.taskExitMsg.extend(self.getWrapperErrorMsg()) 1579 retInfo.retval = 1 1580 return retInfo 1581 elif wrapResult.taskExitCode != 0 : 1582 retInfo.taskExitMsg = self.getExitMsg() 1583 1584 retInfo.retval = wrapResult.taskExitCode 1585 retInfo.isAllowRetry = True 1586 1587 # success! (for sge & taskWrapper, but maybe not for the task...) 1588 return retInfo 1589 1590 1591 @lockMethod
1592 - def _killJob(self) :
1593 """ 1594 (possibly) asynchronous job kill 1595 """ 1596 try : isKilled = self.isKilled 1597 except AttributeError: isKilled = False 1598 if isKilled: return 1599 1600 try : jobId = self.jobId 1601 except AttributeError: jobId = None 1602 if jobId is None: return 1603 killCmd = ["qdel", "%i" % (int(jobId))] 1604 # don't wait for or check exit code of kill cmd... just give it one try 1605 # because we want cleanup to go as quickly as possible 1606 subprocess.Popen(killCmd, shell=False) 1607 self.isKilled = True
1608 1609 1610 @lockMethod
1611 - def stop(self) :
1612 """ 1613 overload thead stop function to provide a 1614 qdel any running tasks. 1615 """ 1616 CommandTaskRunner.stop(self) 1617 self._killJob()
1618
1619 1620 1621 -class TaskFileWriter(StoppableThread) :
1622 """ 1623 This class runs on a separate thread and is 1624 responsible for updating the state and info task 1625 files 1626 """ 1627
1628 - def __init__(self, writeFunc) :
1629 StoppableThread.__init__(self) 1630 # parameter copy: 1631 self.writeFunc = writeFunc 1632 # thread settings: 1633 self.setDaemon(True) 1634 self.setName("TaskFileWriter-Thread") 1635 1636 self.isWrite = threading.Event()
1637
1638 - def run(self) :
1639 while not self.stopped() : 1640 self._writeIfSet() 1641 time.sleep(5) 1642 self.isWrite.wait()
1643
1644 - def flush(self):
1645 self._writeIfSet()
1646
1647 - def _writeIfSet(self) :
1648 if self.isWrite.isSet() : 1649 self.isWrite.clear() 1650 self.writeFunc()
1651
1652 1653 1654 -class TaskManager(StoppableThread) :
1655 """ 1656 This class runs on a separate thread from workflowRunner, 1657 launching jobs based on the current state of the TaskDAG 1658 """ 1659
1660 - def __init__(self, cdata, tdag) :
1661 """ 1662 @param cdata: data from WorkflowRunner instance which will be 1663 constant during the lifetime of the TaskManager, 1664 should be safe to lookup w/o locking 1665 @param tdag: task graph 1666 """ 1667 StoppableThread.__init__(self) 1668 # parameter copy: 1669 self._cdata = cdata 1670 self.tdag = tdag 1671 # thread settings: 1672 self.setDaemon(True) 1673 self.setName("TaskManager-Thread") 1674 # lock is used for function (harvest), which is checked by 1675 # the WorkflowRunner under (literally) exceptional circumstances only 1676 self.lock = threading.RLock() 1677 # rm configuration: 1678 self.freeCores = self._cdata.param.nCores 1679 self.freeMemMb = self._cdata.param.memMb 1680 self.runningTasks = {} 1681 1682 # This is used to track 'pyflow mutexes' -- for each key only a single 1683 # task can run at once. Key is set to True if mutex is occupied. 1684 self.taskMutexState = {}
1685 1686 1687
1688 - def run(self) :
1689 """ 1690 TaskManager runs so long as there are outstanding jobs 1691 """ 1692 1693 try: 1694 cleanEnv() 1695 while not self._isTerm() : 1696 # update status of running jobs 1697 self.tdag.isFinishedEvent.clear() 1698 self.harvestTasks() 1699 # try to launch jobs: 1700 if self.stopped() : continue 1701 self._startTasks() 1702 self.tdag.isFinishedEvent.wait(5) 1703 except: 1704 msg = getExceptionMsg() 1705 self._flowLog(msg,logState=LogState.ERROR) 1706 self._cdata.emailNotification(msg, self._flowLog) 1707 self._cdata.setTaskManagerException()
1708 1709
1710 - def _getCommandTaskRunner(self, task) :
1711 """ 1712 assist launch of a command-task 1713 """ 1714 1715 # shortcuts: 1716 payload = task.payload 1717 param = self._cdata.param 1718 1719 if payload.cmd.cmd is None : 1720 # Note these should have been marked off by the TaskManager already: 1721 raise Exception("Attempting to launch checkpoint task: %s" % (task.fullLabel())) 1722 1723 isForcedLocal = ((param.mode != "local") and (payload.isForceLocal)) 1724 1725 # mark task resources as occupied: 1726 if not isForcedLocal : 1727 if self.freeCores != "unlimited" : 1728 if (self.freeCores < payload.nCores) : 1729 raise Exception("Not enough free cores to launch task") 1730 self.freeCores -= payload.nCores 1731 1732 if self.freeMemMb != "unlimited" : 1733 if (self.freeMemMb < payload.memMb) : 1734 raise Exception("Not enough free memory to launch task") 1735 self.freeMemMb -= payload.memMb 1736 1737 if payload.mutex is not None : 1738 self.taskMutexState[payload.mutex] = True 1739 1740 TaskRunner = None 1741 if param.mode == "local" or payload.isForceLocal or payload.isCmdMakePath : 1742 TaskRunner = LocalTaskRunner 1743 elif param.mode == "sge" : 1744 TaskRunner = SGETaskRunner 1745 else : 1746 raise Exception("Can't support mode: '%s'" % (param.mode)) 1747 1748 # 1749 # TODO: find less hacky way to handle make tasks: 1750 # 1751 taskRetry = payload.retry 1752 1753 if payload.isCmdMakePath : 1754 taskRetry = copy.deepcopy(payload.retry) 1755 taskRetry.window = 0 1756 1757 if param.mode == "local" or payload.isForceLocal : 1758 launchCmdList = ["make", "-j", str(payload.nCores)] 1759 elif param.mode == "sge" : 1760 launchCmdList = siteConfig.getSgeMakePrefix(payload.nCores, payload.memMb, param.schedulerArgList) 1761 else : 1762 raise Exception("Can't support mode: '%s'" % (param.mode)) 1763 1764 launchCmdList.extend(["-C", payload.cmd.cmd]) 1765 payload.launchCmd = Command(launchCmdList, payload.cmd.cwd, payload.cmd.env) 1766 1767 # 1768 # each commandTaskRunner requires a unique tmp dir to write 1769 # wrapper signals to. TaskRunner will create this directory -- it does not bother to destroy it right now: 1770 # 1771 1772 # split the task id into two parts to keep from adding too many files to one directory: 1773 tmpDirId1 = "%03i" % ((int(task.id) / 1000)) 1774 tmpDirId2 = "%03i" % ((int(task.id) % 1000)) 1775 taskRunnerTmpDir = os.path.join(self._cdata.wrapperLogDir, tmpDirId1, tmpDirId2) 1776 1777 return TaskRunner(task.runStatus, self._cdata.getRunid(), 1778 task.fullLabel(), payload.launchCmd, 1779 payload.nCores, payload.memMb, 1780 taskRetry, param.isDryRun, 1781 self._cdata.taskStdoutFile, 1782 self._cdata.taskStderrFile, 1783 taskRunnerTmpDir, 1784 param.schedulerArgList, 1785 self._cdata.flowLog, 1786 task.setRunstate)
1787 1788
1789 - def _getWorkflowTaskRunner(self, task) :
1790 """ 1791 assist launch of a workflow-task 1792 """ 1793 return WorkflowTaskRunner(task.runStatus, task.fullLabel(), task.payload.workflow, 1794 self._cdata.flowLog, task.setRunstate)
1795 1796
1797 - def _launchTask(self, task) :
1798 """ 1799 launch a specific task 1800 """ 1801 1802 if task.payload.type() == "command" : 1803 trun = self._getCommandTaskRunner(task) 1804 elif task.payload.type() == "workflow" : 1805 trun = self._getWorkflowTaskRunner(task) 1806 else : 1807 assert 0 1808 1809 self._infoLog("Launching %s: '%s' from %s" % (task.payload.desc(), task.fullLabel(), namespaceLabel(task.namespace))) 1810 trun.start() 1811 self.runningTasks[task] = trun
1812 1813 1814 @lockMethod
1815 - def _startTasks(self) :
1816 """ 1817 determine what tasks, if any, can be started 1818 1819 Note that the lock is here to protect self.runningTasks 1820 """ 1821 # trace through DAG, completing any empty-command checkpoints 1822 # found with all dependencies completed: 1823 (ready, completed) = self.tdag.getReadyTasks() 1824 for node in completed: 1825 if self.stopped() : return 1826 self._infoLog("Completed %s: '%s' launched from %s" % (node.payload.desc(), node.fullLabel(), namespaceLabel(node.namespace))) 1827 1828 # launch all workflows first, then command tasks as resources 1829 # allow: 1830 ready_workflows = [r for r in ready if r.payload.type() == "workflow"] 1831 for task in ready_workflows : 1832 if self.stopped() : return 1833 self._launchTask(task) 1834 1835 # task submission could be shutdown, eg. in response to a task 1836 # error: 1837 if (not self._cdata.isTaskSubmissionActive()) : return 1838 1839 isNonLocal = (self._cdata.param.mode != "local") 1840 1841 # start command task launch: 1842 ready_commands = [r for r in ready if r.payload.type() == "command"] 1843 ready_commands.sort(key=lambda t: (t.payload.priority, t.payload.nCores), reverse=True) 1844 for task in ready_commands : 1845 if self.stopped() : return 1846 1847 # In a non-local run mode, "isForceLocal" tasks are not subject to 1848 # global core and memory restrictions: 1849 isForcedLocal = (isNonLocal and task.payload.isForceLocal) 1850 if not isForcedLocal : 1851 if ((self.freeCores != "unlimited") and (task.payload.nCores > self.freeCores)) : continue 1852 if ((self.freeMemMb != "unlimited") and (task.payload.memMb > self.freeMemMb)) : continue 1853 1854 # all command tasks must obey separate mutex restrictions: 1855 if ((task.payload.mutex is not None) and 1856 (task.payload.mutex in self.taskMutexState) and 1857 (self.taskMutexState[task.payload.mutex])) : continue 1858 1859 self._launchTask(task)
1860 1861
1862 - def _removeTaskFromRunningSet(self, task) :
1863 """ 1864 Given a running task which is already shown to be finished running, remove it from the running set, and 1865 recover allocated resources. 1866 """ 1867 assert(task in self.runningTasks) 1868 1869 # shortcut: 1870 param = self._cdata.param 1871 1872 # recover core and memory allocations: 1873 if task.payload.type() == "command" : 1874 isForcedLocal = ((param.mode != "local") and (task.payload.isForceLocal)) 1875 if not isForcedLocal : 1876 if self.freeCores != "unlimited" : 1877 self.freeCores += task.payload.nCores 1878 if self.freeMemMb != "unlimited" : 1879 self.freeMemMb += task.payload.memMb 1880 1881 if task.payload.mutex is not None : 1882 self.taskMutexState[task.payload.mutex] = False 1883 1884 del self.runningTasks[task]
1885 1886 1887 1888 @lockMethod
1889 - def harvestTasks(self) :
1890 """ 1891 Check the set of running tasks to see if they've completed and update 1892 Node status accordingly: 1893 """ 1894 notrunning = set() 1895 for task in self.runningTasks.keys() : 1896 if self.stopped() : break 1897 trun = self.runningTasks[task] 1898 if not task.runStatus.isComplete.isSet() : 1899 if trun.isAlive() : continue 1900 # if not complete and thread is dead then we don't know what happened, very bad!: 1901 task.errorstate = 1 1902 task.errorMessage = "Thread: '%s', has stopped without a traceable cause" % (trun.getName()) 1903 else : 1904 task.errorstate = task.runStatus.errorCode 1905 task.errorMessage = task.runStatus.errorMessage 1906 1907 if task.errorstate == 0 : 1908 task.setRunstate("complete") 1909 else: 1910 task.setRunstate("error") 1911 1912 notrunning.add(task) 1913 1914 if not task.isError() : 1915 self._infoLog("Completed %s: '%s' launched from %s" % (task.payload.desc(), task.fullLabel(), namespaceLabel(task.namespace))) 1916 else: 1917 msg = task.getTaskErrorMsg() 1918 1919 if self._cdata.isTaskSubmissionActive() : 1920 # if this is the first error in the workflow, then 1921 # we elaborate a bit on the workflow's response to 1922 # the error. We also send any email-notifications 1923 # for the first error only: 1924 msg.extend(["Shutting down task submission. Waiting for remaining tasks to complete."]) 1925 1926 self._errorLog(msg) 1927 if self._cdata.isTaskSubmissionActive() : 1928 self._cdata.emailNotification(msg, self._flowLog) 1929 1930 # Be sure to send notifications *before* setting error 1931 # bits, because the WorkflowRunner may decide to 1932 # immediately shutdown all tasks and pyflow threads on 1933 # the first error: 1934 self._cdata.setTaskError(task) 1935 1936 # recover task resources: 1937 for task in notrunning : 1938 self._removeTaskFromRunningSet(task)
1939 1940 @lockMethod
1941 - def cancelTaskTree(self, task) :
1942 """ 1943 Cancel a task and all of its children, without labeling the canceled tasks as errors 1944 1945 A canceled task will be stopped if it is running, or unqueued if it is waiting, and will be put into the 1946 waiting/ignored state unless it has already completed. 1947 """ 1948 1949 # Recursively cancel child tasks: 1950 for child in task.children : 1951 self.cancelTaskTree(child) 1952 1953 self._infoLog("Canceling %s '%s' from %s" % (task.payload.desc(), task.fullLabel(), namespaceLabel(task.namespace))) 1954 1955 # Stop the task if it is running: 1956 if task in self.runningTasks : 1957 taskRunner = self.runningTasks[task] 1958 taskRunner.stop() 1959 self._removeTaskFromRunningSet(task) 1960 1961 # Reset the task to be ignored unless it is already done: 1962 if not task.isDone() : 1963 task.runstate = "waiting" 1964 task.isIgnoreThis = True
1965 1966 1967 1968 @lockMethod
1969 - def stop(self) :
1970 StoppableThread.stop(self) 1971 for trun in self.runningTasks.values() : 1972 trun.stop()
1973 1974 1975 @lockMethod
1976 - def _areTasksDead(self) :
1977 for trun in self.runningTasks.values() : 1978 if trun.isAlive(): return False 1979 return True
1980 1981
1982 - def _isTerm(self) :
1983 # check for explicit thread stop request (presumably from the workflowManager): 1984 # if this happens we exit the polling loop 1985 # 1986 if self.stopped() : 1987 while True : 1988 if self._areTasksDead() : return True 1989 time.sleep(1) 1990 1991 # check for "regular" termination conditions: 1992 if (not self._cdata.isTaskSubmissionActive()) : 1993 return (len(self.runningTasks) == 0) 1994 else : 1995 if self.tdag.isRunComplete() : 1996 if (len(self.runningTasks) != 0) : 1997 raise Exception("Inconsistent TaskManager state: workflow appears complete but there are still running tasks") 1998 return True 1999 elif self.tdag.isRunExhausted() : 2000 return True 2001 else : 2002 return False
2003 2004
2005 - def _flowLog(self, msg, logState) :
2006 linePrefixOut = "[TaskManager]" 2007 # if linePrefix is not None : linePrefixOut+=" "+linePrefix 2008 self._cdata.flowLog(msg, linePrefix=linePrefixOut, logState=logState)
2009 2010
2011 - def _infoLog(self, msg) :
2012 self._flowLog(msg, logState=LogState.INFO)
2013
2014 - def _errorLog(self, msg) :
2015 self._flowLog(msg, logState=LogState.ERROR)
2016
2017 2018 2019 2020 # payloads are used to manage the different 2021 # possible actions attributed to task nodes: 2022 # 2023 -class CmdPayload(object) :
2024 - def __init__(self, fullLabel, cmd, nCores, memMb, priority, 2025 isForceLocal, isCmdMakePath=False, isTaskStable=True, 2026 mutex=None, retry=None) :
2027 self.cmd = cmd 2028 self.nCores = nCores 2029 self.memMb = memMb 2030 self.priority = priority 2031 self.isForceLocal = isForceLocal 2032 self.isCmdMakePath = isCmdMakePath 2033 self.isTaskStable = isTaskStable 2034 self.mutex = mutex 2035 self.retry = retry 2036 2037 # launch command includes make/qmake wrapper for Make path commands: 2038 self.launchCmd = cmd 2039 2040 if (cmd.cmd is None) and ((nCores != 0) or (memMb != 0)) : 2041 raise Exception("Null tasks should not have resource requirements. task: '%s'" % (fullLabel))
2042
2043 - def type(self) :
2044 return "command"
2045
2046 - def desc(self) :
2047 return "command task"
2048
2049 2050 -class WorkflowPayload(object) :
2051 - def __init__(self, workflow) :
2052 self.workflow = workflow 2053 self.isTaskStable = True
2054
2055 - def type(self) :
2056 return "workflow"
2057
2058 - def name(self) :
2059 if self.workflow is None : 2060 return "None" 2061 else : 2062 return self.workflow._whoami()
2063
2064 - def desc(self) :
2065 return "sub-workflow task"
2066
2067 2068 2069 -class TaskNode(object) :
2070 """ 2071 Represents an individual task in the task graph 2072 """ 2073
2074 - def __init__(self, lock, init_id, namespace, label, payload, isContinued, isFinishedEvent, isWriteTaskStatus) :
2075 self.lock = lock 2076 self.id = init_id 2077 self.namespace = namespace 2078 self.label = label 2079 self.payload = payload 2080 self.isContinued = isContinued 2081 self.isWriteTaskStatus = isWriteTaskStatus 2082 2083 # if true, do not execute this task or honor it as a dependency for child tasks 2084 self.isIgnoreThis = False 2085 2086 # if true, set the ignore state for all children of this task to true 2087 self.isIgnoreChildren = False 2088 2089 # if true, this task and its dependents will be automatically marked as completed (until 2090 # a startFromTasks node is found) 2091 self.isAutoCompleted = False 2092 2093 # task is reset to waiting runstate in a continued run 2094 self.isReset = False 2095 2096 self.parents = set() 2097 self.children = set() 2098 self.runstateUpdateTimeStamp = time.time() 2099 if self.isContinued: 2100 self.runstate = "complete" 2101 else: 2102 self.runstate = "waiting" 2103 self.errorstate = 0 2104 2105 # errorMessage is used by sub-workflow tasks, but not by command taks: 2106 self.errorMessage = "" 2107 2108 # This is a link to the live status object updated by TaskRunner: 2109 self.runStatus = RunningTaskStatus(isFinishedEvent)
2110
2111 - def __str__(self) :
2112 msg = "TASK id: %s state: %s error: %i" % (self.fullLabel(), self.runstate, self.errorstate) 2113 return msg
2114
2115 - def fullLabel(self) :
2116 return namespaceJoin(self.namespace, self.label)
2117 2118 @lockMethod
2119 - def isDone(self) :
2120 "task has gone as far as it can" 2121 return ((self.runstate == "error") or (self.runstate == "complete"))
2122 2123 @lockMethod
2124 - def isError(self) :
2125 "true if an error occurred in this node" 2126 return ((self.errorstate != 0) or (self.runstate == "error"))
2127 2128 @lockMethod
2129 - def isComplete(self) :
2130 "task completed without error" 2131 return ((self.errorstate == 0) and (self.runstate == "complete"))
2132 2133 @lockMethod
2134 - def isReady(self) :
2135 "task is ready to be run" 2136 retval = ((self.runstate == "waiting") and (self.errorstate == 0) and (not self.isIgnoreThis)) 2137 if retval : 2138 for p in self.parents : 2139 if p.isIgnoreThis : continue 2140 if not p.isComplete() : 2141 retval = False 2142 break 2143 return retval
2144 2145
2146 - def _isDeadWalker(self, searched) :
2147 "recursive helper function for isDead()" 2148 2149 # the fact that you're still searching means that it must have returned False last time: 2150 if self in searched : return False 2151 searched.add(self) 2152 2153 if self.isError() : return True 2154 if self.isComplete() : return False 2155 for p in self.parents : 2156 if p._isDeadWalker(searched) : return True 2157 return False
2158 2159 @lockMethod
2160 - def isDead(self) :
2161 """ 2162 If true, there's no longer a point to waiting for this task, 2163 because it either has an error or there is an error in an 2164 upstream dependency 2165 """ 2166 2167 # searched is used to restrict the complexity of this 2168 # operation on large graphs: 2169 searched = set() 2170 return self._isDeadWalker(searched)
2171 2172 @lockMethod
2173 - def setRunstate(self, runstate, updateTimeStamp=None) :
2174 """ 2175 updateTimeStamp is only supplied in the case where the state 2176 transition time is interestingly different than the function 2177 call time. This can happen with the state update comes from 2178 a polling function with a long poll interval. 2179 """ 2180 if runstate not in TaskNodeConstants.validRunstates : 2181 raise Exception("Can't set TaskNode runstate to %s" % (runstate)) 2182 2183 if updateTimeStamp is None : 2184 self.runstateUpdateTimeStamp = time.time() 2185 else : 2186 self.runstateUpdateTimeStamp = updateTimeStamp 2187 self.runstate = runstate 2188 self.isWriteTaskStatus.set()
2189 2190 @lockMethod
2191 - def getTaskErrorMsg(self) :
2192 """ 2193 generate consistent task error message from task state 2194 """ 2195 2196 if not self.isError() : return [] 2197 2198 msg = "Failed to complete %s: '%s' launched from %s" % (self.payload.desc(), self.fullLabel(), namespaceLabel(self.namespace)) 2199 if self.payload.type() == "command" : 2200 msg += ", error code: %s, command: '%s'" % (str(self.errorstate), str(self.payload.launchCmd)) 2201 elif self.payload.type() == "workflow" : 2202 msg += ", failed sub-workflow classname: '%s'" % (self.payload.name()) 2203 else : 2204 assert 0 2205 2206 msg = lister(msg) 2207 2208 if self.errorMessage != "" : 2209 msg2 = ["Error Message:"] 2210 msg2.extend(lister(self.errorMessage)) 2211 linePrefix = "[%s] " % (self.fullLabel()) 2212 for i in range(len(msg2)) : 2213 msg2[i] = linePrefix + msg2[i] 2214 msg.extend(msg2) 2215 2216 return msg
2217
2218 2219 2220 -class TaskDAG(object) :
2221 """ 2222 Holds all tasks and their dependencies. 2223 2224 Also responsible for task state persistence/continue across 2225 interrupted runs. Object is accessed by both the workflow and 2226 taskrunner threads, so it needs to be thread-safe. 2227 """ 2228
2229 - def __init__(self, isContinue, isForceContinue, isDryRun, 2230 taskInfoFile, taskStateFile, workflowClassName, 2231 startFromTasks, ignoreTasksAfter, resetTasks, 2232 flowLog) :
2233 """ 2234 No other object gets to access the taskStateFile, file locks 2235 are not required (but thread locks are) 2236 """ 2237 self.isContinue = isContinue 2238 self.isForceContinue = isForceContinue 2239 self.isDryRun = isDryRun 2240 self.taskInfoFile = taskInfoFile 2241 self.taskStateFile = taskStateFile 2242 self.workflowClassName = workflowClassName 2243 self.startFromTasks = startFromTasks 2244 self.ignoreTasksAfter = ignoreTasksAfter 2245 self.resetTasks = resetTasks 2246 self.flowLog = flowLog 2247 2248 # unique id for each task in each run -- not persistent across continued runs: 2249 self.taskId = 0 2250 2251 # as tasks are added, occasionally spool task info to disk, and record the last 2252 # task index written + 1 2253 self.lastTaskIdWritten = 0 2254 2255 # it will be easier for people to read the task status file if 2256 # the tasks are in approximately the same order as they were 2257 # added by the workflow: 2258 self.addOrder = [] 2259 self.labelMap = {} 2260 self.headNodes = set() 2261 self.tailNodes = set() 2262 self.lock = threading.RLock() 2263 2264 # this event can be used to optionally accelerate the task cycle 2265 # when running in modes where task can set this event on completion 2266 # (ie. local mode but not sge), if this isn't set the normal polling 2267 # cycle applies 2268 self.isFinishedEvent = threading.Event() 2269 2270 self.isWriteTaskInfo = None 2271 self.isWriteTaskStatus = None
2272 2273 @lockMethod
2274 - def isTaskPresent(self, namespace, label) :
2275 return ((namespace, label) in self.labelMap)
2276 2277 @lockMethod
2278 - def getTask(self, namespace, label) :
2279 if (namespace, label) in self.labelMap : 2280 return self.labelMap[(namespace, label)] 2281 return None
2282 2283 @lockMethod
2284 - def getHeadNodes(self) :
2285 "all tasks with no parents" 2286 return list(self.headNodes)
2287 2288 @lockMethod
2289 - def getTailNodes(self) :
2290 "all tasks with no (runnable) children" 2291 return list(self.tailNodes)
2292 2293 @lockMethod
2294 - def getAllNodes(self, namespace="") :
2295 "get all nodes in this namespace" 2296 retval = [] 2297 for (taskNamespace, taskLabel) in self.addOrder : 2298 if namespace != taskNamespace : continue 2299 node=self.labelMap[(taskNamespace, taskLabel)] 2300 if node.isIgnoreThis : continue 2301 retval.append(node) 2302 return retval
2303
2304 - def _isRunExhaustedNode(self, node, searched) :
2305 2306 # the fact that you're still searching means that it must have returned true last time: 2307 if node in searched : return True 2308 searched.add(node) 2309 2310 if not node.isIgnoreThis : 2311 if not node.isDone() : 2312 return False 2313 if node.isComplete() : 2314 for c in node.children : 2315 if not self._isRunExhaustedNode(c, searched) : 2316 return False 2317 return True
2318 2319 @lockMethod
2320 - def isRunExhausted(self) :
2321 """ 2322 Returns true if the run is as complete as possible due to errors 2323 """ 2324 2325 # searched is used to restrict the complexity of this 2326 # operation on large graphs: 2327 searched = set() 2328 for node in self.getHeadNodes() : 2329 if not self._isRunExhaustedNode(node,searched) : 2330 return False 2331 return True
2332 2333 2334 @lockMethod
2335 - def isRunComplete(self) :
2336 "returns true if run is complete and error free" 2337 for node in self.labelMap.values(): 2338 if node.isIgnoreThis : continue 2339 if not node.isComplete() : 2340 return False 2341 return True
2342 2343
2344 - def _getReadyTasksFromNode(self, node, ready, searched) :
2345 "helper function for getReadyTasks" 2346 2347 if node.isIgnoreThis : return 2348 2349 if node in searched : return 2350 searched.add(node) 2351 2352 if node.isReady() : 2353 ready.add(node) 2354 else: 2355 if not node.isComplete() : 2356 for c in node.parents : 2357 self._getReadyTasksFromNode(c, ready, searched)
2358 2359 2360 @lockMethod
2361 - def getReadyTasks(self) :
2362 """ 2363 Go through DAG from the tail nodes and find all tasks which 2364 have all prerequisites completed: 2365 """ 2366 2367 completed = self.markCheckPointsComplete() 2368 ready = set() 2369 # searched is used to restrict the complexity of this 2370 # operation on large graphs: 2371 searched = set() 2372 for node in self.getTailNodes() : 2373 self._getReadyTasksFromNode(node, ready, searched) 2374 return (list(ready), list(completed))
2375 2376
2377 - def _markCheckPointsCompleteFromNode(self, node, completed, searched) :
2378 "helper function for markCheckPointsComplete" 2379 2380 if node.isIgnoreThis : return 2381 2382 if node in searched : return 2383 searched.add(node) 2384 2385 if node.isComplete() : return 2386 2387 for c in node.parents : 2388 self._markCheckPointsCompleteFromNode(c, completed, searched) 2389 2390 if (node.payload.type() == "command") and (node.payload.cmd.cmd is None) and (node.isReady()) : 2391 node.setRunstate("complete") 2392 completed.add(node)
2393 2394 2395 @lockMethod
2396 - def markCheckPointsComplete(self) :
2397 """ 2398 traverse from tail nodes up, marking any checkpoint tasks 2399 (task.cmd=None) jobs that are ready as complete, return set 2400 of newly completed tasks: 2401 """ 2402 completed = set() 2403 # searched is used to restrict the complexity of this 2404 # operation on large graphs: 2405 searched = set() 2406 for node in self.getTailNodes() : 2407 self._markCheckPointsCompleteFromNode(node, completed, searched) 2408 return completed
2409 2410 2411 @lockMethod
2412 - def addTask(self, namespace, label, payload, dependencies, isContinued=False) :
2413 """ 2414 add new task to the DAG 2415 2416 isContinued indicates the task is being read from state history during a continuation run 2417 """ 2418 # internal data structures use these separately, but for logging we 2419 # create one string: 2420 fullLabel = namespaceJoin(namespace, label) 2421 2422 # first check to see if task exists in DAG already, this is not allowed unless 2423 # we are continuing a previous run, in which case it's allowed once: 2424 if not isContinued and self.isTaskPresent(namespace, label): 2425 if self.isContinue and self.labelMap[(namespace, label)].isContinued: 2426 # confirm that task is a match, flip off the isContinued flag and return: 2427 task = self.labelMap[(namespace, label)] 2428 parentLabels = set([p.label for p in task.parents]) 2429 excPrefix = "Task: '%s' does not match previous definition defined in '%s'." % (fullLabel, self.taskInfoFile) 2430 if task.payload.type() != payload.type() : 2431 msg = excPrefix + " New/old payload type: '%s'/'%s'" % (payload.type(), task.payload.type()) 2432 raise Exception(msg) 2433 if payload.isTaskStable : 2434 if (payload.type() == "command") and (str(task.payload.cmd) != str(payload.cmd)) : 2435 msg = excPrefix + " New/old command: '%s'/'%s'" % (str(payload.cmd), str(task.payload.cmd)) 2436 if self.isForceContinue : self.flowLog(msg,logState=LogState.WARNING) 2437 else : raise Exception(msg) 2438 if (parentLabels != set(dependencies)) : 2439 msg = excPrefix + " New/old dependencies: '%s'/'%s'" % (",".join(dependencies), ",".join(parentLabels)) 2440 if self.isForceContinue : self.flowLog(msg,logState=LogState.WARNING) 2441 else : raise Exception(msg) 2442 if payload.type() == "command" : 2443 task.payload.cmd = payload.cmd 2444 task.payload.isCmdMakePath = payload.isCmdMakePath 2445 task.isContinued = False 2446 return 2447 else: 2448 raise Exception("Task: '%s' is already in TaskDAG" % (fullLabel)) 2449 2450 task = TaskNode(self.lock, self.taskId, namespace, label, payload, isContinued, self.isFinishedEvent, self.isWriteTaskStatus) 2451 2452 self.taskId += 1 2453 2454 self.addOrder.append((namespace, label)) 2455 self.labelMap[(namespace, label)] = task 2456 2457 for d in dependencies : 2458 parent = self.getTask(namespace, d) 2459 if parent is task : 2460 raise Exception("Task: '%s' cannot specify its own task label as a dependency" % (fullLabel)) 2461 if parent is None : 2462 raise Exception("Dependency: '%s' for task: '%s' does not exist in TaskDAG" % (namespaceJoin(namespace, d), fullLabel)) 2463 task.parents.add(parent) 2464 parent.children.add(task) 2465 2466 2467 if isContinued : 2468 isReset=False 2469 if label in self.resetTasks : 2470 isReset=True 2471 else : 2472 for p in task.parents : 2473 if p.isReset : 2474 isReset = True 2475 break 2476 if isReset : 2477 task.setRunstate("waiting") 2478 task.isReset=True 2479 2480 if not isContinued: 2481 self.isWriteTaskInfo.set() 2482 self.isWriteTaskStatus.set() 2483 2484 # determine if this is an ignoreTasksAfter node 2485 if label in self.ignoreTasksAfter : 2486 task.isIgnoreChildren = True 2487 2488 # determine if this is an ignoreTasksAfter descendent 2489 for p in task.parents : 2490 if p.isIgnoreChildren : 2491 task.isIgnoreThis = True 2492 task.isIgnoreChildren = True 2493 break 2494 2495 # update headNodes 2496 if len(task.parents) == 0 : 2497 self.headNodes.add(task) 2498 2499 # update isAutoCompleted: 2500 if (self.startFromTasks and 2501 (label not in self.startFromTasks)) : 2502 task.isAutoCompleted = True 2503 for p in task.parents : 2504 if not p.isAutoCompleted : 2505 task.isAutoCompleted = False 2506 break 2507 2508 # in case of no-parents, also check sub-workflow node 2509 if task.isAutoCompleted and (len(task.parents) == 0) and (namespace != ""): 2510 wval=namespace.rsplit(namespaceSep,1) 2511 if len(wval) == 2 : 2512 (workflowNamespace,workflowLabel)=wval 2513 else : 2514 workflowNamespace="" 2515 workflowLabel=wval[0] 2516 workflowParent = self.labelMap[(workflowNamespace, workflowLabel)] 2517 if not workflowParent.isAutoCompleted : 2518 task.isAutoCompleted = False 2519 2520 if task.isAutoCompleted : 2521 task.setRunstate("complete") 2522 2523 # update tailNodes: 2524 if not task.isIgnoreThis : 2525 self.tailNodes.add(task) 2526 for p in task.parents : 2527 if p in self.tailNodes : 2528 self.tailNodes.remove(p) 2529 2530 # check dependency runState consistency: 2531 if task.isDone() : 2532 for p in task.parents : 2533 if p.isIgnoreThis : continue 2534 if p.isComplete() : continue 2535 raise Exception("Task: '%s' has invalid continuation state. Task dependencies are incomplete")
2536 2537 2538 @lockMethod
2539 - def writeTaskStatus(self) :
2540 """ 2541 (atomic on *nix) update of the runstate and errorstate for all tasks 2542 """ 2543 # don't write task status during dry runs: 2544 if self.isDryRun : return 2545 2546 tmpFile = self.taskStateFile + ".update.incomplete" 2547 tmpFp = open(tmpFile, "w") 2548 tmpFp.write(taskStateHeader()) 2549 for (namespace, label) in self.addOrder : 2550 node = self.labelMap[(namespace, label)] 2551 runstateUpdateTimeStr = timeStampToTimeStr(node.runstateUpdateTimeStamp) 2552 tmpFp.write("%s\t%s\t%s\t%i\t%s\n" % (label, namespace, node.runstate, node.errorstate, runstateUpdateTimeStr)) 2553 tmpFp.close() 2554 2555 forceRename(tmpFile, self.taskStateFile)
2556 2557 2558 @lockMethod
2559 - def getTaskStatus(self) :
2560 """ 2561 Enumerate status of command tasks (but look at sub-workflows to determine if specification is complete) 2562 """ 2563 2564 val = Bunch(waiting=0, queued=0, running=0, complete=0, error=0, isAllSpecComplete=True, 2565 longestQueueSec=0, longestRunSec=0, longestQueueName="", longestRunName="") 2566 2567 currentSec = time.time() 2568 for (namespace, label) in self.addOrder : 2569 node = self.labelMap[(namespace, label)] 2570 # special check just for workflow tasks: 2571 if node.payload.type() == "workflow" : 2572 if not node.runStatus.isSpecificationComplete.isSet() : 2573 val.isAllSpecComplete = False 2574 2575 # the rest of this enumeration is for command tasks only: 2576 continue 2577 2578 taskTime = int(currentSec - node.runstateUpdateTimeStamp) 2579 2580 if node.runstate == "waiting" : 2581 val.waiting += 1 2582 elif node.runstate == "queued" : 2583 val.queued += 1 2584 if val.longestQueueSec < taskTime : 2585 val.longestQueueSec = taskTime 2586 val.longestQueueName = node.fullLabel() 2587 elif node.runstate == "running" : 2588 val.running += 1 2589 if val.longestRunSec < taskTime : 2590 val.longestRunSec = taskTime 2591 val.longestRunName = node.fullLabel() 2592 elif node.runstate == "complete" : 2593 val.complete += 1 2594 elif node.runstate == "error" : 2595 val.error += 1 2596 2597 return val
2598 2599 2600 @lockMethod
2601 - def writeTaskInfo(self) :
2602 """ 2603 appends a description of all new tasks to the taskInfo file 2604 """ 2605 2606 def getTaskLineFromTask(task) : 2607 """ 2608 translate a task into its single-line summary format in the taskInfo file 2609 """ 2610 depstring = "" 2611 if len(task.parents) : 2612 depstring = ",".join([p.label for p in task.parents]) 2613 2614 cmdstring = "" 2615 nCores = "0" 2616 memMb = "0" 2617 priority = "0" 2618 isForceLocal = "0" 2619 payload = task.payload 2620 cwdstring = "" 2621 if payload.type() == "command" : 2622 cmdstring = str(payload.cmd) 2623 nCores = str(payload.nCores) 2624 memMb = str(payload.memMb) 2625 priority = str(payload.priority) 2626 isForceLocal = boolToStr(payload.isForceLocal) 2627 cwdstring = payload.cmd.cwd 2628 elif payload.type() == "workflow" : 2629 cmdstring = payload.name() 2630 else : 2631 assert 0 2632 return "\t".join((task.label, task.namespace, payload.type(), 2633 nCores, memMb, priority, 2634 isForceLocal, depstring, cwdstring, cmdstring))
2635 2636 assert (self.lastTaskIdWritten <= self.taskId) 2637 2638 if self.lastTaskIdWritten == self.taskId : return 2639 2640 newTaskLines = [] 2641 while self.lastTaskIdWritten < self.taskId : 2642 task = self.labelMap[self.addOrder[self.lastTaskIdWritten]] 2643 newTaskLines.append(getTaskLineFromTask(task)) 2644 self.lastTaskIdWritten += 1 2645 2646 fp = open(self.taskInfoFile, "a") 2647 for taskLine in newTaskLines : 2648 fp.write(taskLine + "\n") 2649 fp.close()
2650
2651 2652 2653 # workflowRunner: 2654 # 2655 2656 2657 # special exception used for the case where pyflow data dir is already in use: 2658 # 2659 -class DataDirException(Exception) :
2660 - def __init__(self, msg) :
2661 Exception.__init__(self) 2662 self.msg = msg
2663
2664 2665 2666 -class WorkflowRunnerThreadSharedData(object) :
2667 """ 2668 All data used by the WorkflowRunner which will be constant over 2669 the lifetime of a TaskManager instance. All of the information in 2670 this class will be accessed by both threads without locking. 2671 """ 2672
2673 - def __init__(self) :
2674 self.lock = threading.RLock() 2675 self.pid = os.getpid() 2676 self.runcount = 0 2677 self.cwd = os.path.abspath(os.getcwd()) 2678 2679 self.markFile = None 2680 2681 # we potentially have to log before the logfile is setup (eg 2682 # an exception is thrown reading run parameters), so provide 2683 # an explicit notification that there's no log file: 2684 self.flowLogFp = None 2685 2686 self.warningLogFp = None 2687 self.errorLogFp = None 2688 2689 self.resetRun() 2690 2691 # two elements required to implement a nohup-like behavior: 2692 self.isHangUp = threading.Event() 2693 self._isStderrAlive = True
2694 2695 2696 @staticmethod
2697 - def _validateFixParam(param):
2698 """ 2699 validate and refine raw run() parameters for use by workflow 2700 """ 2701 2702 param.mailTo = setzer(param.mailTo) 2703 param.schedulerArgList = lister(param.schedulerArgList) 2704 if param.successMsg is not None : 2705 if not isString(param.successMsg) : 2706 raise Exception("successMsg argument to WorkflowRunner.run() is not a string") 2707 2708 # create combined task retry settings manager: 2709 param.retry=RetryParam(param.mode, 2710 param.retryMax, 2711 param.retryWait, 2712 param.retryWindow, 2713 param.retryMode) 2714 2715 # setup resource parameters 2716 if param.nCores is None : 2717 param.nCores = RunMode.data[param.mode].defaultCores 2718 2719 # ignore total available memory settings in non-local modes: 2720 if param.mode != "local" : 2721 param.memMb = "unlimited" 2722 2723 if param.mode == "sge" : 2724 if siteConfig.maxSGEJobs != "unlimited" : 2725 if ((param.nCores == "unlimited") or 2726 (int(param.nCores) > int(siteConfig.maxSGEJobs))) : 2727 param.nCores = int(siteConfig.maxSGEJobs) 2728 2729 if param.nCores != "unlimited" : 2730 param.nCores = int(param.nCores) 2731 if param.nCores < 1 : 2732 raise Exception("Invalid run mode nCores argument: %s. Value must be 'unlimited' or an integer no less than 1" % (param.nCores)) 2733 2734 if param.memMb is None : 2735 if param.nCores == "unlimited" : 2736 param.memMb = "unlimited" 2737 mpc = RunMode.data[param.mode].defaultMemMbPerCore 2738 if mpc == "unlimited" : 2739 param.memMb = "unlimited" 2740 else : 2741 param.memMb = mpc * param.nCores 2742 elif param.memMb != "unlimited" : 2743 param.memMb = int(param.memMb) 2744 if param.memMb < 1 : 2745 raise Exception("Invalid run mode memMb argument: %s. Value must be 'unlimited' or an integer no less than 1" % (param.memMb)) 2746 2747 # verify/normalize input settings: 2748 if param.mode not in RunMode.data.keys() : 2749 raise Exception("Invalid mode argument '%s'. Accepted modes are {%s}." \ 2750 % (param.mode, ",".join(RunMode.data.keys()))) 2751 2752 if param.mode == "sge" : 2753 # TODO not-portable to windows (but is this a moot point -- all of sge mode is non-portable, no?): 2754 def checkSgeProg(prog) : 2755 proc = subprocess.Popen(("which", prog), stdout=open(os.devnull, "w"), shell=False) 2756 retval = proc.wait() 2757 if retval != 0 : raise Exception("Run mode is sge, but no %s in path" % (prog))
2758 checkSgeProg("qsub") 2759 checkSgeProg("qstat") 2760 2761 2762 stateDir = os.path.join(param.dataDir, "state") 2763 if param.isContinue == "Auto" : 2764 param.isContinue = os.path.exists(stateDir) 2765 2766 if param.isContinue : 2767 if not os.path.exists(stateDir) : 2768 raise Exception("Cannot continue run without providing a pyflow dataDir containing previous state.: '%s'" % (stateDir)) 2769 2770 for email in param.mailTo : 2771 if not verifyEmailAddy(email): 2772 raise Exception("Invalid email address: '%s'" % (email))
2773 2774 2775
2776 - def _setCustomLogs(self) :
2777 if (self.warningLogFp is None) and (self.param.warningLogFile is not None) : 2778 self.warningLogFp = open(self.param.warningLogFile,"w") 2779 2780 if (self.errorLogFp is None) and (self.param.errorLogFile is not None) : 2781 self.errorLogFp = open(self.param.errorLogFile,"w")
2782 2783 2784
2785 - def setupNewRun(self, param) :
2786 self.param = param 2787 2788 # setup log file-handle first, then run the rest of parameter validation: 2789 # (hold this file open so that we can still log if pyflow runs out of filehandles) 2790 self.param.dataDir = os.path.abspath(self.param.dataDir) 2791 self.param.dataDir = os.path.join(self.param.dataDir, "pyflow.data") 2792 logDir = os.path.join(self.param.dataDir, "logs") 2793 ensureDir(logDir) 2794 self.flowLogFile = os.path.join(logDir, "pyflow_log.txt") 2795 self.flowLogFp = open(self.flowLogFile, "a") 2796 2797 # run remaining validation 2798 self._validateFixParam(self.param) 2799 2800 # initial per-run data 2801 self.taskErrors = set() # this set actually contains every task that failed -- tasks contain all of their own error info 2802 self.isTaskManagerException = False 2803 2804 # create data directory if it does not exist 2805 ensureDir(self.param.dataDir) 2806 2807 # check whether a process already exists: 2808 self.markFile = os.path.join(self.param.dataDir, "active_pyflow_process.txt") 2809 if os.path.exists(self.markFile) : 2810 # Non-conventional logging situation -- another pyflow process is possibly using this same data directory, so we want 2811 # to log to stderr (even if the user has set isQuiet) and not interfere with the other process's log 2812 self.flowLogFp = None 2813 self.param.isQuiet = False 2814 msg = [ "Can't initialize pyflow run because the data directory appears to be in use by another process.", 2815 "\tData directory: '%s'" % (self.param.dataDir), 2816 "\tIt is possible that a previous process was abruptly interrupted and did not clean up properly. To determine if this is", 2817 "\tthe case, please refer to the file '%s'" % (self.markFile), 2818 "\tIf this file refers to a non-running process, delete the file and relaunch pyflow,", 2819 "\totherwise, specify a new data directory. At the API-level this can be done with the dataDirRoot option." ] 2820 self.markFile = None # this keeps pyflow from deleting this file, as it normally would on exit 2821 raise DataDirException(msg) 2822 else : 2823 mfp = open(self.markFile, "w") 2824 msg = """ 2825 This file provides details of the pyflow instance currently using this data directory. 2826 During normal pyflow run termination (due to job completion, error, SIGINT, etc...), 2827 this file should be deleted. If this file is present it should mean either: 2828 (1) the data directory is still in use by a running workflow 2829 (2) a sudden job failure occurred that prevented normal run termination 2830 2831 The associated pyflow job details are as follows: 2832 """ 2833 mfp.write(msg + "\n") 2834 for line in self.getInfoMsg() : 2835 mfp.write(line + "\n") 2836 mfp.write("\n") 2837 mfp.close() 2838 2839 stateDir = os.path.join(self.param.dataDir, "state") 2840 ensureDir(stateDir) 2841 2842 # setup other instance data: 2843 self.runcount += 1 2844 2845 # initialize directories 2846 self.wrapperLogDir = os.path.join(logDir, "tmp", "taskWrapperLogs") 2847 ensureDir(self.wrapperLogDir) 2848 stackDumpLogDir = os.path.join(logDir, "tmp", "stackDumpLog") 2849 ensureDir(stackDumpLogDir) 2850 2851 # initialize filenames: 2852 taskStateFileName = "pyflow_tasks_runstate.txt" 2853 taskInfoFileName = "pyflow_tasks_info.txt" 2854 2855 self.taskStdoutFile = os.path.join(logDir, "pyflow_tasks_stdout_log.txt") 2856 self.taskStderrFile = os.path.join(logDir, "pyflow_tasks_stderr_log.txt") 2857 self.taskStateFile = os.path.join(stateDir, taskStateFileName) 2858 self.taskInfoFile = os.path.join(stateDir, taskInfoFileName) 2859 self.taskDotScriptFile = os.path.join(stateDir, "make_pyflow_task_graph.py") 2860 2861 self.stackDumpLogFile = os.path.join(stackDumpLogDir, "pyflow_stack_dump.txt") 2862 2863 # empty file: 2864 if not self.param.isContinue: 2865 fp = open(self.taskInfoFile, "w") 2866 fp.write(taskInfoHeader()) 2867 fp.close() 2868 2869 self._setCustomLogs() 2870 2871 # finally write dot task graph creation script: 2872 # 2873 # this could fail because of script permission settings, buk it is not critical for 2874 # workflow completion so we get away with a warning 2875 try : 2876 writeDotScript(self.taskDotScriptFile, taskInfoFileName, taskStateFileName, self.param.workflowClassName) 2877 except OSError: 2878 msg = ["Failed to write task graph visualization script to %s" % (self.taskDotScriptFile)] 2879 self.flowLog(msg,logState=LogState.WARNING)
2880 2881
2882 - def resetRun(self) :
2883 """ 2884 Anything that needs to be cleaned up at the end of a run 2885 2886 Right now this just make sure we don't log to the previous run's log file 2887 """ 2888 self.flowLogFile = None 2889 self.param = None 2890 if self.flowLogFp is not None : 2891 self.flowLogFp.close() 2892 self.flowLogFp = None 2893 2894 if self.warningLogFp is not None : 2895 self.warningLogFp.close() 2896 self.warningLogFp = None 2897 2898 if self.errorLogFp is not None : 2899 self.errorLogFp.close() 2900 self.errorLogFp = None 2901 2902 if self.markFile is not None : 2903 if os.path.exists(self.markFile) : os.unlink(self.markFile) 2904 2905 self.markFile = None
2906
2907 - def getRunid(self) :
2908 return "%s_%s" % (self.pid, self.runcount)
2909 2910 @lockMethod
2911 - def setTaskError(self, task) :
2912 self.taskErrors.add(task)
2913 2914 @lockMethod
2915 - def isTaskError(self) :
2916 return (len(self.taskErrors) != 0)
2917
2918 - def isTaskSubmissionActive(self) :
2919 """ 2920 wait() pollers need to know if task submission has been 2921 shutdown to implement sane behavior. 2922 """ 2923 return (not self.isTaskError())
2924 2925 @lockMethod
2926 - def setTaskManagerException(self) :
2927 self.isTaskManagerException = True
2928 2929 @lockMethod
2930 - def flowLog(self, msg, linePrefix=None, logState = LogState.INFO) :
2931 linePrefixOut = "[%s]" % (self.getRunid()) 2932 if linePrefix is not None : 2933 linePrefixOut += " " + linePrefix 2934 2935 if (logState == LogState.ERROR) or (logState == LogState.WARNING) : 2936 linePrefixOut += " [" + LogState.toString(logState) + "]" 2937 2938 ofpList = [] 2939 isAddStderr = (self._isStderrAlive and ((self.flowLogFp is None) or (self.param is None) or (not self.param.isQuiet))) 2940 if isAddStderr: 2941 ofpList.append(sys.stderr) 2942 if self.flowLogFp is not None : 2943 ofpList.append(self.flowLogFp) 2944 2945 # make a last ditch effort to open the special error logs if these are not available already: 2946 try : 2947 self._setCustomLogs() 2948 except : 2949 pass 2950 2951 if (self.warningLogFp is not None) and (logState == LogState.WARNING) : 2952 ofpList.append(self.warningLogFp) 2953 if (self.errorLogFp is not None) and (logState == LogState.ERROR) : 2954 ofpList.append(self.errorLogFp) 2955 2956 if len(ofpList) == 0 : return 2957 retval = log(ofpList, msg, linePrefixOut) 2958 2959 # check if stderr stream failed. If so, turn it off for the remainder of run (assume terminal hup): 2960 if isAddStderr and (not retval[0]) : 2961 if self.isHangUp.isSet() : 2962 self._isStderrAlive = False
2963 2964
2965 - def getInfoMsg(self) :
2966 """ 2967 return a string array with general stats about this run 2968 """ 2969 2970 msg = [ "%s\t%s" % ("pyFlowClientWorkflowClass:", self.param.workflowClassName), 2971 "%s\t%s" % ("pyFlowVersion:", __version__), 2972 "%s\t%s" % ("pythonVersion:", pythonVersion), 2973 "%s\t%s" % ("Runid:", self.getRunid()), 2974 "%s\t%s UTC" % ("RunStartTime:", self.param.logRunStartTime), 2975 "%s\t%s UTC" % ("NotificationTime:", timeStrNow()), 2976 "%s\t%s" % ("HostName:", siteConfig.getHostName()), 2977 "%s\t%s" % ("WorkingDir:", self.cwd), 2978 "%s\t%s" % ("DataDir:", self.param.dataDir), 2979 "%s\t'%s'" % ("ProcessCmdLine:", cmdline()) ] 2980 return msg
2981 2982
2983 - def emailNotification(self, msgList, emailErrorLog=None) :
2984 # 2985 # email addy might not be setup yet: 2986 # 2987 # if errorLog is specified, then an email send exception will 2988 # be handled and logged, otherwise the exception will be re-raised 2989 # down to the caller. 2990 # 2991 2992 if self.param is None : return 2993 if len(self.param.mailTo) == 0 : return 2994 2995 if not isLocalSmtp() : 2996 if emailErrorLog : 2997 msg = ["email notification failed, no local smtp server"] 2998 emailErrorLog(msg,logState=LogState.WARNING) 2999 return 3000 3001 mailTo = sorted(list(self.param.mailTo)) 3002 subject = "pyflow notification from %s run: %s" % (self.param.workflowClassName, self.getRunid()) 3003 msg = msgListToMsg(msgList) 3004 fullMsgList = ["Message:", 3005 '"""', 3006 msg, 3007 '"""'] 3008 fullMsgList.extend(self.getInfoMsg()) 3009 3010 import smtplib 3011 try: 3012 sendEmail(mailTo, siteConfig.mailFrom, subject, fullMsgList) 3013 except smtplib.SMTPException : 3014 if emailErrorLog is None : raise 3015 msg = ["email notification failed"] 3016 eMsg = lister(getExceptionMsg()) 3017 msg.extend(eMsg) 3018 emailErrorLog(msg,logState=LogState.WARNING)
3019
3020 3021 3022 -class WorkflowRunner(object) :
3023 """ 3024 This object is designed to be inherited by a class in 3025 client code. This inheriting class can override the 3026 L{workflow()<WorkflowRunner.workflow>} method to define the 3027 tasks that need to be run and their dependencies. 3028 3029 The inheriting class defining a workflow can be executed in 3030 client code by calling the WorkflowRunner.run() method. 3031 This method provides various run options such as whether 3032 to run locally or on sge. 3033 """ 3034 3035 3036 _maxWorkflowRecursion = 30 3037 """ 3038 This limit protects against a runaway forkbomb in case a 3039 workflow task recursively adds itself w/o termination: 3040 """ 3041 3042
3043 - def run(self, 3044 mode="local", 3045 dataDirRoot=".", 3046 isContinue=False, 3047 isForceContinue=False, 3048 nCores=None, 3049 memMb=None, 3050 isDryRun=False, 3051 retryMax=2, 3052 retryWait=90, 3053 retryWindow=360, 3054 retryMode="nonlocal", 3055 mailTo=None, 3056 updateInterval=60, 3057 schedulerArgList=None, 3058 isQuiet=False, 3059 warningLogFile=None, 3060 errorLogFile=None, 3061 successMsg=None, 3062 startFromTasks=None, 3063 ignoreTasksAfter=None, 3064 resetTasks=None) :
3065 """ 3066 Call this method to execute the workflow() method overridden 3067 in a child class and specify the resources available for the 3068 workflow to run. 3069 3070 Task retry behavior: Retry attempts will be made per the 3071 arguments below for distributed workflow runs (eg. sge run 3072 mode). Note this means that retries will be attempted for 3073 tasks with an 'isForceLocal' setting during distributed runs. 3074 3075 Task error behavior: When a task error occurs the task 3076 manager stops submitting new tasks and allows all currently 3077 running tasks to complete. Note that in this case 'task error' 3078 means that the task could not be completed after exhausting 3079 attempted retries. 3080 3081 Workflow exception behavior: Any exceptions thrown from the 3082 python code of classes derived from WorkflowRunner will be 3083 logged and trigger notification (e.g. email). The exception 3084 will not come down to the client's stack. In sub-workflows the 3085 exception is handled exactly like a task error (ie. task 3086 submission is shut-down and remaining tasks are allowed to 3087 complete). An exception in the master workflow will lead to 3088 workflow termination without waiting for currently running 3089 tasks to finish. 3090 3091 @return: 0 if all tasks completed successfully and 1 otherwise 3092 3093 @param mode: Workflow run mode. Current options are (local|sge) 3094 3095 @param dataDirRoot: All workflow data is written to 3096 {dataDirRoot}/pyflow.data/ These include 3097 workflow/task logs, persistent task state data, 3098 and summary run info. Two workflows cannot 3099 simultaneously use the same dataDir. 3100 3101 @param isContinue: If True, continue workflow from a previous 3102 incomplete run based on the workflow data 3103 files. You must use the same dataDirRoot as a 3104 previous run for this to work. Set to 'Auto' to 3105 have the run continue only if the previous 3106 dataDir exists. (default: False) 3107 3108 @param isForceContinue: Only used if isContinue is not False. Normally 3109 when isContinue is run, the commands of 3110 completed tasks are checked to ensure they 3111 match. When isForceContinue is true, 3112 failing this check is reduced from an error 3113 to a warning 3114 3115 @param nCores: Total number of cores available, or 'unlimited', sge 3116 is currently configured for a maximum job count of 3117 %s, any value higher than this in sge mode will be 3118 reduced to the maximum. (default: 1 for local mode, 3119 %s for sge mode) 3120 3121 @param memMb: Total memory available (in megabytes), or 'unlimited', 3122 Note that this value will be ignored in non-local modes 3123 (such as sge), because in this case total memory available 3124 is expected to be known by the scheduler for each node in its 3125 cluster. (default: %i*nCores for local mode, 'unlimited' 3126 for sge mode) 3127 3128 @param isDryRun: List the commands to be executed without running 3129 them. Note that recursive and dynamic workflows 3130 will potentially have to account for the fact that 3131 expected files will be missing -- here 'recursive 3132 workflow' refers to any workflow which uses the 3133 addWorkflowTask() method, and 'dynamic workflow' 3134 refers to any workflow which uses the 3135 waitForTasks() method. These types of workflows 3136 can query this status with the isDryRun() to make 3137 accomadations. (default: False) 3138 3139 @param retryMax: Maximum number of task retries 3140 3141 @param retryWait: Delay (in seconds) before resubmitting task 3142 3143 @param retryWindow: Maximum time (in seconds) after the first task 3144 submission in which retries are allowed. A value of 3145 zero or less puts no limit on the time when retries 3146 will be attempted. Retries are always allowed (up to 3147 retryMax times), for failed make jobs. 3148 3149 @param retryMode: Modes are 'nonlocal' and 'all'. For 'nonlocal' 3150 retries are not attempted in local run mode. For 'all' 3151 retries are attempted for any run mode. The default mode 3152 is 'nonolocal'. 3153 3154 @param mailTo: An email address or container of email addresses. Notification 3155 will be sent to each email address when 3156 either (1) the run successfully completes (2) the 3157 first task error occurs or (3) an unhandled 3158 exception is raised. The intention is to send one 3159 status message per run() indicating either success 3160 or the reason for failure. This should occur for all 3161 cases except a host hardware/power failure. Note 3162 that mail comes from '%s' (configurable), 3163 which may be classified as junk-mail by your system. 3164 3165 @param updateInterval: How often (in minutes) should pyflow log a 3166 status update message summarizing the run 3167 status. Set this to zero or less to turn 3168 the update off. 3169 3170 @param schedulerArgList: A list of arguments can be specified to be 3171 passed on to an external scheduler when non-local 3172 modes are used (e.g. in sge mode you could pass 3173 schedulerArgList=['-q','work.q'] to put the whole 3174 pyflow job into the sge work.q queue) 3175 3176 @param isQuiet: Don't write any logging output to stderr (but still write 3177 log to pyflow_log.txt) 3178 3179 @param warningLogFile: Replicate all warning messages to the specified file. Warning 3180 messages will still appear in the standard logs, this 3181 file will contain a subset of the log messages pertaining to 3182 warnings only. 3183 3184 @param errorLogFile: Replicate all error messages to the specified file. Error 3185 messages will still appear in the standard logs, this 3186 file will contain a subset of the log messages pertaining to 3187 errors only. It should be empty for a successful run. 3188 3189 @param successMsg: Provide a string containing a custom message which 3190 will be prepended to pyflow's standard success 3191 notification. This message will appear in the log 3192 and any configured notifications (e.g. email). The 3193 message may contain linebreaks. 3194 3195 @param startFromTasks: A task label or container of task labels. Any tasks which 3196 are not in this set or descendants of this set will be marked as 3197 completed. 3198 @type startFromTasks: A single string, or set, tuple or list of strings 3199 3200 @param ignoreTasksAfter: A task label or container of task labels. All descendants 3201 of these task labels will be ignored. 3202 @type ignoreTasksAfter: A single string, or set, tuple or list of strings 3203 3204 @param resetTasks: A task label or container of task labels. These tasks and all 3205 of their descendants will be reset to the "waiting" state to be re-run. 3206 Note this option will only affect a workflow which has been continued 3207 from a previous run. This will not override any nodes altered by the 3208 startFromTasks setting in the case that both options are used together. 3209 @type resetTasks: A single string, or set, tuple or list of strings 3210 """ 3211 3212 # Setup pyflow signal handlers: 3213 # 3214 inHandlers = Bunch(isSet=False) 3215 3216 class SigTermException(Exception) : pass 3217 3218 def sigtermHandler(_signum, _frame) : 3219 raise SigTermException
3220 3221 def sighupHandler(_signum, _frame) : 3222 self._warningLog("pyflow recieved hangup signal. pyflow will continue, but this signal may still interrupt running tasks.") 3223 # tell cdata to turn off any tty writes: 3224 self._cdata().isHangUp.set()
3225 3226 def set_pyflow_sig_handlers() : 3227 import signal 3228 if not inHandlers.isSet : 3229 inHandlers.sigterm = signal.getsignal(signal.SIGTERM) 3230 if not isWindows() : 3231 inHandlers.sighup = signal.getsignal(signal.SIGHUP) 3232 inHandlers.isSet = True 3233 try: 3234 signal.signal(signal.SIGTERM, sigtermHandler) 3235 if not isWindows() : 3236 signal.signal(signal.SIGHUP, sighupHandler) 3237 except ValueError: 3238 if isMainThread() : 3239 raise 3240 else : 3241 self._warningLog("pyflow has not been initialized on main thread, all custom signal handling disabled") 3242 3243 3244 def unset_pyflow_sig_handlers() : 3245 import signal 3246 if not inHandlers.isSet : return 3247 try : 3248 signal.signal(signal.SIGTERM, inHandlers.sigterm) 3249 if not isWindows() : 3250 signal.signal(signal.SIGHUP, inHandlers.sighup) 3251 except ValueError: 3252 if isMainThread() : 3253 raise 3254 else: 3255 pass 3256 3257 3258 # if return value is somehow not set after this then something bad happened, so init to 1: 3259 retval = 1 3260 try: 3261 set_pyflow_sig_handlers() 3262 3263 def exceptionMessaging(prefixMsg=None) : 3264 msg = lister(prefixMsg) 3265 eMsg = lister(getExceptionMsg()) 3266 msg.extend(eMsg) 3267 self._notify(msg,logState=LogState.ERROR) 3268 3269 try: 3270 self.runStartTimeStamp = time.time() 3271 self.updateInterval = int(updateInterval) 3272 # a container to haul all the run() options around in: 3273 param = Bunch(mode=mode, 3274 dataDir=dataDirRoot, 3275 isContinue=isContinue, 3276 isForceContinue=isForceContinue, 3277 nCores=nCores, 3278 memMb=memMb, 3279 isDryRun=isDryRun, 3280 retryMax=retryMax, 3281 retryWait=retryWait, 3282 retryWindow=retryWindow, 3283 retryMode=retryMode, 3284 mailTo=mailTo, 3285 logRunStartTime=timeStampToTimeStr(self.runStartTimeStamp), 3286 workflowClassName=self._whoami(), 3287 schedulerArgList=schedulerArgList, 3288 isQuiet=isQuiet, 3289 warningLogFile=warningLogFile, 3290 errorLogFile=errorLogFile, 3291 successMsg=successMsg, 3292 startFromTasks=setzer(startFromTasks), 3293 ignoreTasksAfter=setzer(ignoreTasksAfter), 3294 resetTasks=setzer(resetTasks)) 3295 retval = self._runWorkflow(param) 3296 3297 except SigTermException: 3298 msg = "Received termination signal, shutting down running tasks..." 3299 self._killWorkflow(msg) 3300 except KeyboardInterrupt: 3301 msg = "Keyboard Interrupt, shutting down running tasks..." 3302 self._killWorkflow(msg) 3303 except DataDirException, e: 3304 self._notify(e.msg,logState=LogState.ERROR) 3305 except: 3306 exceptionMessaging() 3307 raise 3308 3309 finally: 3310 # last set: disconnect the workflow log: 3311 self._cdata().resetRun() 3312 unset_pyflow_sig_handlers() 3313 3314 return retval 3315 3316 3317 # configurable elements of docstring 3318 run.__doc__ = run.__doc__ % (siteConfig.maxSGEJobs, 3319 RunMode.data["sge"].defaultCores, 3320 siteConfig.defaultHostMemMbPerCore, 3321 siteConfig.mailFrom) 3322 3323 3324 3325 # protected methods which can be called within the workflow method: 3326
3327 - def addTask(self, label, command=None, cwd=None, env=None, nCores=1, 3328 memMb=siteConfig.defaultTaskMemMb, 3329 dependencies=None, priority=0, 3330 isForceLocal=False, isCommandMakePath=False, isTaskStable=True, 3331 mutex=None, 3332 retryMax=None, retryWait=None, retryWindow=None, retryMode=None) :
3333 """ 3334 Add task to workflow, including resource requirements and 3335 specification of dependencies. Dependency tasks must already 3336 exist in the workflow. 3337 3338 @return: The 'label' argument is returned without modification. 3339 3340 3341 @param label: A string used to identify each task. The label must 3342 be composed of only ascii letters, digits, 3343 underscores and dashes (ie. /[A-Za-z0-9_-]+/). The 3344 label must also be unique within the workflow, and 3345 non-empty. 3346 3347 @param command: The task command. Commands can be: (1) a shell 3348 string (2) an iterable container of strings (argument 3349 list) (3) None. In all cases strings must not contain 3350 newline characters. A single string is typically used 3351 for commands that require shell features (such as 3352 pipes), an argument list can be used for any other 3353 commands, this is often a useful way to simplify 3354 quoting issues or to submit extremely long 3355 commands. The default command (None), can be used to 3356 create a 'checkpoint', ie. a task which does not run 3357 anything, but provides a label associated with the 3358 completion of a set of dependencies. 3359 3360 @param cwd: Specify current working directory to use for 3361 command execution. Note that if submitting the 3362 command as an argument list (as opposed to a shell 3363 string) the executable (arg[0]) is searched for 3364 before changing the working directory, so you cannot 3365 specify the executable relative to the cwd 3366 setting. If submitting a shell string command this 3367 restriction does not apply. 3368 3369 @param env: A map of environment variables for this task, for 3370 example 'env={"PATH": "/usr/bin"}'. When env is set 3371 to None (the default) the environment of the pyflow 3372 client process is used. 3373 3374 @param nCores: Number of cpu threads required 3375 3376 @param memMb: Amount of memory required (in megabytes) 3377 3378 @param dependencies: A task label or container of task labels specifying all dependent 3379 tasks. Dependent tasks must already exist in 3380 the workflow. 3381 @type dependencies: A single string, or set, tuple or list of strings 3382 3383 3384 @param priority: Among all tasks which are eligible to run at 3385 the same time, launch tasks with higher priority 3386 first. this value can be set from[-100,100]. Note 3387 that this will strongly control the order of task 3388 launch on a local run, but will only control task 3389 submission order to a secondary scheduler (like 3390 sge). All jobs with the same priority are already 3391 submitted in order from highest to lowest nCores 3392 requested, so there is no need to set priorities to 3393 replicate this behavior. The taskManager can start 3394 executing tasks as soon as each addTask() method is 3395 called, so lower-priority tasks may be launched first 3396 if they are specified first in the workflow. 3397 3398 @param isForceLocal: Force this task to run locally when a 3399 distributed task mode is used. This can be used to 3400 launch very small jobs outside of the sge queue. Note 3401 that 'isForceLocal' jobs launched during a non-local 3402 task mode are not subject to resource management, so 3403 it is important that these represent small 3404 jobs. Tasks which delete, move or touch a small 3405 number of files are ideal for this setting. 3406 3407 @param isCommandMakePath: If true, command is assumed to be a 3408 path containing a makefile. It will be run using 3409 make/qmake according to the run's mode and the task's 3410 isForceLocal setting 3411 3412 @param isTaskStable: If false, indicates that the task command 3413 and/or dependencies may change if the run is 3414 interrupted and restarted. A command marked as 3415 unstable will not be checked to make sure it matches 3416 its previous definition during run continuation. 3417 Unstable examples: command contains a date/time, or 3418 lists a set of files which are deleted at some point 3419 in the workflow, etc. 3420 3421 @param mutex: Provide an optional id associated with a pyflow 3422 task mutex. For all tasks with the same mutex id, no more 3423 than one will be run at once. Id name must follow task id 3424 restrictions. Mutex ids are global across all recursively 3425 invoked workflows. 3426 Example use case: This feature has been added as a simpler 3427 alternative to file locking, to ensure sequential, but not 3428 ordered, access to a file. 3429 3430 @param retryMax: The number of times this task will be retried 3431 after failing. If defined, this overrides the workflow 3432 retryMax value. 3433 3434 @param retryWait: The number of seconds to wait before relaunching 3435 a failed task. If defined, this overrides the workflow 3436 retryWait value. 3437 3438 @param retryWindow: The number of seconds after job submission in 3439 which retries will be attempted for non-make jobs. A value of 3440 zero or less causes retries to be attempted anytime after 3441 job submission. If defined, this overrides the workflow 3442 retryWindow value. 3443 3444 @param retryMode: Modes are 'nonlocal' and 'all'. For 'nonlocal' 3445 retries are not attempted in local run mode. For 'all' 3446 retries are attempted for any run mode. If defined, this overrides 3447 the workflow retryMode value. 3448 """ 3449 3450 self._requireInWorkflow() 3451 3452 #### Canceled plans to add deferred dependencies: 3453 # # deferredDependencies -- A container of labels specifying dependent 3454 # # tasks which have not yet been added to the 3455 # # workflow. In this case the added task will 3456 # # wait for the dependency to be defined *and* 3457 # # complete. Avoid these in favor or regular 3458 # # dependencies if possible. 3459 3460 # sanitize bools: 3461 isForceLocal = argToBool(isForceLocal) 3462 isCommandMakePath = argToBool(isCommandMakePath) 3463 3464 # sanitize ints: 3465 nCores = int(nCores) 3466 memMb = int(memMb) 3467 priority = int(priority) 3468 if (priority > 100) or (priority < -100) : 3469 raise Exception("priority must be an integer in the range [-100,100]") 3470 3471 # sanity check label: 3472 WorkflowRunner._checkTaskLabel(label) 3473 3474 fullLabel = namespaceJoin(self._getNamespace(), label) 3475 3476 # verify/sanitize command: 3477 cmd = Command(command, cwd, env) 3478 3479 # deal with command/resource relationship: 3480 if cmd.cmd is None : 3481 nCores = 0 3482 memMb = 0 3483 else: 3484 if nCores <= 0 : 3485 raise Exception("Unexpected core requirement for task: '%s' nCores: %i" % (fullLabel, nCores)) 3486 if memMb <= 0: 3487 raise Exception("Unexpected memory requirement for task: '%s' memory: %i (megabytes)" % (fullLabel, memMb)) 3488 3489 3490 if (self._cdata().param.nCores != "unlimited") and (nCores > self._cdata().param.nCores) : 3491 raise Exception("Task core requirement exceeds full available resources") 3492 3493 if (self._cdata().param.memMb != "unlimited") and (memMb > self._cdata().param.memMb) : 3494 raise Exception("Task memory requirement exceeds full available resources") 3495 3496 # check that make path commands point to a directory: 3497 # 3498 if isCommandMakePath : 3499 if cmd.type != "str" : 3500 raise Exception("isCommandMakePath is set, but no path is provided in task: '%s'" % (fullLabel)) 3501 cmd.cmd = os.path.abspath(cmd.cmd) 3502 3503 # sanitize mutex option 3504 if mutex is not None : 3505 WorkflowRunner._checkTaskLabel(mutex) 3506 3507 task_retry = self._cdata().param.retry.getTaskCopy(retryMax, retryWait, retryWindow, retryMode) 3508 3509 # private _addTaskCore gets hijacked in recursive workflow submission: 3510 # 3511 payload = CmdPayload(fullLabel, cmd, nCores, memMb, priority, isForceLocal, isCommandMakePath, isTaskStable, mutex, task_retry) 3512 self._addTaskCore(self._getNamespace(), label, payload, dependencies) 3513 return label
3514 3515 3516 3517
3518 - def addWorkflowTask(self, label, workflowRunnerInstance, dependencies=None) :
3519 """ 3520 Add another WorkflowRunner instance as a task to this 3521 workflow. The added Workflow's workflow() method will be 3522 called once the dependencies specified in this call have 3523 completed. Once started, all of the submitted workflow's 3524 method calls (like addTask) will be placed into the enclosing 3525 workflow instance and bound by the run parameters of the 3526 enclosing workflow. 3527 3528 This task will be marked complete once the submitted workflow's 3529 workflow() method has finished, and any tasks it initiated have 3530 completed. 3531 3532 Note that all workflow tasks will have their own tasks namespaced 3533 with the workflow task label. This namespace is recursive in the 3534 case that you add workflow tasks which add their own workflow 3535 tasks, etc. 3536 3537 Note that the submitted workflow instance will be deep copied 3538 before being altered in any way. 3539 3540 @return: The 'label' argument is returned without modification. 3541 3542 @param label: A string used to identify each task. The label must 3543 be composed of only ascii letters, digits, 3544 underscores and dashes (ie. /[A-Za-z0-9_-]+/). The 3545 label must also be unique within the workflow, and 3546 non-empty. 3547 3548 @param workflowRunnerInstance: A L{WorkflowRunner} instance. 3549 3550 @param dependencies: A label string or container of labels specifying all dependent 3551 tasks. Dependent tasks must already exist in 3552 the workflow. 3553 @type dependencies: A single string, or set, tuple or list of strings 3554 """ 3555 3556 self._requireInWorkflow() 3557 3558 # sanity check label: 3559 WorkflowRunner._checkTaskLabel(label) 3560 3561 import inspect 3562 3563 # copy and 'hijack' the workflow: 3564 workflowCopy = copy.deepcopy(workflowRunnerInstance) 3565 3566 # hijack! -- take all public methods at the WorkflowRunner level 3567 # (except workflow()), and insert the self copy: 3568 publicExclude = ["workflow", "addTask", "addWorkflowTask", "waitForTasks"] 3569 for (n, _v) in inspect.getmembers(WorkflowRunner, predicate=inspect.ismethod) : 3570 if n[0] == "_" : continue # skip private/special methods 3571 if n in publicExclude : continue 3572 setattr(workflowCopy, n, getattr(self, n)) 3573 3574 privateInclude = ["_cdata", "_addTaskCore", "_waitForTasksCore", "_isTaskCompleteCore","_setRunning","_getRunning"] 3575 for n in privateInclude : 3576 setattr(workflowCopy, n, getattr(self, n)) 3577 3578 # final step: disable the run() function to be extra safe... 3579 workflowCopy.run = None 3580 3581 # set the task namespace: 3582 workflowCopy._appendNamespace(self._getNamespaceList()) 3583 workflowCopy._appendNamespace(label) 3584 3585 # add workflow task to the task-dag, and launch a new taskrunner thread 3586 # if one isn't already running: 3587 payload = WorkflowPayload(workflowCopy) 3588 self._addTaskCore(self._getNamespace(), label, payload, dependencies) 3589 return label
3590 3591
3592 - def waitForTasks(self, labels=None) :
3593 """ 3594 Wait for a list of tasks to complete. 3595 3596 @return: In case of an error in a task being waited for, or in 3597 one of these task's dependencies, the function returns 1. 3598 Else return 0. 3599 3600 @param labels: Container of task labels to wait for. If an empty container is 3601 given or no list is provided then wait for all 3602 outstanding tasks to complete. 3603 @type labels: A single string, or set, tuple or list of strings 3604 """ 3605 3606 self._requireInWorkflow() 3607 3608 return self._waitForTasksCore(self._getNamespace(), labels)
3609 3610
3611 - def isTaskComplete(self, taskLabel) :
3612 """ 3613 Query if a specific task is in the workflow and completed without error. 3614 3615 This can assist workflows with providing 3616 stable interrupt/resume behavior. 3617 3618 @param taskLabel: A task string 3619 3620 @return: Completion status of task 3621 """ 3622 3623 result = self._isTaskCompleteCore(self._getNamespace(), taskLabel) 3624 3625 # Complete = (Done and not Error) 3626 return (result[0] and not result[1])
3627
3628 - def isTaskDone(self, taskLabel) :
3629 """ 3630 Query if a specific task is in the workflow and is done, with or without error 3631 3632 This can assist workflows with providing 3633 stable interrupt/resume behavior. 3634 3635 @param taskLabel: A task string 3636 3637 @return: A boolean tuple specifying (task is done, task finished with error) 3638 """ 3639 3640 return self._isTaskCompleteCore(self._getNamespace(), taskLabel)
3641
3642 - def cancelTaskTree(self, taskLabel) :
3643 """ 3644 Cancel the given task and all of its dependencies. Canceling means that any running jobs will be stopped and 3645 any waiting job will be unqueued. Canceled tasks will not be treated as errors. Canceled tasks that are not 3646 already complete will be put into the waiting/ignored state. 3647 """ 3648 self._cancelTaskTreeCore(self._getNamespace(), taskLabel)
3649
3650 - def getRunMode(self) :
3651 """ 3652 Get the current run mode 3653 3654 This can be used to access the current run mode from 3655 within the workflow function. Although the runmode should 3656 be transparent to client code, this is occasionally needed 3657 to hack workarounds. 3658 3659 @return: Current run mode 3660 """ 3661 3662 self._requireInWorkflow() 3663 3664 return self._cdata().param.mode
3665 3666
3667 - def getNCores(self) :
3668 """ 3669 Get the current run core limit 3670 3671 This function can be used to access the current run's core 3672 limit from within the workflow function. This can be useful 3673 to eg. limit the number of cores requested by a single task. 3674 3675 @return: Total cores available to this workflow run 3676 @rtype: Integer value or 'unlimited' 3677 """ 3678 3679 self._requireInWorkflow() 3680 3681 return self._cdata().param.nCores
3682 3683
3684 - def limitNCores(self, nCores) :
3685 """ 3686 Takes an task nCores argument and reduces it to 3687 the maximum value allowed for the current run. 3688 3689 @param nCores: Proposed core requirement 3690 3691 @return: Min(nCores,Total cores available to this workflow run) 3692 """ 3693 3694 self._requireInWorkflow() 3695 3696 nCores = int(nCores) 3697 runNCores = self._cdata().param.nCores 3698 if runNCores == "unlimited" : return nCores 3699 return min(nCores, runNCores)
3700 3701
3702 - def getMemMb(self) :
3703 """ 3704 Get the current run's total memory limit (in megabytes) 3705 3706 @return: Memory limit in megabytes 3707 @rtype: Integer value or 'unlimited' 3708 """ 3709 3710 self._requireInWorkflow() 3711 3712 return self._cdata().param.memMb
3713 3714
3715 - def limitMemMb(self, memMb) :
3716 """ 3717 Takes a task memMb argument and reduces it to 3718 the maximum value allowed for the current run. 3719 3720 @param memMb: Proposed task memory requirement in megabytes 3721 3722 @return: Min(memMb,Total memory available to this workflow run) 3723 """ 3724 3725 self._requireInWorkflow() 3726 3727 memMb = int(memMb) 3728 runMemMb = self._cdata().param.memMb 3729 if runMemMb == "unlimited" : return memMb 3730 return min(memMb, runMemMb)
3731 3732
3733 - def isDryRun(self) :
3734 """ 3735 Get isDryRun flag value. 3736 3737 When the dryrun flag is set, no commands are actually run. Querying 3738 this flag allows dynamic workflows to correct for dry run behaviors, 3739 such as tasks which do no produce expected files. 3740 3741 @return: DryRun status flag 3742 """ 3743 3744 self._requireInWorkflow() 3745 3746 return self._cdata().param.isDryRun
3747 3748 3749 @staticmethod
3750 - def runModeDefaultCores(mode) :
3751 """ 3752 Get the default core limit for run mode (local,sge,..) 3753 3754 @param mode: run mode, as specified in L{the run() method<WorkflowRunner.run>} 3755 3756 @return: Default maximum number of cores for mode 3757 3758 @rtype: Either 'unlimited', or a string 3759 representation of the integer limit 3760 """ 3761 3762 return str(RunMode.data[mode].defaultCores)
3763 3764
3765 - def flowLog(self, msg, logState = LogState.INFO) :
3766 """ 3767 Send a message to the WorkflowRunner's log. 3768 3769 @param msg: Log message 3770 @type msg: A string or an array of strings. String arrays will be separated by newlines in the log. 3771 @param logState: Message severity, defaults to INFO. 3772 @type logState: A value in pyflow.LogState.{INFO,WARNING,ERROR} 3773 """ 3774 3775 self._requireInWorkflow() 3776 3777 linePrefixOut = "[%s]" % (self._cdata().param.workflowClassName) 3778 self._cdata().flowLog(msg, linePrefix=linePrefixOut, logState=logState)
3779 3780 3781 # Protected methods for client derived-class override: 3782
3783 - def workflow(self) :
3784 """ 3785 Workflow definition defined in child class 3786 3787 This method should be overridden in the class derived from 3788 L{WorkflowRunner} to specify the actual workflow logic. Client 3789 code should not call this method directly. 3790 """ 3791 pass
3792 3793 3794 # private methods: 3795 3796 # special workflowRunner Exception used to terminate workflow() function 3797 # if a ctrl-c is issued
3798 - class _AbortWorkflowException(Exception) :
3799 pass
3800 3801
3802 - def _flowLog(self, msg, logState) :
3803 linePrefixOut = "[WorkflowRunner]" 3804 self._cdata().flowLog(msg, linePrefix=linePrefixOut, logState=logState)
3805
3806 - def _infoLog(self, msg) :
3807 self._flowLog(msg,logState=LogState.INFO)
3808
3809 - def _warningLog(self, msg) :
3810 self._flowLog(msg,logState=LogState.WARNING)
3811
3812 - def _errorLog(self, msg) :
3813 self._flowLog(msg,logState=LogState.ERROR)
3814
3815 - def _whoami(self) :
3816 # returns name of *derived* class 3817 return self.__class__.__name__
3818 3819
3820 - def _getNamespaceList(self) :
3821 try: 3822 return self._namespaceList 3823 except AttributeError: 3824 self._namespaceList = [] 3825 return self._namespaceList
3826
3827 - def _getNamespace(self) :
3828 return namespaceSep.join(self._getNamespaceList())
3829
3830 - def _appendNamespace(self, names) :
3831 names = lister(names) 3832 for name in names : 3833 # check against runaway recursion: 3834 if len(self._getNamespaceList()) >= WorkflowRunner._maxWorkflowRecursion : 3835 raise Exception("Recursive workflow invocation depth exceeds maximum allowed depth of %i" % (WorkflowRunner._maxWorkflowRecursion)) 3836 WorkflowRunner._checkTaskLabel(name) 3837 self._getNamespaceList().append(name)
3838 3839 3840 # flag used to request the termination of all task submission: 3841 # 3842 _allStop = threading.Event() 3843 3844 @staticmethod
3845 - def _stopAllWorkflows() :
3846 # request all workflows stop task submission: 3847 WorkflowRunner._allStop.set()
3848 3849 @staticmethod
3850 - def _isWorkflowStopped() :
3851 # check whether a global signal has been give to stop all workflow submission 3852 # this should only be true when a ctrl-C or similar event has occurred. 3853 return WorkflowRunner._allStop.isSet()
3854
3855 - def _addTaskCore(self, namespace, label, payload, dependencies) :
3856 # private core taskAdd routine for hijacking 3857 # fromWorkflow is the workflow instance used to launch the task 3858 # 3859 3860 # add workflow task to the task-dag, and launch a new taskrunner thread 3861 # if one isn't already running: 3862 if self._isWorkflowStopped() : 3863 raise WorkflowRunner._AbortWorkflowException 3864 3865 self._infoLog("Adding %s '%s' to %s" % (payload.desc(), namespaceJoin(namespace, label), namespaceLabel(namespace))) 3866 3867 # add task to the task-dag, and launch a new taskrunner thread 3868 # if one isn't already running: 3869 dependencies = setzer(dependencies) 3870 self._tdag.addTask(namespace, label, payload, dependencies) 3871 self._startTaskManager()
3872 3873
3874 - def _getWaitStatus(self, namespace, labels, status) :
3875 # update and return two values: 3876 # (1) isAllTaskDone -- are all tasks done (ie. error or complete state 3877 # (2) retval -- this is set to one if any tasks have errors 3878 # 3879 3880 def updateStatusFromTask(task, status) : 3881 if not task.isDone() : 3882 status.isAllTaskDone = False 3883 elif not task.isComplete() : 3884 status.retval = 1 3885 if status.retval == 0 and (not self._cdata().isTaskSubmissionActive()) : 3886 status.retval = 1 3887 if status.retval == 0 and task.isDead() : 3888 status.retval = 1
3889 3890 3891 if len(labels) == 0 : 3892 if namespace == "" : 3893 if self._tdag.isRunExhausted() or (not self._tman.isAlive()) : 3894 if not self._tdag.isRunComplete() : 3895 status.retval = 1 3896 else: 3897 status.isAllTaskDone = False 3898 else : 3899 for task in self._tdag.getAllNodes(namespace) : 3900 updateStatusFromTask(task, status) 3901 else : 3902 for l in labels : 3903 if not self._tdag.isTaskPresent(namespace, l) : 3904 raise Exception("Task: '%s' is not in taskDAG" % (namespaceJoin(namespace, l))) 3905 task = self._tdag.getTask(namespace, l) 3906 updateStatusFromTask(task, status) 3907 3908
3909 - def _waitForTasksCore(self, namespace, labels=None, isVerbose=True) :
3910 labels = setzer(labels) 3911 if isVerbose : 3912 msg = "Pausing %s until completion of" % (namespaceLabel(namespace)) 3913 if len(labels) == 0 : 3914 self._infoLog(msg + " its current tasks") 3915 else: 3916 self._infoLog(msg + " task(s): %s" % (",".join([namespaceJoin(namespace, l) for l in labels]))) 3917 3918 class WaitStatus: 3919 def __init__(self) : 3920 self.isAllTaskDone = True 3921 self.retval = 0
3922 3923 ewaiter = ExpWaiter(1, 1.7, 15) 3924 while True : 3925 if self._isWorkflowStopped() : 3926 raise WorkflowRunner._AbortWorkflowException 3927 status = WaitStatus() 3928 self._getWaitStatus(namespace, labels, status) 3929 if status.isAllTaskDone or (status.retval != 0) : break 3930 ewaiter.wait() 3931 3932 if isVerbose : 3933 msg = "Resuming %s" % (namespaceLabel(namespace)) 3934 self._infoLog(msg) 3935 return status.retval 3936 3937
3938 - def _isTaskCompleteCore(self, namespace, taskLabel) :
3939 """ 3940 @return: A boolean tuple specifying (task is done, task finished with error) 3941 """ 3942 3943 if not self._tdag.isTaskPresent(namespace, taskLabel) : 3944 return (False, False) 3945 task = self._tdag.getTask(namespace, taskLabel) 3946 return ( task.isDone(), task.isError() )
3947
3948 - def _cancelTaskTreeCore(self, namespace, taskLabel) :
3949 if not self._tdag.isTaskPresent(namespace, taskLabel) : 3950 return 3951 task = self._tdag.getTask(namespace, taskLabel) 3952 self._tman.cancelTaskTree(task)
3953 3954 @staticmethod
3955 - def _checkTaskLabel(label) :
3956 # sanity check label: 3957 if not isinstance(label, basestring) : 3958 raise Exception ("Task label is not a string") 3959 if label == "" : 3960 raise Exception ("Task label is empty") 3961 if not re.match("^[A-Za-z0-9_-]+$", label) : 3962 raise Exception ("Task label is invalid due to disallowed characters. Label: '%s'" % (label))
3963 3964
3965 - def _startTaskManager(self) :
3966 # start a new task manager if one isn't already running: 3967 # 3968 if (self._tman is not None) and (self._tman.isAlive()) : return 3969 if not self._cdata().isTaskManagerException : 3970 self._tman = TaskManager(self._cdata(), self._tdag) 3971 self._tman.start()
3972 3973
3974 - def _notify(self, msg, logState) :
3975 # msg is printed to log AND sent to any email or other requested 3976 # notification systems: 3977 self._flowLog(msg,logState) 3978 self._cdata().emailNotification(msg, self._flowLog)
3979 3980
3981 - def _killWorkflow(self, errorMsg) :
3982 self._notify(errorMsg,logState=LogState.ERROR) 3983 self._shutdownAll(timeoutSec=10) 3984 sys.exit(1)
3985 3986
3987 - def _shutdownAll(self, timeoutSec) :
3988 # Try to shut down the task manager, all command-tasks, 3989 # and all sub-workflow tasks. 3990 # 3991 if (self._tman is None) or (not self._tman.isAlive()) : return 3992 StoppableThread.stopAll() 3993 self._stopAllWorkflows() 3994 self._tman.stop() 3995 for _ in range(timeoutSec) : 3996 time.sleep(1) 3997 if not self._tman.isAlive() : 3998 self._infoLog("Task shutdown complete") 3999 return 4000 self._infoLog("Task shutdown timed out")
4001 4002
4003 - def _cdata(self) :
4004 # We're doing this convoluted setup only to avoid having a 4005 # ctor for ease of use by the client. See what pyFlow goes 4006 # through for you client code?? 4007 # 4008 try: 4009 return self._constantData 4010 except AttributeError: 4011 self._constantData = WorkflowRunnerThreadSharedData() 4012 return self._constantData
4013 4014 4015 # TODO: Better definition of the status thread shutdown at the end of a pyflow run to 4016 # prevent race conditions -- ie. what happens if the status update is running while 4017 # pyflow is shutting down? Every method called by the status updater should be safety 4018 # checked wrt this issue. 4019 #
4020 - def _runUpdate(self, runStatus) :
4021 while True : 4022 time.sleep(self.updateInterval * 60) 4023 4024 status = self._tdag.getTaskStatus() 4025 isSpecComplete = (runStatus.isSpecificationComplete.isSet() and status.isAllSpecComplete) 4026 report = [] 4027 report.append("===== " + self._whoami() + " StatusUpdate =====") 4028 report.append("Workflow specification is complete?: %s" % (str(isSpecComplete))) 4029 report.append("Task status (waiting/queued/running/complete/error): %i/%i/%i/%i/%i" 4030 % (status.waiting, status.queued, status.running, status.complete, status.error)) 4031 report.append("Longest ongoing queued task time (hrs): %.4f" % (status.longestQueueSec / 3600.)) 4032 report.append("Longest ongoing queued task name: '%s'" % (status.longestQueueName)) 4033 report.append("Longest ongoing running task time (hrs): %.4f" % (status.longestRunSec / 3600.)) 4034 report.append("Longest ongoing running task name: '%s'" % (status.longestRunName)) 4035 4036 report = [ "[StatusUpdate] " + line for line in report ] 4037 self._infoLog(report) 4038 4039 # Update interval is also an appropriate interval to dump a stack-trace of all active 4040 # threads. This is a useful post-mortem in the event of a large class of hang/deadlock 4041 # errors: 4042 # 4043 stackDumpFp = open(self._cdata().stackDumpLogFile, "a") 4044 4045 # create one fully decorated line in the stack dump file as a prefix to the report: 4046 linePrefixOut = "[%s] [StackDump]" % (self._cdata().getRunid()) 4047 ofpList = [stackDumpFp] 4048 log(ofpList, "Initiating stack dump for all threads", linePrefixOut) 4049 4050 stackDump(stackDumpFp) 4051 hardFlush(stackDumpFp) 4052 stackDumpFp.close()
4053 4054
4055 - def _runWorkflow(self, param) :
4056 # 4057 # Primary workflow logic when nothing goes wrong: 4058 # 4059 self._setupWorkflow(param) 4060 self._initMessage() 4061 4062 runStatus = RunningTaskStatus(self._tdag.isFinishedEvent) 4063 4064 # start status update reporter: 4065 # 4066 # TODO: stop this thread at end of run 4067 # 4068 if(self.updateInterval > 0) : 4069 hb = threading.Thread(target=WorkflowRunner._runUpdate, args=(self, runStatus)) 4070 hb.setDaemon(True) 4071 hb.setName("StatusUpdate-Thread") 4072 hb.start() 4073 4074 # run workflow() function on a separate thread, using exactly 4075 # the same method we use for sub-workflows: 4076 # 4077 # TODO: move the master workflow further into the code path used by sub-workflows, 4078 # so that we aren't replicating polling and error handling code in this function: 4079 # 4080 trun = WorkflowTaskRunner(runStatus, "masterWorkflow", self, self._cdata().flowLog, None) 4081 trun.start() 4082 # can't join() because that blocks SIGINT 4083 ewaiter = ExpWaiter(1, 1.7, 15,runStatus.isComplete) 4084 while True : 4085 if not trun.isAlive() : break 4086 ewaiter.wait() 4087 4088 if not runStatus.isComplete.isSet() : 4089 # if not complete then we don't know what happened, very bad!: 4090 runStatus.errorCode = 1 4091 runStatus.errorMessage = "Thread: '%s', has stopped without a traceable cause" % (trun.getName()) 4092 4093 self._taskInfoWriter.flush() 4094 self._taskStatusWriter.flush() 4095 4096 return self._evalWorkflow(runStatus)
4097 4098
4099 - def _setupWorkflow(self, param) :
4100 cdata = self._cdata() 4101 4102 # setup instance user parameters: 4103 cdata.setupNewRun(param) 4104 4105 # setup other instance data: 4106 self._tdag = TaskDAG(cdata.param.isContinue, cdata.param.isForceContinue, cdata.param.isDryRun, 4107 cdata.taskInfoFile, cdata.taskStateFile, cdata.param.workflowClassName, 4108 cdata.param.startFromTasks, cdata.param.ignoreTasksAfter, cdata.param.resetTasks, 4109 self._flowLog) 4110 self._tman = None 4111 4112 def backupFile(inputFile) : 4113 """ 4114 backup old state files if they exist 4115 """ 4116 if not os.path.isfile(inputFile) : return 4117 fileDir = os.path.dirname(inputFile) 4118 fileName = os.path.basename(inputFile) 4119 backupDir = os.path.join(fileDir, "backup") 4120 ensureDir(backupDir) 4121 backupFileName = fileName + ".backup_before_starting_run_%s.txt" % (cdata.getRunid()) 4122 backupFile = os.path.join(backupDir, backupFileName) 4123 shutil.copyfile(inputFile, backupFile)
4124 4125 backupFile(cdata.taskStateFile) 4126 backupFile(cdata.taskInfoFile) 4127 4128 if cdata.param.isContinue : 4129 self._setupContinuedWorkflow() 4130 4131 self._taskInfoWriter = TaskFileWriter(self._tdag.writeTaskInfo) 4132 self._taskStatusWriter = TaskFileWriter(self._tdag.writeTaskStatus) 4133 4134 self._tdag.isWriteTaskInfo = self._taskInfoWriter.isWrite 4135 self._tdag.isWriteTaskStatus = self._taskStatusWriter.isWrite 4136 4137 self._taskInfoWriter.start() 4138 self._taskStatusWriter.start() 4139 4140 4141
4142 - def _createContinuedStateFile(self) :
4143 # 4144 # create continued version of task state file 4145 # 4146 4147 cdata = self._cdata() 4148 if not os.path.isfile(cdata.taskStateFile) : return set() 4149 4150 tmpFile = cdata.taskStateFile + ".update.incomplete" 4151 tmpfp = open(tmpFile, "w") 4152 tmpfp.write(taskStateHeader()) 4153 complete = set() 4154 for words in taskStateParser(cdata.taskStateFile) : 4155 (runState, errorCode) = words[2:4] 4156 if (runState != "complete") or (int(errorCode) != 0) : continue 4157 tmpfp.write("\t".join(words) + "\n") 4158 (label, namespace) = words[0:2] 4159 complete.add(namespaceJoin(namespace, label)) 4160 4161 tmpfp.close() 4162 forceRename(tmpFile, cdata.taskStateFile) 4163 return complete
4164 4165
4166 - def _createContinuedInfoFile(self, complete) :
4167 # 4168 # create continued version of task info file 4169 # 4170 4171 cdata = self._cdata() 4172 if not os.path.isfile(cdata.taskInfoFile) : return 4173 4174 tmpFile = cdata.taskInfoFile + ".update.incomplete" 4175 tmpfp = open(tmpFile, "w") 4176 tmpfp.write(taskInfoHeader()) 4177 for words in taskInfoParser(cdata.taskInfoFile) : 4178 (label, namespace, ptype, nCores, memMb, priority, isForceLocal, depStr, cwdStr, command) = words 4179 fullLabel = namespaceJoin(namespace, label) 4180 if fullLabel not in complete : continue 4181 tmpfp.write("\t".join(words) + "\n") 4182 if ptype == "command" : 4183 if command == "" : command = None 4184 payload = CmdPayload(fullLabel, Command(command, cwdStr), int(nCores), int(memMb), int(priority), argToBool(isForceLocal)) 4185 elif ptype == "workflow" : 4186 payload = WorkflowPayload(None) 4187 else : assert 0 4188 4189 self._tdag.addTask(namespace, label, payload, getTaskInfoDepSet(depStr), isContinued=True) 4190 4191 tmpfp.close() 4192 forceRename(tmpFile, cdata.taskInfoFile)
4193 4194 4195
4196 - def _setupContinuedWorkflow(self) :
4197 # reduce both state files to completed states only. 4198 complete = self._createContinuedStateFile() 4199 self._createContinuedInfoFile(complete)
4200 4201 4202
4203 - def _initMessage(self) :
4204 param = self._cdata().param # shortcut 4205 msg = ["Initiating pyFlow run"] 4206 msg.append("pyFlowClientWorkflowClass: %s" % (param.workflowClassName)) 4207 msg.append("pyFlowVersion: %s" % (__version__)) 4208 msg.append("pythonVersion: %s" % (pythonVersion)) 4209 msg.append("WorkingDir: '%s'" % (self._cdata().cwd)) 4210 msg.append("ProcessCmdLine: '%s'" % (cmdline())) 4211 4212 parammsg = ["mode: %s" % (param.mode), 4213 "nCores: %s" % (str(param.nCores)), 4214 "memMb: %s" % (str(param.memMb)), 4215 "dataDir: %s" % (str(param.dataDir)), 4216 "isDryRun: %s" % (str(param.isDryRun)), 4217 "isContinue: %s" % (str(param.isContinue)), 4218 "isForceContinue: %s" % (str(param.isForceContinue)), 4219 "mailTo: '%s'" % (",".join(param.mailTo))] 4220 for i in range(len(parammsg)): 4221 parammsg[i] = "[RunParameters] " + parammsg[i] 4222 msg += parammsg 4223 self._infoLog(msg)
4224 4225 4226
4227 - def _getTaskErrorsSummaryMsg(self, isForceTaskHarvest=False) :
4228 # isForceHarvest means we try to force an update of the shared 4229 # taskError information in case this thread is ahead of the 4230 # task manager. 4231 if isForceTaskHarvest : 4232 if (self._tman is not None) and (self._tman.isAlive()) : 4233 self._tman.harvestTasks() 4234 4235 if not self._cdata().isTaskError() : return [] 4236 # this case has already been emailed in the TaskManager @ first error occurrence: 4237 msg = ["Worklow terminated due to the following task errors:"] 4238 for task in self._cdata().taskErrors : 4239 msg.extend(task.getTaskErrorMsg()) 4240 return msg
4241 4242
4243 - def _evalWorkflow(self, masterRunStatus) :
4244 4245 isError = False 4246 if self._cdata().isTaskError() : 4247 msg = self._getTaskErrorsSummaryMsg() 4248 self._errorLog(msg) 4249 isError = True 4250 4251 if masterRunStatus.errorCode != 0 : 4252 eMsg = lister(masterRunStatus.errorMessage) 4253 if (len(eMsg) > 1) or (len(eMsg) == 1 and eMsg[0] != "") : 4254 msg = ["Failed to complete master workflow, error code: %s" % (str(masterRunStatus.errorCode))] 4255 msg.append("errorMessage:") 4256 msg.extend(eMsg) 4257 self._notify(msg,logState=LogState.ERROR) 4258 isError = True 4259 4260 if self._cdata().isTaskManagerException : 4261 # this case has already been emailed in the TaskManager: 4262 self._errorLog("Workflow terminated due to unhandled exception in TaskManager") 4263 isError = True 4264 4265 if (not isError) and (not self._tdag.isRunComplete()) : 4266 msg = "Workflow terminated with unknown error condition" 4267 self._notify(msg,logState=LogState.ERROR) 4268 isError = True 4269 4270 if isError: return 1 4271 4272 elapsed = int(time.time() - self.runStartTimeStamp) 4273 msg = [] 4274 if self._cdata().param.successMsg is not None : 4275 msg.extend([self._cdata().param.successMsg,""]) 4276 msg.extend(["Workflow successfully completed all tasks", 4277 "Elapsed time for full workflow: %s sec" % (elapsed)]) 4278 self._notify(msg,logState=LogState.INFO) 4279 return 0
4280 4281
4282 - def _requireInWorkflow(self) :
4283 """ 4284 check that the calling method is being called as part of a pyflow workflow() method only 4285 """ 4286 if not self._getRunning(): 4287 raise Exception("Method must be a (call stack) descendant of WorkflowRunner workflow() method (via run() method)")
4288 4289
4290 - def _initRunning(self):
4291 try : 4292 assert(self._isRunning >= 0) 4293 except AttributeError : 4294 self._isRunning = 0
4295 4296 @lockMethod
4297 - def _setRunning(self, isRunning) :
4298 self._initRunning() 4299 if isRunning : 4300 self._isRunning += 1 4301 else : 4302 self._isRunning -= 1
4303 4304 @lockMethod
4305 - def _getRunning(self) :
4306 self._initRunning() 4307 return (self._isRunning > 0)
4308 4309 4310 4311 if __name__ == "__main__" : 4312 help(WorkflowRunner) 4313