1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 Machinery for reading, editing, and writing Condor DAG files.
20
21 When running DAGs on Condor compute clusters, very often one will wish to
22 re-run a portion of a DAG. This can be done by marking all jobs except the
23 ones to be re-run as "DONE". Unfortunately the Condor software suite lacks
24 an I/O library for reading and writing Condor DAG files, so there is no
25 easy way to edit DAG files except by playing games sed, awk, or once-off
26 Python or Perl scripts. That's where this module comes in. This module
27 will read a DAG file into an in-ram representation that is easily edited,
28 and allow the file to be written to disk again.
29
30 Example:
31
32 >>> from glue import dagfile
33 >>> dag = dagfile.DAG.parse(open("pipeline.dag"))
34 >>> dag.write(open("pipeline.dag", "w"))
35
36 Although it is possible to machine-generate an original DAG file using this
37 module and write it to disk, this module does not provide the tools
38 required to do any of the other tasks associated with pipeline
39 construction. For example there is no facility here to generate or manage
40 submit files, data files, or any other files that are associated with a
41 full pipeline. Only the DAG file itself is considered here. For general
42 pipeline construction see the pipeline module. The focus of this module is
43 on editing existing DAG files.
44
45 Developers should also consider doing any new pipeline development using
46 DAX files as the fundamental workflow description, instead of DAGs. See
47 http://pegasus.isi.edu for more information.
48
49 A DAG file is loaded using the .parse() class method of the DAG class.
50 This parses the file-like object passed to it and returns an instance of
51 the DAG class representing the file's contents. Once loaded, the nodes in
52 the DAG can all be found in the .nodes dictionary, whose keys are the node
53 names and whose values are the corresponding node objects. Among each node
54 object's attributes are sets .children and .parents containing references
55 to the child and parent nodes (not their names) for each node. Note that
56 every node must appear listed as a parent of each of its children, and vice
57 versa. The other attributes of a DAG instance contain information about
58 the DAG, for example the CONFIG file or the DOT file, and so on. All of
59 the data for each node in the DAG, for example the node's VARS value, its
60 initial working directory, and so on, can be found in the attributes of the
61 nodes themselves. A DAG is written to a file using the .write() method of
62 the DAG object.
63 """
64
65
66
67
68
69
70
71
72
73
74
75 import re
76
77
78 __all__ = ["DAG", "JOB", "DATA", "SPLICE", "SUBDAG_EXTERNAL"]
91 """
92 Progress report wrapper. For internal use only.
93 """
95 self.n = 0
96 self.f = f
97 self.callback = callback
98
100 self.n += dn
101 if self.callback is not None and not self.n % 7411:
102 self.callback(self.f, self.n, False)
103 return self
104
106 if self.callback is not None:
107 self.callback(self.f, self.n, True)
108
111 """
112 Object providing a no-op .write() method to fake a file. For
113 internal use only.
114 """
117
118
119
120
121
122
123
124
125
126
127
128 -class JOB(object):
129 """
130 Representation of a JOB node in a Condor DAG. JOB objects have the
131 following attributes corresponding to information in the DAG file:
132
133 .name
134 The name of the node in the DAG.
135
136 .filename
137 The name of the submit file for the JOB.
138
139 .directory
140 The initial working directory for the JOB. Set to None to
141 omit from DAG (job's working directory will be chosen by
142 Condor).
143
144 .done
145 Boolean indicating if the JOB is done or not. See
146 DAG.load_rescue() for more information.
147
148 .noop
149 Boolean indicating if the JOB is a no-op or not.
150
151 .vars
152 A dictionary of the name-->value pairs in the VARS line for
153 the JOB. Leave empty to omit VARS from DAG.
154
155 .retry
156 The number of retries for the job. Set to None to omit
157 from DAG.
158
159 .retry_unless_exit_value
160 The value of the UNLESS-EXIT suffix of the RETRY line.
161 Set to None to omit from DAG.
162
163 .priority
164 .category
165 The PRIORITY value and CATEGORY name for the node in the
166 DAG. Set to None to omit from the DAG.
167
168 .parents
169 .children
170 Sets of the parent and child nodes of JOB. The sets
171 contain references to the node objects, not their names.
172
173 .prescript
174 .prescriptargs
175 .postscript
176 .postscriptargs
177 The names and lists of arguments of the PRE and POST
178 scripts. Set to None to omit from DAG.
179
180 .abort_dag_on_abortexitvalue
181 .abort_dag_on_dagreturnvalue
182 The ABORT-DAG-ON abort exit value and DAG return value for
183 the JOB. Set to None to omit from DAG.
184
185 For more information about the function of these parameters, refer
186 to the Condor documentation.
187 """
188 keyword = "JOB"
189
190 - def __init__(self, name, filename, directory = None, done = False, noop = False):
191
192 self.name = name
193 self.filename = filename
194 self.directory = directory
195 self.done = done
196 self.noop = noop
197
198
199
200 self.vars = {}
201
202
203 self.retry = None
204 self.retry_unless_exit_value = None
205
206
207 self.priority = None
208 self.category = None
209
210
211
212
213 self.parents = set()
214 self.children = set()
215
216
217
218 self.prescript = None
219 self.prescriptargs = None
220 self.postscript = None
221 self.postscriptargs = None
222
223
224
225 self.abort_dag_on_abortexitvalue = None
226 self.abort_dag_on_dagreturnvalue = None
227
228 - def write(self, f, progress = None):
229 """
230 Write the lines describing this node to the file-like
231 object f. The object must provide a .write() method.
232
233 If progress is not None, it will be incremented by 1 for
234 every line written.
235 """
236
237 f.write("%s %s %s" % (self.keyword, self.name, self.filename))
238 if self.directory is not None:
239 f.write(" DIR \"%s\"" % self.directory)
240 if self.noop:
241 f.write(" NOOP")
242 if self.done:
243 f.write(" DONE")
244 f.write("\n")
245 if progress is not None:
246 progress += 1
247
248
249 if self.priority:
250 f.write("PRIORITY %s %d\n" % (self.name, self.priority))
251 if progress is not None:
252 progress += 1
253
254
255 if self.category is not None:
256 f.write("CATEGORY %s %s\n" % (self.name, self.category))
257 if progress is not None:
258 progress += 1
259
260
261 if self.retry:
262 f.write("RETRY %s %d" % (self.name, self.retry))
263 if self.retry_unless_exit_value is not None:
264 f.write(" UNLESS-EXIT %d" % self.retry_unless_exit_value)
265 f.write("\n")
266 if progress is not None:
267 progress += 1
268
269
270 if self.vars:
271 f.write("VARS %s" % self.name)
272 for name, value in sorted(self.vars.items()):
273
274 f.write(" %s=\"%s\"" % (name, value.replace("\\", "\\\\").replace("\"", "\\\"")))
275 f.write("\n")
276 if progress is not None:
277 progress += 1
278
279
280 if self.prescript is not None:
281 f.write("SCRIPT PRE %s %s" % (self.name, self.prescript))
282 if self.prescriptargs:
283 f.write(" %s" % " ".join(self.prescriptargs))
284 f.write("\n")
285 if progress is not None:
286 progress += 1
287
288
289 if self.postscript is not None:
290 f.write("SCRIPT POST %s %s" % (self.name, self.postscript))
291 if self.postscriptargs:
292 f.write(" %s" % " ".join(self.postscriptargs))
293 f.write("\n")
294 if progress is not None:
295 progress += 1
296
297
298 if self.abort_dag_on_abortexitvalue is not None:
299 f.write("ABORT-DAG-ON %s %d" % (self.name, self.abort_dag_on_abortexitvalue))
300 if self.abort_dag_on_dagreturnvalue is not None:
301 f.write(" RETURN %d" % self.abort_dag_on_dagreturnvalue)
302 f.write("\n")
303 if progress is not None:
304 progress += 1
305
306
307 @property
309 """
310 Get the state of the node. One of 'wait', 'idle', 'run',
311 'abort', 'stop', 'success', 'fail'.
312
313 NOTE: this feature is not implemented at this time.
314 """
315 raise NotImplemented
316
317
318 -class DATA(JOB):
319 """
320 Representation of a Stork DATA node in a Condor DAG.
321 """
322 keyword = "DATA"
323
326 """
327 Representation of a SUBDAG EXTERNAL node in a Condor DAG.
328 """
329 keyword = "SUBDAG EXTERNAL"
330
333 """
334 Representation of a SPLICE node in a Condor DAG.
335 """
336
337
338
339
340
341 keyword = "SPLICE"
342
343
344 -class DAG(object):
345 """
346 Representation of the contents of a Condor DAG file.
347
348 BUGS: the semantics of the "+" special character in category names
349 is not understood. For now, it is an error for a node's category
350 to not be found verbatim in a MAXJOBS line. The "+" character is a
351 wildcard-like character used in the assignment of MAXJOBS values to
352 job categories in splices; see the Condor documentation for more
353 information.
354 """
355
356
357
358
359
360 dotpat = re.compile(r'^DOT\s+(?P<filename>\S+)(\s+(?P<options>.+))?', re.IGNORECASE)
361 jobpat = re.compile(r'^JOB\s+(?P<name>\S+)\s+(?P<filename>\S+)(\s+DIR\s+(?P<directory>\S+))?(\s+(?P<noop>NOOP))?(\s+(?P<done>DONE))?', re.IGNORECASE)
362 datapat = re.compile(r'^DATA\s+(?P<name>\S+)\s+(?P<filename>\S+)(\s+DIR\s+(?P<directory>\S+))?(\s+(?P<noop>NOOP))?(\s+(?P<done>DONE))?', re.IGNORECASE)
363 subdagpat = re.compile(r'^SUBDAG\s+EXTERNAL\s+(?P<name>\S+)\s+(?P<filename>\S+)(\s+DIR\s+(?P<directory>\S+))?(\s+(?P<noop>NOOP))?(\s+(?P<done>DONE))?', re.IGNORECASE)
364 splicepat = re.compile(r'^SPLICE\s+(?P<name>\S+)\s+(?P<filename>\S+)(\s+DIR\s+(?P<directory>\S+))?', re.IGNORECASE)
365 prioritypat = re.compile(r'^PRIORITY\s+(?P<name>\S+)\s+(?P<value>\S+)', re.IGNORECASE)
366 categorypat = re.compile(r'^CATEGORY\s+(?P<name>\S+)\s+(?P<category>\S+)', re.IGNORECASE)
367 retrypat = re.compile(r'^RETRY\s+(?P<name>\S+)\s+(?P<retries>\S+)(\s+UNLESS-EXIT\s+(?P<retry_unless_exit_value>\S+))?', re.IGNORECASE)
368 varspat = re.compile(r'^VARS\s+(?P<name>\S+)\s+(?P<vars>.+)', re.IGNORECASE)
369 varsvaluepat = re.compile(r'(?P<name>\S+)\s*=\s*"(?P<value>.*?)(?<!\\)"', re.IGNORECASE)
370 scriptpat = re.compile(r'^SCRIPT\s+(?P<type>(PRE)|(POST))\s(?P<name>\S+)\s+(?P<executable>\S+)(\s+(?P<arguments>.+))?', re.IGNORECASE)
371 abortdagonpat = re.compile(r'^ABORT-DAG-ON\s+(?P<name>\S+)\s+(?P<exitvalue>\S+)(\s+RETURN\s+(?P<returnvalue>\S+))?', re.IGNORECASE)
372 arcpat = re.compile(r'^PARENT\s+(?P<parents>.+?)\s+CHILD\s+(?P<children>.+)', re.IGNORECASE)
373 maxjobspat = re.compile(r'^MAXJOBS\s+(?P<category>\S+)\s+(?P<value>\S+)', re.IGNORECASE)
374 configpat = re.compile(r'^CONFIG\s+(?P<filename>\S+)', re.IGNORECASE)
375 nodestatuspat = re.compile(r'^NODE_STATUS_FILE\s+(?P<filename>\S+)(\s+(?P<updatetime>\S+))?', re.IGNORECASE)
376 jobstatepat = re.compile(r'^JOBSTATE_LOG\s+(?P<filename>\S+)', re.IGNORECASE)
377
378
379
380
381
382 donepat = re.compile(r'^DONE\s+(?P<name>\S+)', re.IGNORECASE)
383
384
385
386
387
389
390 self.nodes = {}
391
392
393
394
395
396
397 self.maxjobs = {}
398
399 self.config = None
400
401 self.dot = None
402
403 self.dotupdate = False
404 self.dotoverwrite = True
405
406 self.dotinclude = None
407
408 self.node_status_file = None
409 self.node_status_file_updatetime = None
410
411 self.jobstate_log = None
412
414 """
415 Rebuild the .nodes index. This is required if the names of
416 nodes are changed.
417 """
418
419
420
421 nodes = dict((node.name, node) for node in self.nodes.values())
422 if len(nodes) != len(self.nodes):
423 raise ValueError("node names are not unique")
424 self.nodes.clear()
425 self.nodes.update(nodes)
426
427 @classmethod
428 - def parse(cls, f, progress = None):
429 """
430 Parse the file-like object f as a Condor DAG file. Return
431 a DAG object. The file object must be iterable, yielding
432 one line of text of the DAG file in each iteration.
433
434 If the progress argument is not None, it should be a
435 callable object. This object will be called periodically
436 and passed the f argument, the current line number, and a
437 boolean indicating if parsing is complete. The boolean is
438 always False until parsing is complete, then the callable
439 will be invoked one last time with the final line count and
440 the boolean set to True.
441
442 Example:
443
444 >>> def progress(f, n, done):
445 ... print "reading %s: %d lines\\r" % (f.name, n),
446 ... if done:
447 ... print
448 ...
449 >>> dag = DAG.parse(open("pipeline.dag"), progress = progress)
450 """
451 progress = progress_wrapper(f, progress)
452 self = cls()
453 arcs = []
454 for n, line in enumerate(f, start = 1):
455
456 progress += 1
457
458 line = line.strip()
459 if not line or line.startswith("#"):
460 continue
461
462 m = self.jobpat.search(line)
463 if m is not None:
464 if m.group("name") in self.nodes:
465 raise ValueError("line %d: duplicate JOB %s" % (n, m.group("name")))
466 self.nodes[m.group("name")] = JOB(m.group("name"), m.group("filename"), directory = m.group("directory") and m.group("directory").strip("\""), done = bool(m.group("done")), noop = bool(m.group("noop")))
467 continue
468
469 m = self.datapat.search(line)
470 if m is not None:
471 if m.group("name") in self.nodes:
472 raise ValueError("line %d: duplicate DATA %s" % (n, m.group("name")))
473 self.nodes[m.group("name")] = DATA(m.group("name"), m.group("filename"), directory = m.group("directory") and m.group("directory").strip("\""), done = bool(m.group("done")), noop = bool(m.group("noop")))
474 continue
475
476 m = self.subdagpat.search(line)
477 if m is not None:
478 if m.group("name") in self.nodes:
479 raise ValueError("line %d: duplicate SUBDAG EXTERNAL %s" % (n, m.group("name")))
480 self.nodes[m.group("name")] = SUBDAG_EXTERNAL(m.group("name"), m.group("filename"), directory = m.group("directory") and m.group("directory").strip("\""), done = bool(m.group("done")), noop = bool(m.group("noop")))
481 continue
482
483 m = self.splicepat.search(line)
484 if m is not None:
485 if m.group("name") in self.nodes:
486 raise ValueError("line %d: duplicate SPLICE %s" % (n, m.group("name")))
487 self.nodes[m.group("name")] = SPLICE(m.group("name"), m.group("filename"), directory = m.group("directory") and m.group("directory").strip("\""))
488 continue
489
490 m = self.varspat.search(line)
491 if m is not None:
492 node = self.nodes[m.group("name")]
493
494 for name, value in self.varsvaluepat.findall(m.group("vars")):
495 if name in node.vars:
496 raise ValueError("line %d: multiple variable %s for %s %s" % (n, name, node.keyword, node.name))
497
498 node.vars[name] = value.replace("\\\\", "\\").replace("\\\"", "\"")
499 continue
500
501 m = self.arcpat.search(line)
502 if m is not None:
503 parents = m.group("parents").strip().split()
504 children = m.group("children").strip().split()
505 arcs.extend((parent, child) for parent in parents for child in children)
506 continue
507
508 m = self.retrypat.search(line)
509 if m is not None:
510 node = self.nodes[m.group("name")]
511 node.retry = int(m.group("retries"))
512 node.retry_unless_exit_value = m.group("retry_unless_exit_value")
513 continue
514
515 m = self.scriptpat.search(line)
516 if m is not None:
517 node = self.nodes[m.group("name")]
518 if m.group("type").upper() == "PRE":
519 if node.prescript is not None:
520 raise ValueError("line %d: multiple SCRIPT PRE for %s %s" % (n, node.keyword, node.name))
521 node.prescript = m.group("executable")
522 if m.group("arguments") is not None:
523 node.prescriptargs = m.group("arguments").split()
524 elif m.group("type").upper() == "POST":
525 if node.postscript is not None:
526 raise ValueError("line %d: multiple SCRIPT POST for %s %s" % (n, node.keyword, node.name))
527 node.postscript = m.group("executable")
528 if m.group("arguments") is not None:
529 node.postscriptargs = m.group("arguments").split()
530 else:
531 assert False
532 continue
533
534 m = self.prioritypat.search(line)
535 if m is not None:
536 node = self.nodes[m.group("name")]
537 if node.priority is not None:
538 raise ValueError("line %d: multiple PRIORITY for %s %s" % (n, node.keyword, node.name))
539 node.priority = int(m.group("value"))
540 continue
541
542 m = self.categorypat.search(line)
543 if m is not None:
544 self.nodes[m.group("name")].category = m.group("category")
545 continue
546
547 m = self.abortdagonpat.search(line)
548 if m is not None:
549 node = self.nodes[m.group("name")]
550 if node.abort_dag_on_abortexitvalue is not None:
551 raise ValueError("line %d: multiple ABORT-DAG-ON for %s %s" % (n, node.keyword, node.name))
552 node.abort_dag_on_abortexitvalue = int(m.group("exitvalue"))
553 if m.group("returnvalue") is not None:
554 node.abort_dag_on_dagreturnvalue = int(m.group("returnvalue"))
555 continue
556
557 m = self.maxjobspat.search(line)
558 if m is not None:
559 if m.group("category") in self.maxjobs:
560 raise ValueError("line %d: multiple MAXJOBS for category %s" % (n, m.group("category")))
561 self.maxjobs[m.group("category")] = int(m.group("value"))
562 continue
563
564 m = self.dotpat.search(line)
565 if m is not None:
566 self.dot = m.group("filename")
567 options = (m.group("options") or "").split()
568 while options:
569 option = options.pop(0).upper()
570 if option == "UPDATE":
571 self.dotupdate = True
572 elif option == "DONT-UPDATE":
573 self.dotupdate = False
574 elif option == "OVERWRITE":
575 self.dotoverwrite = True
576 elif option == "DONT-OVERWRITE":
577 self.dotoverwrite = False
578 elif option == "INCLUDE":
579 try:
580 self.dotinclude = options.pop(0)
581 except IndexError:
582 raise ValueError("line %d: missing filename for INCLUDE option of DOT" % n)
583 else:
584 raise ValueError("unrecognized option %s for DOT" % option)
585 continue
586
587 m = self.dotpat.search(line)
588 if m is not None:
589 if self.config is not None:
590 raise ValueError("line %d: multiple CONFIG lines in dag file" % n)
591 self.config = m.group("filename")
592 continue
593
594 m = self.nodestatuspat.search(line)
595 if m is not None:
596 if self.node_status_file is not None:
597 raise ValueError("line %d: multiple NODE_STATUS_FILE lines in dag file" % n)
598 self.node_status_file = m.group("filename")
599 if m.group(updatetime) is not None:
600 self.node_status_file_updatetime = int(m.group("updatetime"))
601 continue
602
603 m = self.jobstatepat.search(line)
604 if m is not None:
605
606
607 if self.jobstate_log is None:
608 self.jobstate_log = m.group("filename")
609 continue
610
611 raise ValueError("line %d: invalid line in dag file: %s" % (n, line))
612
613 del progress
614
615 for parent, child in arcs:
616 self.nodes[parent].children.add(self.nodes[child])
617 self.nodes[child].parents.add(self.nodes[parent])
618
619 for node in self.nodes.values():
620 if node.category is not None and node.category not in self.maxjobs:
621 self.maxjobs[node.category] = None
622
623 return self
624
625 @classmethod
627 """
628 Construct a new DAG object containing only the nodes whose
629 names are in nodenames.
630
631 Example:
632
633 >>> names_to_rerun = set(["triggergen"])
634 >>> dag = DAG.select_nodes_by_name(dag, names_to_rerun | dag.get_all_parent_names(names_to_rerun))
635
636 NOTE: the new DAG object is given references to the node
637 (JOB, DATA, etc.) objects in the original DAG, not copies
638 of them. Therefore, editing the node objects, for example
639 modifying their parent or child sets, will affect both
640 DAGs. To obtain an independent DAG with its own node
641 objects, make a deepcopy of the object that is returned
642 (see the copy module in the Python standard library for
643 more information).
644
645 Example:
646
647 >>> import copy
648 >>> dag = copy.deepcopy(DAG.select_nodes_by_name(dag, names_to_rerun | dag.get_all_parent_names(names_to_rerun)))
649 """
650 self = cls()
651 self.nodes = dict((name, node) for name, node in dag.nodes.items() if name in nodenames)
652 self.maxjobs = dict((category, dag.maxjobs[category]) for category in set(node.category for node in self.nodes.values() if node.category is not None))
653 self.config = dag.config
654 self.node_status_file = dag.node_status_file
655 self.node_status_file_updatetime = dag.node_status_file_updatetime
656 self.jobstate_log = dag.jobstate_log
657 self.dot = dag.dot
658 self.dotupdate = dag.dotupdate
659 self.dotoverwrite = dag.dotoverwrite
660 self.dotinclude = dag.dotinclude
661 return self
662
664 """
665 Trace the DAG backward from the parents of the nodes whose
666 names are given to the head nodes, inclusively, and return
667 the set of the names of all nodes visited.
668
669 Example:
670
671 >>> all_parents = dag.get_all_parent_names(["triggergen"])
672 """
673 all_parent_names = set()
674 nodes_to_scan = set(self.nodes[name] for name in names)
675 while nodes_to_scan:
676 node = nodes_to_scan.pop()
677 nodes_to_scan |= node.parents
678 all_parent_names |= set(parent.name for parent in node.parents)
679 return all_parent_names
680
682 """
683 Trace the DAG forward from the children of the nodes whose
684 names are given to the leaf nodes, inclusively, and return
685 the set of the names of all nodes visited.
686
687 Example:
688
689 >>> all_children = dag.get_all_child_names(["triggergen"])
690 """
691 all_child_names = set()
692 nodes_to_scan = set(self.nodes[name] for name in names)
693 while nodes_to_scan:
694 node = nodes_to_scan.pop()
695 nodes_to_scan |= node.children
696 all_child_names |= set(child.name for child in node.children)
697 return all_child_names
698
700 """
701 Check all graph edges for validity. Checks that each of
702 every node's children lists that node as a parent, and vice
703 versa, and that all nodes listed in the parent and child
704 sets of all nodes are contained in this DAG. Raises
705 ValueError if a problem is found, otherwise returns None.
706
707 Example:
708
709 >>> try:
710 ... dag.check_edges()
711 ... except ValueError as e:
712 ... print "edges are broken: %s" % str(e)
713 ... else:
714 ... print "all edges are OK"
715 ...
716 """
717 nodes = set(self.nodes.values())
718 for node in nodes:
719 for child in node.children:
720 if node not in child.parents:
721 raise ValueError("node %s is not a parent of its child %s" % (node.name, child.name))
722 if child not in nodes:
723 raise ValueError("node %s has child %s that is not in DAG" % (node.name, child.name))
724 for parent in node.parents:
725 if node not in parent.children:
726 raise ValueError("node %s is not a child of its parent %s" % (node.name, parent.name))
727 if parent not in nodes:
728 raise ValueError("node %s has parent %s that is not in DAG" % (node.name, parent.name))
729
731 """
732 Parse the file-like object f as a rescue DAG, using the
733 DONE lines therein to set the job states of this DAG.
734
735 In the past, rescue DAGs were full copies of the original
736 DAG with the word DONE added to the JOB lines of completed
737 jobs. In version 7.7.2 of Condor, the default format of
738 rescue DAGs was changed to a condensed format consisting of
739 only the names of completed jobs and the number of retries
740 remaining for incomplete jobs. Currently Condor still
741 supports the original rescue DAG format, but the user must
742 set the DAGMAN_WRITE_PARTIAL_RESCUE config variable to
743 false to obtain one. This module does not directly support
744 the new format, however this method allows a new-style
745 rescue DAG to be parsed to set the states of the jobs in a
746 DAG. This, in effect, converts a new-style rescue DAG to
747 an old-style rescue DAG, allowing the result to be
748 manipulated as before.
749
750 If the progress argument is not None, it should be a
751 callable object. This object will be called periodically
752 and passed the f argument, the current line number, and a
753 boolean indicating if parsing is complete. The boolean is
754 always False until parsing is complete, then the callable
755 will be invoked one last time with the final line count and
756 the boolean set to True.
757 """
758
759 for job in self.nodes.values():
760 job.done = False
761
762 progress = progress_wrapper(f, progress)
763 for n, line in enumerate(f):
764
765 n += 1
766
767 progress += 1
768
769 line = line.strip()
770 if not line or line.startswith("#"):
771 continue
772
773 m = self.donepat.search(line)
774 if m is not None:
775 self.nodes[m.group("name")].done = True
776 continue
777
778 m = self.retrypat.search(line)
779 if m is not None:
780 node = self.nodes[m.group("name")]
781 node.retry = int(m.group("retries"))
782 node.retry_unless_exit_value = m.group("retry_unless_exit_value")
783 continue
784
785 raise ValueError("line %d: invalid line in rescue file: %s" % (n, line))
786
787 del progress
788
789 - def write(self, f, progress = None, rescue = None):
790 """
791 Write the DAG to the file-like object f. The object must
792 provide a .write() method. In the special case that the
793 optional rescue argument is not None (see below) then f can
794 be set to None and no DAG file will be written (just the
795 rescue DAG will be written).
796
797 If the progress argument is not None, it should be a
798 callable object. This object will be called periodically
799 and passed the f argument, the current line number, and a
800 boolean indicating if writing is complete. The boolean is
801 always False until writing is complete, then the callable
802 will be invoked one last time with the final line count and
803 the boolean set to True.
804
805 Example:
806
807 >>> def progress(f, n, done):
808 ... print "writing %s: %d lines\\r" % (f.name, n),
809 ... if done:
810 ... print
811 ...
812 >>> dag.write(open("pipeline.dag", "w"), progress = progress)
813
814 NOTE: when writing PARENT/CHILD graph edges, this method
815 will silently skip any node names that are not in this
816 DAG's graph. This is a convenience to simplify writing
817 DAGs constructed by the .select_nodes_by_name() class
818 method. If one wishes to check for broken parent/child
819 links before writing the DAG use the .check_edges() method.
820
821 If the optional rescue argument is not None, it must be a
822 file-like object providing a .write() method and the DONE
823 state of jobs will be written to this file instead of the
824 .dag (in the .dag all jobs will be marked not done).
825
826 Example:
827
828 >>> dag.write(open("pipeline.dag", "w"), rescue = open("pipeline.dag.rescue001", "w"))
829
830 NOTE: it is left as an exercise for the calling code to
831 ensure the name chosen for the rescue file is consistent
832 with the naming convention assumed by condor_dagman when it
833 starts up.
834 """
835
836 progress = progress_wrapper(f, progress)
837
838
839
840 if f is None and rescue is not None:
841 f = nofile()
842
843
844 if self.dot is not None:
845 f.write("DOT %s" % self.dot)
846 if self.dotupdate:
847 f.write(" UPDATE")
848 if not self.dotoverwrite:
849 f.write(" DONT-OVERWRITE")
850 if self.dotinclude is not None:
851 f.write(" INCLUDE %s" % self.dotinclude)
852 f.write("\n")
853 progress += 1
854
855
856 if self.config is not None:
857 f.write("CONFIG %s\n" % self.config)
858 progress += 1
859
860
861 if self.node_status_file is not None:
862 f.write("NODE_STATUS_FILE %s" % self.node_status_file)
863 if self.node_status_file_updatetime is not None:
864 f.write(" %d" % self.node_status_file_updatetime)
865 f.write("\n")
866 progress += 1
867
868
869 if self.jobstate_log is not None:
870 f.write("JOBSTATE_LOG %s\n" % self.jobstate_log)
871 progress += 1
872
873
874 if set(node.category for node in self.nodes.values() if node.category is not None) - set(self.maxjobs):
875 raise ValueError("no MAXJOBS statement(s) for node category(ies) %s" % ", ".join(sorted(set(node.category for node in self.nodes.values() if node.category is not None) - set(self.maxjobs))))
876 for name, value in sorted(self.maxjobs.items()):
877 if value is not None:
878 f.write("MAXJOBS %s %d\n" % (name, value))
879 progress += 1
880
881
882 for name, node in sorted(self.nodes.items()):
883 if rescue is not None:
884 if node.done:
885 rescue.write("DONE %s\n" % node.name)
886
887 done = node.done
888 node.done = False
889 node.write(f, progress = progress)
890 if rescue is not None:
891
892 node.done = done
893
894
895 names = set(self.nodes)
896 parents_of = {}
897 for name, node in self.nodes.items():
898 parents_of.setdefault(frozenset(child.name for child in node.children) & names, set()).add(node.name)
899 for children, parents in parents_of.items():
900 if children:
901 f.write("PARENT %s CHILD %s\n" % (" ".join(sorted(parents)), " ".join(sorted(children))))
902 progress += 1
903
904
905 del progress
906
907 - def dot_source(self, title = "DAG", rename = False, colour = "black", bgcolour = "#a3a3a3", statecolours = {'wait': 'yellow', 'idle': 'yellow', 'run': 'lightblue', 'abort': 'red', 'stop': 'red', 'success': 'green', 'fail': 'red'}):
908 """
909 Return a string containing DOT code to generate a
910 visualization of the DAG graph. See
911 http://www.graphviz.org for more information.
912
913 title provides a title for the graph. If rename is True,
914 instead of using the names of the nodes for the node names
915 in the graph, numbers will be used instead. The numbers
916 are assigned to the nodes in alphabetical order by node
917 name. This might be required if the nodes have names that
918 are incompatible with the DOT syntax.
919
920 colour and bgcolour set the outline colour of the graph
921 nodes and the background colour for the graph respectively.
922 statecolours is a dictionary mapping node state (see the
923 .state attribute of the JOB class and its derivatives) to a
924 colour. Set statecolours to None to disable state-based
925 colouring of graph nodes.
926
927 Example:
928
929 >>> print(dag.dot_source(statecolours = None))
930
931 BUGS: the JOB class does not implement the ability to
932 retrieve the job state at this time, therefore it is always
933 necessary to set statecolours to None. This might change
934 in the future.
935 """
936
937
938 if rename:
939 namemap = dict((name, str(n)) for n, name in enumerate(sorted(self.nodes), start = 1))
940 else:
941 namemap = dict((name, name) for name in self.nodes)
942
943
944
945 code = 'digraph "%s" {\nnode [color="%s", href="\\N"];\ngraph [bgcolor="%s"];\n' % (title, colour, bgcolour)
946 for node in self.nodes.values():
947 if statecolours is not None:
948 code += '"%s"[color="%s"];\n' % (namemap[node.name], statecolours[node.state])
949 for child in node.children:
950 code += '"%s" -> "%s";\n' % (namemap[node.name], namemap[child.name])
951 code += '}\n'
952
953
954
955 return code
956