1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18   
 19   
 20   
 21   
 22   
 23   
 24   
 25   
 26  """ This module provides thin wrappers around Pegasus.DAX3 functionality that 
 27  provides additional abstraction and argument handling. 
 28  """ 
 29  import Pegasus.DAX3 as dax 
 30   
 32      """ Container of common methods for setting pegasus profile information 
 33      on Executables and nodes. This class expects to be inherited from 
 34      and for a add_profile method to be implemented. 
 35      """ 
 37          """ Set the amount of memory that is required in megabytes 
 38          """ 
 39          self.add_profile('condor', 'request_memory', '%sM' % size) 
  40            
 42          """ Set the amount of storage required in megabytes 
 43          """ 
 44          self.add_profile('condor', 'request_disk', '%sM' % size) 
  45           
 48           
 51           
 54           
 57           
  60   
 62      """ The workflow representation of an Executable  
 63      """ 
 64      id = 0 
 65 -    def __init__(self, name, namespace=None, os='linux',  
 66                         arch='x86_64', installed=True, version=None): 
  77           
 78 -    def add_pfn(self, url, site='local'): 
  79          self._dax_executable.PFN(url, site) 
 80          self.pfns[site] = url 
  81           
 83          return self.pfns[site] 
  84           
 86          dax.addExecutable(self._dax_executable) 
  87           
 89          """ Add profile information to this executable 
 90          """ 
 91          entry = dax.Profile(namespace, key, value) 
 92          self._dax_executable.addProfile(entry)   
   93    
 94 -class Node(ProfileShortcuts):     
  96          self.in_workflow = False    
 97          self.executable=executable             
 98          self._inputs = [] 
 99          self._outputs = []         
100          self._dax_node = dax.Job(name=executable.logical_name, 
101                                   version = executable.version,  
102                                   namespace=executable.namespace) 
103          self._args = [] 
104          self._options = [] 
 105           
107          """ Add an argument 
108          """ 
109          if not isinstance(arg, File): 
110              arg = str(arg) 
111           
112          self._args += [arg] 
 113           
114 -    def add_opt(self, opt, value=None): 
 115          """ Add a option 
116          """ 
117          if value: 
118              if not isinstance(value, File): 
119                  value = str(value) 
120              self._options += [opt, value] 
121          else: 
122              self._options += [opt] 
 123       
124       
130         
132          """ Add as destination of output data 
133          """ 
134          self._outputs += [out] 
135          out.node = self 
136          out._set_as_output_of(self) 
 137           
138       
144           
150           
152          """ Add an option that determines a list of outputs 
153          """ 
154          self.add_opt(opt) 
155          for out in outputs: 
156              self.add_opt(out) 
157              self._add_output(out) 
 158               
166           
172           
178           
180          """ Add an option and return a new file handle 
181          """ 
182          fil = File(name) 
183          self.add_output_opt(opt, fil) 
184          return fil 
 185   
186       
188          """ Add profile information to this node at the DAX level 
189          """ 
190          entry = dax.Profile(namespace, key, value) 
191          self._dax_node.addProfile(entry) 
 192           
194          args = self._args + self._options 
195          self._dax_node.addArguments(*args) 
 196           
198      """  
199      """ 
200 -    def __init__(self, name='my_workflow'): 
 201          self.name = name 
202          self._adag = dax.ADAG(name) 
203           
204          self._inputs = [] 
205          self._outputs = [] 
206          self._executables = [] 
 207           
209          """ Add a node to this workflow 
210           
211          This function adds nodes to the workflow. It also determines 
212          parent/child relations from the DataStorage inputs to this job.  
213           
214          Parameters 
215          ---------- 
216          node : Node 
217              A node that should be exectuded as part of this workflow. 
218          """ 
219          node._finalize() 
220          node.in_workflow = True 
221          self._adag.addJob(node._dax_node) 
222    
223           
224           
225          for inp in node._inputs: 
226              if inp.node is not None and inp.node.in_workflow: 
227                  parent = inp.node._dax_node 
228                  child = node._dax_node 
229                  dep = dax.Dependency(parent=parent, child=child) 
230                  self._adag.addDependency(dep)     
231                               
232              elif inp.node is not None and not inp.node.in_workflow: 
233                  raise ValueError('Parents of this node must be added to the ' 
234                                   'workflow first.')    
235                                     
236              elif inp.node is None and inp.workflow_input is False: 
237                  self._inputs += [inp] 
238                  inp.workflow_input = True 
239               
240           
241          self._outputs += node._outputs   
242               
243           
244          if not node.executable.in_workflow: 
245              node.executable.in_workflow = True 
246              self._executables += [node.executable] 
247           
248          return self 
 249           
250      __iadd__ = add_node 
251         
252 -    def save(self, filename): 
 253          """ Write this workflow to DAX file  
254          """ 
255          f = open(filename, "w") 
256          self._adag.writeXML(f) 
  257   
259      """ A workflow representation of a place to store and read data from.  
260       
261      The abstract representation of a place to store and read data from. This  
262      can include files, database, or remote connections. This object is 
263      used as a handle to pass between functions, and is used a way to logically 
264      represent the order operation on the physical data.  
265      """ 
267          self.name = name       
268          self.node = None 
269          self.workflow_input = False 
 270           
273           
276           
 279           
280 -class File(DataStorage, dax.File): 
 281      """ The workflow representation of a physical file 
282       
283      An object that represents a file from the perspective of setting up a  
284      workflow. The file may or may not exist at the time of workflow generation. 
285      If it does, this is represented by containing a physical file name (PFN). 
286      A storage path is also available to indicate the desired final  
287      destination of this file. 
288      """ 
293   
296           
301          node._dax_node.uses(self, link=dax.Link.OUTPUT, register=False,  
302                                                          transfer=True)                                                        
 304          return '%s %s pool="%s"' % (self.name, self.storage_path, 'local')  
 305           
 308       
311