1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18  import sys 
 19  import os 
 20  import time 
 21  import socket 
 22  import pwd 
 23   
 24  try: 
 25    import DB2 
 26  except: 
 27    pass 
 28   
 29  try: 
 30    from glue import gpstime 
 31    from glue import segments 
 32  except ImportError: 
 33    raise ImportError("Error, unable to import modules from glue. Check that glue is correctly installed and in your PYTHONPATH.") 
 34   
 35   
 36  __author__ = "Ping Wei <piwei@physics.syr.edu>" 
 37  from glue import git_version 
 38  __date__ = git_version.date 
 39  __version__ = git_version.id 
 40  __src__ = "$Source$" 
 41   
 42   
 44    ret = 0             
 45   
 46    try: 
 47      st = int(start_time) 
 48      et = int(end_time) 
 49      db = str(database.strip()) 
 50       
 51       
 52       
 53       
 54      dbconn = DB2.connect(dsn=db, uid='', pwd='', autoCommit=True) 
 55      curs = dbconn.cursor() 
 56   
 57       
 58      sql = "select hex(GENERATE_UNIQUE()) from sysibm.sysdummy1" 
 59      curs.execute(sql) 
 60      hex_procid = curs.fetchone()[0] 
 61      process_id = 'x' + '\'' + hex_procid + '\'' 
 62   
 63       
 64      sql = "SELECT DEFAULT FROM SYSCAT.COLUMNS WHERE " 
 65      sql += "TABNAME = 'PROCESS' AND COLNAME = 'CREATOR_DB'" 
 66      curs.execute(sql) 
 67      creator_db = int(curs.fetchone()[0]) 
 68   
 69     
 70       
 71      program = os.path.abspath(sys.argv[0]) 
 72      node = socket.gethostname() 
 73      username = pwd.getpwuid(os.getuid()).pw_name 
 74      unix_procid = os.getpid() 
 75      proc_start_time = gpstime.GpsSecondsFromPyUTC(time.time()) 
 76      end_time = None 
 77      jobid = 0 
 78      domain = 'coalesce_local' 
 79   
 80       
 81      sql = "INSERT INTO process " 
 82      sql += "(program, is_online, node, username, unix_procid, start_time, jobid, domain, process_id, creator_db) " 
 83      sql += "VALUES ('%s', 0, '%s', '%s', %d, %d, %d, '%s',%s, %d)" % (program, node, username, unix_procid, proc_start_time, jobid, domain, process_id, creator_db) 
 84      curs.execute(sql) 
 85   
 86       
 87      sql = "SELECT BLOB(process_id) from process where hex(process_id)='%s' " % hex_procid 
 88      curs.execute(sql) 
 89      blob_procid = curs.fetchone()[0] 
 90   
 91   
 92       
 93       
 94       
 95       
 96       
 97       
 98       
 99       
100       
101       
102       
103   
104   
105       
106      sql  = "SELECT distinct(hex(segment_summary.segment_def_id)) FROM segment_summary, segment_definer, process " 
107      sql += "WHERE segment_summary.segment_def_id=segment_definer.segment_def_id " 
108      sql += "AND segment_summary.segment_def_cdb=segment_definer.creator_db " 
109      sql += "AND segment_summary.process_id=process.process_id " 
110      sql += "AND segment_summary.creator_db=process.creator_db " 
111       
112       
113      sql += "AND segment_summary.start_time <=%d " % et 
114      sql += "AND segment_summary.end_time >= %d " % st 
115      curs.execute(sql) 
116      def_ids = curs.fetchall() 
117      if not def_ids: 
118        data_existence = 0 
119      else: 
120        data_existence = 1 
121   
122       
123      for d in def_ids: 
124         
125        sql = "SELECT BLOB(segment_def_id), ifos, name, version, creator_db "  
126        sql += "FROM segment_definer "  
127        sql += "WHERE hex(segment_def_id) = '%s' " % d[0] 
128   
129        curs.execute(sql) 
130        result = curs.fetchone() 
131        blob_defid = result[0] 
132        ifos = result[1].strip()  
133        name = result[2] 
134        ver = result[3] 
135        def_cdb = result[4] 
136   
137         
138         
139        try: 
140          curs.execute("drop view seg_view") 
141        except: 
142          pass 
143        sql = "CREATE view seg_view (st,et,seg_id) AS " 
144        sql += "SELECT start_time,end_time, segment_id from segment " 
145        sql += "WHERE hex(segment_def_id) = '%s' " % d[0] 
146        sql += "AND segment.start_time <=%d " % et 
147        sql += "AND segment.end_time >= %d " % st 
148        sys.stdout.write("Selecting segments to coalesce for %s version:%d %s ... \n" % (ifos,ver, name)) 
149        curs.execute(sql) 
150   
151        curs.execute("SELECT st,et from seg_view") 
152        seg_bf_cos = curs.fetchall()    
153   
154         
155        try: 
156          curs.execute("drop view sum_view") 
157        except: 
158          pass 
159        sql = "CREATE view sum_view (st,et,sum_id) AS " 
160        sql += "SELECT start_time,end_time, segment_sum_id from segment_summary " 
161        sql += "WHERE hex(segment_def_id) = '%s' " % d[0] 
162        sql += "AND segment_summary.start_time <=%d " % et 
163        sql += "AND segment_summary.end_time >= %d " % st 
164        curs.execute(sql) 
165   
166        curs.execute("SELECT st,et from sum_view") 
167        sum_bf_cos = curs.fetchall()    
168   
169         
170        sys.stdout.write("Coalescing segments ... \n") 
171        segs = segments.segmentlist([])  
172        sums = segments.segmentlist([])  
173        for bf in seg_bf_cos: 
174          seg = segments.segment(int(bf[0]), int(bf[1])) 
175          segs.append(seg)  
176        for bf in sum_bf_cos: 
177          sum = segments.segment(int(bf[0]), int(bf[1])) 
178          sums.append(sum)  
179   
180        segs.coalesce() 
181        sums.coalesce() 
182   
183   
184         
185         
186        insert_list = [] 
187        for s in segs: 
188           
189          curs.execute("VALUES BLOB(GENERATE_UNIQUE())") 
190          prim_id = curs.fetchone()[0] 
191           
192          insert_list.append((prim_id, creator_db, s[0], s[1], blob_defid, def_cdb, blob_procid)) 
193   
194        sql = "INSERT INTO segment " 
195        sql += "(segment_id, creator_db, start_time, end_time, segment_def_id, segment_def_cdb, process_id) " 
196        sql += "VALUES (?,?,?,?,?,?,?) " 
197        sys.stdout.write("Inserting coalesced segments back in ... \n") 
198        curs.executemany(sql, insert_list) 
199   
200         
201        insert_list = [] 
202        for s in sums: 
203           
204          curs.execute("VALUES BLOB(GENERATE_UNIQUE())") 
205          prim_id = curs.fetchone()[0] 
206           
207          insert_list.append((prim_id, creator_db, s[0], s[1], blob_defid, def_cdb, blob_procid)) 
208        sql = "INSERT INTO segment_summary " 
209        sql += "(segment_sum_id, creator_db, start_time, end_time, segment_def_id, segment_def_cdb, process_id) " 
210        sql += "VALUES (?,?,?,?,?,?,?) " 
211        curs.executemany(sql, insert_list) 
212   
213         
214        sys.stdout.write("Deleting un-coaleseced segments ... \n\n") 
215        sql = "DELETE FROM segment " 
216        sql += "WHERE segment_id in (select seg_id from seg_view) " 
217        sql += "AND process_id != %s " % process_id 
218        curs.execute(sql) 
219   
220        sql = "DELETE FROM segment_summary " 
221        sql += "WHERE segment_sum_id in (select sum_id from sum_view) " 
222        sql += "AND process_id != %s " % process_id 
223        curs.execute(sql) 
224   
225       
226      sql = "update process set end_time=%d where hex(process_id)='%s' " % (gpstime.GpsSecondsFromPyUTC(time.time()),hex_procid) 
227      curs.execute(sql) 
228     
229      try:   
230        curs.execute("drop view seg_view") 
231        curs.execute("drop view sum_view") 
232      except: 
233        pass 
234      curs.close() 
235   
236    except Exception as e: 
237      ret = str(e) 
238      sys.stdout.write("%s\n" % ret) 
239   
240    return ret,data_existence 
 241