Package src :: Module pyflowTaskWrapper
[hide private]
[frames] | no frames]

Source Code for Module src.pyflowTaskWrapper

  1  # 
  2  # pyFlow - a lightweight parallel task engine 
  3  # 
  4  # Copyright (c) 2012-2017 Illumina, Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions 
  9  # are met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright 
 12  #    notice, this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  #    notice, this list of conditions and the following disclaimer in 
 16  #    the documentation and/or other materials provided with the 
 17  #    distribution. 
 18  # 
 19  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 20  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 21  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 22  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 23  # COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 24  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 25  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 26  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 27  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 28  # LIABILITY, OR TORT INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY 
 29  # WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 30  # POSSIBILITY OF SUCH DAMAGE. 
 31  # 
 32  # 
 33   
 34  """ 
 35  This script wraps workflow tasks for execution on local or remote 
 36  hosts.  It is responsible for adding log decorations to task's stderr 
 37  output (which is diverted to a file), and writing task state transition 
 38  and error information to the wrapper's stderr, which becomes the 
 39  task's 'signal' file from pyflow's perspective. The signal file is 
 40  used to determine task exit status, total runtime, and queue->run 
 41  state transition when pyflow is run in SGE mode. 
 42  """ 
 43   
 44  import datetime 
 45  import os 
 46  import subprocess 
 47  import sys 
 48  import time 
 49   
 50   
 51  scriptName = "pyflowTaskWrapper.py" 
 52   
 53   
54 -def getTracebackStr() :
55 import traceback 56 return traceback.format_exc()
57 58
59 -def getExceptionMsg() :
60 return ("[ERROR] Unhandled Exception in pyflowTaskWrapper\n" + getTracebackStr())
61 62
63 -def timeStampToTimeStr(ts) :
64 """ 65 converts timeStamp (time.time()) output to timeStr 66 """ 67 return datetime.datetime.utcfromtimestamp(ts).isoformat()
68
69 -def timeStrNow():
70 return timeStampToTimeStr(time.time())
71
72 -def hardFlush(ofp):
73 ofp.flush() 74 if ofp.isatty() : return 75 os.fsync(ofp.fileno())
76
77 -def isWindows() :
78 import platform 79 return (platform.system().find("Windows") > -1)
80
81 -class SimpleFifo(object) :
82 """ 83 Store up to last N objects, not thread safe. 84 Note extraction does not follow any traditional fifo interface 85 """ 86
87 - def __init__(self, size) :
88 self._size = int(size) 89 assert (self._size > 0) 90 self._data = [None] * self._size 91 self._head = 0 92 self._occup = 0 93 self._counter = 0
94 95
96 - def count(self) :
97 """ 98 Get the total number of adds for this fifo 99 """ 100 return self._counter
101 102
103 - def add(self, obj) :
104 """ 105 add obj to fifo, and return obj for convenience 106 """ 107 self._data[self._head] = obj 108 self._counter += 1 109 if self._occup < self._size : self._occup += 1 110 self._head += 1 111 if self._head == self._size : self._head = 0 112 assert (self._head < self._size) 113 return obj
114 115
116 - def get(self) :
117 """ 118 return an array of the fifo contents 119 """ 120 retval = [] 121 current = (self._head + self._size) - self._occup 122 for _ in range(self._occup) : 123 while current >= self._size : current -= self._size 124 retval.append(self._data[current]) 125 current += 1 126 return retval
127 128 129
130 -class StringBling(object) :
131 - def __init__(self, runid, taskStr) :
132 def getHostName() : 133 import socket 134 # return socket.gethostbyaddr(socket.gethostname())[0] 135 return socket.getfqdn()
136 137 self.runid = runid 138 self.taskStr = taskStr 139 self.hostname = getHostName()
140
141 - def _writeMsg(self, ofp, msg, taskStr, writeFilter=lambda x: x) :
142 """ 143 log a possibly multi-line message with decoration: 144 """ 145 prefix = "[%s] [%s] [%s] [%s] " % (timeStrNow(), self.hostname, self.runid, taskStr) 146 if msg[-1] == "\n" : msg = msg[:-1] 147 for line in msg.split("\n") : 148 ofp.write(writeFilter(prefix + line + "\n")) 149 hardFlush(ofp)
150 151
152 - def transfer(self, inos, outos, writeFilter=lambda x: x):
153 """ 154 This function is used to decorate the stderr stream from the launched task itself 155 """ 156 # 157 # write line-read loop this way to workaround python bug: 158 # http://bugs.python.org/issue3907 159 # 160 while True: 161 line = inos.readline() 162 if not line: break 163 self._writeMsg(outos, line, self.taskStr, writeFilter)
164
165 - def wrapperLog(self, log_os, msg) :
166 """ 167 Used by the wrapper to decorate each msg line with a prefix. The decoration 168 is similar to that for the task's own stderr, but we prefix the task with 169 'pyflowTaskWrapper' to differentiate the source. 170 """ 171 self._writeMsg(log_os, msg, "pyflowTaskWrapper:" + self.taskStr)
172 173 174
175 -def getParams(paramsFile) :
176 import pickle 177 178 paramhash = pickle.load(open(paramsFile)) 179 class Params : pass 180 params = Params() 181 for (k, v) in paramhash.items() : setattr(params, k, v) 182 return params
183 184 185
186 -def main():
187 188 usage = """ 189 190 Usage: %s runid taskid parameter_pickle_file 191 192 The parameter pickle file contains all of the task parameters required by the wrapper 193 194 """ % (scriptName) 195 196 def badUsage(msg=None) : 197 sys.stderr.write(usage) 198 if msg is not None : 199 sys.stderr.write(msg) 200 exitval = 1 201 else: 202 exitval = 2 203 hardFlush(sys.stderr) 204 sys.exit(exitval)
205 206 def checkExpectArgCount(expectArgCount) : 207 if len(sys.argv) == expectArgCount : return 208 badUsage("Incorrect argument count, expected: %i observed: %i\n" % (expectArgCount, len(sys.argv))) 209 210 211 runid = "unknown" 212 taskStr = "unknown" 213 214 if len(sys.argv) > 2 : 215 runid = sys.argv[1] 216 taskStr = sys.argv[2] 217 218 bling = StringBling(runid, taskStr) 219 220 # send a signal for wrapper start as early as possible to help ensure hostname is logged 221 pffp = sys.stderr 222 bling.wrapperLog(pffp, "[wrapperSignal] wrapperStart") 223 224 checkExpectArgCount(4) 225 226 picklefile = sys.argv[3] 227 228 # try multiple times to read the argument file in case of NFS delay: 229 # 230 retryDelaySec = 30 231 maxTrials = 3 232 for _ in range(maxTrials) : 233 if os.path.exists(picklefile) : break 234 time.sleep(retryDelaySec) 235 236 if not os.path.exists(picklefile) : 237 badUsage("First argument does not exist: " + picklefile) 238 239 if not os.path.isfile(picklefile) : 240 badUsage("First argument is not a file: " + picklefile) 241 242 # add another multi-trial loop on the pickle load operation -- 243 # on some filesystems the file can appear to exist but not 244 # be fully instantiated yet: 245 # 246 for t in range(maxTrials) : 247 try : 248 params = getParams(picklefile) 249 except : 250 if (t+1) == maxTrials : 251 raise 252 time.sleep(retryDelaySec) 253 continue 254 break 255 256 if params.cmd is None : 257 badUsage("Invalid TaskWrapper input: task command set to NONE") 258 259 if params.cwd == "" or params.cwd == "None" : 260 params.cwd = None 261 262 toutFp = open(params.outFile, "a") 263 terrFp = open(params.errFile, "a") 264 265 # always keep last N lines of task stderr: 266 fifo = SimpleFifo(20) 267 268 isWin=isWindows() 269 270 # Present shell as arg list with Popen(shell=False), so that 271 # we minimize quoting/escaping issues for 'cmd' itself: 272 # 273 fullcmd = [] 274 if (not isWin) and params.isShellCmd : 275 # TODO shell selection should be configurable somewhere: 276 shell = ["/bin/bash", "--noprofile", "-o", "pipefail"] 277 fullcmd = shell + ["-c", params.cmd] 278 else : 279 fullcmd = params.cmd 280 281 retval = 1 282 283 isShell=isWin 284 285 try: 286 startTime = time.time() 287 bling.wrapperLog(pffp, "[wrapperSignal] taskStart") 288 # turn off buffering so that stderr is updated correctly and its timestamps 289 # are more accurate: 290 # TODO: is there a way to do this for stderr only? 291 proc = subprocess.Popen(fullcmd, stdout=toutFp, stderr=subprocess.PIPE, shell=isShell, bufsize=1, cwd=params.cwd, env=params.env) 292 bling.transfer(proc.stderr, terrFp, fifo.add) 293 retval = proc.wait() 294 295 elapsed = (time.time() - startTime) 296 297 # communication back to pyflow: 298 bling.wrapperLog(pffp, "[wrapperSignal] taskExitCode %i" % (retval)) 299 300 # communication to human-readable log: 301 msg = "Task: '%s' exit code: '%i'" % (taskStr, retval) 302 bling.wrapperLog(terrFp, msg) 303 304 if retval == 0 : 305 # communication back to pyflow: 306 bling.wrapperLog(pffp, "[wrapperSignal] taskElapsedSec %i" % (int(elapsed))) 307 308 # communication to human-readable log: 309 msg = "Task: '%s' complete." % (taskStr) 310 msg += " elapsedSec: %i" % (int(elapsed)) 311 msg += " elapsedCoreSec: %i" % (int(elapsed * params.nCores)) 312 msg += "\n" 313 bling.wrapperLog(terrFp, msg) 314 else : 315 # communication back to pyflow: 316 tailMsg = fifo.get() 317 bling.wrapperLog(pffp, "[wrapperSignal] taskStderrTail %i" % (1 + len(tailMsg))) 318 pffp.write("Last %i stderr lines from task (of %i total lines):\n" % (len(tailMsg), fifo.count())) 319 for line in tailMsg : 320 pffp.write(line) 321 hardFlush(pffp) 322 323 324 except KeyboardInterrupt: 325 msg = "[ERROR] Keyboard Interupt, shutting down task." 326 bling.wrapperLog(terrFp, msg) 327 sys.exit(1) 328 except: 329 msg = getExceptionMsg() 330 bling.wrapperLog(terrFp, msg) 331 raise 332 333 sys.exit(retval) 334 335 336 337 if __name__ == "__main__" : 338 main() 339