1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import sys
19 import os
20 import time
21 import socket
22 import pwd
23
24 try:
25 import DB2
26 except:
27 pass
28
29 try:
30 from glue import gpstime
31 from glue import segments
32 except ImportError:
33 raise ImportError("Error, unable to import modules from glue. Check that glue is correctly installed and in your PYTHONPATH.")
34
35
36 __author__ = "Ping Wei <piwei@physics.syr.edu>"
37 from glue import git_version
38 __date__ = git_version.date
39 __version__ = git_version.id
40 __src__ = "$Source$"
41
42
44 ret = 0
45
46 try:
47 st = int(start_time)
48 et = int(end_time)
49 db = str(database.strip())
50
51
52
53
54 dbconn = DB2.connect(dsn=db, uid='', pwd='', autoCommit=True)
55 curs = dbconn.cursor()
56
57
58 sql = "select hex(GENERATE_UNIQUE()) from sysibm.sysdummy1"
59 curs.execute(sql)
60 hex_procid = curs.fetchone()[0]
61 process_id = 'x' + '\'' + hex_procid + '\''
62
63
64 sql = "SELECT DEFAULT FROM SYSCAT.COLUMNS WHERE "
65 sql += "TABNAME = 'PROCESS' AND COLNAME = 'CREATOR_DB'"
66 curs.execute(sql)
67 creator_db = int(curs.fetchone()[0])
68
69
70
71 program = os.path.abspath(sys.argv[0])
72 node = socket.gethostname()
73 username = pwd.getpwuid(os.getuid()).pw_name
74 unix_procid = os.getpid()
75 proc_start_time = gpstime.GpsSecondsFromPyUTC(time.time())
76 end_time = None
77 jobid = 0
78 domain = 'coalesce_local'
79
80
81 sql = "INSERT INTO process "
82 sql += "(program, is_online, node, username, unix_procid, start_time, jobid, domain, process_id, creator_db) "
83 sql += "VALUES ('%s', 0, '%s', '%s', %d, %d, %d, '%s',%s, %d)" % (program, node, username, unix_procid, proc_start_time, jobid, domain, process_id, creator_db)
84 curs.execute(sql)
85
86
87 sql = "SELECT BLOB(process_id) from process where hex(process_id)='%s' " % hex_procid
88 curs.execute(sql)
89 blob_procid = curs.fetchone()[0]
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 sql = "SELECT distinct(hex(segment_summary.segment_def_id)) FROM segment_summary, segment_definer, process "
107 sql += "WHERE segment_summary.segment_def_id=segment_definer.segment_def_id "
108 sql += "AND segment_summary.segment_def_cdb=segment_definer.creator_db "
109 sql += "AND segment_summary.process_id=process.process_id "
110 sql += "AND segment_summary.creator_db=process.creator_db "
111
112
113 sql += "AND segment_summary.start_time <=%d " % et
114 sql += "AND segment_summary.end_time >= %d " % st
115 curs.execute(sql)
116 def_ids = curs.fetchall()
117 if not def_ids:
118 data_existence = 0
119 else:
120 data_existence = 1
121
122
123 for d in def_ids:
124
125 sql = "SELECT BLOB(segment_def_id), ifos, name, version, creator_db "
126 sql += "FROM segment_definer "
127 sql += "WHERE hex(segment_def_id) = '%s' " % d[0]
128
129 curs.execute(sql)
130 result = curs.fetchone()
131 blob_defid = result[0]
132 ifos = result[1].strip()
133 name = result[2]
134 ver = result[3]
135 def_cdb = result[4]
136
137
138
139 try:
140 curs.execute("drop view seg_view")
141 except:
142 pass
143 sql = "CREATE view seg_view (st,et,seg_id) AS "
144 sql += "SELECT start_time,end_time, segment_id from segment "
145 sql += "WHERE hex(segment_def_id) = '%s' " % d[0]
146 sql += "AND segment.start_time <=%d " % et
147 sql += "AND segment.end_time >= %d " % st
148 sys.stdout.write("Selecting segments to coalesce for %s version:%d %s ... \n" % (ifos,ver, name))
149 curs.execute(sql)
150
151 curs.execute("SELECT st,et from seg_view")
152 seg_bf_cos = curs.fetchall()
153
154
155 try:
156 curs.execute("drop view sum_view")
157 except:
158 pass
159 sql = "CREATE view sum_view (st,et,sum_id) AS "
160 sql += "SELECT start_time,end_time, segment_sum_id from segment_summary "
161 sql += "WHERE hex(segment_def_id) = '%s' " % d[0]
162 sql += "AND segment_summary.start_time <=%d " % et
163 sql += "AND segment_summary.end_time >= %d " % st
164 curs.execute(sql)
165
166 curs.execute("SELECT st,et from sum_view")
167 sum_bf_cos = curs.fetchall()
168
169
170 sys.stdout.write("Coalescing segments ... \n")
171 segs = segments.segmentlist([])
172 sums = segments.segmentlist([])
173 for bf in seg_bf_cos:
174 seg = segments.segment(int(bf[0]), int(bf[1]))
175 segs.append(seg)
176 for bf in sum_bf_cos:
177 sum = segments.segment(int(bf[0]), int(bf[1]))
178 sums.append(sum)
179
180 segs.coalesce()
181 sums.coalesce()
182
183
184
185
186 insert_list = []
187 for s in segs:
188
189 curs.execute("VALUES BLOB(GENERATE_UNIQUE())")
190 prim_id = curs.fetchone()[0]
191
192 insert_list.append((prim_id, creator_db, s[0], s[1], blob_defid, def_cdb, blob_procid))
193
194 sql = "INSERT INTO segment "
195 sql += "(segment_id, creator_db, start_time, end_time, segment_def_id, segment_def_cdb, process_id) "
196 sql += "VALUES (?,?,?,?,?,?,?) "
197 sys.stdout.write("Inserting coalesced segments back in ... \n")
198 curs.executemany(sql, insert_list)
199
200
201 insert_list = []
202 for s in sums:
203
204 curs.execute("VALUES BLOB(GENERATE_UNIQUE())")
205 prim_id = curs.fetchone()[0]
206
207 insert_list.append((prim_id, creator_db, s[0], s[1], blob_defid, def_cdb, blob_procid))
208 sql = "INSERT INTO segment_summary "
209 sql += "(segment_sum_id, creator_db, start_time, end_time, segment_def_id, segment_def_cdb, process_id) "
210 sql += "VALUES (?,?,?,?,?,?,?) "
211 curs.executemany(sql, insert_list)
212
213
214 sys.stdout.write("Deleting un-coaleseced segments ... \n\n")
215 sql = "DELETE FROM segment "
216 sql += "WHERE segment_id in (select seg_id from seg_view) "
217 sql += "AND process_id != %s " % process_id
218 curs.execute(sql)
219
220 sql = "DELETE FROM segment_summary "
221 sql += "WHERE segment_sum_id in (select sum_id from sum_view) "
222 sql += "AND process_id != %s " % process_id
223 curs.execute(sql)
224
225
226 sql = "update process set end_time=%d where hex(process_id)='%s' " % (gpstime.GpsSecondsFromPyUTC(time.time()),hex_procid)
227 curs.execute(sql)
228
229 try:
230 curs.execute("drop view seg_view")
231 curs.execute("drop view sum_view")
232 except:
233 pass
234 curs.close()
235
236 except Exception as e:
237 ret = str(e)
238 sys.stdout.write("%s\n" % ret)
239
240 return ret,data_existence
241