1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 pyflow -- a lightweight parallel task engine
37 """
38
39 __author__ = 'Christopher Saunders'
40
41
42 import copy
43 import datetime
44 import os
45 import re
46 import shutil
47 import subprocess
48 import sys
49 import threading
50 import time
51 import traceback
52
53 from pyflowConfig import siteConfig
54
55
56 moduleDir = os.path.abspath(os.path.dirname(__file__))
57
58
59
60
61 pyver = sys.version_info
62 if pyver[0] != 2 or (pyver[0] == 2 and pyver[1] < 4) :
63 raise Exception("pyflow module has only been tested for python versions [2.4,3.0)")
64
65
66
67
68
69
70 if pyver[0] == 2 and pyver[1] == 7 and pyver[2] == 2 :
71 raise Exception("Python interpreter errors in python 2.7.2 may cause a pyflow workflow hang or crash. Please use a different python version.")
72
73
74
75
76
77
78
79
80 subprocess._cleanup = lambda: None
81
82
83
84
85
86
87
88 try:
89 threading.stack_size(min(256 * 1024, threading.stack_size))
90 except AttributeError:
91
92 pass
96 """
97 Control total memory usage in non-local run modes by
98 limiting the number of simultaneous subprocess calls
99
100 Note that in practice this only controls the total number
101 of qsub/qstat calls in SGE mode
102 """
103 maxSubprocess = 2
104 subprocessControl = threading.Semaphore(maxSubprocess)
105
109 python_version = sys.version_info
110 return ".".join([str(i) for i in python_version])
111
112 pythonVersion = getPythonVersion()
119
120 pyflowAutoVersion = None
121
122
123 if pyflowAutoVersion is not None : return pyflowAutoVersion
124
125
126 try :
127 proc = subprocess.Popen(["git", "describe"], stdout=subprocess.PIPE, stderr=open(os.devnull, "w"), cwd=moduleDir, shell=False)
128 (stdout, _stderr) = proc.communicate()
129 retval = proc.wait()
130 stdoutList = stdout.split("\n")[:-1]
131 if (retval == 0) and (len(stdoutList) == 1) : return stdoutList[0]
132 except OSError:
133
134 pass
135
136 return "unknown"
137
138
139 __version__ = getPyflowVersion()
146 import platform
147 return (platform.system().find("Windows") > -1)
148
151
155
160 """
161 dst is only overwritten in a single atomic operation on *nix
162 on windows, we can't have atomic rename, but we can recreate the behavior otherwise
163 """
164 if isWindows() :
165 if os.path.exists(dst) :
166 os.remove(dst)
167
168 maxTrials=5
169 for trial in range(maxTrials) :
170 try :
171 os.rename(src,dst)
172 return
173 except OSError :
174 if (trial+1) >= maxTrials : raise
175 time.sleep(5)
176
180 """
181 clear bash functions out of the env
182
183 without this change the shellshock security update causes pyflow SGE jobs to
184 fail with the behavior of current (201512) versions of SGE qsub
185 """
186
187 ekeys = os.environ.keys()
188 for key in ekeys :
189 if key.endswith("()") :
190 del os.environ[key]
191
197 """
198 make directory if it doesn't already exist, raise exception if
199 something else is in the way:
200 """
201 if os.path.exists(d):
202 if not os.path.isdir(d) :
203 raise Exception("Can't create directory: %s" % (d))
204 else :
205 os.makedirs(d)
206
214 """
215 converts time.time() output to timenow() string
216 """
217 return datetime.datetime.utcfromtimestamp(ts).isoformat()
218
221
223 import calendar
224 d = datetime.datetime(*map(int, re.split(r'[^\d]', ts)[:-1]))
225 return calendar.timegm(d.timetuple())
226
230 return isinstance(x, (int, long))
231
233 return isinstance(x, basestring)
234
237 return (getattr(x, '__iter__', False) != False)
238
241 """
242 Convert input into a list, whether it's already iterable or
243 not. Make an exception for individual strings to be returned
244 as a list of one string, instead of being chopped into letters
245 Also, convert None type to empty list:
246 """
247
248 if x is None : return []
249 if (isString(x) or (not isIterable(x))) : return [x]
250 return list(x)
251
255 """
256 convert user input into a set, handling the pathological case
257 that you have been handed a single string, and you don't want
258 a set of letters:
259 """
260 return set(lister(x))
261
265 """
266 A simple logging enum
267 """
268 INFO = 1
269 WARNING = 2
270 ERROR = 3
271
272 @classmethod
274 if logState == cls.INFO : return "INFO"
275 if logState == cls.WARNING : return "WARNING"
276 if logState == cls.ERROR : return "ERROR"
277
278 raise Exception("Unknown log state: " + str(logState))
279
284
296
297
298
299 -def log(ofpList, msgList, linePrefix=None):
300 """
301 General logging function.
302
303 @param ofpList: A container of file objects to write to
304
305 @param msgList: A container of (or a single) multi-line log message
306 string. Final newlines are not required
307
308 @param linePrefix: A prefix to add before every line. This will come
309 *after* the log function's own '[time] [hostname]'
310 prefix.
311
312 @return: Returns a boolean tuple of size ofpList indicating the success of
313 writing to each file object
314 """
315 msgList = lister(msgList)
316 ofpList = setzer(ofpList)
317 retval = [True] * len(ofpList)
318 for msg in msgList :
319
320 if (len(msg) > 0) and (msg[-1] == "\n") : msg = msg[:-1]
321 linePrefixOut = "[%s] [%s]" % (timeStrNow(), siteConfig.getHostName())
322 if linePrefix is not None : linePrefixOut += " " + linePrefix
323
324 for i, ofp in enumerate(ofpList):
325
326 if not retval[i] : continue
327 try :
328 for line in msg.split("\n") :
329 ofp.write("%s %s\n" % (linePrefixOut, line))
330 hardFlush(ofp)
331 except IOError:
332 retval[i] = False
333 return retval
334
338 return threading.currentThread().getName()
339
341 return (getThreadName == "MainThread")
342
345 """
346 fakes a filehandle for library functions which write to a stream,
347 and captures output in a string
348 """
351
352 - def write(self, string) :
354
357
360 return traceback.format_exc()
361
364
365 msg = ("Unhandled Exception in %s\n" % (getThreadName())) + getTracebackStr()
366 if msg[-1] == "\n" : msg = msg[:-1]
367 return msg.split("\n")
368
371 return " ".join(sys.argv)
372
376 """
377 convert string or list of strings into a single string message
378 """
379 msg = ""
380 isFirst=True
381 for chunk in lister(msgList) :
382 if isFirst :
383 isFirst = False
384 else :
385 msg += "\n"
386 if ((len(chunk)>0) and (chunk[-1] == '\n')) :
387 chunk = chunk[:-1]
388 msg += chunk
389
390 return msg
391
392
393
394 emailRegex = re.compile(r"(?:^|\s)[-a-z0-9_.]+@(?:[-a-z0-9]+\.)+[a-z]{2,6}(?:\s|$)", re.IGNORECASE)
398
401 """
402 return true if a local smtp server is available
403 """
404 import smtplib
405 try :
406 s = smtplib.SMTP('localhost')
407 except :
408 return False
409 return True
410
411
412 -def sendEmail(mailTo, mailFrom, subject, msgList) :
413 import smtplib
414
415 from email.MIMEText import MIMEText
416
417
418 msg = msgListToMsg(msgList)
419
420 mailTo = setzer(mailTo)
421
422 msg = MIMEText(msg)
423 msg["Subject"] = subject
424 msg["From"] = mailFrom
425 msg["To"] = ", ".join(mailTo)
426
427 s = smtplib.SMTP('localhost')
428 s.sendmail(mailFrom, list(mailTo), msg.as_string())
429 s.quit()
430
434
437 """
438 convert argument of unknown type to a bool:
439 """
440 class FalseStrings :
441 val = ("", "0", "false", "f", "no", "n", "off")
442
443 if isinstance(x, basestring) :
444 return (x.lower() not in FalseStrings.val)
445 return bool(x)
446
449 """
450 This function hashes objects values -- the hash will be the
451 same for two objects containing the same methods and data, so
452 it corresponds to 'A==B' and *not* 'A is B'.
453 """
454 import pickle
455 import hashlib
456 hashlib.md5(pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)).hexdigest()
457
458
459 namespaceSep = "+"
463 """
464 join two strings with a separator only if a exists
465 """
466 if a == "" : return b
467 elif b == "" : return a
468 return a + namespaceSep + b
469
472 """
473 provide a consistent naming scheme to users for embedded workflows
474 """
475 if namespace == "" :
476 return "master workflow"
477 else :
478 return "sub-workflow '%s'" % (namespace)
479
483 """
484 Convenience object to setup exponentially increasing wait/polling times
485 """
486 - def __init__(self, startSec, factor, maxSec, event = None) :
487 """
488 optionally allow an event to interrupt wait cycle
489 """
490 assert (startSec > 0.)
491 assert (factor > 1.)
492 assert (maxSec >= startSec)
493 self.startSec = startSec
494 self.factor = factor
495 self.maxSec = maxSec
496 self.event = event
497
498 self.sec = self.startSec
499 self.isMax = False
500
502 self.sec = self.startSec
503
505 if self.event is None :
506 time.sleep(self.sec)
507 else :
508 self.event.wait(self.sec)
509 if self.isMax : return
510 self.sec = min(self.sec * self.factor, self.maxSec)
511 self.isMax = (self.sec == self.maxSec)
512 assert self.sec <= self.maxSec
513
517 """
518 method decorator acquires/releases object's lock
519 """
520
521 def wrapped(self, *args, **kw):
522 if not hasattr(self,"lock") :
523 self.lock = threading.RLock()
524
525 self.lock.acquire()
526 try:
527 return f(self, *args, **kw)
528 finally:
529 self.lock.release()
530 return wrapped
531
535 """
536 generic struct with named argument constructor
537 """
539 self.__dict__.update(kwds)
540
544 """
545 adapted from haridsv @ stackoverflow:
546 """
547
548 athreads = threading.enumerate()
549 tnames = [(th.getName()) for th in athreads]
550
551 frames = None
552 try:
553 frames = sys._current_frames()
554 except AttributeError:
555
556 pass
557
558 id2name = {}
559 try:
560 id2name = dict([(th.ident, th.getName()) for th in athreads])
561 except AttributeError :
562
563 pass
564
565 if (frames is None) or (len(tnames) > 50) :
566 dumpfp.write("ActiveThreadCount: %i\n" % (len(tnames)))
567 dumpfp.write("KnownActiveThreadNames:\n")
568 for name in tnames : dumpfp.write(" %s\n" % (name))
569 dumpfp.write("\n")
570 return
571
572 dumpfp.write("ActiveThreadCount: %i\n" % (len(frames)))
573 dumpfp.write("KnownActiveThreadNames:\n")
574 for name in tnames : dumpfp.write(" %s\n" % (name))
575 dumpfp.write("\n")
576
577 for tid, stack in frames.items():
578 dumpfp.write("Thread: %d %s\n" % (tid, id2name.get(tid, "NAME_UNKNOWN")))
579 for filename, lineno, name, line in traceback.extract_stack(stack):
580 dumpfp.write('File: "%s", line %d, in %s\n' % (filename, lineno, name))
581 if line is not None:
582 dumpfp.write(" %s\n" % (line.strip()))
583 dumpfp.write("\n")
584 dumpfp.write("\n")
585
597 return "#taskLabel\ttaskNamespace\trunState\terrorCode\trunStateUpdateTime\n"
598
601 class Constants :
602 nStateCols = 5
603
604 for line in open(stateFile) :
605 if len(line) and line[0] == "#" : continue
606 line = line.strip()
607 w = line.split("\t")
608 if len(w) != Constants.nStateCols :
609 raise Exception("Unexpected format in taskStateFile: '%s' line: '%s'" % (stateFile, line))
610 yield [x.strip() for x in w]
611
614 return "#%s\n" % ("\t".join(("taskLabel", "taskNamespace", "taskType", "nCores", "memMb", "priority", "isForceLocal", "dependencies", "cwd", "command")))
615
618 class Constants :
619 nInfoCols = 10
620
621 for line in open(infoFile) :
622 if len(line) and line[0] == "#" : continue
623 line = line.lstrip()
624 w = line.split("\t", (Constants.nInfoCols - 1))
625 if len(w) != Constants.nInfoCols :
626 raise Exception("Unexpected format in taskInfoFile: '%s' line: '%s'" % (infoFile, line))
627 yield [x.strip() for x in w]
628
631
632 s = s.strip()
633 if s == "" : return []
634 return set([d.strip() for d in s.split(",")])
635
639
640 validRunstates = ("complete", "running", "queued", "waiting", "error")
641
645 """
646 A static container of configuration data for dot graph output
647 """
648
649 runstateDotColor = {"waiting" : "grey",
650 "running" : "green",
651 "queued" : "yellow",
652 "error" : "red",
653 "complete" : "blue" }
654
655 runstateDotStyle = {"waiting" : "dashed",
656 "running" : None,
657 "queued" : None,
658 "error" : "bold",
659 "complete" : None }
660
661 @staticmethod
669
670 @staticmethod
672 attrib = ""
673 if nodeType == "workflow" :
674 attrib += " shape=rect style=rounded"
675 return attrib
676
677 @staticmethod
679 string = '{ rank = source; Legend [shape=none, margin=0, label=<\n'
680 string += '<TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0" CELLPADDING="4">\n'
681 string += '<TR><TD COLSPAN="2">Legend</TD></TR>\n'
682 for state in TaskNodeConstants.validRunstates :
683 color = DotConfig.runstateDotColor[state]
684 string += '<TR> <TD>%s</TD> <TD BGCOLOR="%s"></TD> </TR>\n' % (state, color)
685 string += '</TABLE>>];}\n'
686 return string
687
688
689
690 -def writeDotGraph(taskInfoFile, taskStateFile, workflowClassName) :
691 """
692 write out the current graph state in dot format
693 """
694
695 addOrder = []
696 taskInfo = {}
697 headNodes = set()
698 tailNodes = set()
699
700
701 for (label, namespace, ptype, _nCores, _memMb, _priority, _isForceLocal, depStr, _cwdStr, _command) in taskInfoParser(taskInfoFile) :
702 tid = (namespace, label)
703 addOrder.append(tid)
704 taskInfo[tid] = Bunch(ptype=ptype,
705 parentLabels=getTaskInfoDepSet(depStr))
706 if len(taskInfo[tid].parentLabels) == 0 : headNodes.add(tid)
707 tailNodes.add(tid)
708 for plabel in taskInfo[tid].parentLabels :
709 ptid = (namespace, plabel)
710 if ptid in tailNodes : tailNodes.remove(ptid)
711
712 for (label, namespace, runState, _errorCode, _time) in taskStateParser(taskStateFile) :
713 tid = (namespace, label)
714 taskInfo[tid].runState = runState
715
716 dotFp = sys.stdout
717 dotFp.write("// Task graph from pyflow object '%s'\n" % (workflowClassName))
718 dotFp.write("// Process command: '%s'\n" % (cmdline()))
719 dotFp.write("// Process working dir: '%s'\n" % (os.getcwd()))
720 dotFp.write("// Graph capture time: %s\n" % (timeStrNow()))
721 dotFp.write("\n")
722 dotFp.write("digraph %s {\n" % (workflowClassName + "Graph"))
723 dotFp.write("\tcompound=true;\nrankdir=LR;\nnode[fontsize=10];\n")
724 labelToSym = {}
725 namespaceGraph = {}
726 for (i, (namespace, label)) in enumerate(addOrder) :
727 tid = (namespace, label)
728 if namespace not in namespaceGraph :
729 namespaceGraph[namespace] = ""
730 sym = "n%i" % i
731 labelToSym[tid] = sym
732 attrib1 = DotConfig.getRunstateDotAttrib(taskInfo[tid].runState)
733 attrib2 = DotConfig.getTypeDotAttrib(taskInfo[tid].ptype)
734 namespaceGraph[namespace] += "\t\t%s [label=\"%s\"%s%s];\n" % (sym, label, attrib1, attrib2)
735
736 for (namespace, label) in addOrder :
737 tid = (namespace, label)
738 sym = labelToSym[tid]
739 for plabel in taskInfo[tid].parentLabels :
740 ptid = (namespace, plabel)
741 namespaceGraph[namespace] += ("\t\t%s -> %s;\n" % (labelToSym[ptid], sym))
742
743 for (i, ns) in enumerate(namespaceGraph.keys()) :
744 isNs = ((ns is not None) and (ns != ""))
745 dotFp.write("\tsubgraph cluster_sg%i {\n" % (i))
746 if isNs :
747 dotFp.write("\t\tlabel = \"%s\";\n" % (ns))
748 else :
749 dotFp.write("\t\tlabel = \"%s\";\n" % (workflowClassName))
750 dotFp.write(namespaceGraph[ns])
751 dotFp.write("\t\tbegin%i [label=\"begin\" shape=diamond];\n" % (i))
752 dotFp.write("\t\tend%i [label=\"end\" shape=diamond];\n" % (i))
753 for (namespace, label) in headNodes :
754 if namespace != ns : continue
755 sym = labelToSym[(namespace, label)]
756 dotFp.write("\t\tbegin%i -> %s;\n" % (i, sym))
757 for (namespace, label) in tailNodes :
758 if namespace != ns : continue
759 sym = labelToSym[(namespace, label)]
760 dotFp.write("\t\t%s -> end%i;\n" % (sym, i))
761 dotFp.write("\t}\n")
762 if ns in labelToSym :
763 dotFp.write("\t%s -> begin%i [style=dotted];\n" % (labelToSym[ns], i))
764
765
766
767 dotFp.write(DotConfig.getDotLegend())
768 dotFp.write("}\n")
769 hardFlush(dotFp)
770
771
772
773 -def writeDotScript(taskDotScriptFile,
774 taskInfoFileName, taskStateFileName,
775 workflowClassName) :
776 """
777 write dot task graph creation script
778 """
779 import inspect
780
781 dsfp = os.fdopen(os.open(taskDotScriptFile, os.O_WRONLY | os.O_CREAT, 0755), 'w')
782
783 dsfp.write("""#!/usr/bin/env python
784 #
785 # This is a script to create a dot graph from pyflow state files.
786 # Usage: $script >| task_graph.dot
787 #
788 # Note that script assumes the default pyflow state files are in the script directory.
789 #
790 # This file was autogenerated by process: '%s'
791 # ...from working directory: '%s'
792 #
793
794 import datetime,os,sys,time
795
796 scriptDir=os.path.abspath(os.path.dirname(__file__))
797 """ % (os.getcwd(), cmdline()))
798
799 for dobj in (timeStampToTimeStr, timeStrNow, cmdline, Bunch, LogGlobals, hardFlush, TaskNodeConstants, DotConfig, taskStateParser, taskInfoParser, getTaskInfoDepSet, writeDotGraph) :
800 dsfp.write("\n\n")
801 dsfp.write(inspect.getsource(dobj))
802
803 dsfp.write("""
804
805 if __name__ == '__main__' :
806 writeDotGraph(os.path.join(scriptDir,'%s'),os.path.join(scriptDir,'%s'),'%s')
807
808 """ % (taskInfoFileName, taskStateFileName, workflowClassName))
809
810
811
812
813
814
815
816
817
818
819 -class Command(object) :
820 """
821 Commands can be presented as strings or argument lists (or none)
822 """
823
824 - def __init__(self, cmd, cwd, env=None) :
825
826 if ((cmd is None) or
827 (cmd == "") or
828 (isIterable(cmd) and len(cmd) == 0)) :
829 self.cmd = None
830 self.type = "none"
831 elif isString(cmd) :
832 self.cmd = Command.cleanStr(cmd)
833 self.type = "str"
834 elif isIterable(cmd) :
835 self.cmd = []
836 for i, s in enumerate(cmd):
837 if not (isString(s) or isInt(s)):
838 raise Exception("Argument: '%s' from position %i in argument list command is not a string or integer. Full command: '%s'" %
839 (str(s), (i + 1), " ".join([str(s) for s in cmd])))
840 self.cmd.append(Command.cleanStr(s))
841 self.type = "list"
842 else :
843 raise Exception("Invalid task command: '%s'" % (str(cmd)))
844
845
846 self.cwd = ""
847 if cwd is not None and cwd != "" :
848 self.cwd = os.path.abspath(cwd)
849 if os.path.exists(self.cwd) and not os.path.isdir(self.cwd) :
850 raise Exception("Cwd argument is not a directory: '%s', provided for command '%s'" % (cwd, str(cmd)))
851
852
853 self.env = env
854
856 if self.cmd is None : return ""
857 if self.type == "str" : return self.cmd
858 return " ".join(self.cmd)
859
860 @staticmethod
862 if isInt(s) : s = str(s)
863 if "\n" in s : raise Exception("Task command/argument contains newline characters: '%s'" % (s))
864 return s.strip()
865
869 """
870 Thread class with a stop() method. The thread itself has to check
871 regularly for the stopped() condition.
872
873 Note that this is a very new thread base class for pyflow, and most
874 threads do not (yet) check their stopped status.
875
876 """
877
878 _stopAll = threading.Event()
879
881 threading.Thread.__init__(self, *args, **kw)
882 self._stop = threading.Event()
883
885 "thread specific stop method, may be overridden to add async thread-specific kill behavior"
886 self._stop.set()
887
888 @staticmethod
890 "quick global stop signal for threads that happen to poll stopped() very soon after event"
891 StoppableThread._stopAll.set()
892
895
904
908 """
909 Stores default values associated with each runmode: local,sge,...
910 """
911 - def __init__(self, defaultCores, defaultMemMbPerCore, defaultIsRetry) :
912 self.defaultCores = defaultCores
913 self.defaultMemMbPerCore = defaultMemMbPerCore
914 self.defaultIsRetry = defaultIsRetry
915
926
930 """
931 parameters pertaining to task retry behavior
932 """
933 allowed_modes = [ "nonlocal" , "all" ]
934
935 - def __init__(self, run_mode, retry_max, wait, window, retry_mode) :
936 if retry_mode not in self.allowed_modes :
937 raise Exception("Invalid retry mode parameter '%s'. Accepted retry modes are {%s}." \
938 % (retry_mode, ",".join(self.allowed_modes)))
939
940 self._retry_max = retry_max
941 self.wait = wait
942 self.window = window
943 self._retry_mode = retry_mode
944 self._run_mode = run_mode
945
946 self._finalize()
947 self.validate()
948
949
951 """
952 decide whether to turn retry off based on retry and run modes:
953 """
954 if (self._retry_mode == "nonlocal") and \
955 (not RunMode.data[self._run_mode].defaultIsRetry) :
956 self.max = 0
957 else :
958 self.max = int(self._retry_max)
959
960
962 """
963 check that the public parameters are valid
964 """
965 def nonNegParamCheck(val, valLabel) :
966 if val < 0 : raise Exception("Parameter %s must be non-negative" % valLabel)
967
968 nonNegParamCheck(self.max, "retryMax")
969 nonNegParamCheck(self.wait, "retryWait")
970 nonNegParamCheck(self.window, "retryWindow")
971
972
973 - def getTaskCopy(self,retry_max, wait, window, retry_mode):
974 """
975 return a deepcopy of the class customized for each individual task for
976 any retry parameters which are not None
977 """
978 taskself = copy.deepcopy(self)
979
980 if retry_max is not None:
981 taskself._retry_max = retry_max
982 if wait is not None:
983 taskself.wait = wait
984 if window is not None:
985 taskself.window = window
986 if retry_mode is not None :
987 taskself._retry_mode = retry_mode
988
989 taskself._finalize()
990 taskself.validate()
991 return taskself
992
995 """
996 simple object allowing remote task threads to communicate their
997 status back to the TaskManager
998 """
1000 self.isFinishedEvent = isFinishedEvent
1001 self.isComplete = threading.Event()
1002 self.errorCode = 0
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015 self.errorMessage = ""
1016
1017
1018 self.isSpecificationComplete = threading.Event()
1019
1022 """
1023 Each individual command-task or sub workflow task
1024 is run on its own thread using a class inherited from
1025 BaseTaskRunner
1026 """
1027
1028 - def __init__(self, runStatus, taskStr, sharedFlowLog, setRunstate) :
1029 StoppableThread.__init__(self)
1030 self.setDaemon(True)
1031 self.taskStr = taskStr
1032 self.setName("TaskRunner-Thread-%s" % (taskStr))
1033 self.runStatus = runStatus
1034 self._sharedFlowLog = sharedFlowLog
1035 self.lock = threading.RLock()
1036
1037
1038 self._setRunstate = setRunstate
1039
1040
1041
1042 self.setInitialRunstate()
1043
1044
1046 """
1047 BaseTaskRunner's run() method ensures that we can
1048 capture exceptions which might occur in this thread.
1049 Do not override this method -- instead define the core
1050 logic for the task run operation in '_run()'
1051
1052 Note that for sub-workflow tasks we're interpreting raw
1053 client python code on this thread, so exceptions are
1054 *very likely* here -- this is not a corner case.
1055 """
1056 retval = 1
1057 retmsg = ""
1058 try:
1059 (retval, retmsg) = self._run()
1060 except WorkflowRunner._AbortWorkflowException :
1061
1062
1063 pass
1064 except:
1065 retmsg = getExceptionMsg()
1066 self.runStatus.errorCode = retval
1067 self.runStatus.errorMessage = retmsg
1068
1069 self.runStatus.isComplete.set()
1070
1071
1072 self.runStatus.isFinishedEvent.set()
1073 return retval
1074
1076 if self._setRunstate is None : return
1077 self._setRunstate(*args, **kw)
1078
1081
1082 - def flowLog(self, msg, logState) :
1083 linePrefixOut = "[TaskRunner:%s]" % (self.taskStr)
1084 self._sharedFlowLog(msg, linePrefix=linePrefixOut, logState=logState)
1085
1088
1091
1094
1098 """
1099 Manages a sub-workflow task
1100 """
1101
1102 - def __init__(self, runStatus, taskStr, workflow, sharedFlowLog, setRunstate) :
1105
1118
1121 """
1122 Parent to local and SGE TaskRunner specializations for command tasks
1123 """
1124
1125 taskWrapper = os.path.join(moduleDir, "pyflowTaskWrapper.py")
1126
1127 - def __init__(self, runStatus, runid, taskStr, cmd, nCores, memMb, retry, isDryRun,
1128 outFile, errFile, tmpDir, schedulerArgList,
1129 sharedFlowLog, setRunstate) :
1130 """
1131 @param outFile: stdout file
1132 @param errFile: stderr file
1133 @param tmpDir: location to write files containing output from
1134 the task wrapper script (and not the wrapped task)
1135 """
1136 BaseTaskRunner.__init__(self, runStatus, taskStr, sharedFlowLog, setRunstate)
1137
1138 self.cmd = cmd
1139 self.nCores = nCores
1140 self.memMb = memMb
1141 self.retry = retry
1142 self.isDryRun = isDryRun
1143 self.outFile = outFile
1144 self.errFile = errFile
1145 self.tmpDir = tmpDir
1146 self.schedulerArgList = schedulerArgList
1147 self.runid = runid
1148 self.taskStr = taskStr
1149 if not os.path.isfile(self.taskWrapper) :
1150 raise Exception("Can't find task wrapper script: %s" % self.taskWrapper)
1151
1152
1154 import pickle
1155
1156 ensureDir(self.tmpDir)
1157 self.wrapFile = os.path.join(self.tmpDir, "pyflowTaskWrapper.signal.txt")
1158
1159
1160 taskInfo = { 'nCores' : self.nCores,
1161 'outFile' : self.outFile, 'errFile' : self.errFile,
1162 'cwd' : self.cmd.cwd, 'env' : self.cmd.env,
1163 'cmd' : self.cmd.cmd, 'isShellCmd' : (self.cmd.type == "str") }
1164
1165 argFile = os.path.join(self.tmpDir, "taskWrapperParameters.pickle")
1166 pickle.dump(taskInfo, open(argFile, "w"))
1167
1168 self.wrapperCmd = [self.taskWrapper, self.runid, self.taskStr, argFile]
1169
1170
1172 """
1173 Outer loop of _run() handles task retry behavior:
1174 """
1175
1176
1177 self.initFileSystemItems()
1178
1179 startTime = time.time()
1180 retries = 0
1181 retInfo = Bunch(retval=1, taskExitMsg="", isAllowRetry=False)
1182
1183 while not self.stopped() :
1184 if retries :
1185 self.infoLog("Retrying task: '%s'. Total prior task failures: %i" % (self.taskStr, retries))
1186
1187 if self.isDryRun :
1188 self.infoLog("Dryrunning task: '%s' task arg list: [%s]" % (self.taskStr, ",".join(['"%s"' % (s) for s in self.getFullCmd()])))
1189 retInfo.retval = 0
1190 else :
1191 self.runOnce(retInfo)
1192
1193 if retInfo.retval == 0 : break
1194 if retries >= self.retry.max : break
1195 elapsed = (time.time() - startTime)
1196 if (self.retry.window > 0) and \
1197 (elapsed >= self.retry.window) : break
1198 if self.stopped() : break
1199 if not retInfo.isAllowRetry : break
1200 retries += 1
1201 self.warningLog("Task: '%s' failed but qualifies for retry. Total task failures (including this one): %i. Task command: '%s'" % (self.taskStr, retries, str(self.cmd)))
1202 retInfo = Bunch(retval=1, taskExitMsg="", isAllowRetry=False)
1203 time.sleep(self.retry.wait)
1204
1205 return (retInfo.retval, retInfo.taskExitMsg)
1206
1207
1209 """
1210 Attempt to extract exit message from a failed command task, do not complain in
1211 case of any errors in task signal file for this case.
1212 """
1213 msgSize = None
1214 wrapFp = open(self.wrapFile)
1215 for line in wrapFp:
1216 w = line.strip().split()
1217 if (len(w) < 6) or (w[4] != "[wrapperSignal]") :
1218 break
1219 if w[5] == "taskStderrTail" :
1220 if (len(w) == 7) : msgSize = int(w[6])
1221 break
1222
1223 taskExitMsg = ""
1224 if msgSize is not None :
1225 i = 0
1226 for line in wrapFp:
1227 if i >= msgSize : break
1228 taskExitMsg += line
1229 i += 1
1230 wrapFp.close()
1231 return taskExitMsg
1232
1233
1235 """
1236 When the task is theoretically done, go and read the task wrapper to
1237 see the actual task exit code. This is required because:
1238
1239 1) On SGE or similar: We have no other way to get the exit code
1240
1241 2) On all systems, we can distinguish between a conventional task error
1242 and other problems, such as (a) linux OOM killer (b) exception in the
1243 task wrapper itself (c) filesystem failures.
1244 """
1245
1246 def checkWrapFileExit(result) :
1247 """
1248 return isError=True on error in file format only, missing or incomplete file
1249 is not considered an error and the function should not return an error for this
1250 case.
1251 """
1252
1253 if not os.path.isfile(self.wrapFile) : return
1254
1255 for line in open(self.wrapFile) :
1256
1257 if len(line) == 0 or line[-1] != '\n' : return
1258
1259 w = line.strip().split()
1260
1261 if len(w) < 6 :
1262 result.isError = True
1263 return
1264 if (w[4] != "[wrapperSignal]") :
1265 result.isError = True
1266 return
1267 if w[5] == "taskExitCode" :
1268 if (len(w) == 7) :
1269 result.taskExitCode = int(w[6])
1270 return
1271
1272 retryCount = 8
1273 retryDelaySec = 30
1274
1275 wrapResult = Bunch(taskExitCode=None, isError=False)
1276
1277 totalDelaySec = 0
1278 for trialIndex in range(retryCount) :
1279
1280
1281
1282 if trialIndex > 1 :
1283 msg = "No complete signal file found after %i seconds, retrying after delay. Signal file path: '%s'" % (totalDelaySec,self.wrapFile)
1284 self.flowLog(msg, logState=LogState.WARNING)
1285
1286 if trialIndex != 0 :
1287 time.sleep(retryDelaySec)
1288 totalDelaySec += retryDelaySec
1289
1290 checkWrapFileExit(wrapResult)
1291 if wrapResult.isError : break
1292 if wrapResult.taskExitCode is not None : break
1293
1294 return wrapResult
1295
1296
1298 if os.path.isfile(self.wrapFile) :
1299 stderrList = open(self.wrapFile).readlines()
1300 taskExitMsg = ["Anomalous task wrapper stderr output. Wrapper signal file: '%s'" % (self.wrapFile),
1301 "Logging %i line(s) of task wrapper log output below:" % (len(stderrList))]
1302 linePrefix = "[taskWrapper-stderr]"
1303 taskExitMsg.extend([linePrefix + " " + line for line in stderrList])
1304 else :
1305 taskExitMsg = ["Anomalous task wrapper condition: Wrapper signal file is missing: '%s'" % (self.wrapFile)]
1306
1307 return taskExitMsg
1308
1312
1314 return [sys.executable] + self.wrapperCmd
1315
1317
1318
1319 wrapFp = open(self.wrapFile, "w")
1320 proc = subprocess.Popen(self.getFullCmd(), stdout=wrapFp, stderr=subprocess.STDOUT, shell=False, bufsize=1)
1321 self.infoLog("Task initiated on local node")
1322 retInfo.retval = proc.wait()
1323 wrapFp.close()
1324
1325 wrapResult = self.getWrapFileResult()
1326
1327 if (wrapResult.taskExitCode is None) or (wrapResult.taskExitCode != retInfo.retval):
1328 retInfo.taskExitMsg = self.getWrapperErrorMsg()
1329 retInfo.retval = 1
1330 return retInfo
1331 elif retInfo.retval != 0 :
1332 retInfo.taskExitMsg = self.getExitMsg()
1333
1334 retInfo.isAllowRetry = True
1335
1336
1337 return retInfo
1338
1339
1340
1341 -class QCaller(threading.Thread) :
1342 """
1343 Calls to both qsub and qstat go through this run() method so that we
1344 can time them out:
1345 """
1346
1348 threading.Thread.__init__(self)
1349 self.setDaemon(True)
1350 self.setName("QCaller-Timeout-Thread")
1351 self.lock = threading.RLock()
1352 self.cmd = cmd
1353 self.infoLog = infoLog
1354 self.results = Bunch(isComplete=False, retval=1, outList=[])
1355 self.proc = None
1356 self.is_kill_attempt = False
1357
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374 GlobalSync.subprocessControl.acquire()
1375 try :
1376 tmp_proc = subprocess.Popen(self.cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False)
1377 self.lock.acquire()
1378 try:
1379 self.proc = tmp_proc
1380
1381 if self.is_kill_attempt: self.killProc()
1382 finally:
1383 self.lock.release()
1384
1385 if self.is_kill_attempt: return
1386
1387 for line in self.proc.stdout :
1388 self.results.outList.append(line)
1389 self.results.retval = self.proc.wait()
1390 finally:
1391 GlobalSync.subprocessControl.release()
1392 self.results.isComplete = True
1393
1394 @lockMethod
1396 import signal
1397
1398 self.is_kill_attempt = True
1399
1400 if self.proc is None : return
1401
1402 try:
1403 os.kill(self.proc.pid , signal.SIGTERM)
1404 self.infoLog("Sent SIGTERM to sge command process id: %i" % (self.proc.pid))
1405 except OSError :
1406
1407 pass
1408
1412
1414
1415
1416 qsubCmd = ["qsub",
1417 "-V",
1418 "-cwd",
1419 "-S", sys.executable,
1420 "-o", self.wrapFile,
1421 "-e", self.wrapFile]
1422
1423 qsubCmd.extend(self.schedulerArgList)
1424 qsubCmd.extend(siteConfig.qsubResourceArg(self.nCores, self.memMb))
1425 qsubCmd.extend(self.wrapperCmd)
1426
1427 return tuple(qsubCmd)
1428
1429
1432
1433
1434 @lockMethod
1436 """
1437 if stopped here, this is the case where a ctrl-c was entered while the qsub
1438 command was being submitted, so we must kill the job here:
1439 """
1440 self.jobId = jobId
1441 if self.stopped(): self._killJob()
1442
1443
1445
1446 def qcallWithTimeouts(cmd, maxQcallAttempt=1) :
1447 maxQcallWait = 180
1448 qcall = None
1449 for i in range(maxQcallAttempt) :
1450 qcall = QCaller(cmd,self.infoLog)
1451 qcall.start()
1452 qcall.join(maxQcallWait)
1453 if not qcall.isAlive() : break
1454 self.infoLog("Trial %i of sge command has timed out. Killing process for cmd '%s'" % ((i + 1), cmd))
1455 qcall.killProc()
1456 self.infoLog("Finished attempting to kill sge command")
1457
1458 return qcall.results
1459
1460
1461
1462 if os.path.isfile(self.wrapFile): os.remove(self.wrapFile)
1463
1464
1465 qsubFile = os.path.join(os.path.dirname(self.wrapFile), "qsub.args.txt")
1466 if os.path.isfile(qsubFile): os.remove(qsubFile)
1467 qsubfp = open(qsubFile, "w")
1468 for arg in self.getFullCmd() :
1469 qsubfp.write(arg + "\n")
1470 qsubfp.close()
1471
1472 results = qcallWithTimeouts(self.getFullCmd())
1473
1474 isQsubError = False
1475 self.jobId = None
1476 if len(results.outList) != 1 :
1477 isQsubError = True
1478 else :
1479 w = results.outList[0].split()
1480 if (len(w) > 3) and (w[0] == "Your") and (w[1] == "job") :
1481 self.setNewJobId(int(w[2]))
1482 else :
1483 isQsubError = True
1484
1485 if not results.isComplete :
1486 self._killJob()
1487 retInfo.taskExitMsg = ["Job submission failure -- qsub command timed-out"]
1488 return retInfo
1489
1490 if isQsubError or (self.jobId is None):
1491 retInfo.taskExitMsg = ["Unexpected qsub output. Logging %i line(s) of qsub output below:" % (len(results.outList)) ]
1492 retInfo.taskExitMsg.extend([ "[qsub-out] " + line for line in results.outList ])
1493 return retInfo
1494
1495 if results.retval != 0 :
1496 retInfo.retval = results.retval
1497 retInfo.taskExitMsg = ["Job submission failure -- qsub returned exit code: %i" % (retInfo.retval)]
1498 return retInfo
1499
1500
1501 self.infoLog("Task submitted to sge queue with job_number: %i" % (self.jobId))
1502
1503
1504
1505
1506 queueStatus = Bunch(isQueued=True, runStartTimeStamp=None)
1507
1508 def checkWrapFileRunStart(result) :
1509 """
1510 check wrapper file for a line indicating that it has transitioned from queued to
1511 running state. Allow for NFS delay or incomplete file
1512 """
1513 if not os.path.isfile(self.wrapFile) : return
1514 for line in open(self.wrapFile) :
1515 w = line.strip().split()
1516 if (len(w) < 6) or (w[4] != "[wrapperSignal]") :
1517
1518
1519 return
1520 if w[5] == "taskStart" :
1521 result.runStartTimeStamp = timeStrToTimeStamp(w[0].strip('[]'))
1522 result.isQueued = False
1523 return
1524
1525
1526
1527 ewaiter = ExpWaiter(5, 1.7, 60)
1528
1529 pollCmd = ("/bin/bash", "--noprofile", "-o", "pipefail", "-c", "qstat -j %i | awk '/^error reason/'" % (self.jobId))
1530 while not self.stopped():
1531 results = qcallWithTimeouts(pollCmd, 6)
1532 isQstatError = False
1533 if results.retval != 0:
1534 if ((len(results.outList) == 2) and
1535 (results.outList[0].strip() == "Following jobs do not exist:") and
1536 (int(results.outList[1]) == self.jobId)) :
1537 break
1538 else :
1539 isQstatError = True
1540 else :
1541 if (len(results.outList) != 0) :
1542 isQstatError = True
1543
1544 if isQstatError :
1545 if not results.isComplete :
1546 retInfo.taskExitMsg = ["The qstat command for sge job_number %i has timed out for all attempted retries" % (self.jobId)]
1547 self._killJob()
1548 else :
1549 retInfo.taskExitMsg = ["Unexpected qstat output or task has entered sge error state. Sge job_number: %i" % (self.jobId)]
1550 retInfo.taskExitMsg.extend(["Logging %i line(s) of qstat output below:" % (len(results.outList)) ])
1551 retInfo.taskExitMsg.extend([ "[qstat-out] " + line for line in results.outList ])
1552
1553 return retInfo
1554
1555
1556 if queueStatus.isQueued :
1557 checkWrapFileRunStart(queueStatus)
1558 if not queueStatus.isQueued :
1559 self.setRunstate("running", queueStatus.runStartTimeStamp)
1560
1561 ewaiter.wait()
1562
1563 if self.stopped() :
1564
1565 return retInfo
1566
1567 lastJobId = self.jobId
1568
1569
1570
1571
1572 self.jobId = None
1573
1574 wrapResult = self.getWrapFileResult()
1575
1576 if wrapResult.taskExitCode is None :
1577 retInfo.taskExitMsg = ["Sge job_number: '%s'" % (lastJobId)]
1578 retInfo.taskExitMsg.extend(self.getWrapperErrorMsg())
1579 retInfo.retval = 1
1580 return retInfo
1581 elif wrapResult.taskExitCode != 0 :
1582 retInfo.taskExitMsg = self.getExitMsg()
1583
1584 retInfo.retval = wrapResult.taskExitCode
1585 retInfo.isAllowRetry = True
1586
1587
1588 return retInfo
1589
1590
1591 @lockMethod
1593 """
1594 (possibly) asynchronous job kill
1595 """
1596 try : isKilled = self.isKilled
1597 except AttributeError: isKilled = False
1598 if isKilled: return
1599
1600 try : jobId = self.jobId
1601 except AttributeError: jobId = None
1602 if jobId is None: return
1603 killCmd = ["qdel", "%i" % (int(jobId))]
1604
1605
1606 subprocess.Popen(killCmd, shell=False)
1607 self.isKilled = True
1608
1609
1610 @lockMethod
1612 """
1613 overload thead stop function to provide a
1614 qdel any running tasks.
1615 """
1616 CommandTaskRunner.stop(self)
1617 self._killJob()
1618
1622 """
1623 This class runs on a separate thread and is
1624 responsible for updating the state and info task
1625 files
1626 """
1627
1629 StoppableThread.__init__(self)
1630
1631 self.writeFunc = writeFunc
1632
1633 self.setDaemon(True)
1634 self.setName("TaskFileWriter-Thread")
1635
1636 self.isWrite = threading.Event()
1637
1643
1646
1648 if self.isWrite.isSet() :
1649 self.isWrite.clear()
1650 self.writeFunc()
1651
1655 """
1656 This class runs on a separate thread from workflowRunner,
1657 launching jobs based on the current state of the TaskDAG
1658 """
1659
1661 """
1662 @param cdata: data from WorkflowRunner instance which will be
1663 constant during the lifetime of the TaskManager,
1664 should be safe to lookup w/o locking
1665 @param tdag: task graph
1666 """
1667 StoppableThread.__init__(self)
1668
1669 self._cdata = cdata
1670 self.tdag = tdag
1671
1672 self.setDaemon(True)
1673 self.setName("TaskManager-Thread")
1674
1675
1676 self.lock = threading.RLock()
1677
1678 self.freeCores = self._cdata.param.nCores
1679 self.freeMemMb = self._cdata.param.memMb
1680 self.runningTasks = {}
1681
1682
1683
1684 self.taskMutexState = {}
1685
1686
1687
1708
1709
1711 """
1712 assist launch of a command-task
1713 """
1714
1715
1716 payload = task.payload
1717 param = self._cdata.param
1718
1719 if payload.cmd.cmd is None :
1720
1721 raise Exception("Attempting to launch checkpoint task: %s" % (task.fullLabel()))
1722
1723 isForcedLocal = ((param.mode != "local") and (payload.isForceLocal))
1724
1725
1726 if not isForcedLocal :
1727 if self.freeCores != "unlimited" :
1728 if (self.freeCores < payload.nCores) :
1729 raise Exception("Not enough free cores to launch task")
1730 self.freeCores -= payload.nCores
1731
1732 if self.freeMemMb != "unlimited" :
1733 if (self.freeMemMb < payload.memMb) :
1734 raise Exception("Not enough free memory to launch task")
1735 self.freeMemMb -= payload.memMb
1736
1737 if payload.mutex is not None :
1738 self.taskMutexState[payload.mutex] = True
1739
1740 TaskRunner = None
1741 if param.mode == "local" or payload.isForceLocal or payload.isCmdMakePath :
1742 TaskRunner = LocalTaskRunner
1743 elif param.mode == "sge" :
1744 TaskRunner = SGETaskRunner
1745 else :
1746 raise Exception("Can't support mode: '%s'" % (param.mode))
1747
1748
1749
1750
1751 taskRetry = payload.retry
1752
1753 if payload.isCmdMakePath :
1754 taskRetry = copy.deepcopy(payload.retry)
1755 taskRetry.window = 0
1756
1757 if param.mode == "local" or payload.isForceLocal :
1758 launchCmdList = ["make", "-j", str(payload.nCores)]
1759 elif param.mode == "sge" :
1760 launchCmdList = siteConfig.getSgeMakePrefix(payload.nCores, payload.memMb, param.schedulerArgList)
1761 else :
1762 raise Exception("Can't support mode: '%s'" % (param.mode))
1763
1764 launchCmdList.extend(["-C", payload.cmd.cmd])
1765 payload.launchCmd = Command(launchCmdList, payload.cmd.cwd, payload.cmd.env)
1766
1767
1768
1769
1770
1771
1772
1773 tmpDirId1 = "%03i" % ((int(task.id) / 1000))
1774 tmpDirId2 = "%03i" % ((int(task.id) % 1000))
1775 taskRunnerTmpDir = os.path.join(self._cdata.wrapperLogDir, tmpDirId1, tmpDirId2)
1776
1777 return TaskRunner(task.runStatus, self._cdata.getRunid(),
1778 task.fullLabel(), payload.launchCmd,
1779 payload.nCores, payload.memMb,
1780 taskRetry, param.isDryRun,
1781 self._cdata.taskStdoutFile,
1782 self._cdata.taskStderrFile,
1783 taskRunnerTmpDir,
1784 param.schedulerArgList,
1785 self._cdata.flowLog,
1786 task.setRunstate)
1787
1788
1795
1796
1798 """
1799 launch a specific task
1800 """
1801
1802 if task.payload.type() == "command" :
1803 trun = self._getCommandTaskRunner(task)
1804 elif task.payload.type() == "workflow" :
1805 trun = self._getWorkflowTaskRunner(task)
1806 else :
1807 assert 0
1808
1809 self._infoLog("Launching %s: '%s' from %s" % (task.payload.desc(), task.fullLabel(), namespaceLabel(task.namespace)))
1810 trun.start()
1811 self.runningTasks[task] = trun
1812
1813
1814 @lockMethod
1816 """
1817 determine what tasks, if any, can be started
1818
1819 Note that the lock is here to protect self.runningTasks
1820 """
1821
1822
1823 (ready, completed) = self.tdag.getReadyTasks()
1824 for node in completed:
1825 if self.stopped() : return
1826 self._infoLog("Completed %s: '%s' launched from %s" % (node.payload.desc(), node.fullLabel(), namespaceLabel(node.namespace)))
1827
1828
1829
1830 ready_workflows = [r for r in ready if r.payload.type() == "workflow"]
1831 for task in ready_workflows :
1832 if self.stopped() : return
1833 self._launchTask(task)
1834
1835
1836
1837 if (not self._cdata.isTaskSubmissionActive()) : return
1838
1839 isNonLocal = (self._cdata.param.mode != "local")
1840
1841
1842 ready_commands = [r for r in ready if r.payload.type() == "command"]
1843 ready_commands.sort(key=lambda t: (t.payload.priority, t.payload.nCores), reverse=True)
1844 for task in ready_commands :
1845 if self.stopped() : return
1846
1847
1848
1849 isForcedLocal = (isNonLocal and task.payload.isForceLocal)
1850 if not isForcedLocal :
1851 if ((self.freeCores != "unlimited") and (task.payload.nCores > self.freeCores)) : continue
1852 if ((self.freeMemMb != "unlimited") and (task.payload.memMb > self.freeMemMb)) : continue
1853
1854
1855 if ((task.payload.mutex is not None) and
1856 (task.payload.mutex in self.taskMutexState) and
1857 (self.taskMutexState[task.payload.mutex])) : continue
1858
1859 self._launchTask(task)
1860
1861
1863 """
1864 Given a running task which is already shown to be finished running, remove it from the running set, and
1865 recover allocated resources.
1866 """
1867 assert(task in self.runningTasks)
1868
1869
1870 param = self._cdata.param
1871
1872
1873 if task.payload.type() == "command" :
1874 isForcedLocal = ((param.mode != "local") and (task.payload.isForceLocal))
1875 if not isForcedLocal :
1876 if self.freeCores != "unlimited" :
1877 self.freeCores += task.payload.nCores
1878 if self.freeMemMb != "unlimited" :
1879 self.freeMemMb += task.payload.memMb
1880
1881 if task.payload.mutex is not None :
1882 self.taskMutexState[task.payload.mutex] = False
1883
1884 del self.runningTasks[task]
1885
1886
1887
1888 @lockMethod
1890 """
1891 Check the set of running tasks to see if they've completed and update
1892 Node status accordingly:
1893 """
1894 notrunning = set()
1895 for task in self.runningTasks.keys() :
1896 if self.stopped() : break
1897 trun = self.runningTasks[task]
1898 if not task.runStatus.isComplete.isSet() :
1899 if trun.isAlive() : continue
1900
1901 task.errorstate = 1
1902 task.errorMessage = "Thread: '%s', has stopped without a traceable cause" % (trun.getName())
1903 else :
1904 task.errorstate = task.runStatus.errorCode
1905 task.errorMessage = task.runStatus.errorMessage
1906
1907 if task.errorstate == 0 :
1908 task.setRunstate("complete")
1909 else:
1910 task.setRunstate("error")
1911
1912 notrunning.add(task)
1913
1914 if not task.isError() :
1915 self._infoLog("Completed %s: '%s' launched from %s" % (task.payload.desc(), task.fullLabel(), namespaceLabel(task.namespace)))
1916 else:
1917 msg = task.getTaskErrorMsg()
1918
1919 if self._cdata.isTaskSubmissionActive() :
1920
1921
1922
1923
1924 msg.extend(["Shutting down task submission. Waiting for remaining tasks to complete."])
1925
1926 self._errorLog(msg)
1927 if self._cdata.isTaskSubmissionActive() :
1928 self._cdata.emailNotification(msg, self._flowLog)
1929
1930
1931
1932
1933
1934 self._cdata.setTaskError(task)
1935
1936
1937 for task in notrunning :
1938 self._removeTaskFromRunningSet(task)
1939
1940 @lockMethod
1942 """
1943 Cancel a task and all of its children, without labeling the canceled tasks as errors
1944
1945 A canceled task will be stopped if it is running, or unqueued if it is waiting, and will be put into the
1946 waiting/ignored state unless it has already completed.
1947 """
1948
1949
1950 for child in task.children :
1951 self.cancelTaskTree(child)
1952
1953 self._infoLog("Canceling %s '%s' from %s" % (task.payload.desc(), task.fullLabel(), namespaceLabel(task.namespace)))
1954
1955
1956 if task in self.runningTasks :
1957 taskRunner = self.runningTasks[task]
1958 taskRunner.stop()
1959 self._removeTaskFromRunningSet(task)
1960
1961
1962 if not task.isDone() :
1963 task.runstate = "waiting"
1964 task.isIgnoreThis = True
1965
1966
1967
1968 @lockMethod
1973
1974
1975 @lockMethod
1977 for trun in self.runningTasks.values() :
1978 if trun.isAlive(): return False
1979 return True
1980
1981
1983
1984
1985
1986 if self.stopped() :
1987 while True :
1988 if self._areTasksDead() : return True
1989 time.sleep(1)
1990
1991
1992 if (not self._cdata.isTaskSubmissionActive()) :
1993 return (len(self.runningTasks) == 0)
1994 else :
1995 if self.tdag.isRunComplete() :
1996 if (len(self.runningTasks) != 0) :
1997 raise Exception("Inconsistent TaskManager state: workflow appears complete but there are still running tasks")
1998 return True
1999 elif self.tdag.isRunExhausted() :
2000 return True
2001 else :
2002 return False
2003
2004
2006 linePrefixOut = "[TaskManager]"
2007
2008 self._cdata.flowLog(msg, linePrefix=linePrefixOut, logState=logState)
2009
2010
2013
2016
2017
2018
2019
2020
2021
2022
2023 -class CmdPayload(object) :
2024 - def __init__(self, fullLabel, cmd, nCores, memMb, priority,
2025 isForceLocal, isCmdMakePath=False, isTaskStable=True,
2026 mutex=None, retry=None) :
2027 self.cmd = cmd
2028 self.nCores = nCores
2029 self.memMb = memMb
2030 self.priority = priority
2031 self.isForceLocal = isForceLocal
2032 self.isCmdMakePath = isCmdMakePath
2033 self.isTaskStable = isTaskStable
2034 self.mutex = mutex
2035 self.retry = retry
2036
2037
2038 self.launchCmd = cmd
2039
2040 if (cmd.cmd is None) and ((nCores != 0) or (memMb != 0)) :
2041 raise Exception("Null tasks should not have resource requirements. task: '%s'" % (fullLabel))
2042
2045
2047 return "command task"
2048
2054
2057
2063
2065 return "sub-workflow task"
2066
2070 """
2071 Represents an individual task in the task graph
2072 """
2073
2074 - def __init__(self, lock, init_id, namespace, label, payload, isContinued, isFinishedEvent, isWriteTaskStatus) :
2075 self.lock = lock
2076 self.id = init_id
2077 self.namespace = namespace
2078 self.label = label
2079 self.payload = payload
2080 self.isContinued = isContinued
2081 self.isWriteTaskStatus = isWriteTaskStatus
2082
2083
2084 self.isIgnoreThis = False
2085
2086
2087 self.isIgnoreChildren = False
2088
2089
2090
2091 self.isAutoCompleted = False
2092
2093
2094 self.isReset = False
2095
2096 self.parents = set()
2097 self.children = set()
2098 self.runstateUpdateTimeStamp = time.time()
2099 if self.isContinued:
2100 self.runstate = "complete"
2101 else:
2102 self.runstate = "waiting"
2103 self.errorstate = 0
2104
2105
2106 self.errorMessage = ""
2107
2108
2109 self.runStatus = RunningTaskStatus(isFinishedEvent)
2110
2112 msg = "TASK id: %s state: %s error: %i" % (self.fullLabel(), self.runstate, self.errorstate)
2113 return msg
2114
2117
2118 @lockMethod
2120 "task has gone as far as it can"
2121 return ((self.runstate == "error") or (self.runstate == "complete"))
2122
2123 @lockMethod
2125 "true if an error occurred in this node"
2126 return ((self.errorstate != 0) or (self.runstate == "error"))
2127
2128 @lockMethod
2130 "task completed without error"
2131 return ((self.errorstate == 0) and (self.runstate == "complete"))
2132
2133 @lockMethod
2135 "task is ready to be run"
2136 retval = ((self.runstate == "waiting") and (self.errorstate == 0) and (not self.isIgnoreThis))
2137 if retval :
2138 for p in self.parents :
2139 if p.isIgnoreThis : continue
2140 if not p.isComplete() :
2141 retval = False
2142 break
2143 return retval
2144
2145
2147 "recursive helper function for isDead()"
2148
2149
2150 if self in searched : return False
2151 searched.add(self)
2152
2153 if self.isError() : return True
2154 if self.isComplete() : return False
2155 for p in self.parents :
2156 if p._isDeadWalker(searched) : return True
2157 return False
2158
2159 @lockMethod
2161 """
2162 If true, there's no longer a point to waiting for this task,
2163 because it either has an error or there is an error in an
2164 upstream dependency
2165 """
2166
2167
2168
2169 searched = set()
2170 return self._isDeadWalker(searched)
2171
2172 @lockMethod
2173 - def setRunstate(self, runstate, updateTimeStamp=None) :
2174 """
2175 updateTimeStamp is only supplied in the case where the state
2176 transition time is interestingly different than the function
2177 call time. This can happen with the state update comes from
2178 a polling function with a long poll interval.
2179 """
2180 if runstate not in TaskNodeConstants.validRunstates :
2181 raise Exception("Can't set TaskNode runstate to %s" % (runstate))
2182
2183 if updateTimeStamp is None :
2184 self.runstateUpdateTimeStamp = time.time()
2185 else :
2186 self.runstateUpdateTimeStamp = updateTimeStamp
2187 self.runstate = runstate
2188 self.isWriteTaskStatus.set()
2189
2190 @lockMethod
2192 """
2193 generate consistent task error message from task state
2194 """
2195
2196 if not self.isError() : return []
2197
2198 msg = "Failed to complete %s: '%s' launched from %s" % (self.payload.desc(), self.fullLabel(), namespaceLabel(self.namespace))
2199 if self.payload.type() == "command" :
2200 msg += ", error code: %s, command: '%s'" % (str(self.errorstate), str(self.payload.launchCmd))
2201 elif self.payload.type() == "workflow" :
2202 msg += ", failed sub-workflow classname: '%s'" % (self.payload.name())
2203 else :
2204 assert 0
2205
2206 msg = lister(msg)
2207
2208 if self.errorMessage != "" :
2209 msg2 = ["Error Message:"]
2210 msg2.extend(lister(self.errorMessage))
2211 linePrefix = "[%s] " % (self.fullLabel())
2212 for i in range(len(msg2)) :
2213 msg2[i] = linePrefix + msg2[i]
2214 msg.extend(msg2)
2215
2216 return msg
2217
2221 """
2222 Holds all tasks and their dependencies.
2223
2224 Also responsible for task state persistence/continue across
2225 interrupted runs. Object is accessed by both the workflow and
2226 taskrunner threads, so it needs to be thread-safe.
2227 """
2228
2229 - def __init__(self, isContinue, isForceContinue, isDryRun,
2230 taskInfoFile, taskStateFile, workflowClassName,
2231 startFromTasks, ignoreTasksAfter, resetTasks,
2232 flowLog) :
2233 """
2234 No other object gets to access the taskStateFile, file locks
2235 are not required (but thread locks are)
2236 """
2237 self.isContinue = isContinue
2238 self.isForceContinue = isForceContinue
2239 self.isDryRun = isDryRun
2240 self.taskInfoFile = taskInfoFile
2241 self.taskStateFile = taskStateFile
2242 self.workflowClassName = workflowClassName
2243 self.startFromTasks = startFromTasks
2244 self.ignoreTasksAfter = ignoreTasksAfter
2245 self.resetTasks = resetTasks
2246 self.flowLog = flowLog
2247
2248
2249 self.taskId = 0
2250
2251
2252
2253 self.lastTaskIdWritten = 0
2254
2255
2256
2257
2258 self.addOrder = []
2259 self.labelMap = {}
2260 self.headNodes = set()
2261 self.tailNodes = set()
2262 self.lock = threading.RLock()
2263
2264
2265
2266
2267
2268 self.isFinishedEvent = threading.Event()
2269
2270 self.isWriteTaskInfo = None
2271 self.isWriteTaskStatus = None
2272
2273 @lockMethod
2275 return ((namespace, label) in self.labelMap)
2276
2277 @lockMethod
2278 - def getTask(self, namespace, label) :
2279 if (namespace, label) in self.labelMap :
2280 return self.labelMap[(namespace, label)]
2281 return None
2282
2283 @lockMethod
2285 "all tasks with no parents"
2286 return list(self.headNodes)
2287
2288 @lockMethod
2290 "all tasks with no (runnable) children"
2291 return list(self.tailNodes)
2292
2293 @lockMethod
2295 "get all nodes in this namespace"
2296 retval = []
2297 for (taskNamespace, taskLabel) in self.addOrder :
2298 if namespace != taskNamespace : continue
2299 node=self.labelMap[(taskNamespace, taskLabel)]
2300 if node.isIgnoreThis : continue
2301 retval.append(node)
2302 return retval
2303
2305
2306
2307 if node in searched : return True
2308 searched.add(node)
2309
2310 if not node.isIgnoreThis :
2311 if not node.isDone() :
2312 return False
2313 if node.isComplete() :
2314 for c in node.children :
2315 if not self._isRunExhaustedNode(c, searched) :
2316 return False
2317 return True
2318
2319 @lockMethod
2321 """
2322 Returns true if the run is as complete as possible due to errors
2323 """
2324
2325
2326
2327 searched = set()
2328 for node in self.getHeadNodes() :
2329 if not self._isRunExhaustedNode(node,searched) :
2330 return False
2331 return True
2332
2333
2334 @lockMethod
2336 "returns true if run is complete and error free"
2337 for node in self.labelMap.values():
2338 if node.isIgnoreThis : continue
2339 if not node.isComplete() :
2340 return False
2341 return True
2342
2343
2345 "helper function for getReadyTasks"
2346
2347 if node.isIgnoreThis : return
2348
2349 if node in searched : return
2350 searched.add(node)
2351
2352 if node.isReady() :
2353 ready.add(node)
2354 else:
2355 if not node.isComplete() :
2356 for c in node.parents :
2357 self._getReadyTasksFromNode(c, ready, searched)
2358
2359
2360 @lockMethod
2362 """
2363 Go through DAG from the tail nodes and find all tasks which
2364 have all prerequisites completed:
2365 """
2366
2367 completed = self.markCheckPointsComplete()
2368 ready = set()
2369
2370
2371 searched = set()
2372 for node in self.getTailNodes() :
2373 self._getReadyTasksFromNode(node, ready, searched)
2374 return (list(ready), list(completed))
2375
2376
2378 "helper function for markCheckPointsComplete"
2379
2380 if node.isIgnoreThis : return
2381
2382 if node in searched : return
2383 searched.add(node)
2384
2385 if node.isComplete() : return
2386
2387 for c in node.parents :
2388 self._markCheckPointsCompleteFromNode(c, completed, searched)
2389
2390 if (node.payload.type() == "command") and (node.payload.cmd.cmd is None) and (node.isReady()) :
2391 node.setRunstate("complete")
2392 completed.add(node)
2393
2394
2395 @lockMethod
2397 """
2398 traverse from tail nodes up, marking any checkpoint tasks
2399 (task.cmd=None) jobs that are ready as complete, return set
2400 of newly completed tasks:
2401 """
2402 completed = set()
2403
2404
2405 searched = set()
2406 for node in self.getTailNodes() :
2407 self._markCheckPointsCompleteFromNode(node, completed, searched)
2408 return completed
2409
2410
2411 @lockMethod
2412 - def addTask(self, namespace, label, payload, dependencies, isContinued=False) :
2413 """
2414 add new task to the DAG
2415
2416 isContinued indicates the task is being read from state history during a continuation run
2417 """
2418
2419
2420 fullLabel = namespaceJoin(namespace, label)
2421
2422
2423
2424 if not isContinued and self.isTaskPresent(namespace, label):
2425 if self.isContinue and self.labelMap[(namespace, label)].isContinued:
2426
2427 task = self.labelMap[(namespace, label)]
2428 parentLabels = set([p.label for p in task.parents])
2429 excPrefix = "Task: '%s' does not match previous definition defined in '%s'." % (fullLabel, self.taskInfoFile)
2430 if task.payload.type() != payload.type() :
2431 msg = excPrefix + " New/old payload type: '%s'/'%s'" % (payload.type(), task.payload.type())
2432 raise Exception(msg)
2433 if payload.isTaskStable :
2434 if (payload.type() == "command") and (str(task.payload.cmd) != str(payload.cmd)) :
2435 msg = excPrefix + " New/old command: '%s'/'%s'" % (str(payload.cmd), str(task.payload.cmd))
2436 if self.isForceContinue : self.flowLog(msg,logState=LogState.WARNING)
2437 else : raise Exception(msg)
2438 if (parentLabels != set(dependencies)) :
2439 msg = excPrefix + " New/old dependencies: '%s'/'%s'" % (",".join(dependencies), ",".join(parentLabels))
2440 if self.isForceContinue : self.flowLog(msg,logState=LogState.WARNING)
2441 else : raise Exception(msg)
2442 if payload.type() == "command" :
2443 task.payload.cmd = payload.cmd
2444 task.payload.isCmdMakePath = payload.isCmdMakePath
2445 task.isContinued = False
2446 return
2447 else:
2448 raise Exception("Task: '%s' is already in TaskDAG" % (fullLabel))
2449
2450 task = TaskNode(self.lock, self.taskId, namespace, label, payload, isContinued, self.isFinishedEvent, self.isWriteTaskStatus)
2451
2452 self.taskId += 1
2453
2454 self.addOrder.append((namespace, label))
2455 self.labelMap[(namespace, label)] = task
2456
2457 for d in dependencies :
2458 parent = self.getTask(namespace, d)
2459 if parent is task :
2460 raise Exception("Task: '%s' cannot specify its own task label as a dependency" % (fullLabel))
2461 if parent is None :
2462 raise Exception("Dependency: '%s' for task: '%s' does not exist in TaskDAG" % (namespaceJoin(namespace, d), fullLabel))
2463 task.parents.add(parent)
2464 parent.children.add(task)
2465
2466
2467 if isContinued :
2468 isReset=False
2469 if label in self.resetTasks :
2470 isReset=True
2471 else :
2472 for p in task.parents :
2473 if p.isReset :
2474 isReset = True
2475 break
2476 if isReset :
2477 task.setRunstate("waiting")
2478 task.isReset=True
2479
2480 if not isContinued:
2481 self.isWriteTaskInfo.set()
2482 self.isWriteTaskStatus.set()
2483
2484
2485 if label in self.ignoreTasksAfter :
2486 task.isIgnoreChildren = True
2487
2488
2489 for p in task.parents :
2490 if p.isIgnoreChildren :
2491 task.isIgnoreThis = True
2492 task.isIgnoreChildren = True
2493 break
2494
2495
2496 if len(task.parents) == 0 :
2497 self.headNodes.add(task)
2498
2499
2500 if (self.startFromTasks and
2501 (label not in self.startFromTasks)) :
2502 task.isAutoCompleted = True
2503 for p in task.parents :
2504 if not p.isAutoCompleted :
2505 task.isAutoCompleted = False
2506 break
2507
2508
2509 if task.isAutoCompleted and (len(task.parents) == 0) and (namespace != ""):
2510 wval=namespace.rsplit(namespaceSep,1)
2511 if len(wval) == 2 :
2512 (workflowNamespace,workflowLabel)=wval
2513 else :
2514 workflowNamespace=""
2515 workflowLabel=wval[0]
2516 workflowParent = self.labelMap[(workflowNamespace, workflowLabel)]
2517 if not workflowParent.isAutoCompleted :
2518 task.isAutoCompleted = False
2519
2520 if task.isAutoCompleted :
2521 task.setRunstate("complete")
2522
2523
2524 if not task.isIgnoreThis :
2525 self.tailNodes.add(task)
2526 for p in task.parents :
2527 if p in self.tailNodes :
2528 self.tailNodes.remove(p)
2529
2530
2531 if task.isDone() :
2532 for p in task.parents :
2533 if p.isIgnoreThis : continue
2534 if p.isComplete() : continue
2535 raise Exception("Task: '%s' has invalid continuation state. Task dependencies are incomplete")
2536
2537
2538 @lockMethod
2540 """
2541 (atomic on *nix) update of the runstate and errorstate for all tasks
2542 """
2543
2544 if self.isDryRun : return
2545
2546 tmpFile = self.taskStateFile + ".update.incomplete"
2547 tmpFp = open(tmpFile, "w")
2548 tmpFp.write(taskStateHeader())
2549 for (namespace, label) in self.addOrder :
2550 node = self.labelMap[(namespace, label)]
2551 runstateUpdateTimeStr = timeStampToTimeStr(node.runstateUpdateTimeStamp)
2552 tmpFp.write("%s\t%s\t%s\t%i\t%s\n" % (label, namespace, node.runstate, node.errorstate, runstateUpdateTimeStr))
2553 tmpFp.close()
2554
2555 forceRename(tmpFile, self.taskStateFile)
2556
2557
2558 @lockMethod
2560 """
2561 Enumerate status of command tasks (but look at sub-workflows to determine if specification is complete)
2562 """
2563
2564 val = Bunch(waiting=0, queued=0, running=0, complete=0, error=0, isAllSpecComplete=True,
2565 longestQueueSec=0, longestRunSec=0, longestQueueName="", longestRunName="")
2566
2567 currentSec = time.time()
2568 for (namespace, label) in self.addOrder :
2569 node = self.labelMap[(namespace, label)]
2570
2571 if node.payload.type() == "workflow" :
2572 if not node.runStatus.isSpecificationComplete.isSet() :
2573 val.isAllSpecComplete = False
2574
2575
2576 continue
2577
2578 taskTime = int(currentSec - node.runstateUpdateTimeStamp)
2579
2580 if node.runstate == "waiting" :
2581 val.waiting += 1
2582 elif node.runstate == "queued" :
2583 val.queued += 1
2584 if val.longestQueueSec < taskTime :
2585 val.longestQueueSec = taskTime
2586 val.longestQueueName = node.fullLabel()
2587 elif node.runstate == "running" :
2588 val.running += 1
2589 if val.longestRunSec < taskTime :
2590 val.longestRunSec = taskTime
2591 val.longestRunName = node.fullLabel()
2592 elif node.runstate == "complete" :
2593 val.complete += 1
2594 elif node.runstate == "error" :
2595 val.error += 1
2596
2597 return val
2598
2599
2600 @lockMethod
2602 """
2603 appends a description of all new tasks to the taskInfo file
2604 """
2605
2606 def getTaskLineFromTask(task) :
2607 """
2608 translate a task into its single-line summary format in the taskInfo file
2609 """
2610 depstring = ""
2611 if len(task.parents) :
2612 depstring = ",".join([p.label for p in task.parents])
2613
2614 cmdstring = ""
2615 nCores = "0"
2616 memMb = "0"
2617 priority = "0"
2618 isForceLocal = "0"
2619 payload = task.payload
2620 cwdstring = ""
2621 if payload.type() == "command" :
2622 cmdstring = str(payload.cmd)
2623 nCores = str(payload.nCores)
2624 memMb = str(payload.memMb)
2625 priority = str(payload.priority)
2626 isForceLocal = boolToStr(payload.isForceLocal)
2627 cwdstring = payload.cmd.cwd
2628 elif payload.type() == "workflow" :
2629 cmdstring = payload.name()
2630 else :
2631 assert 0
2632 return "\t".join((task.label, task.namespace, payload.type(),
2633 nCores, memMb, priority,
2634 isForceLocal, depstring, cwdstring, cmdstring))
2635
2636 assert (self.lastTaskIdWritten <= self.taskId)
2637
2638 if self.lastTaskIdWritten == self.taskId : return
2639
2640 newTaskLines = []
2641 while self.lastTaskIdWritten < self.taskId :
2642 task = self.labelMap[self.addOrder[self.lastTaskIdWritten]]
2643 newTaskLines.append(getTaskLineFromTask(task))
2644 self.lastTaskIdWritten += 1
2645
2646 fp = open(self.taskInfoFile, "a")
2647 for taskLine in newTaskLines :
2648 fp.write(taskLine + "\n")
2649 fp.close()
2650
2661 Exception.__init__(self)
2662 self.msg = msg
2663
2667 """
2668 All data used by the WorkflowRunner which will be constant over
2669 the lifetime of a TaskManager instance. All of the information in
2670 this class will be accessed by both threads without locking.
2671 """
2672
2674 self.lock = threading.RLock()
2675 self.pid = os.getpid()
2676 self.runcount = 0
2677 self.cwd = os.path.abspath(os.getcwd())
2678
2679 self.markFile = None
2680
2681
2682
2683
2684 self.flowLogFp = None
2685
2686 self.warningLogFp = None
2687 self.errorLogFp = None
2688
2689 self.resetRun()
2690
2691
2692 self.isHangUp = threading.Event()
2693 self._isStderrAlive = True
2694
2695
2696 @staticmethod
2698 """
2699 validate and refine raw run() parameters for use by workflow
2700 """
2701
2702 param.mailTo = setzer(param.mailTo)
2703 param.schedulerArgList = lister(param.schedulerArgList)
2704 if param.successMsg is not None :
2705 if not isString(param.successMsg) :
2706 raise Exception("successMsg argument to WorkflowRunner.run() is not a string")
2707
2708
2709 param.retry=RetryParam(param.mode,
2710 param.retryMax,
2711 param.retryWait,
2712 param.retryWindow,
2713 param.retryMode)
2714
2715
2716 if param.nCores is None :
2717 param.nCores = RunMode.data[param.mode].defaultCores
2718
2719
2720 if param.mode != "local" :
2721 param.memMb = "unlimited"
2722
2723 if param.mode == "sge" :
2724 if siteConfig.maxSGEJobs != "unlimited" :
2725 if ((param.nCores == "unlimited") or
2726 (int(param.nCores) > int(siteConfig.maxSGEJobs))) :
2727 param.nCores = int(siteConfig.maxSGEJobs)
2728
2729 if param.nCores != "unlimited" :
2730 param.nCores = int(param.nCores)
2731 if param.nCores < 1 :
2732 raise Exception("Invalid run mode nCores argument: %s. Value must be 'unlimited' or an integer no less than 1" % (param.nCores))
2733
2734 if param.memMb is None :
2735 if param.nCores == "unlimited" :
2736 param.memMb = "unlimited"
2737 mpc = RunMode.data[param.mode].defaultMemMbPerCore
2738 if mpc == "unlimited" :
2739 param.memMb = "unlimited"
2740 else :
2741 param.memMb = mpc * param.nCores
2742 elif param.memMb != "unlimited" :
2743 param.memMb = int(param.memMb)
2744 if param.memMb < 1 :
2745 raise Exception("Invalid run mode memMb argument: %s. Value must be 'unlimited' or an integer no less than 1" % (param.memMb))
2746
2747
2748 if param.mode not in RunMode.data.keys() :
2749 raise Exception("Invalid mode argument '%s'. Accepted modes are {%s}." \
2750 % (param.mode, ",".join(RunMode.data.keys())))
2751
2752 if param.mode == "sge" :
2753
2754 def checkSgeProg(prog) :
2755 proc = subprocess.Popen(("which", prog), stdout=open(os.devnull, "w"), shell=False)
2756 retval = proc.wait()
2757 if retval != 0 : raise Exception("Run mode is sge, but no %s in path" % (prog))
2758 checkSgeProg("qsub")
2759 checkSgeProg("qstat")
2760
2761
2762 stateDir = os.path.join(param.dataDir, "state")
2763 if param.isContinue == "Auto" :
2764 param.isContinue = os.path.exists(stateDir)
2765
2766 if param.isContinue :
2767 if not os.path.exists(stateDir) :
2768 raise Exception("Cannot continue run without providing a pyflow dataDir containing previous state.: '%s'" % (stateDir))
2769
2770 for email in param.mailTo :
2771 if not verifyEmailAddy(email):
2772 raise Exception("Invalid email address: '%s'" % (email))
2773
2774
2775
2777 if (self.warningLogFp is None) and (self.param.warningLogFile is not None) :
2778 self.warningLogFp = open(self.param.warningLogFile,"w")
2779
2780 if (self.errorLogFp is None) and (self.param.errorLogFile is not None) :
2781 self.errorLogFp = open(self.param.errorLogFile,"w")
2782
2783
2784
2786 self.param = param
2787
2788
2789
2790 self.param.dataDir = os.path.abspath(self.param.dataDir)
2791 self.param.dataDir = os.path.join(self.param.dataDir, "pyflow.data")
2792 logDir = os.path.join(self.param.dataDir, "logs")
2793 ensureDir(logDir)
2794 self.flowLogFile = os.path.join(logDir, "pyflow_log.txt")
2795 self.flowLogFp = open(self.flowLogFile, "a")
2796
2797
2798 self._validateFixParam(self.param)
2799
2800
2801 self.taskErrors = set()
2802 self.isTaskManagerException = False
2803
2804
2805 ensureDir(self.param.dataDir)
2806
2807
2808 self.markFile = os.path.join(self.param.dataDir, "active_pyflow_process.txt")
2809 if os.path.exists(self.markFile) :
2810
2811
2812 self.flowLogFp = None
2813 self.param.isQuiet = False
2814 msg = [ "Can't initialize pyflow run because the data directory appears to be in use by another process.",
2815 "\tData directory: '%s'" % (self.param.dataDir),
2816 "\tIt is possible that a previous process was abruptly interrupted and did not clean up properly. To determine if this is",
2817 "\tthe case, please refer to the file '%s'" % (self.markFile),
2818 "\tIf this file refers to a non-running process, delete the file and relaunch pyflow,",
2819 "\totherwise, specify a new data directory. At the API-level this can be done with the dataDirRoot option." ]
2820 self.markFile = None
2821 raise DataDirException(msg)
2822 else :
2823 mfp = open(self.markFile, "w")
2824 msg = """
2825 This file provides details of the pyflow instance currently using this data directory.
2826 During normal pyflow run termination (due to job completion, error, SIGINT, etc...),
2827 this file should be deleted. If this file is present it should mean either:
2828 (1) the data directory is still in use by a running workflow
2829 (2) a sudden job failure occurred that prevented normal run termination
2830
2831 The associated pyflow job details are as follows:
2832 """
2833 mfp.write(msg + "\n")
2834 for line in self.getInfoMsg() :
2835 mfp.write(line + "\n")
2836 mfp.write("\n")
2837 mfp.close()
2838
2839 stateDir = os.path.join(self.param.dataDir, "state")
2840 ensureDir(stateDir)
2841
2842
2843 self.runcount += 1
2844
2845
2846 self.wrapperLogDir = os.path.join(logDir, "tmp", "taskWrapperLogs")
2847 ensureDir(self.wrapperLogDir)
2848 stackDumpLogDir = os.path.join(logDir, "tmp", "stackDumpLog")
2849 ensureDir(stackDumpLogDir)
2850
2851
2852 taskStateFileName = "pyflow_tasks_runstate.txt"
2853 taskInfoFileName = "pyflow_tasks_info.txt"
2854
2855 self.taskStdoutFile = os.path.join(logDir, "pyflow_tasks_stdout_log.txt")
2856 self.taskStderrFile = os.path.join(logDir, "pyflow_tasks_stderr_log.txt")
2857 self.taskStateFile = os.path.join(stateDir, taskStateFileName)
2858 self.taskInfoFile = os.path.join(stateDir, taskInfoFileName)
2859 self.taskDotScriptFile = os.path.join(stateDir, "make_pyflow_task_graph.py")
2860
2861 self.stackDumpLogFile = os.path.join(stackDumpLogDir, "pyflow_stack_dump.txt")
2862
2863
2864 if not self.param.isContinue:
2865 fp = open(self.taskInfoFile, "w")
2866 fp.write(taskInfoHeader())
2867 fp.close()
2868
2869 self._setCustomLogs()
2870
2871
2872
2873
2874
2875 try :
2876 writeDotScript(self.taskDotScriptFile, taskInfoFileName, taskStateFileName, self.param.workflowClassName)
2877 except OSError:
2878 msg = ["Failed to write task graph visualization script to %s" % (self.taskDotScriptFile)]
2879 self.flowLog(msg,logState=LogState.WARNING)
2880
2881
2883 """
2884 Anything that needs to be cleaned up at the end of a run
2885
2886 Right now this just make sure we don't log to the previous run's log file
2887 """
2888 self.flowLogFile = None
2889 self.param = None
2890 if self.flowLogFp is not None :
2891 self.flowLogFp.close()
2892 self.flowLogFp = None
2893
2894 if self.warningLogFp is not None :
2895 self.warningLogFp.close()
2896 self.warningLogFp = None
2897
2898 if self.errorLogFp is not None :
2899 self.errorLogFp.close()
2900 self.errorLogFp = None
2901
2902 if self.markFile is not None :
2903 if os.path.exists(self.markFile) : os.unlink(self.markFile)
2904
2905 self.markFile = None
2906
2908 return "%s_%s" % (self.pid, self.runcount)
2909
2910 @lockMethod
2912 self.taskErrors.add(task)
2913
2914 @lockMethod
2916 return (len(self.taskErrors) != 0)
2917
2919 """
2920 wait() pollers need to know if task submission has been
2921 shutdown to implement sane behavior.
2922 """
2923 return (not self.isTaskError())
2924
2925 @lockMethod
2927 self.isTaskManagerException = True
2928
2929 @lockMethod
2931 linePrefixOut = "[%s]" % (self.getRunid())
2932 if linePrefix is not None :
2933 linePrefixOut += " " + linePrefix
2934
2935 if (logState == LogState.ERROR) or (logState == LogState.WARNING) :
2936 linePrefixOut += " [" + LogState.toString(logState) + "]"
2937
2938 ofpList = []
2939 isAddStderr = (self._isStderrAlive and ((self.flowLogFp is None) or (self.param is None) or (not self.param.isQuiet)))
2940 if isAddStderr:
2941 ofpList.append(sys.stderr)
2942 if self.flowLogFp is not None :
2943 ofpList.append(self.flowLogFp)
2944
2945
2946 try :
2947 self._setCustomLogs()
2948 except :
2949 pass
2950
2951 if (self.warningLogFp is not None) and (logState == LogState.WARNING) :
2952 ofpList.append(self.warningLogFp)
2953 if (self.errorLogFp is not None) and (logState == LogState.ERROR) :
2954 ofpList.append(self.errorLogFp)
2955
2956 if len(ofpList) == 0 : return
2957 retval = log(ofpList, msg, linePrefixOut)
2958
2959
2960 if isAddStderr and (not retval[0]) :
2961 if self.isHangUp.isSet() :
2962 self._isStderrAlive = False
2963
2964
2966 """
2967 return a string array with general stats about this run
2968 """
2969
2970 msg = [ "%s\t%s" % ("pyFlowClientWorkflowClass:", self.param.workflowClassName),
2971 "%s\t%s" % ("pyFlowVersion:", __version__),
2972 "%s\t%s" % ("pythonVersion:", pythonVersion),
2973 "%s\t%s" % ("Runid:", self.getRunid()),
2974 "%s\t%s UTC" % ("RunStartTime:", self.param.logRunStartTime),
2975 "%s\t%s UTC" % ("NotificationTime:", timeStrNow()),
2976 "%s\t%s" % ("HostName:", siteConfig.getHostName()),
2977 "%s\t%s" % ("WorkingDir:", self.cwd),
2978 "%s\t%s" % ("DataDir:", self.param.dataDir),
2979 "%s\t'%s'" % ("ProcessCmdLine:", cmdline()) ]
2980 return msg
2981
2982
2984
2985
2986
2987
2988
2989
2990
2991
2992 if self.param is None : return
2993 if len(self.param.mailTo) == 0 : return
2994
2995 if not isLocalSmtp() :
2996 if emailErrorLog :
2997 msg = ["email notification failed, no local smtp server"]
2998 emailErrorLog(msg,logState=LogState.WARNING)
2999 return
3000
3001 mailTo = sorted(list(self.param.mailTo))
3002 subject = "pyflow notification from %s run: %s" % (self.param.workflowClassName, self.getRunid())
3003 msg = msgListToMsg(msgList)
3004 fullMsgList = ["Message:",
3005 '"""',
3006 msg,
3007 '"""']
3008 fullMsgList.extend(self.getInfoMsg())
3009
3010 import smtplib
3011 try:
3012 sendEmail(mailTo, siteConfig.mailFrom, subject, fullMsgList)
3013 except smtplib.SMTPException :
3014 if emailErrorLog is None : raise
3015 msg = ["email notification failed"]
3016 eMsg = lister(getExceptionMsg())
3017 msg.extend(eMsg)
3018 emailErrorLog(msg,logState=LogState.WARNING)
3019
3023 """
3024 This object is designed to be inherited by a class in
3025 client code. This inheriting class can override the
3026 L{workflow()<WorkflowRunner.workflow>} method to define the
3027 tasks that need to be run and their dependencies.
3028
3029 The inheriting class defining a workflow can be executed in
3030 client code by calling the WorkflowRunner.run() method.
3031 This method provides various run options such as whether
3032 to run locally or on sge.
3033 """
3034
3035
3036 _maxWorkflowRecursion = 30
3037 """
3038 This limit protects against a runaway forkbomb in case a
3039 workflow task recursively adds itself w/o termination:
3040 """
3041
3042
3043 - def run(self,
3044 mode="local",
3045 dataDirRoot=".",
3046 isContinue=False,
3047 isForceContinue=False,
3048 nCores=None,
3049 memMb=None,
3050 isDryRun=False,
3051 retryMax=2,
3052 retryWait=90,
3053 retryWindow=360,
3054 retryMode="nonlocal",
3055 mailTo=None,
3056 updateInterval=60,
3057 schedulerArgList=None,
3058 isQuiet=False,
3059 warningLogFile=None,
3060 errorLogFile=None,
3061 successMsg=None,
3062 startFromTasks=None,
3063 ignoreTasksAfter=None,
3064 resetTasks=None) :
3065 """
3066 Call this method to execute the workflow() method overridden
3067 in a child class and specify the resources available for the
3068 workflow to run.
3069
3070 Task retry behavior: Retry attempts will be made per the
3071 arguments below for distributed workflow runs (eg. sge run
3072 mode). Note this means that retries will be attempted for
3073 tasks with an 'isForceLocal' setting during distributed runs.
3074
3075 Task error behavior: When a task error occurs the task
3076 manager stops submitting new tasks and allows all currently
3077 running tasks to complete. Note that in this case 'task error'
3078 means that the task could not be completed after exhausting
3079 attempted retries.
3080
3081 Workflow exception behavior: Any exceptions thrown from the
3082 python code of classes derived from WorkflowRunner will be
3083 logged and trigger notification (e.g. email). The exception
3084 will not come down to the client's stack. In sub-workflows the
3085 exception is handled exactly like a task error (ie. task
3086 submission is shut-down and remaining tasks are allowed to
3087 complete). An exception in the master workflow will lead to
3088 workflow termination without waiting for currently running
3089 tasks to finish.
3090
3091 @return: 0 if all tasks completed successfully and 1 otherwise
3092
3093 @param mode: Workflow run mode. Current options are (local|sge)
3094
3095 @param dataDirRoot: All workflow data is written to
3096 {dataDirRoot}/pyflow.data/ These include
3097 workflow/task logs, persistent task state data,
3098 and summary run info. Two workflows cannot
3099 simultaneously use the same dataDir.
3100
3101 @param isContinue: If True, continue workflow from a previous
3102 incomplete run based on the workflow data
3103 files. You must use the same dataDirRoot as a
3104 previous run for this to work. Set to 'Auto' to
3105 have the run continue only if the previous
3106 dataDir exists. (default: False)
3107
3108 @param isForceContinue: Only used if isContinue is not False. Normally
3109 when isContinue is run, the commands of
3110 completed tasks are checked to ensure they
3111 match. When isForceContinue is true,
3112 failing this check is reduced from an error
3113 to a warning
3114
3115 @param nCores: Total number of cores available, or 'unlimited', sge
3116 is currently configured for a maximum job count of
3117 %s, any value higher than this in sge mode will be
3118 reduced to the maximum. (default: 1 for local mode,
3119 %s for sge mode)
3120
3121 @param memMb: Total memory available (in megabytes), or 'unlimited',
3122 Note that this value will be ignored in non-local modes
3123 (such as sge), because in this case total memory available
3124 is expected to be known by the scheduler for each node in its
3125 cluster. (default: %i*nCores for local mode, 'unlimited'
3126 for sge mode)
3127
3128 @param isDryRun: List the commands to be executed without running
3129 them. Note that recursive and dynamic workflows
3130 will potentially have to account for the fact that
3131 expected files will be missing -- here 'recursive
3132 workflow' refers to any workflow which uses the
3133 addWorkflowTask() method, and 'dynamic workflow'
3134 refers to any workflow which uses the
3135 waitForTasks() method. These types of workflows
3136 can query this status with the isDryRun() to make
3137 accomadations. (default: False)
3138
3139 @param retryMax: Maximum number of task retries
3140
3141 @param retryWait: Delay (in seconds) before resubmitting task
3142
3143 @param retryWindow: Maximum time (in seconds) after the first task
3144 submission in which retries are allowed. A value of
3145 zero or less puts no limit on the time when retries
3146 will be attempted. Retries are always allowed (up to
3147 retryMax times), for failed make jobs.
3148
3149 @param retryMode: Modes are 'nonlocal' and 'all'. For 'nonlocal'
3150 retries are not attempted in local run mode. For 'all'
3151 retries are attempted for any run mode. The default mode
3152 is 'nonolocal'.
3153
3154 @param mailTo: An email address or container of email addresses. Notification
3155 will be sent to each email address when
3156 either (1) the run successfully completes (2) the
3157 first task error occurs or (3) an unhandled
3158 exception is raised. The intention is to send one
3159 status message per run() indicating either success
3160 or the reason for failure. This should occur for all
3161 cases except a host hardware/power failure. Note
3162 that mail comes from '%s' (configurable),
3163 which may be classified as junk-mail by your system.
3164
3165 @param updateInterval: How often (in minutes) should pyflow log a
3166 status update message summarizing the run
3167 status. Set this to zero or less to turn
3168 the update off.
3169
3170 @param schedulerArgList: A list of arguments can be specified to be
3171 passed on to an external scheduler when non-local
3172 modes are used (e.g. in sge mode you could pass
3173 schedulerArgList=['-q','work.q'] to put the whole
3174 pyflow job into the sge work.q queue)
3175
3176 @param isQuiet: Don't write any logging output to stderr (but still write
3177 log to pyflow_log.txt)
3178
3179 @param warningLogFile: Replicate all warning messages to the specified file. Warning
3180 messages will still appear in the standard logs, this
3181 file will contain a subset of the log messages pertaining to
3182 warnings only.
3183
3184 @param errorLogFile: Replicate all error messages to the specified file. Error
3185 messages will still appear in the standard logs, this
3186 file will contain a subset of the log messages pertaining to
3187 errors only. It should be empty for a successful run.
3188
3189 @param successMsg: Provide a string containing a custom message which
3190 will be prepended to pyflow's standard success
3191 notification. This message will appear in the log
3192 and any configured notifications (e.g. email). The
3193 message may contain linebreaks.
3194
3195 @param startFromTasks: A task label or container of task labels. Any tasks which
3196 are not in this set or descendants of this set will be marked as
3197 completed.
3198 @type startFromTasks: A single string, or set, tuple or list of strings
3199
3200 @param ignoreTasksAfter: A task label or container of task labels. All descendants
3201 of these task labels will be ignored.
3202 @type ignoreTasksAfter: A single string, or set, tuple or list of strings
3203
3204 @param resetTasks: A task label or container of task labels. These tasks and all
3205 of their descendants will be reset to the "waiting" state to be re-run.
3206 Note this option will only affect a workflow which has been continued
3207 from a previous run. This will not override any nodes altered by the
3208 startFromTasks setting in the case that both options are used together.
3209 @type resetTasks: A single string, or set, tuple or list of strings
3210 """
3211
3212
3213
3214 inHandlers = Bunch(isSet=False)
3215
3216 class SigTermException(Exception) : pass
3217
3218 def sigtermHandler(_signum, _frame) :
3219 raise SigTermException
3220
3221 def sighupHandler(_signum, _frame) :
3222 self._warningLog("pyflow recieved hangup signal. pyflow will continue, but this signal may still interrupt running tasks.")
3223
3224 self._cdata().isHangUp.set()
3225
3226 def set_pyflow_sig_handlers() :
3227 import signal
3228 if not inHandlers.isSet :
3229 inHandlers.sigterm = signal.getsignal(signal.SIGTERM)
3230 if not isWindows() :
3231 inHandlers.sighup = signal.getsignal(signal.SIGHUP)
3232 inHandlers.isSet = True
3233 try:
3234 signal.signal(signal.SIGTERM, sigtermHandler)
3235 if not isWindows() :
3236 signal.signal(signal.SIGHUP, sighupHandler)
3237 except ValueError:
3238 if isMainThread() :
3239 raise
3240 else :
3241 self._warningLog("pyflow has not been initialized on main thread, all custom signal handling disabled")
3242
3243
3244 def unset_pyflow_sig_handlers() :
3245 import signal
3246 if not inHandlers.isSet : return
3247 try :
3248 signal.signal(signal.SIGTERM, inHandlers.sigterm)
3249 if not isWindows() :
3250 signal.signal(signal.SIGHUP, inHandlers.sighup)
3251 except ValueError:
3252 if isMainThread() :
3253 raise
3254 else:
3255 pass
3256
3257
3258
3259 retval = 1
3260 try:
3261 set_pyflow_sig_handlers()
3262
3263 def exceptionMessaging(prefixMsg=None) :
3264 msg = lister(prefixMsg)
3265 eMsg = lister(getExceptionMsg())
3266 msg.extend(eMsg)
3267 self._notify(msg,logState=LogState.ERROR)
3268
3269 try:
3270 self.runStartTimeStamp = time.time()
3271 self.updateInterval = int(updateInterval)
3272
3273 param = Bunch(mode=mode,
3274 dataDir=dataDirRoot,
3275 isContinue=isContinue,
3276 isForceContinue=isForceContinue,
3277 nCores=nCores,
3278 memMb=memMb,
3279 isDryRun=isDryRun,
3280 retryMax=retryMax,
3281 retryWait=retryWait,
3282 retryWindow=retryWindow,
3283 retryMode=retryMode,
3284 mailTo=mailTo,
3285 logRunStartTime=timeStampToTimeStr(self.runStartTimeStamp),
3286 workflowClassName=self._whoami(),
3287 schedulerArgList=schedulerArgList,
3288 isQuiet=isQuiet,
3289 warningLogFile=warningLogFile,
3290 errorLogFile=errorLogFile,
3291 successMsg=successMsg,
3292 startFromTasks=setzer(startFromTasks),
3293 ignoreTasksAfter=setzer(ignoreTasksAfter),
3294 resetTasks=setzer(resetTasks))
3295 retval = self._runWorkflow(param)
3296
3297 except SigTermException:
3298 msg = "Received termination signal, shutting down running tasks..."
3299 self._killWorkflow(msg)
3300 except KeyboardInterrupt:
3301 msg = "Keyboard Interrupt, shutting down running tasks..."
3302 self._killWorkflow(msg)
3303 except DataDirException, e:
3304 self._notify(e.msg,logState=LogState.ERROR)
3305 except:
3306 exceptionMessaging()
3307 raise
3308
3309 finally:
3310
3311 self._cdata().resetRun()
3312 unset_pyflow_sig_handlers()
3313
3314 return retval
3315
3316
3317
3318 run.__doc__ = run.__doc__ % (siteConfig.maxSGEJobs,
3319 RunMode.data["sge"].defaultCores,
3320 siteConfig.defaultHostMemMbPerCore,
3321 siteConfig.mailFrom)
3322
3323
3324
3325
3326
3327 - def addTask(self, label, command=None, cwd=None, env=None, nCores=1,
3328 memMb=siteConfig.defaultTaskMemMb,
3329 dependencies=None, priority=0,
3330 isForceLocal=False, isCommandMakePath=False, isTaskStable=True,
3331 mutex=None,
3332 retryMax=None, retryWait=None, retryWindow=None, retryMode=None) :
3333 """
3334 Add task to workflow, including resource requirements and
3335 specification of dependencies. Dependency tasks must already
3336 exist in the workflow.
3337
3338 @return: The 'label' argument is returned without modification.
3339
3340
3341 @param label: A string used to identify each task. The label must
3342 be composed of only ascii letters, digits,
3343 underscores and dashes (ie. /[A-Za-z0-9_-]+/). The
3344 label must also be unique within the workflow, and
3345 non-empty.
3346
3347 @param command: The task command. Commands can be: (1) a shell
3348 string (2) an iterable container of strings (argument
3349 list) (3) None. In all cases strings must not contain
3350 newline characters. A single string is typically used
3351 for commands that require shell features (such as
3352 pipes), an argument list can be used for any other
3353 commands, this is often a useful way to simplify
3354 quoting issues or to submit extremely long
3355 commands. The default command (None), can be used to
3356 create a 'checkpoint', ie. a task which does not run
3357 anything, but provides a label associated with the
3358 completion of a set of dependencies.
3359
3360 @param cwd: Specify current working directory to use for
3361 command execution. Note that if submitting the
3362 command as an argument list (as opposed to a shell
3363 string) the executable (arg[0]) is searched for
3364 before changing the working directory, so you cannot
3365 specify the executable relative to the cwd
3366 setting. If submitting a shell string command this
3367 restriction does not apply.
3368
3369 @param env: A map of environment variables for this task, for
3370 example 'env={"PATH": "/usr/bin"}'. When env is set
3371 to None (the default) the environment of the pyflow
3372 client process is used.
3373
3374 @param nCores: Number of cpu threads required
3375
3376 @param memMb: Amount of memory required (in megabytes)
3377
3378 @param dependencies: A task label or container of task labels specifying all dependent
3379 tasks. Dependent tasks must already exist in
3380 the workflow.
3381 @type dependencies: A single string, or set, tuple or list of strings
3382
3383
3384 @param priority: Among all tasks which are eligible to run at
3385 the same time, launch tasks with higher priority
3386 first. this value can be set from[-100,100]. Note
3387 that this will strongly control the order of task
3388 launch on a local run, but will only control task
3389 submission order to a secondary scheduler (like
3390 sge). All jobs with the same priority are already
3391 submitted in order from highest to lowest nCores
3392 requested, so there is no need to set priorities to
3393 replicate this behavior. The taskManager can start
3394 executing tasks as soon as each addTask() method is
3395 called, so lower-priority tasks may be launched first
3396 if they are specified first in the workflow.
3397
3398 @param isForceLocal: Force this task to run locally when a
3399 distributed task mode is used. This can be used to
3400 launch very small jobs outside of the sge queue. Note
3401 that 'isForceLocal' jobs launched during a non-local
3402 task mode are not subject to resource management, so
3403 it is important that these represent small
3404 jobs. Tasks which delete, move or touch a small
3405 number of files are ideal for this setting.
3406
3407 @param isCommandMakePath: If true, command is assumed to be a
3408 path containing a makefile. It will be run using
3409 make/qmake according to the run's mode and the task's
3410 isForceLocal setting
3411
3412 @param isTaskStable: If false, indicates that the task command
3413 and/or dependencies may change if the run is
3414 interrupted and restarted. A command marked as
3415 unstable will not be checked to make sure it matches
3416 its previous definition during run continuation.
3417 Unstable examples: command contains a date/time, or
3418 lists a set of files which are deleted at some point
3419 in the workflow, etc.
3420
3421 @param mutex: Provide an optional id associated with a pyflow
3422 task mutex. For all tasks with the same mutex id, no more
3423 than one will be run at once. Id name must follow task id
3424 restrictions. Mutex ids are global across all recursively
3425 invoked workflows.
3426 Example use case: This feature has been added as a simpler
3427 alternative to file locking, to ensure sequential, but not
3428 ordered, access to a file.
3429
3430 @param retryMax: The number of times this task will be retried
3431 after failing. If defined, this overrides the workflow
3432 retryMax value.
3433
3434 @param retryWait: The number of seconds to wait before relaunching
3435 a failed task. If defined, this overrides the workflow
3436 retryWait value.
3437
3438 @param retryWindow: The number of seconds after job submission in
3439 which retries will be attempted for non-make jobs. A value of
3440 zero or less causes retries to be attempted anytime after
3441 job submission. If defined, this overrides the workflow
3442 retryWindow value.
3443
3444 @param retryMode: Modes are 'nonlocal' and 'all'. For 'nonlocal'
3445 retries are not attempted in local run mode. For 'all'
3446 retries are attempted for any run mode. If defined, this overrides
3447 the workflow retryMode value.
3448 """
3449
3450 self._requireInWorkflow()
3451
3452
3453
3454
3455
3456
3457
3458
3459
3460
3461 isForceLocal = argToBool(isForceLocal)
3462 isCommandMakePath = argToBool(isCommandMakePath)
3463
3464
3465 nCores = int(nCores)
3466 memMb = int(memMb)
3467 priority = int(priority)
3468 if (priority > 100) or (priority < -100) :
3469 raise Exception("priority must be an integer in the range [-100,100]")
3470
3471
3472 WorkflowRunner._checkTaskLabel(label)
3473
3474 fullLabel = namespaceJoin(self._getNamespace(), label)
3475
3476
3477 cmd = Command(command, cwd, env)
3478
3479
3480 if cmd.cmd is None :
3481 nCores = 0
3482 memMb = 0
3483 else:
3484 if nCores <= 0 :
3485 raise Exception("Unexpected core requirement for task: '%s' nCores: %i" % (fullLabel, nCores))
3486 if memMb <= 0:
3487 raise Exception("Unexpected memory requirement for task: '%s' memory: %i (megabytes)" % (fullLabel, memMb))
3488
3489
3490 if (self._cdata().param.nCores != "unlimited") and (nCores > self._cdata().param.nCores) :
3491 raise Exception("Task core requirement exceeds full available resources")
3492
3493 if (self._cdata().param.memMb != "unlimited") and (memMb > self._cdata().param.memMb) :
3494 raise Exception("Task memory requirement exceeds full available resources")
3495
3496
3497
3498 if isCommandMakePath :
3499 if cmd.type != "str" :
3500 raise Exception("isCommandMakePath is set, but no path is provided in task: '%s'" % (fullLabel))
3501 cmd.cmd = os.path.abspath(cmd.cmd)
3502
3503
3504 if mutex is not None :
3505 WorkflowRunner._checkTaskLabel(mutex)
3506
3507 task_retry = self._cdata().param.retry.getTaskCopy(retryMax, retryWait, retryWindow, retryMode)
3508
3509
3510
3511 payload = CmdPayload(fullLabel, cmd, nCores, memMb, priority, isForceLocal, isCommandMakePath, isTaskStable, mutex, task_retry)
3512 self._addTaskCore(self._getNamespace(), label, payload, dependencies)
3513 return label
3514
3515
3516
3517
3518 - def addWorkflowTask(self, label, workflowRunnerInstance, dependencies=None) :
3519 """
3520 Add another WorkflowRunner instance as a task to this
3521 workflow. The added Workflow's workflow() method will be
3522 called once the dependencies specified in this call have
3523 completed. Once started, all of the submitted workflow's
3524 method calls (like addTask) will be placed into the enclosing
3525 workflow instance and bound by the run parameters of the
3526 enclosing workflow.
3527
3528 This task will be marked complete once the submitted workflow's
3529 workflow() method has finished, and any tasks it initiated have
3530 completed.
3531
3532 Note that all workflow tasks will have their own tasks namespaced
3533 with the workflow task label. This namespace is recursive in the
3534 case that you add workflow tasks which add their own workflow
3535 tasks, etc.
3536
3537 Note that the submitted workflow instance will be deep copied
3538 before being altered in any way.
3539
3540 @return: The 'label' argument is returned without modification.
3541
3542 @param label: A string used to identify each task. The label must
3543 be composed of only ascii letters, digits,
3544 underscores and dashes (ie. /[A-Za-z0-9_-]+/). The
3545 label must also be unique within the workflow, and
3546 non-empty.
3547
3548 @param workflowRunnerInstance: A L{WorkflowRunner} instance.
3549
3550 @param dependencies: A label string or container of labels specifying all dependent
3551 tasks. Dependent tasks must already exist in
3552 the workflow.
3553 @type dependencies: A single string, or set, tuple or list of strings
3554 """
3555
3556 self._requireInWorkflow()
3557
3558
3559 WorkflowRunner._checkTaskLabel(label)
3560
3561 import inspect
3562
3563
3564 workflowCopy = copy.deepcopy(workflowRunnerInstance)
3565
3566
3567
3568 publicExclude = ["workflow", "addTask", "addWorkflowTask", "waitForTasks"]
3569 for (n, _v) in inspect.getmembers(WorkflowRunner, predicate=inspect.ismethod) :
3570 if n[0] == "_" : continue
3571 if n in publicExclude : continue
3572 setattr(workflowCopy, n, getattr(self, n))
3573
3574 privateInclude = ["_cdata", "_addTaskCore", "_waitForTasksCore", "_isTaskCompleteCore","_setRunning","_getRunning"]
3575 for n in privateInclude :
3576 setattr(workflowCopy, n, getattr(self, n))
3577
3578
3579 workflowCopy.run = None
3580
3581
3582 workflowCopy._appendNamespace(self._getNamespaceList())
3583 workflowCopy._appendNamespace(label)
3584
3585
3586
3587 payload = WorkflowPayload(workflowCopy)
3588 self._addTaskCore(self._getNamespace(), label, payload, dependencies)
3589 return label
3590
3591
3593 """
3594 Wait for a list of tasks to complete.
3595
3596 @return: In case of an error in a task being waited for, or in
3597 one of these task's dependencies, the function returns 1.
3598 Else return 0.
3599
3600 @param labels: Container of task labels to wait for. If an empty container is
3601 given or no list is provided then wait for all
3602 outstanding tasks to complete.
3603 @type labels: A single string, or set, tuple or list of strings
3604 """
3605
3606 self._requireInWorkflow()
3607
3608 return self._waitForTasksCore(self._getNamespace(), labels)
3609
3610
3612 """
3613 Query if a specific task is in the workflow and completed without error.
3614
3615 This can assist workflows with providing
3616 stable interrupt/resume behavior.
3617
3618 @param taskLabel: A task string
3619
3620 @return: Completion status of task
3621 """
3622
3623 result = self._isTaskCompleteCore(self._getNamespace(), taskLabel)
3624
3625
3626 return (result[0] and not result[1])
3627
3629 """
3630 Query if a specific task is in the workflow and is done, with or without error
3631
3632 This can assist workflows with providing
3633 stable interrupt/resume behavior.
3634
3635 @param taskLabel: A task string
3636
3637 @return: A boolean tuple specifying (task is done, task finished with error)
3638 """
3639
3640 return self._isTaskCompleteCore(self._getNamespace(), taskLabel)
3641
3643 """
3644 Cancel the given task and all of its dependencies. Canceling means that any running jobs will be stopped and
3645 any waiting job will be unqueued. Canceled tasks will not be treated as errors. Canceled tasks that are not
3646 already complete will be put into the waiting/ignored state.
3647 """
3648 self._cancelTaskTreeCore(self._getNamespace(), taskLabel)
3649
3651 """
3652 Get the current run mode
3653
3654 This can be used to access the current run mode from
3655 within the workflow function. Although the runmode should
3656 be transparent to client code, this is occasionally needed
3657 to hack workarounds.
3658
3659 @return: Current run mode
3660 """
3661
3662 self._requireInWorkflow()
3663
3664 return self._cdata().param.mode
3665
3666
3668 """
3669 Get the current run core limit
3670
3671 This function can be used to access the current run's core
3672 limit from within the workflow function. This can be useful
3673 to eg. limit the number of cores requested by a single task.
3674
3675 @return: Total cores available to this workflow run
3676 @rtype: Integer value or 'unlimited'
3677 """
3678
3679 self._requireInWorkflow()
3680
3681 return self._cdata().param.nCores
3682
3683
3685 """
3686 Takes an task nCores argument and reduces it to
3687 the maximum value allowed for the current run.
3688
3689 @param nCores: Proposed core requirement
3690
3691 @return: Min(nCores,Total cores available to this workflow run)
3692 """
3693
3694 self._requireInWorkflow()
3695
3696 nCores = int(nCores)
3697 runNCores = self._cdata().param.nCores
3698 if runNCores == "unlimited" : return nCores
3699 return min(nCores, runNCores)
3700
3701
3703 """
3704 Get the current run's total memory limit (in megabytes)
3705
3706 @return: Memory limit in megabytes
3707 @rtype: Integer value or 'unlimited'
3708 """
3709
3710 self._requireInWorkflow()
3711
3712 return self._cdata().param.memMb
3713
3714
3716 """
3717 Takes a task memMb argument and reduces it to
3718 the maximum value allowed for the current run.
3719
3720 @param memMb: Proposed task memory requirement in megabytes
3721
3722 @return: Min(memMb,Total memory available to this workflow run)
3723 """
3724
3725 self._requireInWorkflow()
3726
3727 memMb = int(memMb)
3728 runMemMb = self._cdata().param.memMb
3729 if runMemMb == "unlimited" : return memMb
3730 return min(memMb, runMemMb)
3731
3732
3734 """
3735 Get isDryRun flag value.
3736
3737 When the dryrun flag is set, no commands are actually run. Querying
3738 this flag allows dynamic workflows to correct for dry run behaviors,
3739 such as tasks which do no produce expected files.
3740
3741 @return: DryRun status flag
3742 """
3743
3744 self._requireInWorkflow()
3745
3746 return self._cdata().param.isDryRun
3747
3748
3749 @staticmethod
3751 """
3752 Get the default core limit for run mode (local,sge,..)
3753
3754 @param mode: run mode, as specified in L{the run() method<WorkflowRunner.run>}
3755
3756 @return: Default maximum number of cores for mode
3757
3758 @rtype: Either 'unlimited', or a string
3759 representation of the integer limit
3760 """
3761
3762 return str(RunMode.data[mode].defaultCores)
3763
3764
3766 """
3767 Send a message to the WorkflowRunner's log.
3768
3769 @param msg: Log message
3770 @type msg: A string or an array of strings. String arrays will be separated by newlines in the log.
3771 @param logState: Message severity, defaults to INFO.
3772 @type logState: A value in pyflow.LogState.{INFO,WARNING,ERROR}
3773 """
3774
3775 self._requireInWorkflow()
3776
3777 linePrefixOut = "[%s]" % (self._cdata().param.workflowClassName)
3778 self._cdata().flowLog(msg, linePrefix=linePrefixOut, logState=logState)
3779
3780
3781
3782
3784 """
3785 Workflow definition defined in child class
3786
3787 This method should be overridden in the class derived from
3788 L{WorkflowRunner} to specify the actual workflow logic. Client
3789 code should not call this method directly.
3790 """
3791 pass
3792
3793
3794
3795
3796
3797
3800
3801
3803 linePrefixOut = "[WorkflowRunner]"
3804 self._cdata().flowLog(msg, linePrefix=linePrefixOut, logState=logState)
3805
3808
3811
3814
3816
3817 return self.__class__.__name__
3818
3819
3821 try:
3822 return self._namespaceList
3823 except AttributeError:
3824 self._namespaceList = []
3825 return self._namespaceList
3826
3829
3838
3839
3840
3841
3842 _allStop = threading.Event()
3843
3844 @staticmethod
3848
3849 @staticmethod
3854
3855 - def _addTaskCore(self, namespace, label, payload, dependencies) :
3872
3873
3875
3876
3877
3878
3879
3880 def updateStatusFromTask(task, status) :
3881 if not task.isDone() :
3882 status.isAllTaskDone = False
3883 elif not task.isComplete() :
3884 status.retval = 1
3885 if status.retval == 0 and (not self._cdata().isTaskSubmissionActive()) :
3886 status.retval = 1
3887 if status.retval == 0 and task.isDead() :
3888 status.retval = 1
3889
3890
3891 if len(labels) == 0 :
3892 if namespace == "" :
3893 if self._tdag.isRunExhausted() or (not self._tman.isAlive()) :
3894 if not self._tdag.isRunComplete() :
3895 status.retval = 1
3896 else:
3897 status.isAllTaskDone = False
3898 else :
3899 for task in self._tdag.getAllNodes(namespace) :
3900 updateStatusFromTask(task, status)
3901 else :
3902 for l in labels :
3903 if not self._tdag.isTaskPresent(namespace, l) :
3904 raise Exception("Task: '%s' is not in taskDAG" % (namespaceJoin(namespace, l)))
3905 task = self._tdag.getTask(namespace, l)
3906 updateStatusFromTask(task, status)
3907
3908
3910 labels = setzer(labels)
3911 if isVerbose :
3912 msg = "Pausing %s until completion of" % (namespaceLabel(namespace))
3913 if len(labels) == 0 :
3914 self._infoLog(msg + " its current tasks")
3915 else:
3916 self._infoLog(msg + " task(s): %s" % (",".join([namespaceJoin(namespace, l) for l in labels])))
3917
3918 class WaitStatus:
3919 def __init__(self) :
3920 self.isAllTaskDone = True
3921 self.retval = 0
3922
3923 ewaiter = ExpWaiter(1, 1.7, 15)
3924 while True :
3925 if self._isWorkflowStopped() :
3926 raise WorkflowRunner._AbortWorkflowException
3927 status = WaitStatus()
3928 self._getWaitStatus(namespace, labels, status)
3929 if status.isAllTaskDone or (status.retval != 0) : break
3930 ewaiter.wait()
3931
3932 if isVerbose :
3933 msg = "Resuming %s" % (namespaceLabel(namespace))
3934 self._infoLog(msg)
3935 return status.retval
3936
3937
3939 """
3940 @return: A boolean tuple specifying (task is done, task finished with error)
3941 """
3942
3943 if not self._tdag.isTaskPresent(namespace, taskLabel) :
3944 return (False, False)
3945 task = self._tdag.getTask(namespace, taskLabel)
3946 return ( task.isDone(), task.isError() )
3947
3953
3954 @staticmethod
3956
3957 if not isinstance(label, basestring) :
3958 raise Exception ("Task label is not a string")
3959 if label == "" :
3960 raise Exception ("Task label is empty")
3961 if not re.match("^[A-Za-z0-9_-]+$", label) :
3962 raise Exception ("Task label is invalid due to disallowed characters. Label: '%s'" % (label))
3963
3964
3966
3967
3968 if (self._tman is not None) and (self._tman.isAlive()) : return
3969 if not self._cdata().isTaskManagerException :
3970 self._tman = TaskManager(self._cdata(), self._tdag)
3971 self._tman.start()
3972
3973
3974 - def _notify(self, msg, logState) :
3979
3980
3985
3986
3988
3989
3990
3991 if (self._tman is None) or (not self._tman.isAlive()) : return
3992 StoppableThread.stopAll()
3993 self._stopAllWorkflows()
3994 self._tman.stop()
3995 for _ in range(timeoutSec) :
3996 time.sleep(1)
3997 if not self._tman.isAlive() :
3998 self._infoLog("Task shutdown complete")
3999 return
4000 self._infoLog("Task shutdown timed out")
4001
4002
4004
4005
4006
4007
4008 try:
4009 return self._constantData
4010 except AttributeError:
4011 self._constantData = WorkflowRunnerThreadSharedData()
4012 return self._constantData
4013
4014
4015
4016
4017
4018
4019
4021 while True :
4022 time.sleep(self.updateInterval * 60)
4023
4024 status = self._tdag.getTaskStatus()
4025 isSpecComplete = (runStatus.isSpecificationComplete.isSet() and status.isAllSpecComplete)
4026 report = []
4027 report.append("===== " + self._whoami() + " StatusUpdate =====")
4028 report.append("Workflow specification is complete?: %s" % (str(isSpecComplete)))
4029 report.append("Task status (waiting/queued/running/complete/error): %i/%i/%i/%i/%i"
4030 % (status.waiting, status.queued, status.running, status.complete, status.error))
4031 report.append("Longest ongoing queued task time (hrs): %.4f" % (status.longestQueueSec / 3600.))
4032 report.append("Longest ongoing queued task name: '%s'" % (status.longestQueueName))
4033 report.append("Longest ongoing running task time (hrs): %.4f" % (status.longestRunSec / 3600.))
4034 report.append("Longest ongoing running task name: '%s'" % (status.longestRunName))
4035
4036 report = [ "[StatusUpdate] " + line for line in report ]
4037 self._infoLog(report)
4038
4039
4040
4041
4042
4043 stackDumpFp = open(self._cdata().stackDumpLogFile, "a")
4044
4045
4046 linePrefixOut = "[%s] [StackDump]" % (self._cdata().getRunid())
4047 ofpList = [stackDumpFp]
4048 log(ofpList, "Initiating stack dump for all threads", linePrefixOut)
4049
4050 stackDump(stackDumpFp)
4051 hardFlush(stackDumpFp)
4052 stackDumpFp.close()
4053
4054
4056
4057
4058
4059 self._setupWorkflow(param)
4060 self._initMessage()
4061
4062 runStatus = RunningTaskStatus(self._tdag.isFinishedEvent)
4063
4064
4065
4066
4067
4068 if(self.updateInterval > 0) :
4069 hb = threading.Thread(target=WorkflowRunner._runUpdate, args=(self, runStatus))
4070 hb.setDaemon(True)
4071 hb.setName("StatusUpdate-Thread")
4072 hb.start()
4073
4074
4075
4076
4077
4078
4079
4080 trun = WorkflowTaskRunner(runStatus, "masterWorkflow", self, self._cdata().flowLog, None)
4081 trun.start()
4082
4083 ewaiter = ExpWaiter(1, 1.7, 15,runStatus.isComplete)
4084 while True :
4085 if not trun.isAlive() : break
4086 ewaiter.wait()
4087
4088 if not runStatus.isComplete.isSet() :
4089
4090 runStatus.errorCode = 1
4091 runStatus.errorMessage = "Thread: '%s', has stopped without a traceable cause" % (trun.getName())
4092
4093 self._taskInfoWriter.flush()
4094 self._taskStatusWriter.flush()
4095
4096 return self._evalWorkflow(runStatus)
4097
4098
4100 cdata = self._cdata()
4101
4102
4103 cdata.setupNewRun(param)
4104
4105
4106 self._tdag = TaskDAG(cdata.param.isContinue, cdata.param.isForceContinue, cdata.param.isDryRun,
4107 cdata.taskInfoFile, cdata.taskStateFile, cdata.param.workflowClassName,
4108 cdata.param.startFromTasks, cdata.param.ignoreTasksAfter, cdata.param.resetTasks,
4109 self._flowLog)
4110 self._tman = None
4111
4112 def backupFile(inputFile) :
4113 """
4114 backup old state files if they exist
4115 """
4116 if not os.path.isfile(inputFile) : return
4117 fileDir = os.path.dirname(inputFile)
4118 fileName = os.path.basename(inputFile)
4119 backupDir = os.path.join(fileDir, "backup")
4120 ensureDir(backupDir)
4121 backupFileName = fileName + ".backup_before_starting_run_%s.txt" % (cdata.getRunid())
4122 backupFile = os.path.join(backupDir, backupFileName)
4123 shutil.copyfile(inputFile, backupFile)
4124
4125 backupFile(cdata.taskStateFile)
4126 backupFile(cdata.taskInfoFile)
4127
4128 if cdata.param.isContinue :
4129 self._setupContinuedWorkflow()
4130
4131 self._taskInfoWriter = TaskFileWriter(self._tdag.writeTaskInfo)
4132 self._taskStatusWriter = TaskFileWriter(self._tdag.writeTaskStatus)
4133
4134 self._tdag.isWriteTaskInfo = self._taskInfoWriter.isWrite
4135 self._tdag.isWriteTaskStatus = self._taskStatusWriter.isWrite
4136
4137 self._taskInfoWriter.start()
4138 self._taskStatusWriter.start()
4139
4140
4141
4143
4144
4145
4146
4147 cdata = self._cdata()
4148 if not os.path.isfile(cdata.taskStateFile) : return set()
4149
4150 tmpFile = cdata.taskStateFile + ".update.incomplete"
4151 tmpfp = open(tmpFile, "w")
4152 tmpfp.write(taskStateHeader())
4153 complete = set()
4154 for words in taskStateParser(cdata.taskStateFile) :
4155 (runState, errorCode) = words[2:4]
4156 if (runState != "complete") or (int(errorCode) != 0) : continue
4157 tmpfp.write("\t".join(words) + "\n")
4158 (label, namespace) = words[0:2]
4159 complete.add(namespaceJoin(namespace, label))
4160
4161 tmpfp.close()
4162 forceRename(tmpFile, cdata.taskStateFile)
4163 return complete
4164
4165
4167
4168
4169
4170
4171 cdata = self._cdata()
4172 if not os.path.isfile(cdata.taskInfoFile) : return
4173
4174 tmpFile = cdata.taskInfoFile + ".update.incomplete"
4175 tmpfp = open(tmpFile, "w")
4176 tmpfp.write(taskInfoHeader())
4177 for words in taskInfoParser(cdata.taskInfoFile) :
4178 (label, namespace, ptype, nCores, memMb, priority, isForceLocal, depStr, cwdStr, command) = words
4179 fullLabel = namespaceJoin(namespace, label)
4180 if fullLabel not in complete : continue
4181 tmpfp.write("\t".join(words) + "\n")
4182 if ptype == "command" :
4183 if command == "" : command = None
4184 payload = CmdPayload(fullLabel, Command(command, cwdStr), int(nCores), int(memMb), int(priority), argToBool(isForceLocal))
4185 elif ptype == "workflow" :
4186 payload = WorkflowPayload(None)
4187 else : assert 0
4188
4189 self._tdag.addTask(namespace, label, payload, getTaskInfoDepSet(depStr), isContinued=True)
4190
4191 tmpfp.close()
4192 forceRename(tmpFile, cdata.taskInfoFile)
4193
4194
4195
4200
4201
4202
4204 param = self._cdata().param
4205 msg = ["Initiating pyFlow run"]
4206 msg.append("pyFlowClientWorkflowClass: %s" % (param.workflowClassName))
4207 msg.append("pyFlowVersion: %s" % (__version__))
4208 msg.append("pythonVersion: %s" % (pythonVersion))
4209 msg.append("WorkingDir: '%s'" % (self._cdata().cwd))
4210 msg.append("ProcessCmdLine: '%s'" % (cmdline()))
4211
4212 parammsg = ["mode: %s" % (param.mode),
4213 "nCores: %s" % (str(param.nCores)),
4214 "memMb: %s" % (str(param.memMb)),
4215 "dataDir: %s" % (str(param.dataDir)),
4216 "isDryRun: %s" % (str(param.isDryRun)),
4217 "isContinue: %s" % (str(param.isContinue)),
4218 "isForceContinue: %s" % (str(param.isForceContinue)),
4219 "mailTo: '%s'" % (",".join(param.mailTo))]
4220 for i in range(len(parammsg)):
4221 parammsg[i] = "[RunParameters] " + parammsg[i]
4222 msg += parammsg
4223 self._infoLog(msg)
4224
4225
4226
4228
4229
4230
4231 if isForceTaskHarvest :
4232 if (self._tman is not None) and (self._tman.isAlive()) :
4233 self._tman.harvestTasks()
4234
4235 if not self._cdata().isTaskError() : return []
4236
4237 msg = ["Worklow terminated due to the following task errors:"]
4238 for task in self._cdata().taskErrors :
4239 msg.extend(task.getTaskErrorMsg())
4240 return msg
4241
4242
4244
4245 isError = False
4246 if self._cdata().isTaskError() :
4247 msg = self._getTaskErrorsSummaryMsg()
4248 self._errorLog(msg)
4249 isError = True
4250
4251 if masterRunStatus.errorCode != 0 :
4252 eMsg = lister(masterRunStatus.errorMessage)
4253 if (len(eMsg) > 1) or (len(eMsg) == 1 and eMsg[0] != "") :
4254 msg = ["Failed to complete master workflow, error code: %s" % (str(masterRunStatus.errorCode))]
4255 msg.append("errorMessage:")
4256 msg.extend(eMsg)
4257 self._notify(msg,logState=LogState.ERROR)
4258 isError = True
4259
4260 if self._cdata().isTaskManagerException :
4261
4262 self._errorLog("Workflow terminated due to unhandled exception in TaskManager")
4263 isError = True
4264
4265 if (not isError) and (not self._tdag.isRunComplete()) :
4266 msg = "Workflow terminated with unknown error condition"
4267 self._notify(msg,logState=LogState.ERROR)
4268 isError = True
4269
4270 if isError: return 1
4271
4272 elapsed = int(time.time() - self.runStartTimeStamp)
4273 msg = []
4274 if self._cdata().param.successMsg is not None :
4275 msg.extend([self._cdata().param.successMsg,""])
4276 msg.extend(["Workflow successfully completed all tasks",
4277 "Elapsed time for full workflow: %s sec" % (elapsed)])
4278 self._notify(msg,logState=LogState.INFO)
4279 return 0
4280
4281
4283 """
4284 check that the calling method is being called as part of a pyflow workflow() method only
4285 """
4286 if not self._getRunning():
4287 raise Exception("Method must be a (call stack) descendant of WorkflowRunner workflow() method (via run() method)")
4288
4289
4291 try :
4292 assert(self._isRunning >= 0)
4293 except AttributeError :
4294 self._isRunning = 0
4295
4296 @lockMethod
4298 self._initRunning()
4299 if isRunning :
4300 self._isRunning += 1
4301 else :
4302 self._isRunning -= 1
4303
4304 @lockMethod
4308
4309
4310
4311 if __name__ == "__main__" :
4312 help(WorkflowRunner)
4313