1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 """
35 This script wraps workflow tasks for execution on local or remote
36 hosts. It is responsible for adding log decorations to task's stderr
37 output (which is diverted to a file), and writing task state transition
38 and error information to the wrapper's stderr, which becomes the
39 task's 'signal' file from pyflow's perspective. The signal file is
40 used to determine task exit status, total runtime, and queue->run
41 state transition when pyflow is run in SGE mode.
42 """
43
44 import datetime
45 import os
46 import subprocess
47 import sys
48 import time
49
50
51 scriptName = "pyflowTaskWrapper.py"
52
53
55 import traceback
56 return traceback.format_exc()
57
58
60 return ("[ERROR] Unhandled Exception in pyflowTaskWrapper\n" + getTracebackStr())
61
62
64 """
65 converts timeStamp (time.time()) output to timeStr
66 """
67 return datetime.datetime.utcfromtimestamp(ts).isoformat()
68
71
73 ofp.flush()
74 if ofp.isatty() : return
75 os.fsync(ofp.fileno())
76
78 import platform
79 return (platform.system().find("Windows") > -1)
80
82 """
83 Store up to last N objects, not thread safe.
84 Note extraction does not follow any traditional fifo interface
85 """
86
88 self._size = int(size)
89 assert (self._size > 0)
90 self._data = [None] * self._size
91 self._head = 0
92 self._occup = 0
93 self._counter = 0
94
95
97 """
98 Get the total number of adds for this fifo
99 """
100 return self._counter
101
102
103 - def add(self, obj) :
104 """
105 add obj to fifo, and return obj for convenience
106 """
107 self._data[self._head] = obj
108 self._counter += 1
109 if self._occup < self._size : self._occup += 1
110 self._head += 1
111 if self._head == self._size : self._head = 0
112 assert (self._head < self._size)
113 return obj
114
115
117 """
118 return an array of the fifo contents
119 """
120 retval = []
121 current = (self._head + self._size) - self._occup
122 for _ in range(self._occup) :
123 while current >= self._size : current -= self._size
124 retval.append(self._data[current])
125 current += 1
126 return retval
127
128
129
132 def getHostName() :
133 import socket
134
135 return socket.getfqdn()
136
137 self.runid = runid
138 self.taskStr = taskStr
139 self.hostname = getHostName()
140
141 - def _writeMsg(self, ofp, msg, taskStr, writeFilter=lambda x: x) :
142 """
143 log a possibly multi-line message with decoration:
144 """
145 prefix = "[%s] [%s] [%s] [%s] " % (timeStrNow(), self.hostname, self.runid, taskStr)
146 if msg[-1] == "\n" : msg = msg[:-1]
147 for line in msg.split("\n") :
148 ofp.write(writeFilter(prefix + line + "\n"))
149 hardFlush(ofp)
150
151
152 - def transfer(self, inos, outos, writeFilter=lambda x: x):
153 """
154 This function is used to decorate the stderr stream from the launched task itself
155 """
156
157
158
159
160 while True:
161 line = inos.readline()
162 if not line: break
163 self._writeMsg(outos, line, self.taskStr, writeFilter)
164
166 """
167 Used by the wrapper to decorate each msg line with a prefix. The decoration
168 is similar to that for the task's own stderr, but we prefix the task with
169 'pyflowTaskWrapper' to differentiate the source.
170 """
171 self._writeMsg(log_os, msg, "pyflowTaskWrapper:" + self.taskStr)
172
173
174
176 import pickle
177
178 paramhash = pickle.load(open(paramsFile))
179 class Params : pass
180 params = Params()
181 for (k, v) in paramhash.items() : setattr(params, k, v)
182 return params
183
184
185
187
188 usage = """
189
190 Usage: %s runid taskid parameter_pickle_file
191
192 The parameter pickle file contains all of the task parameters required by the wrapper
193
194 """ % (scriptName)
195
196 def badUsage(msg=None) :
197 sys.stderr.write(usage)
198 if msg is not None :
199 sys.stderr.write(msg)
200 exitval = 1
201 else:
202 exitval = 2
203 hardFlush(sys.stderr)
204 sys.exit(exitval)
205
206 def checkExpectArgCount(expectArgCount) :
207 if len(sys.argv) == expectArgCount : return
208 badUsage("Incorrect argument count, expected: %i observed: %i\n" % (expectArgCount, len(sys.argv)))
209
210
211 runid = "unknown"
212 taskStr = "unknown"
213
214 if len(sys.argv) > 2 :
215 runid = sys.argv[1]
216 taskStr = sys.argv[2]
217
218 bling = StringBling(runid, taskStr)
219
220
221 pffp = sys.stderr
222 bling.wrapperLog(pffp, "[wrapperSignal] wrapperStart")
223
224 checkExpectArgCount(4)
225
226 picklefile = sys.argv[3]
227
228
229
230 retryDelaySec = 30
231 maxTrials = 3
232 for _ in range(maxTrials) :
233 if os.path.exists(picklefile) : break
234 time.sleep(retryDelaySec)
235
236 if not os.path.exists(picklefile) :
237 badUsage("First argument does not exist: " + picklefile)
238
239 if not os.path.isfile(picklefile) :
240 badUsage("First argument is not a file: " + picklefile)
241
242
243
244
245
246 for t in range(maxTrials) :
247 try :
248 params = getParams(picklefile)
249 except :
250 if (t+1) == maxTrials :
251 raise
252 time.sleep(retryDelaySec)
253 continue
254 break
255
256 if params.cmd is None :
257 badUsage("Invalid TaskWrapper input: task command set to NONE")
258
259 if params.cwd == "" or params.cwd == "None" :
260 params.cwd = None
261
262 toutFp = open(params.outFile, "a")
263 terrFp = open(params.errFile, "a")
264
265
266 fifo = SimpleFifo(20)
267
268 isWin=isWindows()
269
270
271
272
273 fullcmd = []
274 if (not isWin) and params.isShellCmd :
275
276 shell = ["/bin/bash", "--noprofile", "-o", "pipefail"]
277 fullcmd = shell + ["-c", params.cmd]
278 else :
279 fullcmd = params.cmd
280
281 retval = 1
282
283 isShell=isWin
284
285 try:
286 startTime = time.time()
287 bling.wrapperLog(pffp, "[wrapperSignal] taskStart")
288
289
290
291 proc = subprocess.Popen(fullcmd, stdout=toutFp, stderr=subprocess.PIPE, shell=isShell, bufsize=1, cwd=params.cwd, env=params.env)
292 bling.transfer(proc.stderr, terrFp, fifo.add)
293 retval = proc.wait()
294
295 elapsed = (time.time() - startTime)
296
297
298 bling.wrapperLog(pffp, "[wrapperSignal] taskExitCode %i" % (retval))
299
300
301 msg = "Task: '%s' exit code: '%i'" % (taskStr, retval)
302 bling.wrapperLog(terrFp, msg)
303
304 if retval == 0 :
305
306 bling.wrapperLog(pffp, "[wrapperSignal] taskElapsedSec %i" % (int(elapsed)))
307
308
309 msg = "Task: '%s' complete." % (taskStr)
310 msg += " elapsedSec: %i" % (int(elapsed))
311 msg += " elapsedCoreSec: %i" % (int(elapsed * params.nCores))
312 msg += "\n"
313 bling.wrapperLog(terrFp, msg)
314 else :
315
316 tailMsg = fifo.get()
317 bling.wrapperLog(pffp, "[wrapperSignal] taskStderrTail %i" % (1 + len(tailMsg)))
318 pffp.write("Last %i stderr lines from task (of %i total lines):\n" % (len(tailMsg), fifo.count()))
319 for line in tailMsg :
320 pffp.write(line)
321 hardFlush(pffp)
322
323
324 except KeyboardInterrupt:
325 msg = "[ERROR] Keyboard Interupt, shutting down task."
326 bling.wrapperLog(terrFp, msg)
327 sys.exit(1)
328 except:
329 msg = getExceptionMsg()
330 bling.wrapperLog(terrFp, msg)
331 raise
332
333 sys.exit(retval)
334
335
336
337 if __name__ == "__main__" :
338 main()
339