1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 """ This module provides thin wrappers around Pegasus.DAX3 functionality that
27 provides additional abstraction and argument handling.
28 """
29 import Pegasus.DAX3 as dax
30
32 """ Container of common methods for setting pegasus profile information
33 on Executables and nodes. This class expects to be inherited from
34 and for a add_profile method to be implemented.
35 """
37 """ Set the amount of memory that is required in megabytes
38 """
39 self.add_profile('condor', 'request_memory', '%sM' % size)
40
42 """ Set the amount of storage required in megabytes
43 """
44 self.add_profile('condor', 'request_disk', '%sM' % size)
45
48
51
54
57
60
62 """ The workflow representation of an Executable
63 """
64 id = 0
65 - def __init__(self, name, namespace=None, os='linux',
66 arch='x86_64', installed=True, version=None):
77
78 - def add_pfn(self, url, site='local'):
79 self._dax_executable.PFN(url, site)
80 self.pfns[site] = url
81
83 return self.pfns[site]
84
86 dax.addExecutable(self._dax_executable)
87
89 """ Add profile information to this executable
90 """
91 entry = dax.Profile(namespace, key, value)
92 self._dax_executable.addProfile(entry)
93
94 -class Node(ProfileShortcuts):
96 self.in_workflow = False
97 self.executable=executable
98 self._inputs = []
99 self._outputs = []
100 self._dax_node = dax.Job(name=executable.logical_name,
101 version = executable.version,
102 namespace=executable.namespace)
103 self._args = []
104 self._options = []
105
107 """ Add an argument
108 """
109 if not isinstance(arg, File):
110 arg = str(arg)
111
112 self._args += [arg]
113
114 - def add_opt(self, opt, value=None):
115 """ Add a option
116 """
117 if value:
118 if not isinstance(value, File):
119 value = str(value)
120 self._options += [opt, value]
121 else:
122 self._options += [opt]
123
124
130
132 """ Add as destination of output data
133 """
134 self._outputs += [out]
135 out.node = self
136 out._set_as_output_of(self)
137
138
144
150
152 """ Add an option that determines a list of outputs
153 """
154 self.add_opt(opt)
155 for out in outputs:
156 self.add_opt(out)
157 self._add_output(out)
158
166
172
178
180 """ Add an option and return a new file handle
181 """
182 fil = File(name)
183 self.add_output_opt(opt, fil)
184 return fil
185
186
188 """ Add profile information to this node at the DAX level
189 """
190 entry = dax.Profile(namespace, key, value)
191 self._dax_node.addProfile(entry)
192
194 args = self._args + self._options
195 self._dax_node.addArguments(*args)
196
198 """
199 """
200 - def __init__(self, name='my_workflow'):
201 self.name = name
202 self._adag = dax.ADAG(name)
203
204 self._inputs = []
205 self._outputs = []
206 self._executables = []
207
209 """ Add a node to this workflow
210
211 This function adds nodes to the workflow. It also determines
212 parent/child relations from the DataStorage inputs to this job.
213
214 Parameters
215 ----------
216 node : Node
217 A node that should be exectuded as part of this workflow.
218 """
219 node._finalize()
220 node.in_workflow = True
221 self._adag.addJob(node._dax_node)
222
223
224
225 for inp in node._inputs:
226 if inp.node is not None and inp.node.in_workflow:
227 parent = inp.node._dax_node
228 child = node._dax_node
229 dep = dax.Dependency(parent=parent, child=child)
230 self._adag.addDependency(dep)
231
232 elif inp.node is not None and not inp.node.in_workflow:
233 raise ValueError('Parents of this node must be added to the '
234 'workflow first.')
235
236 elif inp.node is None and inp.workflow_input is False:
237 self._inputs += [inp]
238 inp.workflow_input = True
239
240
241 self._outputs += node._outputs
242
243
244 if not node.executable.in_workflow:
245 node.executable.in_workflow = True
246 self._executables += [node.executable]
247
248 return self
249
250 __iadd__ = add_node
251
252 - def save(self, filename):
253 """ Write this workflow to DAX file
254 """
255 f = open(filename, "w")
256 self._adag.writeXML(f)
257
259 """ A workflow representation of a place to store and read data from.
260
261 The abstract representation of a place to store and read data from. This
262 can include files, database, or remote connections. This object is
263 used as a handle to pass between functions, and is used a way to logically
264 represent the order operation on the physical data.
265 """
267 self.name = name
268 self.node = None
269 self.workflow_input = False
270
273
276
279
280 -class File(DataStorage, dax.File):
281 """ The workflow representation of a physical file
282
283 An object that represents a file from the perspective of setting up a
284 workflow. The file may or may not exist at the time of workflow generation.
285 If it does, this is represented by containing a physical file name (PFN).
286 A storage path is also available to indicate the desired final
287 destination of this file.
288 """
293
296
301 node._dax_node.uses(self, link=dax.Link.OUTPUT, register=False,
302 transfer=True)
304 return '%s %s pool="%s"' % (self.name, self.storage_path, 'local')
305
308
311