Package glue :: Package ligolw :: Package utils :: Module coalesce_db
[hide private]
[frames] | no frames]

Source Code for Module glue.ligolw.utils.coalesce_db

  1  # 
  2  # Copyright (C) 2006  Kipp C. Cannon 
  3  # 
  4  # This program is free software; you can redistribute it and/or modify it 
  5  # under the terms of the GNU General Public License as published by the 
  6  # Free Software Foundation; either version 2 of the License, or (at your 
  7  # option) any later version. 
  8  # 
  9  # This program is distributed in the hope that it will be useful, but 
 10  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 11  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General 
 12  # Public License for more details. 
 13  # 
 14  # You should have received a copy of the GNU General Public License along 
 15  # with this program; if not, write to the Free Software Foundation, Inc., 
 16  # 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA. 
 17   
 18  import sys 
 19  import os 
 20  import time 
 21  import socket 
 22  import pwd 
 23   
 24  try: 
 25    import DB2 
 26  except: 
 27    pass 
 28   
 29  try: 
 30    from glue import gpstime 
 31    from glue import segments 
 32  except ImportError: 
 33    raise ImportError("Error, unable to import modules from glue. Check that glue is correctly installed and in your PYTHONPATH.") 
 34   
 35  #================================================================================ 
 36  __author__ = "Ping Wei <piwei@physics.syr.edu>" 
 37  from glue import git_version 
 38  __date__ = git_version.date 
 39  __version__ = git_version.id 
 40  __src__ = "$Source$" 
 41  #================================================================================ 
 42   
43 -def coalesce_seg(database, start_time, end_time):
44 ret = 0 #assume execution successufl 45 46 try: 47 st = int(start_time) 48 et = int(end_time) 49 db = str(database.strip()) 50 #------------------------------------------------------------------- 51 # Set up environment and get needed values 52 #------------------------------------------------------------------- 53 # Set up connection to the database 54 dbconn = DB2.connect(dsn=db, uid='', pwd='', autoCommit=True) 55 curs = dbconn.cursor() 56 57 # create a new process_id 58 sql = "select hex(GENERATE_UNIQUE()) from sysibm.sysdummy1" 59 curs.execute(sql) 60 hex_procid = curs.fetchone()[0] 61 process_id = 'x' + '\'' + hex_procid + '\'' 62 63 # determine the local creator_db 64 sql = "SELECT DEFAULT FROM SYSCAT.COLUMNS WHERE " 65 sql += "TABNAME = 'PROCESS' AND COLNAME = 'CREATOR_DB'" 66 curs.execute(sql) 67 creator_db = int(curs.fetchone()[0]) 68 69 70 # prepare values for the new row to be inserted into the process table 71 program = os.path.abspath(sys.argv[0]) 72 node = socket.gethostname() 73 username = pwd.getpwuid(os.getuid()).pw_name 74 unix_procid = os.getpid() 75 proc_start_time = gpstime.GpsSecondsFromPyUTC(time.time()) 76 end_time = None 77 jobid = 0 78 domain = 'coalesce_local' 79 80 # insert new row into process table 81 sql = "INSERT INTO process " 82 sql += "(program, is_online, node, username, unix_procid, start_time, jobid, domain, process_id, creator_db) " 83 sql += "VALUES ('%s', 0, '%s', '%s', %d, %d, %d, '%s',%s, %d)" % (program, node, username, unix_procid, proc_start_time, jobid, domain, process_id, creator_db) 84 curs.execute(sql) 85 86 # get the BLOB process_id for later reference 87 sql = "SELECT BLOB(process_id) from process where hex(process_id)='%s' " % hex_procid 88 curs.execute(sql) 89 blob_procid = curs.fetchone()[0] 90 91 92 #======================================================================== 93 # 94 # Main 95 # 96 #======================================================================== 97 # Algorithm: 98 # 1. Find distinct version 1 segment type from segment_summary table witnin start_time, end_time range 99 # 2. Find segments and intervals to coalesce 100 # 3. Coalesce segments and intervals 101 # 4. Insert coaleseced segments back in to the database 102 # 5. Delete uncoalesced segments and intervals from the database 103 104 105 # 1. Find distinct segment types matching our criteria from segment_summary within the specified time range 106 sql = "SELECT distinct(hex(segment_summary.segment_def_id)) FROM segment_summary, segment_definer, process " 107 sql += "WHERE segment_summary.segment_def_id=segment_definer.segment_def_id " 108 sql += "AND segment_summary.segment_def_cdb=segment_definer.creator_db " 109 sql += "AND segment_summary.process_id=process.process_id " 110 sql += "AND segment_summary.creator_db=process.creator_db " 111 # Removed next line so that all segments are coalesced: this will be slower up front but faster for queries and the long run 112 #sql += "AND ((segment_definer.name like 'DMT-%' and segment_definer.version=1) or (process.ifos='V1' and process.program='SegOnline')) " 113 sql += "AND segment_summary.start_time <=%d " % et 114 sql += "AND segment_summary.end_time >= %d " % st 115 curs.execute(sql) 116 def_ids = curs.fetchall() 117 if not def_ids: 118 data_existence = 0 119 else: 120 data_existence = 1 121 122 # loop in the segment types to fetch, coalesce, insert and delete 123 for d in def_ids: 124 # get the BLOB segment_def_id for later use 125 sql = "SELECT BLOB(segment_def_id), ifos, name, version, creator_db " 126 sql += "FROM segment_definer " 127 sql += "WHERE hex(segment_def_id) = '%s' " % d[0] 128 129 curs.execute(sql) 130 result = curs.fetchone() 131 blob_defid = result[0] 132 ifos = result[1].strip() 133 name = result[2] 134 ver = result[3] 135 def_cdb = result[4] 136 137 # 2. Find segments and intervals to coalesce 138 # get the segment start_time, end_time to coalesce, and according primary key to delete 139 try: 140 curs.execute("drop view seg_view") 141 except: 142 pass 143 sql = "CREATE view seg_view (st,et,seg_id) AS " 144 sql += "SELECT start_time,end_time, segment_id from segment " 145 sql += "WHERE hex(segment_def_id) = '%s' " % d[0] 146 sql += "AND segment.start_time <=%d " % et 147 sql += "AND segment.end_time >= %d " % st 148 sys.stdout.write("Selecting segments to coalesce for %s version:%d %s ... \n" % (ifos,ver, name)) 149 curs.execute(sql) 150 151 curs.execute("SELECT st,et from seg_view") 152 seg_bf_cos = curs.fetchall() # get the segments to coalesce 153 154 # get the summary start_time, end_time to coalesce, and according primary key to delete 155 try: 156 curs.execute("drop view sum_view") 157 except: 158 pass 159 sql = "CREATE view sum_view (st,et,sum_id) AS " 160 sql += "SELECT start_time,end_time, segment_sum_id from segment_summary " 161 sql += "WHERE hex(segment_def_id) = '%s' " % d[0] 162 sql += "AND segment_summary.start_time <=%d " % et 163 sql += "AND segment_summary.end_time >= %d " % st 164 curs.execute(sql) 165 166 curs.execute("SELECT st,et from sum_view") 167 sum_bf_cos = curs.fetchall() # get the summarys to coalesce 168 169 # 3. Coalesce segments and intervals 170 sys.stdout.write("Coalescing segments ... \n") 171 segs = segments.segmentlist([]) 172 sums = segments.segmentlist([]) 173 for bf in seg_bf_cos: 174 seg = segments.segment(int(bf[0]), int(bf[1])) 175 segs.append(seg) 176 for bf in sum_bf_cos: 177 sum = segments.segment(int(bf[0]), int(bf[1])) 178 sums.append(sum) 179 180 segs.coalesce() 181 sums.coalesce() 182 183 184 # 4. Insert coaleseced segments back in to the database 185 # insert coalesced segs into segment table 186 insert_list = [] 187 for s in segs: 188 # generate unique id for insertion 189 curs.execute("VALUES BLOB(GENERATE_UNIQUE())") 190 prim_id = curs.fetchone()[0] 191 # generate a list of values to insert using executemany() 192 insert_list.append((prim_id, creator_db, s[0], s[1], blob_defid, def_cdb, blob_procid)) 193 194 sql = "INSERT INTO segment " 195 sql += "(segment_id, creator_db, start_time, end_time, segment_def_id, segment_def_cdb, process_id) " 196 sql += "VALUES (?,?,?,?,?,?,?) " 197 sys.stdout.write("Inserting coalesced segments back in ... \n") 198 curs.executemany(sql, insert_list) 199 200 # insert coalesced sums into segment_summary table 201 insert_list = [] 202 for s in sums: 203 # generate unique id for insertion 204 curs.execute("VALUES BLOB(GENERATE_UNIQUE())") 205 prim_id = curs.fetchone()[0] 206 # generate a list of values to insert using executemany() 207 insert_list.append((prim_id, creator_db, s[0], s[1], blob_defid, def_cdb, blob_procid)) 208 sql = "INSERT INTO segment_summary " 209 sql += "(segment_sum_id, creator_db, start_time, end_time, segment_def_id, segment_def_cdb, process_id) " 210 sql += "VALUES (?,?,?,?,?,?,?) " 211 curs.executemany(sql, insert_list) 212 213 # 5. Delete uncoalesced segments and intervals from the database 214 sys.stdout.write("Deleting un-coaleseced segments ... \n\n") 215 sql = "DELETE FROM segment " 216 sql += "WHERE segment_id in (select seg_id from seg_view) " 217 sql += "AND process_id != %s " % process_id 218 curs.execute(sql) 219 220 sql = "DELETE FROM segment_summary " 221 sql += "WHERE segment_sum_id in (select sum_id from sum_view) " 222 sql += "AND process_id != %s " % process_id 223 curs.execute(sql) 224 225 # update end_time in process table 226 sql = "update process set end_time=%d where hex(process_id)='%s' " % (gpstime.GpsSecondsFromPyUTC(time.time()),hex_procid) 227 curs.execute(sql) 228 229 try: 230 curs.execute("drop view seg_view") 231 curs.execute("drop view sum_view") 232 except: 233 pass 234 curs.close() 235 236 except Exception as e: 237 ret = str(e) 238 sys.stdout.write("%s\n" % ret) 239 240 return ret,data_existence
241