Package glue :: Module dagfile :: Class DAG
[hide private]
[frames] | no frames]

Class DAG

source code

object --+
         |
        DAG

Representation of the contents of a Condor DAG file.

BUGS: the semantics of the "+" special character in category names is not understood. For now, it is an error for a node's category to not be found verbatim in a MAXJOBS line. The "+" character is a wildcard-like character used in the assignment of MAXJOBS values to job categories in splices; see the Condor documentation for more information.

Instance Methods [hide private]
 
__init__(self)
x.__init__(...) initializes x; see help(type(x)) for signature
source code
 
check_edges(self)
Check all graph edges for validity.
source code
 
dot_source(self, title='DAG', rename=False, colour='black', bgcolour='#a3a3a3', statecolours={'abort': 'red', 'fail': 'red', 'idle': 'yellow', 'run': 'ligh...)
Return a string containing DOT code to generate a visualization of the DAG graph.
source code
 
get_all_child_names(self, names)
Trace the DAG forward from the children of the nodes whose names are given to the leaf nodes, inclusively, and return the set of the names of all nodes visited.
source code
 
get_all_parent_names(self, names)
Trace the DAG backward from the parents of the nodes whose names are given to the head nodes, inclusively, and return the set of the names of all nodes visited.
source code
 
load_rescue(self, f, progress=None)
Parse the file-like object f as a rescue DAG, using the DONE lines therein to set the job states of this DAG.
source code
 
reindex(self)
Rebuild the .nodes index.
source code
 
write(self, f, progress=None, rescue=None)
Write the DAG to the file-like object f.
source code

Inherited from object: __delattr__, __format__, __getattribute__, __hash__, __new__, __reduce__, __reduce_ex__, __repr__, __setattr__, __sizeof__, __str__, __subclasshook__

Class Methods [hide private]
 
parse(cls, f, progress=None)
Parse the file-like object f as a Condor DAG file.
source code
 
select_nodes_by_name(cls, dag, nodenames)
Construct a new DAG object containing only the nodes whose names are in nodenames.
source code
Class Variables [hide private]
  abortdagonpat = re.compile(r'(?i)^ABORT-DAG-ON\s+(?P<name>\S+)...
  arcpat = re.compile(r'(?i)^PARENT\s+(?P<parents>.+?)\s+CHILD\s...
  categorypat = re.compile(r'(?i)^CATEGORY\s+(?P<name>\S+)\s+(?P...
  configpat = re.compile(r'(?i)^CONFIG\s+(?P<filename>\S+)')
  datapat = re.compile(r'(?i)^DATA\s+(?P<name>\S+)\s+(?P<filenam...
  donepat = re.compile(r'(?i)^DONE\s+(?P<name>\S+)')
  dotpat = re.compile(r'(?i)^DOT\s+(?P<filename>\S+)(\s+(?P<opti...
  jobpat = re.compile(r'(?i)^JOB\s+(?P<name>\S+)\s+(?P<filename>...
  jobstatepat = re.compile(r'(?i)^JOBSTATE_LOG\s+(?P<filename>\S...
  maxjobspat = re.compile(r'(?i)^MAXJOBS\s+(?P<category>\S+)\s+(...
  nodestatuspat = re.compile(r'(?i)^NODE_STATUS_FILE\s+(?P<filen...
  prioritypat = re.compile(r'(?i)^PRIORITY\s+(?P<name>\S+)\s+(?P...
  retrypat = re.compile(r'(?i)^RETRY\s+(?P<name>\S+)\s+(?P<retri...
  scriptpat = re.compile(r'(?i)^SCRIPT\s+(?P<type>(PRE)|(POST))\...
  splicepat = re.compile(r'(?i)^SPLICE\s+(?P<name>\S+)\s+(?P<fil...
  subdagpat = re.compile(r'(?i)^SUBDAG\s+EXTERNAL\s+(?P<name>\S+...
  varspat = re.compile(r'(?i)^VARS\s+(?P<name>\S+)\s+(?P<vars>.+)')
  varsvaluepat = re.compile(r'(?i)(?P<name>\S+)\s*=\s*"(?P<value...
Properties [hide private]

Inherited from object: __class__

Method Details [hide private]

__init__(self)
(Constructor)

source code 

x.__init__(...) initializes x; see help(type(x)) for signature

Overrides: object.__init__
(inherited documentation)

check_edges(self)

source code 

Check all graph edges for validity. Checks that each of every node's children lists that node as a parent, and vice versa, and that all nodes listed in the parent and child sets of all nodes are contained in this DAG. Raises ValueError if a problem is found, otherwise returns None.

Example:

>>> try:
...     dag.check_edges()
... except ValueError as e:
...     print "edges are broken: %s" % str(e)
... else:
...     print "all edges are OK"
...

dot_source(self, title='DAG', rename=False, colour='black', bgcolour='#a3a3a3', statecolours={'abort': 'red', 'fail': 'red', 'idle': 'yellow', 'run': 'ligh...)

source code 

Return a string containing DOT code to generate a visualization of the DAG graph. See http://www.graphviz.org for more information.

title provides a title for the graph. If rename is True, instead of using the names of the nodes for the node names in the graph, numbers will be used instead. The numbers are assigned to the nodes in alphabetical order by node name. This might be required if the nodes have names that are incompatible with the DOT syntax.

colour and bgcolour set the outline colour of the graph nodes and the background colour for the graph respectively. statecolours is a dictionary mapping node state (see the .state attribute of the JOB class and its derivatives) to a colour. Set statecolours to None to disable state-based colouring of graph nodes.

Example:

>>> print(dag.dot_source(statecolours = None))

BUGS: the JOB class does not implement the ability to retrieve the job state at this time, therefore it is always necessary to set statecolours to None. This might change in the future.

get_all_child_names(self, names)

source code 

Trace the DAG forward from the children of the nodes whose names are given to the leaf nodes, inclusively, and return the set of the names of all nodes visited.

Example:

>>> all_children = dag.get_all_child_names(["triggergen"])

get_all_parent_names(self, names)

source code 

Trace the DAG backward from the parents of the nodes whose names are given to the head nodes, inclusively, and return the set of the names of all nodes visited.

Example:

>>> all_parents = dag.get_all_parent_names(["triggergen"])

load_rescue(self, f, progress=None)

source code 

Parse the file-like object f as a rescue DAG, using the DONE lines therein to set the job states of this DAG.

In the past, rescue DAGs were full copies of the original DAG with the word DONE added to the JOB lines of completed jobs. In version 7.7.2 of Condor, the default format of rescue DAGs was changed to a condensed format consisting of only the names of completed jobs and the number of retries remaining for incomplete jobs. Currently Condor still supports the original rescue DAG format, but the user must set the DAGMAN_WRITE_PARTIAL_RESCUE config variable to false to obtain one. This module does not directly support the new format, however this method allows a new-style rescue DAG to be parsed to set the states of the jobs in a DAG. This, in effect, converts a new-style rescue DAG to an old-style rescue DAG, allowing the result to be manipulated as before.

If the progress argument is not None, it should be a callable object. This object will be called periodically and passed the f argument, the current line number, and a boolean indicating if parsing is complete. The boolean is always False until parsing is complete, then the callable will be invoked one last time with the final line count and the boolean set to True.

parse(cls, f, progress=None)
Class Method

source code 

Parse the file-like object f as a Condor DAG file. Return a DAG object. The file object must be iterable, yielding one line of text of the DAG file in each iteration.

If the progress argument is not None, it should be a callable object. This object will be called periodically and passed the f argument, the current line number, and a boolean indicating if parsing is complete. The boolean is always False until parsing is complete, then the callable will be invoked one last time with the final line count and the boolean set to True.

Example:

>>> def progress(f, n, done):
...     print "reading %s: %d lines\r" % (f.name, n),
...     if done:
...             print
...
>>> dag = DAG.parse(open("pipeline.dag"), progress = progress)

reindex(self)

source code 

Rebuild the .nodes index. This is required if the names of nodes are changed.

select_nodes_by_name(cls, dag, nodenames)
Class Method

source code 

Construct a new DAG object containing only the nodes whose names are in nodenames.

Example:

>>> names_to_rerun = set(["triggergen"])
>>> dag = DAG.select_nodes_by_name(dag, names_to_rerun | dag.get_all_parent_names(names_to_rerun))

NOTE: the new DAG object is given references to the node (JOB, DATA, etc.) objects in the original DAG, not copies of them. Therefore, editing the node objects, for example modifying their parent or child sets, will affect both DAGs. To obtain an independent DAG with its own node objects, make a deepcopy of the object that is returned (see the copy module in the Python standard library for more information).

Example:

>>> import copy
>>> dag = copy.deepcopy(DAG.select_nodes_by_name(dag, names_to_rerun | dag.get_all_parent_names(names_to_rerun)))

write(self, f, progress=None, rescue=None)

source code 

Write the DAG to the file-like object f. The object must provide a .write() method. In the special case that the optional rescue argument is not None (see below) then f can be set to None and no DAG file will be written (just the rescue DAG will be written).

If the progress argument is not None, it should be a callable object. This object will be called periodically and passed the f argument, the current line number, and a boolean indicating if writing is complete. The boolean is always False until writing is complete, then the callable will be invoked one last time with the final line count and the boolean set to True.

Example:

>>> def progress(f, n, done):
...     print "writing %s: %d lines\r" % (f.name, n),
...     if done:
...             print
...
>>> dag.write(open("pipeline.dag", "w"), progress = progress)

NOTE: when writing PARENT/CHILD graph edges, this method will silently skip any node names that are not in this DAG's graph. This is a convenience to simplify writing DAGs constructed by the .select_nodes_by_name() class method. If one wishes to check for broken parent/child links before writing the DAG use the .check_edges() method.

If the optional rescue argument is not None, it must be a file-like object providing a .write() method and the DONE state of jobs will be written to this file instead of the .dag (in the .dag all jobs will be marked not done).

Example:

>>> dag.write(open("pipeline.dag", "w"), rescue = open("pipeline.dag.rescue001", "w"))

NOTE: it is left as an exercise for the calling code to ensure the name chosen for the rescue file is consistent with the naming convention assumed by condor_dagman when it starts up.


Class Variable Details [hide private]

abortdagonpat

Value:
re.compile(r'(?i)^ABORT-DAG-ON\s+(?P<name>\S+)\s+(?P<exitvalue>\S+)(\s\
+RETURN\s+(?P<returnvalue>\S+))?')

arcpat

Value:
re.compile(r'(?i)^PARENT\s+(?P<parents>.+?)\s+CHILD\s+(?P<children>.+)\
')

categorypat

Value:
re.compile(r'(?i)^CATEGORY\s+(?P<name>\S+)\s+(?P<category>\S+)')

datapat

Value:
re.compile(r'(?i)^DATA\s+(?P<name>\S+)\s+(?P<filename>\S+)(\s+DIR\s+(?\
P<directory>\S+))?(\s+(?P<noop>NOOP))?(\s+(?P<done>DONE))?')

dotpat

Value:
re.compile(r'(?i)^DOT\s+(?P<filename>\S+)(\s+(?P<options>.+))?')

jobpat

Value:
re.compile(r'(?i)^JOB\s+(?P<name>\S+)\s+(?P<filename>\S+)(\s+DIR\s+(?P\
<directory>\S+))?(\s+(?P<noop>NOOP))?(\s+(?P<done>DONE))?')

jobstatepat

Value:
re.compile(r'(?i)^JOBSTATE_LOG\s+(?P<filename>\S+)')

maxjobspat

Value:
re.compile(r'(?i)^MAXJOBS\s+(?P<category>\S+)\s+(?P<value>\S+)')

nodestatuspat

Value:
re.compile(r'(?i)^NODE_STATUS_FILE\s+(?P<filename>\S+)(\s+(?P<updateti\
me>\S+))?')

prioritypat

Value:
re.compile(r'(?i)^PRIORITY\s+(?P<name>\S+)\s+(?P<value>\S+)')

retrypat

Value:
re.compile(r'(?i)^RETRY\s+(?P<name>\S+)\s+(?P<retries>\S+)(\s+UNLESS-E\
XIT\s+(?P<retry_unless_exit_value>\S+))?')

scriptpat

Value:
re.compile(r'(?i)^SCRIPT\s+(?P<type>(PRE)|(POST))\s(?P<name>\S+)\s+(?P\
<executable>\S+)(\s+(?P<arguments>.+))?')

splicepat

Value:
re.compile(r'(?i)^SPLICE\s+(?P<name>\S+)\s+(?P<filename>\S+)(\s+DIR\s+\
(?P<directory>\S+))?')

subdagpat

Value:
re.compile(r'(?i)^SUBDAG\s+EXTERNAL\s+(?P<name>\S+)\s+(?P<filename>\S+\
)(\s+DIR\s+(?P<directory>\S+))?(\s+(?P<noop>NOOP))?(\s+(?P<done>DONE))\
?')

varsvaluepat

Value:
re.compile(r'(?i)(?P<name>\S+)\s*=\s*"(?P<value>.*?)(?<!\\)"')