1 """
2 This modules contains objects that make it simple for the user to
3 create python scripts that build Condor DAGs to run code on the LSC
4 Data Grid.
5
6 This file is part of the Grid LSC User Environment (GLUE)
7
8 GLUE is free software: you can redistribute it and/or modify it under the
9 terms of the GNU General Public License as published by the Free Software
10 Foundation, either version 3 of the License, or (at your option) any later
11 version.
12
13 This program is distributed in the hope that it will be useful, but WITHOUT
14 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
16 details.
17
18 You should have received a copy of the GNU General Public License along with
19 this program. If not, see <http://www.gnu.org/licenses/>.
20 """
21
22 from __future__ import print_function
23 __author__ = 'Duncan Brown <duncan@gravity.phys.uwm.edu>'
24 from glue import git_version
25 __date__ = git_version.date
26 __version__ = git_version.id
27
28 import os
29 import sys
30 import string, re
31 import time
32 import random
33 import math
34 from six.moves import urllib
35 import stat
36 import socket
37 import itertools
38 import glue.segments
39 from hashlib import md5
40
41 try:
42 from cjson import decode
43 except ImportError:
44 from json import loads as decode
45
46 try:
47 import six.moves.http_client
48 except:
49 pass
50
51 import StringIO
52 import six.moves.configparser
53
54
55
56 PEGASUS_SCRIPT="""#!/bin/bash
57 TMP_EXEC_DIR=%s
58 IHOPE_RUN_DIR=%s
59 UEBER_CONCRETE_DAG=%s
60 usage()
61 {
62 echo "Usage: pegasus_submit_dag [-f] [-h]"
63 echo
64 echo " -f, --force Force re-plan and resubmit of DAX"
65 echo " -h, --help Print this message"
66 echo
67 }
68
69 if [ $# -gt 1 ] ; then
70 usage
71 exit 1
72 fi
73
74 if [ $# -eq 1 ] ; then
75 if [ $1 = "-h" ] || [ $1 = "--help" ] ; then
76 usage
77 exit 0
78 fi
79 if [ $1 = "-f" ] || [ $1 = "--force" ] ; then
80 echo "WARNING: removing any existing workflow files!"
81 pegasus-remove ${TMP_EXEC_DIR}/.
82 echo "Sleeping for 60 seconds to give running DAGs chance to exit..."
83 sleep 60
84 rm -rf ${TMP_EXEC_DIR}
85 mkdir ${TMP_EXEC_DIR}
86 chmod 755 ${TMP_EXEC_DIR}
87 else
88 usage
89 exit 1
90 fi
91 fi
92 if [ -f ${TMP_EXEC_DIR}/${UEBER_CONCRETE_DAG}.lock ] ; then
93 echo
94 echo "ERROR: A dagman lock file already exists which may indicate that your"
95 echo "workflow is already running. Please check the status of your DAX with"
96 echo
97 echo " pegasus-status ${TMP_EXEC_DIR}/."
98 echo
99 echo "If necessary, you can remove the workflow with"
100 echo
101 echo " pegasus-remove ${TMP_EXEC_DIR}/."
102 echo
103 echo "You can also run"
104 echo
105 echo " pegasus_submit_dax -f"
106 echo
107 echo "to force the workflow to re-run. This will remove any existing"
108 echo "workflow log and error files. If these need to be preserved,"
109 echo "you must back them up first before running with -f."
110 echo
111 exit 1
112 fi
113
114 # The theory here is to find the longest living
115 # proxy certificate and copy it into the default
116 # location so that workflows can find it even after
117 # the user has logged out. This is necessary because
118 # Condor and condor_dagman do not yet properly cache
119 # and manage credentials to make them available to all
120 # jobs in the workflow, and other tools make assumptions
121 # about where a proxy, service, or user certificate is located
122 # on the file system and do not properly find valid
123 # existing credentials using the proper GSI search algorithm.
124 #
125 # This is not a great solution because there can be quite
126 # valid reasons to have multiple credentials with different
127 # lifetimes, and it presents a security risk to circumvent
128 # the state and move files around without knowing why the
129 # state is the way it is.
130 #
131 # But to support LIGO users we are doing it at this time
132 # until better tooling is available.
133
134 #
135 # Assumes grid-proxy-info is in PATH
136
137 if ! `/usr/bin/which grid-proxy-info > /dev/null 2>&1` ; then
138 echo "ERROR: cannot find grid-proxy-info in PATH";
139 exit 1
140 fi
141
142 # default location for proxy certificates based on uid
143 x509_default="/tmp/x509up_u`id -u`"
144
145
146
147 # if X509_USER_PROXY is defined and has a lifetime of > 1 hour
148 # compare to any existing default and copy it into place if
149 # and only if its lifetime is greater than the default
150
151 if [ -n "$X509_USER_PROXY" ] ; then
152 echo "X509_USER_PROXY=${X509_USER_PROXY}"
153 if `grid-proxy-info -file ${X509_USER_PROXY} -exists -valid 1:0` ; then
154 nsec=`grid-proxy-info -file ${X509_USER_PROXY} -timeleft`
155 echo "Lifetime of ${X509_USER_PROXY} ${nsec} seconds"
156 if [ -e ${x509_default} ] ; then
157 echo "Proxy exists at default location"
158 if `grid-proxy-info -file ${x509_default} -exists -valid 1:0` ; then
159 nsec=`grid-proxy-info -file ${X509_USER_PROXY} -timeleft`
160 echo "Lifetime of default ${nsec} seconds"
161 env_life=`grid-proxy-info -file ${X509_USER_PROXY} -timeleft`
162 def_life=`grid-proxy-info -file ${x509_default} -timeleft`
163 if [ ${env_life} -gt ${def_life} ] ; then
164 cp ${X509_USER_PROXY} ${x509_default}
165 echo "Lifetime of ${X509_USER_PROXY} > default"
166 echo "Copied ${X509_USER_PROXY} into default location"
167 else
168 echo "Lifetime of default > ${X509_USER_PROXY}"
169 echo "Leaving default in place"
170 fi
171 else
172 echo "Lifetime of default < 1 hour"
173 cp ${X509_USER_PROXY} ${x509_default}
174 echo "Lifetime of ${X509_USER_PROXY} > default"
175 echo "Copied ${X509_USER_PROXY} into default location"
176 fi
177 else
178 echo "No proxy at default location"
179 cp ${X509_USER_PROXY} $x509_default
180 echo "Copied ${X509_USER_PROXY} into default location"
181 fi
182 else
183 echo "Lifetime of ${X509_USER_PROXY} < 1 hour"
184 echo "Ignoring ${X509_USER_PROXY}"
185 echo "Assuming default location for proxy"
186 fi
187 else
188 echo "X509_USER_PROXY not set"
189 echo "Assuming default location for proxy"
190 fi
191
192 # when we get here we can assume that if a valid proxy with lifetime > 1 exists
193 # then it is in the default location, so test for it now
194
195 valid=`grid-proxy-info -file ${x509_default} -exists -valid 1:0 > /dev/null 2>&1`
196 if ! ${valid} ; then
197 echo "ERROR: could not find proxy with lifetime > 1 hour"
198 exit 1
199 fi
200
201 # if we get here valid proxy with lifetime > 1 hour was
202 # found so print out details for the record
203 grid-proxy-info -file ${x509_default} -all
204
205 # set X509_USER_PROXY to the default now
206 X509_USER_PROXY=${x509_default}
207 export X509_USER_PROXY
208
209 # set specific condor variables needed by pegasus
210
211 export _CONDOR_DAGMAN_LOG_ON_NFS_IS_ERROR=FALSE
212 export _CONDOR_DAGMAN_COPY_TO_SPOOL=False
213
214 if [ -f ${TMP_EXEC_DIR}/${UEBER_CONCRETE_DAG} ] ; then
215 pegasus-run --conf ${IHOPE_RUN_DIR}/pegasus.properties ${TMP_EXEC_DIR}/.
216 else
217 pegasus-plan --conf ${IHOPE_RUN_DIR}/pegasus.properties \\
218 --dax %s \\
219 --dir ${TMP_EXEC_DIR} \\
220 %s -s %s --nocleanup -f --submit
221
222 ln -sf ${TMP_EXEC_DIR}/${UEBER_CONCRETE_DAG}.dagman.out ${UEBER_CONCRETE_DAG}.dagman.out
223 fi
224 """
225
226 PEGASUS_BASEDIR_SCRIPT="""
227 #!/bin/bash
228
229 TMP_EXEC_DIR=%s
230 UEBER_CONCRETE_DAG=%s
231
232 usage()
233 {
234 echo "Usage: pegasus_basedir [-d]"
235 echo
236 echo "Prints the name of the Pegasus basedir where the condor files can be found"
237 echo
238 echo " -d, --dag Append the name of the concrete DAG to the basedir"
239 echo " -h, --help Print this message"
240 echo
241 }
242
243 if [ $# -gt 1 ] ; then
244 usage
245 exit 1
246 fi
247
248
249 if [ $# -eq 1 ] ; then
250 if [ $1 = "-h" ] || [ $1 = "--help" ] ; then
251 usage
252 exit 0
253 fi
254 if [ $1 = "-d" ] || [ $1 = "--dag" ] ; then
255 echo ${TMP_EXEC_DIR}/${UEBER_CONCRETE_DAG}
256 exit 0
257 else
258 usage
259 exit 1
260 fi
261 fi
262
263 echo ${TMP_EXEC_DIR}
264 exit 0
265 """
266
267 PEGASUS_PROPERTIES= """
268 ###############################################################################
269 # Pegasus properties file generated by pipeline
270 #
271 ###############################################################################
272 # Catalog Properties
273
274 # Specifies which type of replica catalog to use during the planning process
275 # In File mode, Pegasus queries a file based replica catalog.
276 pegasus.catalog.replica=File
277
278 # Sets the location of the site catalog file that contains the description of
279 # the sites available to run jobs.
280 pegasus.catalog.site.file=%s/sites.xml
281
282
283 ###############################################################################
284 # Transfer Configuration Properties
285
286 # If pegasus sees a pool attribute in the replica catalog associated with the
287 # PFN that matches the execution pool, return the PFN as a file URL, so that
288 # the transfer executable uses ln to link the input files.
289 pegasus.transfer.links=true
290
291
292 ###############################################################################
293 # Dagman Profile Namespace
294
295 # sets the maximum number of PRE scripts within the DAG that may be running at
296 # one time (including pegasus-plan, which is run as a pre script).
297 dagman.maxpre=1
298
299 # number of times DAGMan retries the full job cycle from pre-script through
300 # post-script, if failure was detected
301 dagman.retry=3
302
303
304 ###############################################################################
305 # Site Selection Properties
306
307 # Jobs will be assigned in a round robin manner amongst the sites that can
308 # execute them.
309 pegasus.selector.site=RoundRobin
310
311
312 ###############################################################################
313 # Site Directories
314
315 # While creating the submit directory use a timestamp based numbering scheme
316 # instead of the default runxxxx scheme.
317 pegasus.dir.useTimestamp=true
318
319
320 ###############################################################################
321 # Directory Properties
322
323 # Use directory labels for sub-workflows (needed for ihope)
324 pegasus.dir.submit.subwf.labelbased=true
325
326
327 """
328
329
331 """
332 Return True if t is in the S2 playground, False otherwise
333 t = GPS time to test if playground
334 """
335 return ((t - 729273613) % 6370) < 600
336
337
350
351
353 """Error thrown by Condor Jobs"""
369
370
372 """
373 Generic condor job class. Provides methods to set the options in the
374 condor submit file for a particular executable
375 """
376 - def __init__(self, universe, executable, queue):
377 """
378 @param universe: the condor universe to run the job in.
379 @param executable: the executable to run.
380 @param queue: number of jobs to queue.
381 """
382 self.__universe = universe
383 self.__executable = executable
384 self.__queue = queue
385
386
387 self.__options = {}
388 self.__short_options = {}
389 self.__arguments = []
390 self.__condor_cmds = {}
391 self.__notification = None
392 self.__log_file = None
393 self.__in_file = None
394 self.__err_file = None
395 self.__out_file = None
396 self.__sub_file_path = None
397 self.__output_files = []
398 self.__input_files = []
399 self.__checkpoint_files = []
400 self.__grid_type = None
401 self.__grid_server = None
402 self.__grid_scheduler = None
403 self.__executable_installed = True
404
406 """
407 Return the name of the executable for this job.
408 """
409 return self.__executable
410
412 """
413 Set the name of the executable for this job.
414 """
415 self.__executable = executable
416
418 """
419 Return the condor universe that the job will run in.
420 """
421 return self.__universe
422
424 """
425 Set the condor universe for the job to run in.
426 @param universe: the condor universe to run the job in.
427 """
428 self.__universe = universe
429
431 """
432 Return the grid type of the job.
433 """
434 return self.__grid_type
435
437 """
438 Set the type of grid resource for the job.
439 @param grid_type: type of grid resource.
440 """
441 self.__grid_type = grid_type
442
444 """
445 Return the grid server on which the job will run.
446 """
447 return self.__grid_server
448
450 """
451 Set the grid server on which to run the job.
452 @param grid_server: grid server on which to run.
453 """
454 self.__grid_server = grid_server
455
457 """
458 Return the grid scheduler.
459 """
460 return self.__grid_scheduler
461
463 """
464 Set the grid scheduler.
465 @param grid_scheduler: grid scheduler on which to run.
466 """
467 self.__grid_scheduler = grid_scheduler
468
470 """
471 If executable installed is true, then no copying of the executable is
472 done. If it is false, pegasus stages the executable to the remote site.
473 Default is executable is installed (i.e. True).
474 @param installed: true or fale
475 """
476 self.__executable_installed = installed
477
479 """
480 return whether or not the executable is installed
481 """
482 return self.__executable_installed
483
485 """
486 Add a Condor command to the submit file (e.g. a class add or evironment).
487 @param cmd: Condor command directive.
488 @param value: value for command.
489 """
490 self.__condor_cmds[cmd] = value
491
493 """
494 Return the dictionary of condor keywords to add to the job
495 """
496 return self.__condor_cmds
497
506
508 """
509 Add filename as a output file for this DAG node.
510
511 @param filename: output filename to add
512 """
513 if filename not in self.__output_files:
514 self.__output_files.append(filename)
515
517 """
518 Add filename as a checkpoint file for this DAG job.
519 """
520 if filename not in self.__checkpoint_files:
521 self.__checkpoint_files.append(filename)
522
528
530 """
531 Return list of output files for this DAG node.
532 """
533 return self.__output_files
534
536 """
537 Return a list of checkpoint files for this DAG node
538 """
539 return self.__checkpoint_files
540
542 """
543 Add an argument to the executable. Arguments are appended after any
544 options and their order is guaranteed.
545 @param arg: argument to add.
546 """
547 self.__arguments.append(arg)
548
550 """
551 Add a file argument to the executable. Arguments are appended after any
552 options and their order is guaranteed. Also adds the file name to the
553 list of required input data for this job.
554 @param filename: file to add as argument.
555 """
556 self.__arguments.append(filename)
557 if filename not in self.__input_files:
558 self.__input_files.append(filename)
559
561 """
562 Return the list of arguments that are to be passed to the executable.
563 """
564 return self.__arguments
565
567 """
568 Add a command line option to the executable. The order that the arguments
569 will be appended to the command line is not guaranteed, but they will
570 always be added before any command line arguments. The name of the option
571 is prefixed with double hyphen and the program is expected to parse it
572 with getopt_long().
573 @param opt: command line option to add.
574 @param value: value to pass to the option (None for no argument).
575 """
576 self.__options[opt] = value
577
579 """
580 Returns the value associated with the given command line option.
581 Returns None if the option does not exist in the options list.
582 @param opt: command line option
583 """
584 if opt in self.__options:
585 return self.__options[opt]
586 return None
587
589 """
590 Add a command line option to the executable. The order that the arguments
591 will be appended to the command line is not guaranteed, but they will
592 always be added before any command line arguments. The name of the option
593 is prefixed with double hyphen and the program is expected to parse it
594 with getopt_long().
595 @param opt: command line option to add.
596 @param value: value to pass to the option (None for no argument).
597 """
598 self.__options[opt] = filename
599 if filename not in self.__input_files:
600 self.__input_files.append(filename)
601
603 """
604 Return the dictionary of opts for the job.
605 """
606 return self.__options
607
609 """
610 Add a command line option to the executable. The order that the arguments
611 will be appended to the command line is not guaranteed, but they will
612 always be added before any command line arguments. The name of the option
613 is prefixed with single hyphen and the program is expected to parse it
614 with getopt() or getopt_long() (if a single character option), or
615 getopt_long_only() (if multiple characters). Long and (single-character)
616 short options may be mixed if the executable permits this.
617 @param opt: command line option to add.
618 @param value: value to pass to the option (None for no argument).
619 """
620 self.__short_options[opt] = value
621
623 """
624 Return the dictionary of short options for the job.
625 """
626 return self.__short_options
627
629 """
630 Parse command line options from a given section in an ini file and
631 pass to the executable.
632 @param cp: ConfigParser object pointing to the ini file.
633 @param section: section of the ini file to add to the options.
634 """
635 for opt in cp.options(section):
636 arg = string.strip(cp.get(section,opt))
637 self.__options[opt] = arg
638
640 """
641 Set the email address to send notification to.
642 @param value: email address or never for no notification.
643 """
644 self.__notification = value
645
647 """
648 Set the Condor log file.
649 @param path: path to log file.
650 """
651 self.__log_file = path
652
654 """
655 Set the file from which Condor directs the stdin of the job.
656 @param path: path to stdin file.
657 """
658 self.__in_file = path
659
661 """
662 Get the file from which Condor directs the stdin of the job.
663 """
664 return self.__in_file
665
667 """
668 Set the file to which Condor directs the stderr of the job.
669 @param path: path to stderr file.
670 """
671 self.__err_file = path
672
674 """
675 Get the file to which Condor directs the stderr of the job.
676 """
677 return self.__err_file
678
680 """
681 Set the file to which Condor directs the stdout of the job.
682 @param path: path to stdout file.
683 """
684 self.__out_file = path
685
687 """
688 Get the file to which Condor directs the stdout of the job.
689 """
690 return self.__out_file
691
693 """
694 Set the name of the file to write the Condor submit file to when
695 write_sub_file() is called.
696 @param path: path to submit file.
697 """
698 self.__sub_file_path = path
699
701 """
702 Get the name of the file which the Condor submit file will be
703 written to when write_sub_file() is called.
704 """
705 return self.__sub_file_path
706
708 """
709 Write a submit file for this Condor job.
710 """
711 if not self.__log_file:
712 raise CondorSubmitError("Log file not specified.")
713 if not self.__err_file:
714 raise CondorSubmitError("Error file not specified.")
715 if not self.__out_file:
716 raise CondorSubmitError("Output file not specified.")
717
718 if not self.__sub_file_path:
719 raise CondorSubmitError('No path for submit file.')
720 try:
721 subfile = open(self.__sub_file_path, 'w')
722 except:
723 raise CondorSubmitError("Cannot open file " + self.__sub_file_path)
724
725 if self.__universe == 'grid':
726 if self.__grid_type == None:
727 raise CondorSubmitError('No grid type specified.')
728 elif self.__grid_type == 'gt2':
729 if self.__grid_server == None:
730 raise CondorSubmitError('No server specified for grid resource.')
731 elif self.__grid_type == 'gt4':
732 if self.__grid_server == None:
733 raise CondorSubmitError('No server specified for grid resource.')
734 if self.__grid_scheduler == None:
735 raise CondorSubmitError('No scheduler specified for grid resource.')
736 else:
737 raise CondorSubmitError('Unsupported grid resource.')
738
739 subfile.write( 'universe = ' + self.__universe + '\n' )
740 subfile.write( 'executable = ' + self.__executable + '\n' )
741
742 if self.__universe == 'grid':
743 if self.__grid_type == 'gt2':
744 subfile.write('grid_resource = %s %s\n' % (self.__grid_type,
745 self.__grid_server))
746 if self.__grid_type == 'gt4':
747 subfile.write('grid_resource = %s %s %s\n' % (self.__grid_type,
748 self.__grid_server, self.__grid_scheduler))
749
750 if self.__universe == 'grid':
751 subfile.write('when_to_transfer_output = ON_EXIT\n')
752 subfile.write('transfer_output_files = $(macrooutput)\n')
753 subfile.write('transfer_input_files = $(macroinput)\n')
754
755 if list(self.__options.keys()) or list(self.__short_options.keys()) or self.__arguments:
756 subfile.write( 'arguments = "' )
757 for c in self.__options.keys():
758 if self.__options[c]:
759 subfile.write( ' --' + c + ' ' + self.__options[c] )
760 else:
761 subfile.write( ' --' + c )
762 for c in self.__short_options.keys():
763 if self.__short_options[c]:
764 subfile.write( ' -' + c + ' ' + self.__short_options[c] )
765 else:
766 subfile.write( ' -' + c )
767 for c in self.__arguments:
768 subfile.write( ' ' + c )
769 subfile.write( ' "\n' )
770
771 for cmd in self.__condor_cmds.keys():
772 subfile.write( str(cmd) + " = " + str(self.__condor_cmds[cmd]) + '\n' )
773
774 subfile.write( 'log = ' + self.__log_file + '\n' )
775 if self.__in_file is not None:
776 subfile.write( 'input = ' + self.__in_file + '\n' )
777 subfile.write( 'error = ' + self.__err_file + '\n' )
778 subfile.write( 'output = ' + self.__out_file + '\n' )
779 if self.__notification:
780 subfile.write( 'notification = ' + self.__notification + '\n' )
781 subfile.write( 'queue ' + str(self.__queue) + '\n' )
782
783 subfile.close()
784
785
786
788 """
789 A Condor DAG job never notifies the user on completion and can have variable
790 options that are set for a particular node in the DAG. Inherits methods
791 from a CondorJob.
792 """
793 - def __init__(self, universe, executable):
794 """
795 universe = the condor universe to run the job in.
796 executable = the executable to run in the DAG.
797 """
798 CondorJob.__init__(self, universe, executable, 1)
799 CondorJob.set_notification(self, 'never')
800 self.__var_opts = []
801 self.__arg_index = 0
802 self.__var_args = []
803 self.__var_cmds = []
804 self.__grid_site = None
805 self.__bad_macro_chars = re.compile(r'[_-]')
806 self.__dax_mpi_cluster = None
807
809 """
810 Create a condor node from this job. This provides a basic interface to
811 the CondorDAGNode class. Most jobs in a workflow will subclass the
812 CondorDAGNode class and overwrite this to give more details when
813 initializing the node. However, this will work fine for jobs with very simp
814 input/output.
815 """
816 return CondorDAGNode(self)
817
819 """
820 Set the grid site to run on. If not specified,
821 will not give hint to Pegasus
822 """
823 self.__grid_site=str(site)
824 if site != 'local':
825 self.set_executable_installed(False)
826
828 """
829 Return the grid site for this node
830 """
831 return self.__grid_site
832
834 """
835 Set the DAX collapse key for this node
836 """
837 self.__dax_mpi_cluster = size
838
840 """
841 Get the DAX collapse key for this node
842 """
843 return self.__dax_mpi_cluster
844
846 """
847 Add a variable (or macro) option to the condor job. The option is added
848 to the submit file and a different argument to the option can be set for
849 each node in the DAG.
850 @param opt: name of option to add.
851 """
852 if opt not in self.__var_opts:
853 self.__var_opts.append(opt)
854 macro = self.__bad_macro_chars.sub( r'', opt )
855 if short:
856 self.add_short_opt(opt,'$(macro' + macro + ')')
857 else:
858 self.add_opt(opt,'$(macro' + macro + ')')
859
861 """
862 Add a condor command to the submit file that allows variable (macro)
863 arguments to be passes to the executable.
864 """
865 if command not in self.__var_cmds:
866 self.__var_cmds.append(command)
867 macro = self.__bad_macro_chars.sub( r'', command )
868 self.add_condor_cmd(command, '$(macro' + macro + ')')
869
871 """
872 Add a command to the submit file to allow variable (macro) arguments
873 to be passed to the executable.
874 """
875 try:
876 self.__var_args[arg_index]
877 except IndexError:
878 if arg_index != self.__arg_index:
879 raise CondorDAGJobError("mismatch between job and node var_arg index")
880 if quote:
881 self.__var_args.append("'$(macroargument%s)'" % str(arg_index))
882 else:
883 self.__var_args.append('$(macroargument%s)' % str(arg_index))
884 self.add_arg(self.__var_args[self.__arg_index])
885 self.__arg_index += 1
886
887
889 """
890 Condor DAGMan job class. Appropriate for setting up DAGs to run within a
891 DAG.
892 """
893 - def __init__(self, dag, dir=None, dax=None):
894 """
895 dag = the name of the condor dag file to run
896 dir = the diretory in which the dag file is located
897 """
898 self.__dag = dag
899 self.__dax = dax
900 self.__notification = None
901 self.__dag_directory= dir
902 self.__pegasus_exec_dir = None
903 self.__pfn_cache = []
904
906 """
907 Create a condor node from this job. This provides a basic interface to
908 the CondorDAGManNode class. Most jobs in a workflow will subclass the
909 CondorDAGManNode class and overwrite this to give more details when
910 initializing the node. However, this will work fine for jobs with very simp
911 input/output.
912 """
913 return CondorDAGManNode(self)
914
916 """
917 Set the directory where the dag will be run
918 @param dir: the name of the directory where the dag will be run
919 """
920 self.__dag_directory = dir
921
923 """
924 Get the directory where the dag will be run
925 """
926 return self.__dag_directory
927
929 """
930 Set the email address to send notification to.
931 @param value: email address or never for no notification.
932 """
933 self.__notification = value
934
936 """
937 Return the name of the dag as the submit file name for the
938 SUBDAG EXTERNAL command in the uber-dag
939 """
940 return self.__dag
941
943 """
944 Do nothing as there is not need for a sub file with the
945 SUBDAG EXTERNAL command in the uber-dag
946 """
947 pass
948
950 """
951 Return the name of any associated dax file
952 """
953 return self.__dax
954
956 """
957 Return the name of any associated dag file
958 """
959 return self.__dag
960
962 """
963 Set the directory in which pegasus will generate all log files
964 """
965 self.__pegasus_exec_dir = dir
966
968 """
969 Return the directory in which pegasus will generate all log files
970 """
971 return self.__pegasus_exec_dir
972
974 """
975 Add an lfn pfn and pool tuple to the pfn cache
976 """
977 self.__pfn_cache += pfn_list
978
980 """
981 Return the pfn cache
982 """
983 return self.__pfn_cache
984
985
987 """
988 A CondorDAGNode represents a node in the DAG. It corresponds to a particular
989 condor job (and so a particular submit file). If the job has variable
990 (macro) options, they can be set here so each nodes executes with the
991 correct options.
992 """
994 """
995 @param job: the CondorJob that this node corresponds to.
996 """
997 if not isinstance(job, CondorDAGJob) and \
998 not isinstance(job,CondorDAGManJob):
999 raise CondorDAGNodeError(
1000 "A DAG node must correspond to a Condor DAG job or Condor DAGMan job")
1001 self.__name = None
1002 self.__job = job
1003 self.__category = None
1004 self.__priority = None
1005 self.__pre_script = None
1006 self.__pre_script_args = []
1007 self.__post_script = None
1008 self.__post_script_args = []
1009 self.__macros = {}
1010 self.__opts = {}
1011 self.__args = []
1012 self.__arg_index = 0
1013 self.__retry = 0
1014 self.__parents = []
1015 self.__bad_macro_chars = re.compile(r'[_-]')
1016 self.__output_files = []
1017 self.__input_files = []
1018 self.__checkpoint_files = []
1019 self.__dax_collapse = None
1020 self.__vds_group = None
1021 if isinstance(job,CondorDAGJob) and job.get_universe()=='standard':
1022 self.__grid_start = 'none'
1023 else:
1024 self.__grid_start = None
1025 self.__pegasus_profile = []
1026
1027
1028 t = str( int( time.time() * 1000 ) )
1029 r = str( int( random.random() * 100000000000000000 ) )
1030 a = str( self.__class__ )
1031 self.__name = md5(t + r + a).hexdigest()
1032 self.__md5name = self.__name
1033
1036
1038 """
1039 Return the CondorJob that this node is associated with.
1040 """
1041 return self.__job
1042
1044 """
1045 Add a Pegasus profile to this job which will be written to the dax as
1046 <profile namespace="NAMESPACE" key="KEY">VALUE</profile>
1047 This can be used to add classads to particular jobs in the DAX
1048 @param namespace: A valid Pegasus namespace, e.g. condor.
1049 @param key: The name of the attribute.
1050 @param value: The value of the attribute.
1051 """
1052 self.__pegasus_profile.append((str(namespace),str(key),str(value)))
1053
1055 """
1056 Return the pegasus profile dictionary for this node.
1057 """
1058 return self.__pegasus_profile
1059
1061 """
1062 Set the grid starter that pegasus will use. 4.1 options
1063 are none (the default), kickstart and pegasuslite
1064 @param: gridstart pegasus.gridstart property
1065 """
1066 self.__grid_start = str(gridstart)
1067
1069 """
1070 Return the grid starter that pegasus will use.
1071 """
1072 return self.__grid_start
1073
1075 """
1076 Sets the name of the pre script that is executed before the DAG node is
1077 run.
1078 @param script: path to script
1079 """
1080 self.__pre_script = script
1081
1083 """
1084 Adds an argument to the pre script that is executed before the DAG node is
1085 run.
1086 """
1087 self.__pre_script_args.append(arg)
1088
1089 - def set_post_script(self,script):
1090 """
1091 Sets the name of the post script that is executed before the DAG node is
1092 run.
1093 @param script: path to script
1094 """
1095 self.__post_script = script
1096
1097 - def get_post_script(self):
1098 """
1099 returns the name of the post script that is executed before the DAG node is
1100 run.
1101 @param script: path to script
1102 """
1103 return self.__post_script
1104
1105 - def add_post_script_arg(self,arg):
1106 """
1107 Adds an argument to the post script that is executed before the DAG node is
1108 run.
1109 """
1110 self.__post_script_args.append(arg)
1111
1113 """
1114 Returns and array of arguments to the post script that is executed before
1115 the DAG node is run.
1116 """
1117 return self.__post_script_args
1118
1120 """
1121 Set the name for this node in the DAG.
1122 """
1123 self.__name = str(name)
1124
1126 """
1127 Get the name for this node in the DAG.
1128 """
1129 return self.__name
1130
1132 """
1133 Set the category for this node in the DAG.
1134 """
1135 self.__category = str(category)
1136
1138 """
1139 Get the category for this node in the DAG.
1140 """
1141 return self.__category
1142
1144 """
1145 Set the priority for this node in the DAG.
1146 """
1147 self.__priority = str(priority)
1148
1150 """
1151 Get the priority for this node in the DAG.
1152 """
1153 return self.__priority
1154
1166
1178
1189
1198
1200 """
1201 Return list of output files for this DAG node and its job.
1202 """
1203 output_files = list(self.__output_files)
1204 if isinstance(self.job(), CondorDAGJob):
1205 output_files = output_files + self.job().get_output_files()
1206 return output_files
1207
1209 """
1210 Return a list of checkpoint files for this DAG node and its job.
1211 """
1212 checkpoint_files = list(self.__checkpoint_files)
1213 if isinstance(self.job(), CondorDAGJob):
1214 checkpoint_files = checkpoint_files + self.job().get_checkpoint_files()
1215 return checkpoint_files
1216
1218 """
1219 Set the name of the VDS group key when generating a DAX
1220 @param group: name of group for thus nore
1221 """
1222 self.__vds_group = str(group)
1223
1225 """
1226 Returns the VDS group key for this node
1227 """
1228 return self.__vds_group
1229
1231 """
1232 Set the DAX collapse key for this node
1233 """
1234 self.__dax_collapse = str(collapse)
1235
1237 """
1238 Get the DAX collapse key for this node
1239 """
1240 return self.__dax_collapse
1241
1243 """
1244 Add a variable (macro) for this node. This can be different for
1245 each node in the DAG, even if they use the same CondorJob. Within
1246 the CondorJob, the value of the macro can be referenced as
1247 '$(name)' -- for instance, to define a unique output or error file
1248 for each node.
1249 @param name: macro name.
1250 @param value: value of the macro for this node in the DAG
1251 """
1252 macro = self.__bad_macro_chars.sub( r'', name )
1253 self.__opts[macro] = value
1254
1256 """
1257 Add a variable (macro) for storing the input/output files associated
1258 with this node.
1259 @param io: macroinput or macrooutput
1260 @param filename: filename of input/output file
1261 """
1262 io = self.__bad_macro_chars.sub( r'', io )
1263 if io not in self.__opts:
1264 self.__opts[io] = filename
1265 else:
1266 if filename not in self.__opts[io]:
1267 self.__opts[io] += ',%s' % filename
1268
1276
1278 """
1279 Add a variable (macro) for storing the output files associated with
1280 this node.
1281 @param filename: filename of output file
1282 """
1283 self.add_io_macro('macrooutput', filename)
1284
1287
1289 """
1290 Return the opts for this node. Note that this returns only
1291 the options for this instance of the node and not those
1292 associated with the underlying job template.
1293 """
1294 return self.__opts
1295
1297 """
1298 Add a variable (macro) condor command for this node. If the command
1299 specified does not exist in the CondorJob, it is added so the submit file
1300 will be correct.
1301 PLEASE NOTE: AS with other add_var commands, the variable must be set for
1302 all nodes that use the CondorJob instance.
1303 @param command: command name
1304 @param value: Value of the command for this node in the DAG/DAX.
1305 """
1306 macro = self.__bad_macro_chars.sub( r'', command )
1307 self.__macros['macro' + macro] = value
1308 self.__job.add_var_condor_cmd(command)
1309
1311 """
1312 Add a variable (macro) option for this node. If the option
1313 specified does not exist in the CondorJob, it is added so the submit
1314 file will be correct when written.
1315 @param opt: option name.
1316 @param value: value of the option for this node in the DAG.
1317 """
1318 macro = self.__bad_macro_chars.sub( r'', opt )
1319 self.__opts['macro' + macro] = value
1320 self.__job.add_var_opt(opt,short)
1321
1322 - def add_file_opt(self,opt,filename,file_is_output_file=False):
1323 """
1324 Add a variable (macro) option for this node. If the option
1325 specified does not exist in the CondorJob, it is added so the submit
1326 file will be correct when written. The value of the option is also
1327 added to the list of input files for the DAX.
1328 @param opt: option name.
1329 @param value: value of the option for this node in the DAG.
1330 @param file_is_output_file: A boolean if the file will be an output file
1331 instead of an input file. The default is to have it be an input.
1332 """
1333 self.add_var_opt(opt,filename)
1334 if file_is_output_file: self.add_output_file(filename)
1335 else: self.add_input_file(filename)
1336
1338 """
1339 Add a variable (or macro) argument to the condor job. The argument is
1340 added to the submit file and a different value of the argument can be set
1341 for each node in the DAG.
1342 @param arg: name of option to add.
1343 """
1344 self.__args.append(arg)
1345 self.__job.add_var_arg(self.__arg_index,quote=quote)
1346 self.__arg_index += 1
1347
1349 """
1350 Add a variable (or macro) file name argument to the condor job. The
1351 argument is added to the submit file and a different value of the
1352 argument can be set for each node in the DAG. The file name is also
1353 added to the list of input files for the DAX.
1354 @param filename: name of option to add.
1355 """
1356 self.add_input_file(filename)
1357 self.add_var_arg(filename)
1358
1360 """
1361 Return the arguments for this node. Note that this returns
1362 only the arguments for this instance of the node and not those
1363 associated with the underlying job template.
1364 """
1365 return self.__args
1366
1368 """
1369 Set the number of times that this node in the DAG should retry.
1370 @param retry: number of times to retry node.
1371 """
1372 self.__retry = retry
1373
1375 """
1376 Return the number of times that this node in the DAG should retry.
1377 @param retry: number of times to retry node.
1378 """
1379 return self.__retry
1380
1382 """
1383 Write the DAG entry for this node's job to the DAG file descriptor.
1384 @param fh: descriptor of open DAG file.
1385 """
1386 if isinstance(self.job(),CondorDAGManJob):
1387
1388 fh.write( ' '.join(
1389 ['SUBDAG EXTERNAL', self.__name, self.__job.get_sub_file()]) )
1390 if self.job().get_dag_directory():
1391 fh.write( ' DIR ' + self.job().get_dag_directory() )
1392 else:
1393
1394 fh.write( 'JOB ' + self.__name + ' ' + self.__job.get_sub_file() )
1395 fh.write( '\n')
1396
1397 fh.write( 'RETRY ' + self.__name + ' ' + str(self.__retry) + '\n' )
1398
1400 """
1401 Write the DAG entry for this node's category to the DAG file descriptor.
1402 @param fh: descriptor of open DAG file.
1403 """
1404 fh.write( 'CATEGORY ' + self.__name + ' ' + self.__category + '\n' )
1405
1407 """
1408 Write the DAG entry for this node's priority to the DAG file descriptor.
1409 @param fh: descriptor of open DAG file.
1410 """
1411 fh.write( 'PRIORITY ' + self.__name + ' ' + self.__priority + '\n' )
1412
1414 """
1415 Write the variable (macro) options and arguments to the DAG file
1416 descriptor.
1417 @param fh: descriptor of open DAG file.
1418 """
1419 if list(self.__macros.keys()) or list(self.__opts.keys()) or self.__args:
1420 fh.write( 'VARS ' + self.__name )
1421 for k in self.__macros.keys():
1422 fh.write( ' ' + str(k) + '="' + str(self.__macros[k]) + '"' )
1423 for k in self.__opts.keys():
1424 fh.write( ' ' + str(k) + '="' + str(self.__opts[k]) + '"' )
1425 if self.__args:
1426 for i in range(self.__arg_index):
1427 fh.write( ' macroargument' + str(i) + '="' + self.__args[i] + '"' )
1428 fh.write( '\n' )
1429
1431 """
1432 Write the parent/child relations for this job to the DAG file descriptor.
1433 @param fh: descriptor of open DAG file.
1434 """
1435 for parent in self.__parents:
1436 fh.write( 'PARENT ' + str(parent) + ' CHILD ' + str(self) + '\n' )
1437
1439 """
1440 Write the pre script for the job, if there is one
1441 @param fh: descriptor of open DAG file.
1442 """
1443 if self.__pre_script:
1444 fh.write( 'SCRIPT PRE ' + str(self) + ' ' + self.__pre_script + ' ' +
1445 ' '.join(self.__pre_script_args) + '\n' )
1446
1447 - def write_post_script(self,fh):
1448 """
1449 Write the post script for the job, if there is one
1450 @param fh: descriptor of open DAG file.
1451 """
1452 if self.__post_script:
1453 fh.write( 'SCRIPT POST ' + str(self) + ' ' + self.__post_script + ' ' +
1454 ' '.join(self.__post_script_args) + '\n' )
1455
1465
1467 """
1468 Write as a comment into the DAG file the list of output files
1469 for this DAG node.
1470
1471 @param fh: descriptor of open DAG file.
1472 """
1473 for f in self.__output_files:
1474 fh.write("## Job %s generates output file %s\n" % (self.__name, f))
1475
1477 """
1478 Set the Condor log file to be used by this CondorJob.
1479 @param log: path of Condor log file.
1480 """
1481 self.__job.set_log_file(log)
1482
1484 """
1485 Add a parent to this node. This node will not be executed until the
1486 parent node has run sucessfully.
1487 @param node: CondorDAGNode to add as a parent.
1488 """
1489 if not isinstance(node, (CondorDAGNode,CondorDAGManNode) ):
1490 raise CondorDAGNodeError("Parent must be a CondorDAGNode or a CondorDAGManNode")
1491 self.__parents.append( node )
1492
1494 """
1495 Return a list of tuples containg the command line arguments
1496 """
1497
1498
1499 pat = re.compile(r'\$\((.+)\)')
1500 argpat = re.compile(r'\d+')
1501
1502
1503 options = self.job().get_opts()
1504 macros = self.get_opts()
1505
1506 cmd_list = []
1507
1508 for k in options:
1509 val = options[k]
1510 m = pat.match(val)
1511 if m:
1512 key = m.group(1)
1513 value = macros[key]
1514
1515 cmd_list.append(("--%s" % k, str(value)))
1516 else:
1517 cmd_list.append(("--%s" % k, str(val)))
1518
1519
1520 options = self.job().get_short_opts()
1521
1522 for k in options:
1523 val = options[k]
1524 m = pat.match(val)
1525 if m:
1526 key = m.group(1)
1527 value = macros[key]
1528
1529 cmd_list.append(("-%s" % k, str(value)))
1530 else:
1531 cmd_list.append(("-%s" % k, str(val)))
1532
1533
1534 args = self.job().get_args()
1535 macros = self.get_args()
1536
1537 for a in args:
1538 m = pat.search(a)
1539 if m:
1540 arg_index = int(argpat.findall(a)[0])
1541 try:
1542 cmd_list.append(("%s" % macros[arg_index], ""))
1543 except IndexError:
1544 cmd_list.append("")
1545 else:
1546 cmd_list.append(("%s" % a, ""))
1547
1548 return cmd_list
1549
1551 """
1552 Return the full command line that will be used when this node
1553 is run by DAGman.
1554 """
1555
1556 cmd = ""
1557 cmd_list = self.get_cmd_tuple_list()
1558 for argument in cmd_list:
1559 cmd += ' '.join(argument) + " "
1560
1561 return cmd
1562
1564 """
1565 The finalize method of a node is called before the node is
1566 finally added to the DAG and can be overridden to do any last
1567 minute clean up (such as setting extra command line arguments)
1568 """
1569 pass
1570
1571
1573 """
1574 Condor DAGMan node class. Appropriate for setting up DAGs to run within a
1575 DAG. Adds the user-tag functionality to condor_dagman processes running in
1576 the DAG. May also be used to extend dagman-node specific functionality.
1577 """
1579 """
1580 @job: a CondorDAGNodeJob
1581 """
1582 CondorDAGNode.__init__(self, job)
1583 self.__user_tag = None
1584 self.__maxjobs_categories = []
1585 self.__cluster_jobs = None
1586 self.__static_pfn_cache = None
1587 self.__reduce_dax = False
1588
1590 """
1591 Set the user tag that is passed to the analysis code.
1592 @param user_tag: the user tag to identify the job
1593 """
1594 self.__user_tag = str(usertag)
1595
1597 """
1598 Returns the usertag string
1599 """
1600 return self.__user_tag
1601
1603 """
1604 Add a category to this DAG called categoryName with a maxjobs of maxJobsNum.
1605 @param node: Add (categoryName,maxJobsNum) tuple to CondorDAG.__maxjobs_categories.
1606 """
1607 self.__maxjobs_categories.append((str(categoryName),str(maxJobsNum)))
1608
1610 """
1611 Return an array of tuples containing (categoryName,maxJobsNum)
1612 """
1613 return self.__maxjobs_categories
1614
1616 """
1617 Set the type of job clustering pegasus can use to collapse jobs
1618 @param cluster: clustering type
1619 """
1620 self.__cluster_jobs = str(cluster)
1621
1623 """
1624 Returns the usertag string
1625 """
1626 return self.__cluster_jobs
1627
1629 """
1630 Set the flag that tells Pegasus to reduce the DAX based on existing PFNs
1631 @param rd: True or False
1632 """
1633 self.__reduce_dax = rd
1634
1636 """
1637 Return the flag that tells Pegasus to reduce the DAX based on existing PFNs
1638 """
1639 return self.__reduce_dax
1640
1642 """
1643 Use the --cache option to pass a static PFN cache to pegasus-plan
1644 @param cache: full path to the cache file
1645 """
1646 self.__static_pfn_cache = str(file)
1647
1649 """
1650 Return the path to a static PFN cache
1651 """
1652 return self.__static_pfn_cache
1653
1654
1656 """
1657 A CondorDAG is a Condor Directed Acyclic Graph that describes a collection
1658 of Condor jobs and the order in which to run them. All Condor jobs in the
1659 DAG must write their Codor logs to the same file.
1660 NOTE: The log file must not be on an NFS mounted system as the Condor jobs
1661 must be able to get an exclusive file lock on the log file.
1662 """
1664 """
1665 @param log: path to log file which must not be on an NFS mounted file system.
1666 @param dax: Set to 1 to create an abstract DAG (a DAX)
1667 """
1668 self.__log_file_path = log
1669 self.__dax = dax
1670 self.__dag_file_path = None
1671 self.__dax_file_path = None
1672 self.__jobs = []
1673 self.__nodes = []
1674 self.__maxjobs_categories = []
1675 self.__integer_node_names = 0
1676 self.__node_count = 0
1677 self.__nodes_finalized = 0
1678 self.__pegasus_worker = None
1679 self.__pfn_cache=[]
1680
1682 """
1683 Return a list containing all the nodes in the DAG
1684 """
1685 return self.__nodes
1686
1688 """
1689 Return a list containing all the jobs in the DAG
1690 """
1691 return self.__jobs
1692
1694 """
1695 Returns true if this DAG is really a DAX
1696 """
1697 return self.__dax
1698
1700 """
1701 Use integer node names for the DAG
1702 """
1703 self.__integer_node_names = 1
1704
1706 """
1707 Set the name of the file into which the DAG is written.
1708 @param path: path to DAG file.
1709 """
1710 self.__dag_file_path = path + '.dag'
1711
1713 """
1714 Return the path to the DAG file.
1715 """
1716 if not self.__log_file_path:
1717 raise CondorDAGError("No path for DAG file")
1718 else:
1719 return self.__dag_file_path
1720
1722 """
1723 Set the name of the file into which the DAG is written.
1724 @param path: path to DAG file.
1725 """
1726 self.__dax_file_path = path + '.dax'
1727
1729 """
1730 Return the path to the DAG file.
1731 """
1732 if not self.__log_file_path:
1733 raise CondorDAGError("No path for DAX file")
1734 else:
1735 return self.__dax_file_path
1736
1738 """
1739 Add a CondorDAGNode to this DAG. The CondorJob that the node uses is
1740 also added to the list of Condor jobs in the DAG so that a list of the
1741 submit files needed by the DAG can be maintained. Each unique CondorJob
1742 will be added once to prevent duplicate submit files being written.
1743 @param node: CondorDAGNode to add to the CondorDAG.
1744 """
1745 if not isinstance(node, CondorDAGNode):
1746 raise CondorDAGError("Nodes must be class CondorDAGNode or subclass")
1747 if not isinstance(node.job(), CondorDAGManJob):
1748 node.set_log_file(self.__log_file_path)
1749 self.__nodes.append(node)
1750 if self.__integer_node_names:
1751 node.set_name(str(self.__node_count))
1752 self.__node_count += 1
1753 if node.job() not in self.__jobs:
1754 self.__jobs.append(node.job())
1755
1757 """
1758 Add a category to this DAG called categoryName with a maxjobs of maxJobsNum.
1759 @param node: Add (categoryName,maxJobsNum) tuple to CondorDAG.__maxjobs_categories.
1760 """
1761 self.__maxjobs_categories.append((str(categoryName),str(maxJobsNum)))
1762
1764 """
1765 Return an array of tuples containing (categoryName,maxJobsNum)
1766 """
1767 return self.__maxjobs_categories
1768
1770 """
1771 Set the path of a pagsus worker package to use for the workflow.
1772 @param path: path to worker package.
1773 """
1774 self.__pegasus_worker = path
1775
1777 """
1778 Return the path to the pegasus worker package.
1779 """
1780 return self.__pegasus_worker
1781
1783 """
1784 Write the DAG entry for this category's maxjobs to the DAG file descriptor.
1785 @param fh: descriptor of open DAG file.
1786 @param category: tuple containing type of jobs to set a maxjobs limit for
1787 and the maximum number of jobs of that type to run at once.
1788 """
1789 fh.write( 'MAXJOBS ' + str(category[0]) + ' ' + str(category[1]) + '\n' )
1790
1792 """
1793 Write all the submit files used by the dag to disk. Each submit file is
1794 written to the file name set in the CondorJob.
1795 """
1796 if not self.__nodes_finalized:
1797 for node in self.__nodes:
1798 node.finalize()
1799 if not self.is_dax():
1800 for job in self.__jobs:
1801 job.write_sub_file()
1802
1804 """
1805 Add an lfn pfn and pool tuple to the pfn cache
1806 Note: input looks like ('/path/to/file','file:///path/to/file','local')
1807 """
1808 self.__pfn_cache += pfn_list
1809
1811 """
1812 Return the pfn cache
1813 """
1814 return self.__pfn_cache
1815
1842
1844 """
1845 Write all the nodes in the workflow to the DAX file.
1846 """
1847
1848
1849 using_stampede = False
1850
1851 if not self.__dax_file_path:
1852
1853 return
1854
1855 import Pegasus.DAX3
1856
1857 dax_name = os.path.split(self.__dax_file_path)[-1]
1858 dax_basename = '.'.join(dax_name.split('.')[0:-1])
1859 workflow = Pegasus.DAX3.ADAG( dax_basename )
1860
1861
1862
1863
1864 node_job_object_dict = {}
1865
1866
1867
1868 workflow_executable_dict = {}
1869 workflow_pfn_dict = {}
1870
1871
1872 for pfn_tuple in self.get_pfn_cache():
1873 workflow_pfn_dict[pfn_tuple[0]] = pfn_tuple
1874
1875 if self.get_pegasus_worker():
1876
1877 worker_package = Pegasus.DAX3.Executable(
1878 namespace="pegasus", name="worker",
1879 os="linux", arch="x86_64", installed=False)
1880 worker_package.addPFN(Pegasus.DAX3.PFN(self.get_pegasus_worker(),"local"))
1881 workflow_executable_dict['pegasus-pegasus_worker'] = worker_package
1882
1883
1884 for path in os.environ["PATH"].split(":"):
1885 cluster_path = os.path.join(path,"pegasus-cluster")
1886 if os.path.exists(cluster_path):
1887
1888 seqexec_package = Pegasus.DAX3.Executable(
1889 namespace="pegasus", name="seqexec",
1890 os="linux", arch="x86_64", installed=True)
1891 seqexec_package.addPFN(Pegasus.DAX3.PFN(cluster_path,"local"))
1892 workflow_executable_dict['pegasus-pegasus_seqexec'] = seqexec_package
1893
1894 id = 0
1895 for node in self.__nodes:
1896 if self.is_dax() and isinstance(node, LSCDataFindNode):
1897 pass
1898
1899 elif isinstance(node.job(), CondorDAGManJob):
1900 id += 1
1901 id_tag = "ID%06d" % id
1902 node_name = node._CondorDAGNode__name
1903
1904 if node.job().get_dax() is None:
1905
1906 subdag_name = os.path.split(node.job().get_dag())[-1]
1907 try:
1908 subdag_exec_path = os.path.join(
1909 os.getcwd(),node.job().get_dag_directory())
1910 except AttributeError:
1911 subdag_exec_path = os.getcwd()
1912
1913 subdag = Pegasus.DAX3.DAG(subdag_name,id=id_tag)
1914 subdag.addProfile(Pegasus.DAX3.Profile("dagman","DIR",subdag_exec_path))
1915
1916 subdag_file = Pegasus.DAX3.File(subdag_name)
1917 subdag_file.addPFN(Pegasus.DAX3.PFN(os.path.join(subdag_exec_path,subdag_name),"local"))
1918 workflow.addFile(subdag_file)
1919 workflow.addDAG(subdag)
1920 node_job_object_dict[node_name] = subdag
1921
1922 else:
1923
1924 subdax_name = os.path.split(node.job().get_dax())[-1]
1925 dax_subdir = node.job().get_dag_directory()
1926 if dax_subdir:
1927 subdax_path = os.path.join(
1928 os.getcwd(),node.job().get_dag_directory(),subdax_name)
1929 else:
1930 subdax_path = os.path.join(os.getcwd(),subdax_name)
1931 dax_subdir = '.'
1932
1933 subdax = Pegasus.DAX3.DAX(subdax_name,id=id_tag)
1934
1935
1936 for pfn_tuple in node.job().get_pfn_cache():
1937 workflow_pfn_dict[pfn_tuple[0]] = pfn_tuple
1938
1939
1940 pegasus_args = """--dir %s """ % dax_subdir
1941 pegasus_args += """--output-dir %s """ % dax_subdir
1942
1943
1944
1945
1946 for maxjobcat in node.get_maxjobs_categories():
1947 pegasus_args += "-Dpegasus.dagman." + maxjobcat[0] + ".maxjobs=" + maxjobcat[1] + " "
1948
1949 if not self.is_dax():
1950 pegasus_args += "--nocleanup "
1951
1952 if node.get_cluster_jobs():
1953 pegasus_args += "--cluster " + node.get_cluster_jobs() + " "
1954
1955 if node.get_reduce_dax() is False:
1956 pegasus_args += " --force "
1957
1958 if node.get_static_pfn_cache():
1959 pegasus_args += " --cache " + node.get_static_pfn_cache() + " "
1960
1961 pegasus_args += "--output-site local -vvvvvv"
1962 subdax.addArguments(pegasus_args)
1963
1964 subdax_file = Pegasus.DAX3.File(subdax_name)
1965 subdax_file.addPFN(Pegasus.DAX3.PFN(subdax_path,"local"))
1966 workflow.addFile(subdax_file)
1967 workflow.addDAX(subdax)
1968 node_job_object_dict[node_name] = subdax
1969
1970 else:
1971
1972 executable = node.job()._CondorJob__executable
1973 node_name = node._CondorDAGNode__name
1974
1975 id += 1
1976 id_tag = "ID%06d" % id
1977 node_job_object_dict[node_name] = id_tag
1978
1979
1980 executable_namespace = 'ligo-' + str(node.job().__class__.__name__).lower()
1981 executable_base = os.path.basename(executable)
1982
1983 workflow_job = Pegasus.DAX3.Job( namespace=executable_namespace,
1984 name=executable_base, version="1.0", id=id_tag)
1985
1986 cmd_line = node.get_cmd_tuple_list()
1987
1988
1989
1990 input_node_file_dict = {}
1991 for f in node.get_input_files():
1992 input_node_file_dict[f] = 1
1993
1994 for f in input_node_file_dict.keys():
1995 workflow_job.uses(Pegasus.DAX3.File(os.path.basename(f)),link=Pegasus.DAX3.Link.INPUT,register=False,transfer=True)
1996
1997 output_node_file_dict = {}
1998 for f in node.get_output_files():
1999 output_node_file_dict[f] = 1
2000
2001 checkpoint_node_file_dict = {}
2002 for f in node.get_checkpoint_files():
2003 checkpoint_node_file_dict[f] = 1
2004
2005 for f in output_node_file_dict.keys():
2006 workflow_job.uses(Pegasus.DAX3.File(os.path.basename(f)),link=Pegasus.DAX3.Link.OUTPUT,register=False,transfer=True)
2007
2008 for f in checkpoint_node_file_dict.keys():
2009 workflow_job.uses(Pegasus.DAX3.File(os.path.basename(f)),link=Pegasus.DAX3.Link.CHECKPOINT,register=False,transfer=True)
2010
2011 node_file_dict = dict( list(input_node_file_dict.items()) + list(output_node_file_dict.items()) + list(checkpoint_node_file_dict.items()) )
2012
2013 for job_arg in cmd_line:
2014 try:
2015 if job_arg[0] in node_file_dict:
2016 workflow_job.addArguments(Pegasus.DAX3.File(os.path.basename(job_arg[0])))
2017 elif job_arg[1] in node_file_dict:
2018 workflow_job.addArguments(job_arg[0], Pegasus.DAX3.File(os.path.basename(job_arg[1])))
2019 elif len(job_arg[1].split(' ')) != 1:
2020 args = [job_arg[0]]
2021 for arg in job_arg[1].split(' '):
2022 if arg in node_file_dict:
2023 args.append(Pegasus.DAX3.File(os.path.basename(arg)))
2024 else:
2025 args.append(arg)
2026 workflow_job.addArguments(*args)
2027 else:
2028 workflow_job.addArguments(job_arg[0], job_arg[1])
2029 except IndexError:
2030 pass
2031
2032
2033 if node.job().get_grid_site():
2034 this_grid_site = node.job().get_grid_site()
2035 workflow_job.addProfile(Pegasus.DAX3.Profile('hints','execution.site',this_grid_site))
2036 if this_grid_site == 'stampede-dev' or this_grid_site=='stampede':
2037 using_stampede = True
2038
2039
2040 job_executable = Pegasus.DAX3.Executable(
2041 namespace=executable_namespace,
2042 name=executable_base, version="1.0",
2043 os="linux", arch="x86_64",
2044 installed=node.job().get_executable_installed())
2045
2046 executable_path = os.path.join(os.getcwd(),executable)
2047 job_executable.addPFN(Pegasus.DAX3.PFN(executable_path,"local"))
2048
2049 workflow_executable_dict[executable_namespace + executable_base] = job_executable
2050
2051
2052 if node.job().get_dax_mpi_cluster():
2053 workflow_job.addProfile(Pegasus.DAX3.Profile("pegasus","job.aggregator","mpiexec"))
2054 workflow_job.addProfile(Pegasus.DAX3.Profile("pegasus","clusters.size",str(node.job().get_dax_mpi_cluster())))
2055
2056
2057
2058 if node.get_grid_start():
2059 workflow_job.addProfile(Pegasus.DAX3.Profile("pegasus","gridstart",node.get_grid_start()))
2060
2061
2062 if node.get_dax_collapse():
2063 workflow_job.addProfile(Pegasus.DAX3.Profile("pegasus","clusters.size",str(node.get_dax_collapse())))
2064
2065
2066 if node.get_retry():
2067 workflow_job.addProfile(Pegasus.DAX3.Profile("dagman","retry",str(node.get_retry())))
2068
2069
2070 if node.get_post_script():
2071 post_script_base = os.path.basename(node.get_post_script())
2072 post_script_path = os.path.join(os.getcwd(),node.get_post_script())
2073 workflow_job.addProfile(Pegasus.DAX3.Profile("dagman","post",post_script_base))
2074 workflow_job.addProfile(Pegasus.DAX3.Profile("dagman","post.path." + post_script_base,post_script_path))
2075
2076
2077 if node.get_post_script_arg():
2078 workflow_job.addProfile(Pegasus.DAX3.Profile("dagman","post.arguments",' '.join(node.get_post_script_arg())))
2079
2080
2081 if node.get_category():
2082 workflow_job.addProfile(Pegasus.DAX3.Profile("dagman","category",str(node.get_category())))
2083
2084
2085 if node.get_priority():
2086 workflow_job.addProfile(Pegasus.DAX3.Profile("condor","priority",str(node.get_priority())))
2087
2088
2089 if node.get_dax_collapse():
2090
2091 workflow_job.addProfile(Pegasus.DAX3.Profile("condor","universe","vanilla"))
2092 else:
2093 workflow_job.addProfile(Pegasus.DAX3.Profile("condor","universe",node.job().get_universe()))
2094
2095
2096 for ccmd_key, ccmd_val in node.job().get_condor_cmds().items():
2097 workflow_job.addProfile(Pegasus.DAX3.Profile("condor", ccmd_key, ccmd_val))
2098
2099
2100 workflow_job.setStdout(node.job().get_stdout_file())
2101 workflow_job.setStderr(node.job().get_stderr_file())
2102
2103
2104 for p in node.get_pegasus_profile():
2105 workflow_job.addProfile(Pegasus.DAX3.Profile(p[0],p[1],p[2]))
2106
2107
2108 workflow.addJob(workflow_job)
2109 node_job_object_dict[node_name] = workflow_job
2110
2111
2112
2113 for node in self.__nodes:
2114 if self.is_dax() and isinstance(node, LSCDataFindNode):
2115 pass
2116 elif self.is_dax() and ( len(node._CondorDAGNode__parents) == 1 ) and isinstance(node._CondorDAGNode__parents[0], LSCDataFindNode):
2117 pass
2118 else:
2119 child_job_object = node_job_object_dict[str(node)]
2120 if node._CondorDAGNode__parents:
2121 for parent in node._CondorDAGNode__parents:
2122 if self.is_dax() and isinstance(parent, LSCDataFindNode):
2123 pass
2124 else:
2125 parent_job_object = node_job_object_dict[str(parent)]
2126 workflow.addDependency(Pegasus.DAX3.Dependency(parent=parent_job_object, child=child_job_object))
2127
2128
2129 for exec_key in workflow_executable_dict.keys():
2130 workflow.addExecutable(workflow_executable_dict[exec_key])
2131
2132
2133 if using_stampede:
2134 prod_mpiexec = Pegasus.DAX3.Executable(namespace="pegasus",
2135 name="mpiexec", os="linux", arch="x86_64", installed="true")
2136 prod_mpiexec.addPFN(Pegasus.DAX3.PFN("file:///home1/02796/dabrown/bin/mpi-cluster-wrapper-impi.sh","stampede"))
2137 workflow.addExecutable(prod_mpiexec)
2138
2139 dev_mpiexec = Pegasus.DAX3.Executable(namespace="pegasus",
2140 name="mpiexec", os="linux", arch="x86_64", installed="true")
2141 dev_mpiexec.addPFN(Pegasus.DAX3.PFN("file:///home1/02796/dabrown/bin/mpi-cluster-wrapper-impi.sh","stampede-dev"))
2142 workflow.addExecutable(dev_mpiexec)
2143
2144
2145 for pfn_key in workflow_pfn_dict.keys():
2146 f = Pegasus.DAX3.File(workflow_pfn_dict[pfn_key][0])
2147 f.addPFN(Pegasus.DAX3.PFN(workflow_pfn_dict[pfn_key][1],workflow_pfn_dict[pfn_key][2]))
2148 workflow.addFile(f)
2149
2150 f = open(self.__dax_file_path,"w")
2151 workflow.writeXML(f)
2152 f.close()
2153
2163
2165 """
2166 Write the workflow to a script (.sh instead of .dag).
2167
2168 Assuming that parents were added to the DAG before their children,
2169 dependencies should be handled correctly.
2170 """
2171 if not self.__dag_file_path:
2172 raise CondorDAGError("No path for DAG file")
2173 try:
2174 dfp = self.__dag_file_path
2175 outfilename = ".".join(dfp.split(".")[:-1]) + ".sh"
2176 outfile = open(outfilename, "w")
2177 except:
2178 raise CondorDAGError("Cannot open file " + self.__dag_file_path)
2179
2180 for node in self.__nodes:
2181 outfile.write("# Job %s\n" % node.get_name())
2182
2183 if isinstance(node,CondorDAGManNode):
2184 outfile.write("condor_submit_dag %s\n\n" % (node.job().get_dag()))
2185 else:
2186 outfile.write("%s %s\n\n" % (node.job().get_executable(),
2187 node.get_cmd_line()))
2188 outfile.close()
2189
2190 os.chmod(outfilename, os.stat(outfilename)[0] | stat.S_IEXEC)
2191
2192 - def prepare_dax(self,grid_site=None,tmp_exec_dir='.',peg_frame_cache=None):
2193 """
2194 Sets up a pegasus script for the given dag
2195 """
2196 dag=self
2197 log_path=self.__log_file_path
2198
2199
2200 peg_fh = open("pegasus_submit_dax", "w")
2201 pegprop_fh = open("pegasus.properties", "w")
2202 sitefile = open( 'sites.xml', 'w' )
2203
2204
2205 pegprop_fh.write(PEGASUS_PROPERTIES % (os.getcwd()))
2206
2207
2208 dirs_entry='--relative-dir .'
2209 if grid_site:
2210 exec_site=grid_site
2211 exec_ssite_list = exec_site.split(',')
2212 for site in exec_ssite_list:
2213
2214 if site == 'nikhef':
2215 dirs_entry += ' --staging-site nikhef=nikhef-srm'
2216 else:
2217 dirs_entry += ' --staging-site %s=%s' % (site,site)
2218 if site == 'nikhef' or site == 'bologna':
2219 pegprop_fh.write(
2220 """
2221 ###############################################################################
2222 # Data Staging Configuration
2223
2224 # Pegasus will be setup to execute jobs on an execution site without relying
2225 # on a shared filesystem between the head node and the worker nodes. If this
2226 # is set, specify staging site ( using --staging-site option to pegasus-plan)
2227 # to indicate the site to use as a central storage location for a workflow.
2228 pegasus.data.configuration=nonsharedfs
2229
2230
2231 """)
2232 else:
2233 exec_site='local'
2234
2235
2236 peg_fh.write(PEGASUS_SCRIPT % ( tmp_exec_dir, os.getcwd(),
2237 dag.get_dax_file().replace('.dax','') + '-0.dag',
2238 dag.get_dax_file(), dirs_entry, exec_site ))
2239 peg_fh.close()
2240 os.chmod("pegasus_submit_dax",0o755)
2241
2242
2243
2244 if peg_frame_cache:
2245 pegprop_fh.write("pegasus.catalog.replica.file=%s\n" % (os.path.join(os.getcwd(),os.path.basename(peg_frame_cache))))
2246 pegprop_fh.close()
2247
2248
2249 basedir_fh = open("pegasus_basedir", "w")
2250 basedir_fh.write(PEGASUS_BASEDIR_SCRIPT % ( tmp_exec_dir, dag.get_dax_file().replace('.dax','') + '-0.dag' ))
2251 basedir_fh.close()
2252 os.chmod("pegasus_basedir",0o755)
2253
2254
2255 pwd = os.getcwd()
2256 try:
2257 hostname = socket.gethostbyaddr(socket.gethostname())[0]
2258 except:
2259 hostname = 'localhost'
2260
2261 sitefile.write("""
2262 <?xml version="1.0" encoding="UTF-8"?>
2263 <sitecatalog xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2264 xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-4.0.xsd" version="4.0">
2265 <site handle="local" arch="x86_64" os="LINUX">
2266 <grid type="gt2" contact="%s/jobmanager-fork" scheduler="Fork" jobtype="auxillary" total-nodes="50"/>
2267 <grid type="gt2" contact="%s/jobmanager-condor" scheduler="Condor" jobtype="compute" total-nodes="50"/>
2268 <directory path="%s" type="shared-scratch" free-size="null" total-size="null">
2269 <file-server operation="all" url="file://%s">
2270 </file-server>
2271 </directory>
2272 <directory path="%s" type="shared-storage" free-size="null" total-size="null">
2273 <file-server operation="all" url="file://%s">
2274 </file-server>
2275 </directory>
2276 <replica-catalog type="LRC" url="rlsn://smarty.isi.edu">
2277 </replica-catalog>
2278 """ % (hostname,hostname,pwd,pwd,pwd,pwd))
2279
2280 try:
2281 sitefile.write(""" <profile namespace="env" key="GLOBUS_LOCATION">%s</profile>\n""" % os.environ['GLOBUS_LOCATION'])
2282 except:
2283 pass
2284 try:
2285 sitefile.write(""" <profile namespace="env" key="LD_LIBRARY_PATH">%s</profile>\n""" % os.environ['LD_LIBRARY_PATH'])
2286 except:
2287 pass
2288 try:
2289 sitefile.write(""" <profile namespace="env" key="PYTHONPATH">%s</profile>\n""" % os.environ['PYTHONPATH'])
2290 except:
2291 pass
2292 try:
2293 sitefile.write(""" <profile namespace="env" key="PEGASUS_HOME">%s</profile>\n""" % os.environ['PEGASUS_HOME'])
2294 except:
2295 pass
2296 try:
2297 sitefile.write(""" <profile namespace="env" key="LIGO_DATAFIND_SERVER">%s</profile>\n""" % os.environ['LIGO_DATAFIND_SERVER'])
2298 except:
2299 pass
2300 try:
2301 sitefile.write(""" <profile namespace="env" key="S6_SEGMENT_SERVER">%s</profile>\n""" % os.environ['S6_SEGMENT_SERVER'])
2302 except:
2303 pass
2304
2305 sitefile.write("""\
2306 <profile namespace="env" key="JAVA_HEAPMAX">4096</profile>
2307 <profile namespace="pegasus" key="style">condor</profile>
2308 <profile namespace="condor" key="getenv">True</profile>
2309 <profile namespace="condor" key="should_transfer_files">YES</profile>
2310 <profile namespace="condor" key="when_to_transfer_output">ON_EXIT_OR_EVICT</profile>
2311 </site>
2312 """)
2313
2314 sitefile.write("""\
2315 <!-- Bologna cluster -->
2316 <site handle="bologna" arch="x86_64" os="LINUX">
2317 <grid type="cream" contact="https://ce01-lcg.cr.cnaf.infn.it:8443/ce-cream/services/CREAM2" scheduler="LSF" jobtype="compute" />
2318 <grid type="cream" contact="https://ce01-lcg.cr.cnaf.infn.it:8443/ce-cream/services/CREAM2" scheduler="LSF" jobtype="auxillary" />
2319 <directory type="shared-scratch" path="/storage/gpfs_virgo4/virgo4/%s/">
2320 <file-server operation="all" url="srm://storm-fe-archive.cr.cnaf.infn.it:8444/srm/managerv2?SFN=/virgo4/%s/"/>
2321 </directory>
2322 <profile namespace="pegasus" key="style">cream</profile>
2323 <profile namespace="globus" key="queue">virgo</profile>
2324 </site>
2325 """ % (os.path.basename(tmp_exec_dir),os.path.basename(tmp_exec_dir)))
2326
2327 sitefile.write("""\
2328 <!-- Nikhef Big Grid -->
2329 <site handle="nikhef" arch="x86_64" os="LINUX">
2330 <grid type="cream" contact="https://klomp.nikhef.nl:8443/ce-cream/services/CREAM2" scheduler="PBS" jobtype="compute" />
2331 <grid type="cream" contact="https://klomp.nikhef.nl:8443/ce-cream/services/CREAM2" scheduler="PBS" jobtype="auxillary" />
2332 <profile namespace="pegasus" key="style">cream</profile>
2333 <profile namespace="globus" key="queue">medium</profile>
2334 </site>
2335 <!-- Nikhef Stage in Site -->
2336 <site handle="nikhef-srm" arch="x86_64" os="LINUX">
2337 <directory type="shared-scratch" path="/%s/">
2338 <file-server operation="all" url="srm://tbn18.nikhef.nl:8446/srm/managerv2?SFN=/dpm/nikhef.nl/home/virgo/%s/" />
2339 </directory>
2340 </site>
2341 """ % (os.path.basename(tmp_exec_dir),os.path.basename(tmp_exec_dir)))
2342
2343 try:
2344 stampede_home = subprocess.check_output(
2345 ['gsissh','-o','BatchMode=yes','-p','2222','stampede.tacc.xsede.org','pwd'])
2346 stampede_home = stampede_home.split('/')
2347 stampede_magic_number = stampede_home[2]
2348 stampede_username = stampede_home[3]
2349 shared_scratch = "/work/%s/%s/ihope-workflow/%s" % (
2350 stampede_magic_number,stampede_username,os.path.basename(tmp_exec_dir))
2351
2352 sitefile.write("""\
2353 <!-- XSEDE Stampede Cluster at TACC Development Queue -->
2354 <site handle="stampede-dev" arch="x86_64" os="LINUX">
2355 <grid type="gt5" contact="login5.stampede.tacc.utexas.edu/jobmanager-fork" scheduler="Fork" jobtype="auxillary"/>
2356 <grid type="gt5" contact="login5.stampede.tacc.utexas.edu/jobmanager-slurm" scheduler="unknown" jobtype="compute"/>
2357 <directory type="shared-scratch" path="%s">
2358 <file-server operation="all" url="gsiftp://gridftp.stampede.tacc.xsede.org%s"/>
2359 </directory>
2360 <profile namespace="env" key="PEGASUS_HOME">/usr</profile>
2361 <profile namespace="globus" key="queue">development</profile>
2362 <profile namespace="globus" key="maxwalltime">180</profile>
2363 <profile namespace="globus" key="host_count">1</profile>
2364 <profile namespace="globus" key="count">16</profile>
2365 <profile namespace="globus" key="jobtype">single</profile>
2366 <profile namespace="globus" key="project">TG-PHY140012</profile>
2367 </site>
2368 """ % (shared_scratch,shared_scratch))
2369
2370 sitefile.write("""\
2371 <!-- XSEDE Stampede Cluster at TACC Development Queue -->
2372 <site handle="stampede" arch="x86_64" os="LINUX">
2373 <grid type="gt5" contact="login5.stampede.tacc.utexas.edu/jobmanager-fork" scheduler="Fork" jobtype="auxillary"/>
2374 <grid type="gt5" contact="login5.stampede.tacc.utexas.edu/jobmanager-slurm" scheduler="unknown" jobtype="compute"/>
2375 <directory type="shared-scratch" path="%s">
2376 <file-server operation="all" url="gsiftp://gridftp.stampede.tacc.xsede.org%s"/>
2377 </directory>
2378 <profile namespace="env" key="PEGASUS_HOME">/usr</profile>
2379 <profile namespace="globus" key="queue">development</profile>
2380 <profile namespace="globus" key="maxwalltime">540</profile>
2381 <profile namespace="globus" key="host_count">32</profile>
2382 <profile namespace="globus" key="count">512</profile>
2383 <profile namespace="globus" key="jobtype">single</profile>
2384 <profile namespace="globus" key="project">TG-PHY140012</profile>
2385 </site>
2386 """ % (shared_scratch,shared_scratch))
2387
2388 except:
2389 sitefile.write("""\
2390 <!-- XSEDE Stampede Cluster disabled as gsissh to TACC failed-->
2391 """)
2392
2393 sitefile.write("""\
2394 </sitecatalog>""")
2395 sitefile.close()
2396
2397
2398 print()
2399 print("Created a workflow file which can be submitted by executing")
2400 print("""
2401
2402 ./pegasus_submit_dax
2403
2404 in the analysis directory on a condor submit machine.
2405
2406 From the analysis directory on the condor submit machine, you can run the
2407 command
2408
2409 pegasus-status --long -t -i `./pegasus_basedir`
2410
2411 to check the status of your workflow. Once the workflow has finished you
2412 can run the command
2413
2414 pegasus-analyzer -t -i `./pegasus_basedir`
2415
2416 to debug any failed jobs.
2417 """)
2418
2419
2421 """
2422 Describes a generic analysis job that filters LIGO data as configured by
2423 an ini file.
2424 """
2426 """
2427 @param cp: ConfigParser object that contains the configuration for this job.
2428 """
2429 self.__cp = cp
2430 self.__dax = dax
2431 try:
2432 self.__channel = string.strip(self.__cp.get('input','channel'))
2433 except:
2434 self.__channel = None
2435
2437 """
2438 Returns true if this job should behave as a DAX
2439 """
2440 return self.__dax
2441
2443 """
2444 Get the configration variable in a particular section of this jobs ini
2445 file.
2446 @param sec: ini file section.
2447 @param opt: option from section sec.
2448 """
2449 return string.strip(self.__cp.get(sec,opt))
2450
2452 """
2453 Set the name of the channel that this job is filtering. This will
2454 overwrite the value obtained at initialization.
2455 """
2456 self.__channel = channel
2457
2459 """
2460 Returns the name of the channel that this job is filtering. Note that
2461 channel is defined to be IFO independent, so this may be LSC-AS_Q or
2462 IOO-MC_F. The IFO is set on a per node basis, not a per job basis.
2463 """
2464 return self.__channel
2465
2466
2468 """
2469 Contains the methods that allow an object to be built to analyse LIGO
2470 data in a Condor DAG.
2471 """
2473 self.__start = 0
2474 self.__end = 0
2475 self.__data_start = 0
2476 self.__pad_data = 0
2477 self.__data_end = 0
2478 self.__trig_start = 0
2479 self.__trig_end = 0
2480 self.__ifo = None
2481 self.__ifo_tag = None
2482 self.__input = None
2483 self.__output = None
2484 self.__calibration = None
2485 self.__calibration_cache = None
2486 self.__LHO2k = re.compile(r'H2')
2487 self.__user_tag = self.job().get_opts().get("user-tag", None)
2488
2489 - def set_start(self,time,pass_to_command_line=True):
2490 """
2491 Set the GPS start time of the analysis node by setting a --gps-start-time
2492 option to the node when it is executed.
2493 @param time: GPS start time of job.
2494 @bool pass_to_command_line: add gps-start-time as variable option.
2495 """
2496 if pass_to_command_line:
2497 self.add_var_opt('gps-start-time',time)
2498 self.__start = time
2499 self.__data_start = time
2500
2501
2502
2504 """
2505 Get the GPS start time of the node.
2506 """
2507 return self.__start
2508
2509 - def set_end(self,time,pass_to_command_line=True):
2510 """
2511 Set the GPS end time of the analysis node by setting a --gps-end-time
2512 option to the node when it is executed.
2513 @param time: GPS end time of job.
2514 @bool pass_to_command_line: add gps-end-time as variable option.
2515 """
2516 if pass_to_command_line:
2517 self.add_var_opt('gps-end-time',time)
2518 self.__end = time
2519 self.__data_end = time
2520
2522 """
2523 Get the GPS end time of the node.
2524 """
2525 return self.__end
2526
2528 """
2529 Set the GPS start time of the data needed by this analysis node.
2530 @param time: GPS start time of job.
2531 """
2532 self.__data_start = time
2533
2535 """
2536 Get the GPS start time of the data needed by this node.
2537 """
2538 return self.__data_start
2539
2541 """
2542 Set the GPS start time of the data needed by this analysis node.
2543 @param time: GPS start time of job.
2544 """
2545 self.__pad_data = pad
2546
2548 """
2549 Get the GPS start time of the data needed by this node.
2550 """
2551 return self.__pad_data
2552
2554 """
2555 Set the GPS end time of the data needed by this analysis node.
2556 @param time: GPS end time of job.
2557 """
2558 self.__data_end = time
2559
2561 """
2562 Get the GPS end time of the data needed by this node.
2563 """
2564 return self.__data_end
2565
2567 """
2568 Set the trig start time of the analysis node by setting a
2569 --trig-start-time option to the node when it is executed.
2570 @param time: trig start time of job.
2571 @bool pass_to_command_line: add trig-start-time as a variable option.
2572 """
2573 if pass_to_command_line:
2574 self.add_var_opt('trig-start-time',time)
2575 self.__trig_start = time
2576
2578 """
2579 Get the trig start time of the node.
2580 """
2581 return self.__trig_start
2582
2584 """
2585 Set the trig end time of the analysis node by setting a --trig-end-time
2586 option to the node when it is executed.
2587 @param time: trig end time of job.
2588 @bool pass_to_command_line: add trig-end-time as a variable option.
2589 """
2590 if pass_to_command_line:
2591 self.add_var_opt('trig-end-time',time)
2592 self.__trig_end = time
2593
2595 """
2596 Get the trig end time of the node.
2597 """
2598 return self.__trig_end
2599
2610
2616
2617 - def set_output(self,filename,pass_to_command_line=True):
2618 """
2619 Add an output to the node by adding a --output option.
2620 @param filename: option argument to pass as output.
2621 @bool pass_to_command_line: add output as a variable option.
2622 """
2623 self.__output = filename
2624 if pass_to_command_line:
2625 self.add_var_opt('output', filename)
2626 self.add_output_file(filename)
2627
2629 """
2630 Get the file that will be passed as output.
2631 """
2632 return self.__output
2633
2635 """
2636 Set the ifo name to analyze. If the channel name for the job is defined,
2637 then the name of the ifo is prepended to the channel name obtained
2638 from the job configuration file and passed with a --channel-name option.
2639 @param ifo: two letter ifo code (e.g. L1, H1 or H2).
2640 """
2641 self.__ifo = ifo
2642 if self.job().channel():
2643 self.add_var_opt('channel-name', ifo + ':' + self.job().channel())
2644
2646 """
2647 Returns the two letter IFO code for this node.
2648 """
2649 return self.__ifo
2650
2651 - def set_ifo_tag(self,ifo_tag,pass_to_command_line=True):
2652 """
2653 Set the ifo tag that is passed to the analysis code.
2654 @param ifo_tag: a string to identify one or more IFOs
2655 @bool pass_to_command_line: add ifo-tag as a variable option.
2656 """
2657 self.__ifo_tag = ifo_tag
2658 if pass_to_command_line:
2659 self.add_var_opt('ifo-tag', ifo_tag)
2660
2662 """
2663 Returns the IFO tag string
2664 """
2665 return self.__ifo_tag
2666
2668 """
2669 Set the user tag that is passed to the analysis code.
2670 @param user_tag: the user tag to identify the job
2671 @bool pass_to_command_line: add user-tag as a variable option.
2672 """
2673 self.__user_tag = usertag
2674 if pass_to_command_line:
2675 self.add_var_opt('user-tag', usertag)
2676
2678 """
2679 Returns the usertag string
2680 """
2681 return self.__user_tag
2682
2712
2714 """
2715 Determine the path to the correct calibration cache file to use.
2716 """
2717 if self.__ifo and self.__start > 0:
2718 cal_path = self.job().get_config('calibration','path')
2719
2720
2721 if ( self.__LHO2k.match(self.__ifo) and
2722 (self.__start >= 729273613) and (self.__start <= 734367613) ):
2723 if self.__start < int(
2724 self.job().get_config('calibration','H2-cal-epoch-boundary')):
2725 cal_file = self.job().get_config('calibration','H2-1')
2726 else:
2727 cal_file = self.job().get_config('calibration','H2-2')
2728 else:
2729
2730 cal_file = self.job().get_config('calibration',self.__ifo)
2731
2732 cal = os.path.join(cal_path,cal_file)
2733 self.__calibration_cache = cal
2734 else:
2735 msg = "IFO and start-time must be set first"
2736 raise CondorDAGNodeError(msg)
2737
2739 """
2740 Set the path to the calibration cache file for the given IFO.
2741 During S2 the Hanford 2km IFO had two calibration epochs, so
2742 if the start time is during S2, we use the correct cache file.
2743 """
2744
2745
2746 self.calibration_cache_path()
2747
2748 if self.job().is_dax():
2749
2750 self.add_var_opt('glob-calibration-data','')
2751 cache_filename=self.get_calibration()
2752 pat = re.compile(r'(file://.*)')
2753 f = open(cache_filename, 'r')
2754 lines = f.readlines()
2755
2756
2757 for line in lines:
2758 m = pat.search(line)
2759 if not m:
2760 raise IOError
2761 url = m.group(1)
2762
2763 path = urllib.parse.urlparse(url)[2]
2764 calibration_lfn = os.path.basename(path)
2765 self.add_input_file(calibration_lfn)
2766 else:
2767
2768 self.add_var_opt('calibration-cache', self.__calibration_cache)
2769 self.__calibration = self.__calibration_cache
2770 self.add_input_file(self.__calibration)
2771
2773 """
2774 Return the calibration cache file to be used by the
2775 DAG.
2776 """
2777 return self.__calibration_cache
2778
2779
2780
2782 """
2783 An AnalysisChunk is the unit of data that a node works with, usually some
2784 subset of a ScienceSegment.
2785 """
2786 - def __init__(self, start, end, trig_start = 0, trig_end = 0):
2787 """
2788 @param start: GPS start time of the chunk.
2789 @param end: GPS end time of the chunk.
2790 @param trig_start: GPS time at which to start generating triggers
2791 @param trig_end: GPS time at which to stop generating triggers
2792 """
2793 self.__start = start
2794 self.__end = end
2795 self.__length = end - start
2796 self.__trig_start = trig_start
2797 self.__trig_end = trig_end
2798
2800 if self.__trig_start and self.__trig_end:
2801 return '<AnalysisChunk: start %d, end %d, trig_start %d, trig_end %d>' % (
2802 self.__start, self.__end, self.__trig_start, self.__trig_end)
2803 elif self.__trig_start and not self.__trig_end:
2804 return '<AnalysisChunk: start %d, end %d, trig_start %d>' % (
2805 self.__start, self.__end, self.__trig_start)
2806 elif not self.__trig_start and self.__trig_end:
2807 return '<AnalysisChunk: start %d, end %d, trig_end %d>' % (
2808 self.__start, self.__end, self.__trig_end)
2809 else:
2810 return '<AnalysisChunk: start %d, end %d>' % (self.__start, self.__end)
2811
2813 """
2814 Returns the length of data for which this AnalysisChunk will produce
2815 triggers (in seconds).
2816 """
2817 if self.__trig_start and self.__trig_end:
2818 x = self.__trig_end - self.__trig_start
2819 elif self.__trig_start and not self.__trig_end:
2820 x = self.__end - self.__trig_start
2821 elif not self.__trig_start and self.__trig_end:
2822 x = self.__trig_end - self.__start
2823 else:
2824 x = self.__end - self.__start
2825
2826 if x < 0:
2827 raise SegmentError(self + 'has negative length')
2828 else:
2829 return x
2830
2832 """
2833 Returns the GPS start time of the chunk.
2834 """
2835 return self.__start
2836
2838 """
2839 Returns the GPS end time of the chunk.
2840 """
2841 return self.__end
2842
2844 """
2845 Returns the length (duration) of the chunk in seconds.
2846 """
2847 return self.__length
2848
2850 """
2851 Return the first GPS time at which triggers for this chunk should be
2852 generated.
2853 """
2854 return self.__trig_start
2855
2857 """
2858 Return the last GPS time at which triggers for this chunk should be
2859 generated.
2860 """
2861 return self.__trig_end
2862
2864 """
2865 Set the first GPS time at which triggers for this chunk should be
2866 generated.
2867 """
2868 self.__trig_start = start
2869
2871 """
2872 Set the last GPS time at which triggers for this chunk should be
2873 generated.
2874 """
2875 self.__trig_end = end
2876
2877
2878
2880 """
2881 A ScienceSegment is a period of time where the experimenters determine
2882 that the inteferometer is in a state where the data is suitable for
2883 scientific analysis. A science segment can have a list of AnalysisChunks
2884 asscociated with it that break the segment up into (possibly overlapping)
2885 smaller time intervals for analysis.
2886 """
2888 """
2889 @param segment: a tuple containing the (segment id, gps start time, gps end
2890 time, duration) of the segment.
2891 """
2892 self.__id = segment[0]
2893 self.__start = segment[1]
2894 self.__end = segment[2]
2895 self.__dur = segment[3]
2896 self.__chunks = []
2897 self.__unused = self.dur()
2898 self.__ifo = None
2899 self.__df_node = None
2900
2902 """
2903 Allows iteration over and direct access to the AnalysisChunks contained
2904 in this ScienceSegment.
2905 """
2906 if i < 0: raise IndexError("list index out of range")
2907 return self.__chunks[i]
2908
2910 """
2911 Returns the number of AnalysisChunks contained in this ScienceSegment.
2912 """
2913 return len(self.__chunks)
2914
2916 return '<ScienceSegment: id %d, start %d, end %d, dur %d, unused %d>' % (
2917 self.id(),self.start(),self.end(),self.dur(),self.__unused)
2918
2920 """
2921 ScienceSegments are compared by the GPS start time of the segment.
2922 """
2923 return cmp(self.start(),other.start())
2924
2925 - def make_chunks(self,length=0,overlap=0,play=0,sl=0,excl_play=0,pad_data=0):
2926 """
2927 Divides the science segment into chunks of length seconds overlapped by
2928 overlap seconds. If the play option is set, only chunks that contain S2
2929 playground data are generated. If the user has a more complicated way
2930 of generating chunks, this method should be overriden in a sub-class.
2931 Any data at the end of the ScienceSegment that is too short to contain a
2932 chunk is ignored. The length of this unused data is stored and can be
2933 retrieved with the unused() method.
2934 @param length: length of chunk in seconds.
2935 @param overlap: overlap between chunks in seconds.
2936 @param play: 1 : only generate chunks that overlap with S2 playground data.
2937 2 : as play = 1 plus compute trig start and end times to
2938 coincide with the start/end of the playground
2939 @param sl: slide by sl seconds before determining playground data.
2940 @param excl_play: exclude the first excl_play second from the start and end
2941 of the chunk when computing if the chunk overlaps with playground.
2942 @param pad_data: exclude the first and last pad_data seconds of the segment
2943 when generating chunks
2944 """
2945 time_left = self.dur() - (2 * pad_data)
2946 start = self.start() + pad_data
2947 increment = length - overlap
2948 while time_left >= length:
2949 end = start + length
2950 if (not play) or (play and (((end-sl-excl_play-729273613) % 6370) <
2951 (600+length-2*excl_play))):
2952 if (play == 2):
2953
2954 play_start = 729273613 + 6370 * \
2955 math.floor((end-sl-excl_play-729273613) / 6370)
2956 play_end = play_start + 600
2957 trig_start = 0
2958 trig_end = 0
2959 if ( (play_end - 6370) > start ):
2960 print("Two playground segments in this chunk:", end=' ')
2961 print(" Code to handle this case has not been implemented")
2962 sys.exit(1)
2963 else:
2964 if play_start > start:
2965 trig_start = int(play_start)
2966 if play_end < end:
2967 trig_end = int(play_end)
2968 self.__chunks.append(AnalysisChunk(start,end,trig_start,trig_end))
2969 else:
2970 self.__chunks.append(AnalysisChunk(start,end))
2971 start += increment
2972 time_left -= increment
2973 self.__unused = time_left - overlap
2974
2975 - def add_chunk(self,start,end,trig_start=0,trig_end=0):
2976 """
2977 Add an AnalysisChunk to the list associated with this ScienceSegment.
2978 @param start: GPS start time of chunk.
2979 @param end: GPS end time of chunk.
2980 @param trig_start: GPS start time for triggers from chunk
2981 """
2982 self.__chunks.append(AnalysisChunk(start,end,trig_start,trig_end))
2983
2985 """
2986 Returns the length of data in the science segment not used to make chunks.
2987 """
2988 return self.__unused
2989
2991 """
2992 Set the length of data in the science segment not used to make chunks.
2993 """
2994 self.__unused = unused
2995
2997 """
2998 Returns the ID of this ScienceSegment.
2999 """
3000 return self.__id
3001
3003 """
3004 Returns the GPS start time of this ScienceSegment.
3005 """
3006 return self.__start
3007
3009 """
3010 Returns the GPS end time of this ScienceSegment.
3011 """
3012 return self.__end
3013
3015 """
3016 Override the GPS start time (and set the duration) of this ScienceSegment.
3017 @param t: new GPS start time.
3018 """
3019 self.__dur += self.__start - t
3020 self.__start = t
3021
3023 """
3024 Override the GPS end time (and set the duration) of this ScienceSegment.
3025 @param t: new GPS end time.
3026 """
3027 self.__dur -= self.__end - t
3028 self.__end = t
3029
3031 """
3032 Returns the length (duration) in seconds of this ScienceSegment.
3033 """
3034 return self.__dur
3035
3037 """
3038 Set the DataFind node associated with this ScienceSegment to df_node.
3039 @param df_node: the DataFind node for this ScienceSegment.
3040 """
3041 self.__df_node = df_node
3042
3044 """
3045 Returns the DataFind node for this ScienceSegment.
3046 """
3047 return self.__df_node
3048
3049
3051 """
3052 An object that can contain all the science data used in an analysis. Can
3053 contain multiple ScienceSegments and has a method to generate these from
3054 a text file produces by the LIGOtools segwizard program.
3055 """
3057 self.__sci_segs = []
3058 self.__filename = None
3059
3061 """
3062 Allows direct access to or iteration over the ScienceSegments associated
3063 with the ScienceData.
3064 """
3065 return self.__sci_segs[i]
3066
3068 return '<ScienceData: file %s>' % self.__filename
3069
3071 """
3072 Returns the number of ScienceSegments associated with the ScienceData.
3073 """
3074 return len(self.__sci_segs)
3075
3076 - def read(self,filename,min_length,slide_sec=0,buffer=0):
3077 """
3078 Parse the science segments from the segwizard output contained in file.
3079 @param filename: input text file containing a list of science segments generated by
3080 segwizard.
3081 @param min_length: only append science segments that are longer than min_length.
3082 @param slide_sec: Slide each ScienceSegment by::
3083
3084 delta > 0:
3085 [s,e] -> [s+delta,e].
3086 delta < 0:
3087 [s,e] -> [s,e-delta].
3088
3089 @param buffer: shrink the ScienceSegment::
3090
3091 [s,e] -> [s+buffer,e-buffer]
3092 """
3093 self.__filename = filename
3094 octothorpe = re.compile(r'\A#')
3095 for line in open(filename):
3096 if not octothorpe.match(line) and int(line.split()[3]) >= min_length:
3097 (id,st,en,du) = list(map(int,line.split()))
3098
3099
3100 if slide_sec > 0:
3101 st += slide_sec
3102 elif slide_sec < 0:
3103 en += slide_sec
3104 du -= abs(slide_sec)
3105
3106
3107 if buffer > 0:
3108 st += buffer
3109 en -= buffer
3110 du -= 2*abs(buffer)
3111
3112 x = ScienceSegment(tuple([id,st,en,du]))
3113 self.__sci_segs.append(x)
3114
3118
3120 """
3121 Parse the science segments from a tama list of locked segments contained in
3122 file.
3123 @param filename: input text file containing a list of tama segments.
3124 """
3125 self.__filename = filename
3126 for line in open(filename):
3127 columns = line.split()
3128 id = int(columns[0])
3129 start = int(math.ceil(float(columns[3])))
3130 end = int(math.floor(float(columns[4])))
3131 dur = end - start
3132
3133 x = ScienceSegment(tuple([id, start, end, dur]))
3134 self.__sci_segs.append(x)
3135
3136
3137 - def make_chunks(self,length,overlap=0,play=0,sl=0,excl_play=0,pad_data=0):
3138 """
3139 Divide each ScienceSegment contained in this object into AnalysisChunks.
3140 @param length: length of chunk in seconds.
3141 @param overlap: overlap between segments.
3142 @param play: if true, only generate chunks that overlap with S2 playground
3143 data.
3144 @param sl: slide by sl seconds before determining playground data.
3145 @param excl_play: exclude the first excl_play second from the start and end
3146 of the chunk when computing if the chunk overlaps with playground.
3147 """
3148 for seg in self.__sci_segs:
3149 seg.make_chunks(length,overlap,play,sl,excl_play,pad_data)
3150
3153 """
3154 Create an extra chunk that uses up the unused data in the science segment.
3155 @param length: length of chunk in seconds.
3156 @param trig_overlap: length of time start generating triggers before the
3157 start of the unused data.
3158 @param play:
3159 - 1 : only generate chunks that overlap with S2 playground data.
3160 - 2 : as 1 plus compute trig start and end times to coincide
3161 with the start/end of the playground
3162 @param min_length: the unused data must be greater than min_length to make a
3163 chunk.
3164 @param sl: slide by sl seconds before determining playground data.
3165 @param excl_play: exclude the first excl_play second from the start and end
3166 of the chunk when computing if the chunk overlaps with playground.
3167 @param pad_data: exclude the first and last pad_data seconds of the segment
3168 when generating chunks
3169
3170 """
3171 for seg in self.__sci_segs:
3172
3173 if seg.unused() > min_length:
3174 end = seg.end() - pad_data
3175 start = end - length
3176 if (not play) or (play and (((end-sl-excl_play-729273613)%6370) <
3177 (600+length-2*excl_play))):
3178 trig_start = end - seg.unused() - trig_overlap
3179 if (play == 2):
3180
3181 play_start = 729273613 + 6370 * \
3182 math.floor((end-sl-excl_play-729273613) / 6370)
3183 play_end = play_start + 600
3184 trig_end = 0
3185 if ( (play_end - 6370) > start ):
3186 print("Two playground segments in this chunk")
3187 print(" Code to handle this case has not been implemented")
3188 sys.exit(1)
3189 else:
3190 if play_start > trig_start:
3191 trig_start = int(play_start)
3192 if (play_end < end):
3193 trig_end = int(play_end)
3194 if (trig_end == 0) or (trig_end > trig_start):
3195 seg.add_chunk(start, end, trig_start, trig_end)
3196 else:
3197 seg.add_chunk(start, end, trig_start)
3198 seg.set_unused(0)
3199
3202 """
3203 Create a chunk that uses up the unused data in the science segment
3204 @param min_length: the unused data must be greater than min_length to make a
3205 chunk.
3206 @param overlap: overlap between chunks in seconds.
3207 @param play: if true, only generate chunks that overlap with S2 playground data.
3208 @param sl: slide by sl seconds before determining playground data.
3209 @param excl_play: exclude the first excl_play second from the start and end
3210 of the chunk when computing if the chunk overlaps with playground.
3211 """
3212 for seg in self.__sci_segs:
3213 if seg.unused() > min_length:
3214 start = seg.end() - seg.unused() - overlap
3215 end = seg.end()
3216 length = start - end
3217 if (not play) or (play and (((end-sl-excl_play-729273613)%6370) <
3218 (600+length-2*excl_play))):
3219 seg.add_chunk(start, end, start)
3220 seg.set_unused(0)
3221
3223 """
3224 Splits ScienceSegments up into chunks, of a given maximum length.
3225 The length of the last two chunks are chosen so that the data
3226 utilisation is optimised.
3227 @param min_length: minimum chunk length.
3228 @param max_length: maximum chunk length.
3229 @param pad_data: exclude the first and last pad_data seconds of the
3230 segment when generating chunks
3231 """
3232 for seg in self.__sci_segs:
3233
3234 seg_start = seg.start() + pad_data
3235 seg_end = seg.end() - pad_data
3236
3237 if seg.unused() > max_length:
3238
3239 N = (seg_end - seg_start)/max_length
3240
3241
3242 for i in range(N-1):
3243 start = seg_start + (i * max_length)
3244 stop = start + max_length
3245 seg.add_chunk(start, stop)
3246
3247
3248 start = seg_start + ((N-1) * max_length)
3249 middle = (start + seg_end)/2
3250 seg.add_chunk(start, middle)
3251 seg.add_chunk(middle, seg_end)
3252 seg.set_unused(0)
3253 elif seg.unused() > min_length:
3254
3255 seg.add_chunk(seg_start, seg_end)
3256 else:
3257
3258 seg.set_unused(0)
3259
3261 """
3262 Replaces the ScienceSegments contained in this instance of ScienceData
3263 with the intersection of those in the instance other. Returns the number
3264 of segments in the intersection.
3265 @param other: ScienceData to use to generate the intersection
3266 """
3267
3268
3269 length1 = len(self)
3270 length2 = len(other)
3271
3272
3273 ostart = -1
3274 outlist = []
3275 iseg2 = -1
3276 start2 = -1
3277 stop2 = -1
3278
3279 for seg1 in self:
3280 start1 = seg1.start()
3281 stop1 = seg1.end()
3282 id = seg1.id()
3283
3284
3285 while start2 < stop1:
3286 if stop2 > start1:
3287
3288
3289
3290 if start1 < start2:
3291 ostart = start2
3292 else:
3293 ostart = start1
3294 if stop1 > stop2:
3295 ostop = stop2
3296 else:
3297 ostop = stop1
3298
3299 x = ScienceSegment(tuple([id, ostart, ostop, ostop-ostart]))
3300 outlist.append(x)
3301
3302 if stop2 > stop1:
3303 break
3304
3305
3306 iseg2 += 1
3307 if iseg2 < len(other):
3308 seg2 = other[iseg2]
3309 start2 = seg2.start()
3310 stop2 = seg2.end()
3311 else:
3312
3313 start2 = 2000000000
3314 stop2 = 2000000000
3315
3316
3317 self.__sci_segs = outlist
3318 return len(self)
3319
3320
3321
3322 - def union(self, other):
3323 """
3324 Replaces the ScienceSegments contained in this instance of ScienceData
3325 with the union of those in the instance other. Returns the number of
3326 ScienceSegments in the union.
3327 @param other: ScienceData to use to generate the intersection
3328 """
3329
3330
3331 length1 = len(self)
3332 length2 = len(other)
3333
3334
3335 ostart = -1
3336 seglist = []
3337
3338 i1 = -1
3339 i2 = -1
3340 start1 = -1
3341 start2 = -1
3342 id = -1
3343
3344 while 1:
3345
3346 if start1 == -1:
3347 i1 += 1
3348 if i1 < length1:
3349 start1 = self[i1].start()
3350 stop1 = self[i1].end()
3351 id = self[i1].id()
3352 elif i2 == length2:
3353 break
3354
3355
3356 if start2 == -1:
3357 i2 += 1
3358 if i2 < length2:
3359 start2 = other[i2].start()
3360 stop2 = other[i2].end()
3361 elif i1 == length1:
3362 break
3363
3364
3365 if start1 > -1 and ( start2 == -1 or start1 <= start2):
3366 ustart = start1
3367 ustop = stop1
3368
3369 start1 = -1
3370 elif start2 > -1:
3371 ustart = start2
3372 ustop = stop2
3373
3374 start2 = -1
3375 else:
3376 break
3377
3378
3379
3380 if ostart == -1:
3381 ostart = ustart
3382 ostop = ustop
3383 elif ustart <= ostop:
3384 if ustop > ostop:
3385
3386 ostop = ustop
3387 else:
3388
3389 pass
3390 else:
3391
3392
3393 x = ScienceSegment(tuple([id,ostart,ostop,ostop-ostart]))
3394 seglist.append(x)
3395 ostart = ustart
3396 ostop = ustop
3397
3398
3399 if ostart != -1:
3400 x = ScienceSegment(tuple([id,ostart,ostop,ostop-ostart]))
3401 seglist.append(x)
3402
3403 self.__sci_segs = seglist
3404 return len(self)
3405
3406
3408 """
3409 Coalesces any adjacent ScienceSegments. Returns the number of
3410 ScienceSegments in the coalesced list.
3411 """
3412
3413
3414 if len(self) == 0:
3415 return 0
3416
3417
3418 self.__sci_segs.sort()
3419
3420
3421 outlist = []
3422 ostop = -1
3423
3424 for seg in self:
3425 start = seg.start()
3426 stop = seg.end()
3427 id = seg.id()
3428 if start > ostop:
3429
3430 if ostop >= 0:
3431 x = ScienceSegment(tuple([id,ostart,ostop,ostop-ostart]))
3432 outlist.append(x)
3433 ostart = start
3434 ostop = stop
3435 elif stop > ostop:
3436
3437 ostop = stop
3438
3439
3440 if ostop >= 0:
3441 x = ScienceSegment(tuple([id,ostart,ostop,ostop-ostart]))
3442 outlist.append(x)
3443
3444 self.__sci_segs = outlist
3445 return len(self)
3446
3447
3449 """
3450 Inverts the ScienceSegments in the class (i.e. set NOT). Returns the
3451 number of ScienceSegments after inversion.
3452 """
3453
3454
3455 if len(self) == 0:
3456
3457 self.__sci_segs = ScienceSegment(tuple([0,0,1999999999,1999999999]))
3458
3459
3460 outlist = []
3461 ostart = 0
3462 for seg in self:
3463 start = seg.start()
3464 stop = seg.end()
3465 if start < 0 or stop < start or start < ostart:
3466 raise SegmentError("Invalid list")
3467 if start > 0:
3468 x = ScienceSegment(tuple([0,ostart,start,start-ostart]))
3469 outlist.append(x)
3470 ostart = stop
3471
3472 if ostart < 1999999999:
3473 x = ScienceSegment(tuple([0,ostart,1999999999,1999999999-ostart]))
3474 outlist.append(x)
3475
3476 self.__sci_segs = outlist
3477 return len(self)
3478
3479
3481 """
3482 Keep only times in ScienceSegments which are in the playground
3483 """
3484
3485 length = len(self)
3486
3487
3488 ostart = -1
3489 outlist = []
3490 begin_s2 = 729273613
3491 play_space = 6370
3492 play_len = 600
3493
3494 for seg in self:
3495 start = seg.start()
3496 stop = seg.end()
3497 id = seg.id()
3498
3499
3500 play_start = begin_s2+play_space*( 1 +
3501 int((start - begin_s2 - play_len)/play_space) )
3502
3503 while play_start < stop:
3504 if play_start > start:
3505 ostart = play_start
3506 else:
3507 ostart = start
3508
3509
3510 play_stop = play_start + play_len
3511
3512 if play_stop < stop:
3513 ostop = play_stop
3514 else:
3515 ostop = stop
3516
3517 x = ScienceSegment(tuple([id, ostart, ostop, ostop-ostart]))
3518 outlist.append(x)
3519
3520
3521 play_start = play_start + play_space
3522
3523
3524 self.__sci_segs = outlist
3525 return len(self)
3526
3527
3529 """
3530 Intersection routine for three inputs. Built out of the intersect,
3531 coalesce and play routines
3532 """
3533 self.intersection(second)
3534 self.intersection(third)
3535 self.coalesce()
3536 return len(self)
3537
3547
3549 """
3550 Split the segments in the list is subsegments at least as long as dt
3551 """
3552 outlist=[]
3553 for seg in self:
3554 start = seg.start()
3555 stop = seg.end()
3556 id = seg.id()
3557
3558 while start < stop:
3559 tmpstop = start + dt
3560 if tmpstop > stop:
3561 tmpstop = stop
3562 elif tmpstop + dt > stop:
3563 tmpstop = int( (start + stop)/2 )
3564 x = ScienceSegment(tuple([id,start,tmpstop,tmpstop-start]))
3565 outlist.append(x)
3566 start = tmpstop
3567
3568
3569 self.__sci_segs = outlist
3570 return len(self)
3571
3572
3573
3576
3577 self.__path = path
3578
3579
3580
3581 self.cache = {'gwf': None, 'sft' : None, 'xml' : None}
3582
3583
3584
3585 for type in self.cache.keys():
3586 self.cache[type] = {}
3587
3588 - def group(self, lst, n):
3589 """
3590 Group an iterable into an n-tuples iterable. Incomplete
3591 tuples are discarded
3592 """
3593 return itertools.izip(*[itertools.islice(lst, i, None, n) for i in range(n)])
3594
3595 - def parse(self,type_regex=None):
3596 """
3597 Each line of the frame cache file is like the following:
3598
3599 /frames/E13/LHO/frames/hoftMon_H1/H-H1_DMT_C00_L2-9246,H,H1_DMT_C00_L2,1,16 1240664820 6231 {924600000 924646720 924646784 924647472 924647712 924700000}
3600
3601 The description is as follows:
3602
3603 1.1) Directory path of files
3604 1.2) Site
3605 1.3) Type
3606 1.4) Number of frames in the files (assumed to be 1)
3607 1.5) Duration of the frame files.
3608
3609 2) UNIX timestamp for directory modification time.
3610
3611 3) Number of files that that match the above pattern in the directory.
3612
3613 4) List of time range or segments [start, stop)
3614
3615 We store the cache for each site and frameType combination
3616 as a dictionary where the keys are (directory, duration)
3617 tuples and the values are segment lists.
3618
3619 Since the cache file is already coalesced we do not
3620 have to call the coalesce method on the segment lists.
3621 """
3622 path = self.__path
3623 cache = self.cache
3624 if type_regex:
3625 type_filter = re.compile(type_regex)
3626 else:
3627 type_filter = None
3628
3629 f = open(path, 'r')
3630
3631
3632 gwfDict = {}
3633
3634
3635 for line in f:
3636
3637 if type_filter and type_filter.search(line) is None:
3638 continue
3639
3640
3641 header, modTime, fileCount, times = line.strip().split(' ', 3)
3642 dir, site, frameType, frameCount, duration = header.split(',')
3643 duration = int(duration)
3644
3645
3646
3647
3648
3649 times = [ int(s) for s in times[1:-1].split(' ') ]
3650
3651
3652 segments = [ glue.segments.segment(a) for a in self.group(times, 2) ]
3653
3654
3655 if site not in gwfDict:
3656 gwfDict[site] = {}
3657
3658
3659 if frameType not in gwfDict[site]:
3660 gwfDict[site][frameType] = {}
3661
3662
3663 key = (dir, duration)
3664 if key in gwfDict[site][frameType]:
3665 msg = "The combination %s is not unique in the frame cache file" \
3666 % str(key)
3667 raise RuntimeError(msg)
3668
3669 gwfDict[site][frameType][key] = glue.segments.segmentlist(segments)
3670 f.close()
3671
3672 cache['gwf'] = gwfDict
3673
3674 - def get_lfns(self, site, frameType, gpsStart, gpsEnd):
3675 """
3676 """
3677
3678 cache = self.cache
3679
3680
3681 if site not in cache['gwf']:
3682 return []
3683
3684
3685 if frameType not in cache['gwf'][site]:
3686 return []
3687
3688
3689 search = glue.segments.segment(gpsStart, gpsEnd)
3690
3691
3692 searchlist = glue.segments.segmentlist([search])
3693
3694
3695 lfnDict = {}
3696
3697 for key,seglist in cache['gwf'][site][frameType].items():
3698 dir, dur = key
3699
3700
3701 overlap = seglist.intersects(searchlist)
3702
3703 if not overlap: continue
3704
3705
3706
3707
3708 for s in seglist:
3709 if s.intersects(search):
3710 t1, t2 = s
3711 times = range(t1, t2, dur)
3712
3713
3714 for t in times:
3715 if search.intersects(glue.segments.segment(t, t + dur)):
3716 lfn = "%s-%s-%d-%d.gwf" % (site, frameType, t, dur)
3717 lfnDict[lfn] = None
3718
3719
3720 lfns = list(lfnDict.keys())
3721 lfns.sort()
3722
3723 return lfns
3724
3725
3727 """
3728 An LSCdataFind job used to locate data. The static options are
3729 read from the section [datafind] in the ini file. The stdout from
3730 LSCdataFind contains the paths to the frame files and is directed to a file
3731 in the cache directory named by site and GPS start and end times. The stderr
3732 is directed to the logs directory. The job always runs in the scheduler
3733 universe. The path to the executable is determined from the ini file.
3734 """
3735 - def __init__(self,cache_dir,log_dir,config_file,dax=0,lsync_cache_file=None,lsync_type_regex=None):
3736 """
3737 @param cache_dir: the directory to write the output lal cache files to.
3738 @param log_dir: the directory to write the stderr file to.
3739 @param config_file: ConfigParser object containing the path to the LSCdataFind
3740 executable in the [condor] section and a [datafind] section from which
3741 the LSCdataFind options are read.
3742 """
3743 self.__executable = config_file.get('condor','datafind')
3744 self.__universe = 'local'
3745 CondorDAGJob.__init__(self,self.__universe,self.__executable)
3746 AnalysisJob.__init__(self,config_file)
3747 self.__cache_dir = cache_dir
3748 self.__dax = dax
3749 self.__config_file = config_file
3750 self.__lsync_cache = None
3751 self.add_condor_cmd('accounting_group',config_file.get('condor','accounting_group'))
3752 if lsync_cache_file:
3753 self.__lsync_cache = LsyncCache(lsync_cache_file)
3754 self.__lsync_cache.parse(lsync_type_regex)
3755
3756
3757 for o in self.__config_file.options('datafind'):
3758 opt = string.strip(o)
3759 if opt[:4] != "type":
3760 arg = string.strip(self.__config_file.get('datafind',opt))
3761 self.add_opt(opt,arg)
3762
3763 if self.__dax:
3764
3765 self.add_opt('names-only','')
3766 else:
3767
3768 self.add_opt('lal-cache','')
3769 self.add_opt('url-type','file')
3770
3771 self.add_condor_cmd('getenv','True')
3772
3773 self.set_stderr_file(os.path.join(log_dir, 'datafind-$(macroobservatory)-$(macrotype)-$(macrogpsstarttime)-$(macrogpsendtime)-$(cluster)-$(process).err'))
3774 self.set_stdout_file(os.path.join(log_dir, 'datafind-$(macroobservatory)-$(macrotype)-$(macrogpsstarttime)-$(macrogpsendtime)-$(cluster)-$(process).out'))
3775 self.set_sub_file('datafind.sub')
3776
3778 """
3779 returns the directroy that the cache files are written to.
3780 """
3781 return self.__cache_dir
3782
3784 """
3785 returns the dax flag
3786 """
3787 return self.__dax
3788
3790 """
3791 return the configuration file object
3792 """
3793 return self.__config_file
3794
3796 return self.__lsync_cache
3797
3798
3800 """
3801 A DataFindNode runs an instance of LSCdataFind in a Condor DAG.
3802 """
3804 """
3805 @param job: A CondorDAGJob that can run an instance of LALdataFind.
3806 """
3807 CondorDAGNode.__init__(self,job)
3808 AnalysisNode.__init__(self)
3809 self.__start = 0
3810 self.__end = 0
3811 self.__observatory = None
3812 self.__output = None
3813 self.__job = job
3814 self.__dax = job.is_dax()
3815 self.__lfn_list = None
3816
3817
3818 try:
3819 self.set_type(self.job().get_config_file().get('datafind','type'))
3820 except:
3821 self.__type = None
3822
3824 """
3825 Private method to set the file to write the cache to. Automaticaly set
3826 once the ifo, start and end times have been set.
3827 """
3828 if self.__start and self.__end and self.__observatory and self.__type:
3829 self.__output = os.path.join(self.__job.get_cache_dir(), self.__observatory + '-' + self.__type +'_CACHE' + '-' + str(self.__start) + '-' + str(self.__end - self.__start) + '.lcf')
3830 self.set_output(self.__output)
3831
3833 """
3834 Set the start time of the datafind query.
3835 @param time: GPS start time of query.
3836 """
3837 if pad:
3838 self.add_var_opt('gps-start-time', int(time)-int(pad))
3839 else:
3840 self.add_var_opt('gps-start-time', int(time))
3841 self.__start = time
3842 self.__set_output()
3843
3845 """
3846 Return the start time of the datafind query
3847 """
3848 return self.__start
3849
3851 """
3852 Set the end time of the datafind query.
3853 @param time: GPS end time of query.
3854 """
3855 self.add_var_opt('gps-end-time', time)
3856 self.__end = time
3857 self.__set_output()
3858
3860 """
3861 Return the start time of the datafind query
3862 """
3863 return self.__end
3864
3866 """
3867 Set the IFO to retrieve data for. Since the data from both Hanford
3868 interferometers is stored in the same frame file, this takes the first
3869 letter of the IFO (e.g. L or H) and passes it to the --observatory option
3870 of LSCdataFind.
3871 @param obs: IFO to obtain data for.
3872 """
3873 self.add_var_opt('observatory',obs)
3874 self.__observatory = str(obs)
3875 self.__set_output()
3876
3878 """
3879 Return the start time of the datafind query
3880 """
3881 return self.__observatory
3882
3884 """
3885 sets the frame type that we are querying
3886 """
3887 self.add_var_opt('type',str(type))
3888 self.__type = str(type)
3889 self.__set_output()
3890
3892 """
3893 gets the frame type that we are querying
3894 """
3895 return self.__type
3896
3898 return self.__output
3899
3901 """
3902 Return the output file, i.e. the file containing the frame cache data.
3903 or the files itself as tuple (for DAX)
3904 """
3905 if self.__dax:
3906
3907
3908 if self.__lfn_list is None:
3909
3910 if self.job().lsync_cache():
3911
3912 if self.__lfn_list is None:
3913 self.__lfn_list = self.job().lsync_cache().get_lfns(
3914 self.get_observatory(), self.get_type(),
3915 self.get_start(), self.get_end())
3916
3917 else:
3918
3919 try:
3920 server = os.environ['LIGO_DATAFIND_SERVER']
3921 except KeyError:
3922 raise RuntimeError(
3923 "Environment variable LIGO_DATAFIND_SERVER is not set")
3924
3925 try:
3926 h = six.moves.http_client.HTTPConnection(server)
3927 except:
3928
3929
3930 cert = None
3931 key = None
3932 try:
3933 proxy = os.environ['X509_USER_PROXY']
3934 cert = proxy
3935 key = proxy
3936 except:
3937 try:
3938 cert = os.environ['X509_USER_CERT']
3939 key = os.environ['X509_USER_KEY']
3940 except:
3941 uid = os.getuid()
3942 proxy_path = "/tmp/x509up_u%d" % uid
3943 if os.access(path, os.R_OK):
3944 cert = proxy_path
3945 key = proxy_path
3946
3947 h = six.moves.http_client.HTTPSConnection(server, key_file = key, cert_file = cert)
3948
3949
3950 url = "/LDR/services/data/v1/gwf/%s/%s/%s,%s.json" % (
3951 self.get_observatory(), self.get_type(),
3952 str(self.get_start()), str(self.get_end()))
3953
3954
3955 h.request("GET", url)
3956 response = h.getresponse()
3957
3958 if response.status != 200:
3959 msg = "Server returned code %d: %s" % (response.status, response.reason)
3960 body = response.read()
3961 msg += body
3962 raise RuntimeError(msg)
3963
3964
3965 body = response.read()
3966
3967
3968 urlList = decode(body)
3969 lfnDict = {}
3970 for url in urlList:
3971 path = urllib.parse.urlparse(url)[2]
3972 lfn = os.path.split(path)[1]
3973 lfnDict[lfn] = 1
3974
3975 self.__lfn_list = list(lfnDict.keys())
3976 self.__lfn_list.sort()
3977
3978 return self.__lfn_list
3979 else:
3980 return self.__output
3981
3982
3984 """
3985 A ligolw_add job can be used to concatenate several ligo lw files
3986 """
3987 - def __init__(self,log_dir,cp,dax=False):
3988 """
3989 cp = ConfigParser object from which options are read.
3990 """
3991 self.__executable = cp.get('condor','ligolw_add')
3992 self.__universe = 'vanilla'
3993 CondorDAGJob.__init__(self,self.__universe,self.__executable)
3994 AnalysisJob.__init__(self,cp,dax)
3995 self.add_ini_opts(cp, "ligolw_add")
3996
3997 self.add_condor_cmd('getenv','True')
3998 self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
3999
4000 self.set_stdout_file(os.path.join( log_dir, 'ligolw_add-$(cluster)-$(process).out') )
4001 self.set_stderr_file(os.path.join( log_dir, 'ligolw_add-$(cluster)-$(process).err') )
4002 self.set_sub_file('ligolw_add.sub')
4003
4004
4006 """
4007 Runs an instance of ligolw_add in a Condor DAG.
4008 """
4015
4016
4018 """
4019 A ligolw_cut job can be used to remove parts of a ligo lw file
4020 """
4021 - def __init__(self,log_dir,cp,dax=False):
4022 """
4023 cp = ConfigParser object from which options are read.
4024 """
4025 self.__executable = cp.get('condor','ligolw_cut')
4026 self.__universe = 'vanilla'
4027 CondorDAGJob.__init__(self,self.__universe,self.__executable)
4028 AnalysisJob.__init__(self,cp,dax)
4029
4030 self.add_condor_cmd('getenv','True')
4031
4032 self.set_stdout_file(os.path.join( log_dir, 'ligolw_cut-$(cluster)-$(process).out') )
4033 self.set_stderr_file(os.path.join( log_dir, 'ligolw_cut-$(cluster)-$(process).err') )
4034 self.set_sub_file('ligolw_cut.sub')
4035
4036
4038 """
4039 Runs an instance of ligolw_cut in a Condor DAG.
4040 """
4047
4048
4049 -class LDBDCJob(CondorDAGJob, AnalysisJob):
4050 """
4051 A ldbdc job can be used to insert data or fetch data from the database.
4052 """
4053 - def __init__(self,log_dir,cp,dax=False):
4054 """
4055 cp = ConfigParser object from which options are read.
4056 """
4057 self.__executable = cp.get('condor','ldbdc')
4058 self.__universe = 'local'
4059 CondorDAGJob.__init__(self,self.__universe,self.__executable)
4060 AnalysisJob.__init__(self,cp,dax)
4061
4062 self.add_condor_cmd('getenv','True')
4063
4064 self.set_stdout_file(os.path.join( log_dir, 'ldbdc-$(cluster)-$(process).out') )
4065 self.set_stderr_file(os.path.join( log_dir, 'ldbdc-$(cluster)-$(process).err') )
4066 self.set_sub_file('ldbdc.sub')
4067
4068
4069 -class LDBDCNode(CondorDAGNode, AnalysisNode):
4070 """
4071 Runs an instance of ldbdc in a Condor DAG.
4072 """
4074 """
4075 @param job: A CondorDAGJob that can run an instance of ligolw_add
4076 """
4077 CondorDAGNode.__init__(self,job)
4078 AnalysisNode.__init__(self)
4079 self.__server = None
4080 self.__identity = None
4081 self.__insert = None
4082 self.__pfn = None
4083 self.__query = None
4084
4086 """
4087 Set the server name.
4088 """
4089 self.add_var_opt('server',server)
4090 self.__server = server
4091
4093 """
4094 Get the server name.
4095 """
4096 return self.__server
4097
4099 """
4100 Set the identity name.
4101 """
4102 self.add_var_opt('identity',identity)
4103 self.__identity = identity
4104
4106 """
4107 Get the identity name.
4108 """
4109 return self.__identity
4110
4117
4119 """
4120 Get the insert name.
4121 """
4122 return self.__insert
4123
4125 """
4126 Set the pfn name.
4127 """
4128 self.add_var_opt('pfn',pfn)
4129 self.__pfn = pfn
4130
4132 """
4133 Get the pfn name.
4134 """
4135 return self.__pfn
4136
4138 """
4139 Set the query name.
4140 """
4141 self.add_var_opt('query',query)
4142 self.__query = query
4143
4145 """
4146 Get the query name.
4147 """
4148 return self.__query
4149
4150
4151 -class NoopJob(CondorDAGJob, AnalysisJob):
4152 """
4153 A Noop Job does nothing.
4154 """
4155 - def __init__(self,log_dir,cp,dax=False):
4156 """
4157 cp = ConfigParser object from which options are read.
4158 """
4159 self.__executable = 'true'
4160 self.__universe = 'local'
4161 CondorDAGJob.__init__(self,self.__universe,self.__executable)
4162 AnalysisJob.__init__(self,cp,dax)
4163
4164 self.add_condor_cmd('getenv','True')
4165 self.add_condor_cmd('noop_job','True')
4166 self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
4167
4168 self.set_stdout_file(os.path.join( log_dir, 'noop-$(cluster)-$(process).out') )
4169 self.set_stderr_file(os.path.join( log_dir, 'noop-$(cluster)-$(process).err') )
4170 self.set_sub_file('noop.sub')
4171
4172
4173 -class NoopNode(CondorDAGNode, AnalysisNode):
4174 """
4175 Run an noop job in a Condor DAG.
4176 """
4178 """
4179 @param job: A CondorDAGJob that does nothing.
4180 """
4181 CondorDAGNode.__init__(self,job)
4182 AnalysisNode.__init__(self)
4183 self.__server = None
4184 self.__identity = None
4185 self.__insert = None
4186 self.__pfn = None
4187 self.__query = None
4188
4189
4191 """
4192 A cbc sqlite job adds to CondorDAGJob and AnalysisJob features common to jobs
4193 which read or write to a sqlite database. Of note, the universe is always set to
4194 local regardless of what's in the cp file, the extension is set
4195 to None so that it may be set by individual SqliteNodes, log files do not
4196 have macrogpsstarttime and endtime in them, and get_env is set to True.
4197 """
4198 - def __init__(self, cp, sections, exec_name, dax = False):
4199 """
4200 @cp: a ConfigParser object from which options are read
4201 @sections: list of sections in cp to get added options
4202 @exec_name: the name of the sql executable
4203 """
4204 self.__exec_name = exec_name
4205 executable = cp.get('condor', exec_name)
4206 universe = 'vanilla'
4207 CondorDAGJob.__init__(self, universe, executable)
4208 AnalysisJob.__init__(self, cp, dax)
4209
4210 for sec in sections:
4211 if cp.has_section(sec):
4212 self.add_ini_opts(cp, sec)
4213 else:
4214 sys.stderr.write("warning: config file is missing section [" + sec + "]\n")
4215
4216 self.add_condor_cmd('getenv', 'True')
4217 self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
4218 self.set_stdout_file('logs/' + exec_name + '-$(cluster)-$(process).out')
4219 self.set_stderr_file('logs/' + exec_name + '-$(cluster)-$(process).err')
4220
4222 """
4223 Set the exec_name name
4224 """
4225 self.__exec_name = exec_name
4226
4228 """
4229 Get the exec_name name
4230 """
4231 return self.__exec_name
4232
4233
4235 """
4236 A cbc sqlite node adds to the standard AnalysisNode features common to nodes
4237 which read or write to a sqlite database. Specifically, it adds the set_tmp_space_path
4238 and set_database methods.
4239 """
4248
4250 """
4251 Sets temp-space path. This should be on a local disk.
4252 """
4253 self.add_var_opt('tmp-space', tmp_space)
4254 self.__tmp_space = tmp_space
4255
4257 """
4258 Gets tmp-space path.
4259 """
4260 return self.__tmp_space
4261
4263 """
4264 Sets database option.
4265 """
4266 self.add_file_opt('database', database)
4267 self.__database = database
4268
4270 """
4271 Gets database option.
4272 """
4273 return self.__database
4274
4275
4277 """
4278 A LigolwSqlite job. The static options are read from the
4279 section [ligolw_sqlite] in the ini file.
4280 """
4282 """
4283 @cp: ConfigParser object from which options are read.
4284 """
4285 exec_name = 'ligolw_sqlite'
4286 sections = ['ligolw_sqlite']
4287 SqliteJob.__init__(self, cp, sections, exec_name, dax)
4288
4290 """
4291 Sets the --replace option. This will cause the job
4292 to overwrite existing databases rather than add to them.
4293 """
4294 self.add_opt('replace','')
4295
4296
4298 """
4299 A LigolwSqlite node.
4300 """
4302 """
4303 @job: a LigolwSqliteJob
4304 """
4305 SqliteNode.__init__(self, job)
4306 self.__input_cache = None
4307 self.__xml_output = None
4308 self.__xml_input = None
4309
4316
4322
4328
4330 """
4331 Tell ligolw_sqlite to dump the contents of the database to a file.
4332 """
4333 if self.get_database() is None:
4334 raise ValueError("no database specified")
4335 self.add_file_opt('extract', xml_file)
4336 self.__xml_output = xml_file
4337
4339 """
4340 Override standard get_output to return xml-file if xml-file is specified.
4341 Otherwise, will return database.
4342 """
4343 if self.__xml_output:
4344 return self.__xml_output
4345 elif self.get_database():
4346 return self.get_database()
4347 else:
4348 raise ValueError("no output xml file or database specified")
4349
4351 """
4352 The standard SafeConfigParser no longer supports deepcopy() as of python
4353 2.7 (see http://bugs.python.org/issue16058). This subclass restores that
4354 functionality.
4355 """
4357
4358
4359 config_string = StringIO.StringIO()
4360 self.write(config_string)
4361 config_string.seek(0)
4362 new_config = self.__class__()
4363 new_config.readfp(config_string)
4364 return new_config
4365