Package glue :: Package segmentdb :: Module segmentdb_utils
[hide private]
[frames] | no frames]

Source Code for Module glue.segmentdb.segmentdb_utils

  1  #!/usr/bin/env python 
  2  # Id$ 
  3  # 
  4  # Copyright (C) 2009  Larne Pekowsky 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License as published by the 
  8  # Free Software Foundation; either version 3 of the License, or (at your 
  9  # option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General 
 14  # Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License along 
 17  # with this program; if not, write to the Free Software Foundation, Inc., 
 18  # 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA. 
 19   
 20  import sys 
 21  import os 
 22  import re 
 23   
 24  from glue.segments import segment, segmentlist 
 25  from glue.ligolw import lsctables 
 26  from glue.ligolw import table 
 27  from glue.segmentdb import query_engine 
 28  from glue.ligolw import types as ligolwtypes 
 29  from six.moves import filter 
 30  from six.moves import map 
 31  from six.moves import range 
 32   
 33   
 34   
 35  # 
 36  # ============================================================================= 
 37  # 
 38  #                     Routines to set up backends 
 39  # 
 40  # ============================================================================= 
 41  # 
 42   
 43   
44 -def get_all_files_in_range(dirname, starttime, endtime, pad=64):
45 """Returns all files in dirname and all its subdirectories whose 46 names indicate that they contain segments in the range starttime 47 to endtime""" 48 49 ret = [] 50 51 # Maybe the user just wants one file... 52 if os.path.isfile(dirname): 53 if re.match('.*-[0-9]*-[0-9]*\.xml$', dirname): 54 return [dirname] 55 else: 56 return ret 57 58 first_four_start = starttime / 100000 59 first_four_end = endtime / 100000 60 61 # Screen for files starting with . and ending with .xml.* 62 # i.e. those leftover by rsync 63 file_list=os.listdir(dirname) 64 file_list.sort() 65 for filename in file_list: 66 a = re.match("\..*\.xml\..*$",filename) 67 if a != None: 68 file_list.remove(a.group(0)) 69 70 #for filename in os.listdir(dirname): 71 for filename in file_list: 72 if re.match('.*-[0-9]{5}$', filename): 73 dirtime = int(filename[-5:]) 74 if dirtime >= first_four_start and dirtime <= first_four_end: 75 ret += get_all_files_in_range(os.path.join(dirname,filename), starttime, endtime, pad=pad) 76 elif re.match('.*-[0-9]{4}$', filename): 77 dirtime = int(filename[-4:]) 78 if dirtime >= first_four_start and dirtime <= first_four_end: 79 ret += get_all_files_in_range(os.path.join(dirname,filename), starttime, endtime, pad=pad) 80 elif re.match('.*-[0-9]*-[0-9]*\.xml$', filename): 81 file_time = int(filename.split('-')[-2]) 82 if file_time >= (starttime-pad) and file_time <= (endtime+pad): 83 ret.append(os.path.join(dirname,filename)) 84 elif os.path.isfile(os.path.join(dirname,filename)): 85 # Non .xml file, don't recurse: 86 return ret 87 else: 88 # Keep recursing, we may be looking at directories of 89 # ifos, each of which has directories with times 90 ret += get_all_files_in_range(os.path.join(dirname,filename), starttime, endtime, pad=pad) 91 92 return ret
93 94 95
96 -def setup_database(database_location):
97 """ 1. Determine protocol""" 98 try: 99 # When no protocol is given: 100 if database_location.find('://') == -1: 101 msg = "Error: Please specify protocol in your --segment-url argument in the format PROTOCOL://HOST" 102 msg +="\nFor example: --segment-url https://segdb.ligo.caltech.edu" 103 msg += "\nSupported protocols include: http, https, ldbd, ldbdi" 104 msg += "\nRun with --help for usage" 105 raise ValueError(msg) 106 107 # When wrong protocol is given: 108 protocol = database_location[:database_location.find('://')].lower() 109 if protocol not in ("http","https","ldbd","ldbdi"): 110 msg = "Error: protocol %s not supported" % protocol 111 msg += "\nPlease specify correct protocol in your --segment-url argument in the format PROTOCOL://HOST" 112 msg += "\nSupported protocols include: http, https, ldbd, ldbdi" 113 msg += "\nRun with --help for usage" 114 raise ValueError(msg) 115 except ValueError as e: 116 sys.stderr.write('%s\n' % str(e)) 117 sys.exit(1) 118 119 """ 2. Determine host and port""" 120 host_and_port = database_location[(len(protocol)+3):] 121 if host_and_port.find(':') < 0: 122 # if no port number given, set default port respectively: 123 host = host_and_port 124 if protocol == 'http': 125 port = 80 126 elif protocol == 'https': 127 port = 443 128 elif protocol == 'ldbd': 129 port = 30015 130 elif protocol == 'ldbdi': 131 port = 30016 132 else: 133 host, portString = host_and_port.split(':') 134 port = int(portString) 135 136 if port == 30020: 137 sys.stderr.write("Error: PORT 30020 no longer provide segment database service\n") 138 sys.exit(1) 139 140 """ 3. Set up connection to LDBD(W)Server """ 141 client = None 142 identity = "/DC=org/DC=doegrids/OU=Services/CN=ldbd/" 143 144 if protocol.startswith('http'): 145 from glue import LDBDWClient 146 if protocol == "https": 147 identity += host 148 else: 149 identity = None 150 try: 151 client = LDBDWClient.LDBDClient(host,port,protocol,identity) 152 except Exception as e: 153 sys.stderr.write("Unable to connect to LDBD Server at %s://%s:%d \n" % (protocol,host, port) + str(e)) 154 sys.exit(1) 155 156 elif protocol.startswith('ldbd'): 157 from glue import LDBDClient 158 if protocol == "ldbd": 159 identity += host 160 from glue import gsiserverutils 161 else: 162 identity = None 163 try: 164 client = LDBDClient.LDBDClient(host,port,identity) 165 except Exception as e: 166 sys.stderr.write("Unable to connect to LDBD Server at %s://%s:%d\n" % (protocol,host, port) + str(e)) 167 try: 168 if gsiserverutils.checkCredentials(): 169 sys.stderr.write("Got the following error : \n" + str(e)) 170 sys.stderr.write("Run wiht --help for usage\n") 171 except UnboundLocalError: 172 pass 173 sys.exit(1) 174 175 else: 176 raise ValueError( "invalid url for segment database" ) 177 178 179 return client
180 181 182 # 183 # ============================================================================= 184 # 185 # Routines to find segment information in databases/XML docs 186 # 187 # ============================================================================= 188 # 189 190 191
192 -def query_segments(engine, table, segdefs):
193 # each segdef is a list containing: 194 # ifo, name, version, start_time, end_time, start_pad, end_pad 195 196 197 # The trivial case: if there's nothing to do, return no time 198 if len(segdefs) == 0: 199 return [ segmentlist([]) ] 200 201 # 202 # For the sake of efficiency we query the database for all the segdefs at once 203 # This constructs a clause that looks for one 204 # 205 def make_clause(table, segdef): 206 ifo, name, version, start_time, end_time, start_pad, end_pad = segdef 207 208 sql = " (segment_definer.ifos = '%s' " % ifo 209 sql += "AND segment_definer.name = '%s' " % name 210 sql += "AND segment_definer.version = %s " % version 211 sql += "AND NOT (%d > %s.end_time OR %s.start_time > %d)) " % (start_time, table, table, end_time) 212 213 return sql
214 215 clauses = [make_clause(table, segdef) for segdef in segdefs] 216 217 sql = 'SELECT segment_definer.ifos, segment_definer.name, segment_definer.version, ' 218 sql += ' %s.start_time, %s.end_time ' % (table, table) 219 sql += ' FROM segment_definer, %s ' % table 220 sql += ' WHERE %s.segment_def_id = segment_definer.segment_def_id AND ' % table 221 222 if engine.__class__ == query_engine.LdbdQueryEngine: 223 sql += " %s.segment_def_cdb = segment_definer.creator_db AND " % table 224 225 sql += '( ' + ' OR '.join(clauses) + ' )' 226 227 rows = engine.query(sql) 228 229 # 230 # The result of a query will be rows of the form 231 # ifo, name, version, start_time, end_time 232 # 233 # We want to associate each returned row with the segdef it belongs to so that 234 # we can apply the correct padding. 235 # 236 # If segdefs were uniquely spcified by (ifo, name, version) this would 237 # be easy, but it may happen that we're looking for the same segment definer 238 # at multiple disjoint times. In particular this can happen if the user 239 # didn't specify a version number; in that case we might have version 2 240 # of some flag defined over multiple disjoint segment_definers. 241 # 242 results = [] 243 244 for segdef in segdefs: 245 ifo, name, version, start_time, end_time, start_pad, end_pad = segdef 246 247 search_span = segment(start_time, end_time) 248 search_span_list = segmentlist([search_span]) 249 250 # See whether the row belongs to the current segdef. Name, ifo and version must match 251 # and the padded segment must overlap with the range of the segdef. 252 def matches(row): 253 return ( row[0].strip() == ifo and row[1] == name and int(row[2]) == int(version) 254 and search_span.intersects(segment(row[3] + start_pad, row[4] + start_pad)) ) 255 256 # Add the padding. Segments may extend beyond the time of interest, chop off the excess. 257 def pad_and_truncate(row_start, row_end): 258 tmp = segmentlist([segment(row_start + start_pad, row_end + end_pad)]) 259 # No coalesce needed as a list with a single segment is already coalesced 260 tmp &= search_span_list 261 262 # The intersection is guaranteed to be non-empty if the row passed match() 263 # PR 2969: The above comment is incorrect. Negative padding may cause 264 # an empty intersection. 265 if len(tmp) == 0: 266 return segment(0,0) 267 else: 268 return tmp[0] 269 270 # Build a segment list from the returned segments, padded and trunctated. The segments will 271 # not necessarily be disjoint, if the padding crosses gaps. They are also not gauranteed to 272 # be in order, since there's no ORDER BY in the query. So the list needs to be coalesced 273 # before arithmatic can be done with it. 274 result = segmentlist( [pad_and_truncate(row[3], row[4]) for row in rows if matches(row)] ).coalesce() 275 276 # This is not needed: since each of the segments are constrained to be within the search 277 # span the whole list must be as well. 278 # result &= search_span_list 279 280 results.append(result) 281 282 return results 283 284
285 -def expand_version_number(engine, segdef):
286 ifo, name, version, start_time, end_time, start_pad, end_pad = segdef 287 288 if version != '*': 289 return [segdef] 290 291 # Start looking at the full interval 292 intervals = segmentlist([segment(start_time, end_time)]) 293 294 # Find the maximum version number 295 sql = "SELECT max(version) FROM segment_definer " 296 sql += "WHERE segment_definer.ifos = '%s' " % ifo 297 sql += "AND segment_definer.name = '%s' " % name 298 299 rows = engine.query(sql) 300 try: 301 version = len(rows[0]) and rows[0][0] or 1 302 except: 303 version = None 304 305 results = [] 306 307 while version > 0: 308 for interval in intervals: 309 segs = query_segments(engine, 'segment_summary', [(ifo, name, version, interval[0], interval[1], 0, 0)]) 310 311 for seg in segs[0]: 312 results.append( (ifo, name, version, seg[0], seg[1], 0, 0) ) 313 314 intervals.coalesce() 315 intervals -= segs[0] 316 317 version -= 1 318 319 return results
320 321 322 323
324 -def find_segments(doc, key, use_segment_table = True):
325 key_pieces = key.split(':') 326 while len(key_pieces) < 3: 327 key_pieces.append('*') 328 329 filter_func = lambda x: str(x.ifos) == key_pieces[0] and (str(x.name) == key_pieces[1] or key_pieces[1] == '*') and (str(x.version) == key_pieces[2] or key_pieces[2] == '*') 330 331 # Find all segment definers matching the critieria 332 seg_def_table = lsctables.SegmentDefTable.get_table(doc) 333 seg_defs = list(filter(filter_func, seg_def_table)) 334 seg_def_ids = [str(x.segment_def_id) for x in seg_defs] 335 336 # Find all segments belonging to those definers 337 if use_segment_table: 338 seg_table = lsctables.SegmentTable.get_table(doc) 339 seg_entries = [x for x in seg_table if str(x.segment_def_id) in seg_def_ids] 340 else: 341 seg_sum_table = lsctables.SegmentSumTable.get_table(doc) 342 seg_entries = [x for x in seg_sum_table if str(x.segment_def_id) in seg_def_ids] 343 344 # Combine into a segmentlist 345 ret = segmentlist([segment(x.start_time, x.end_time) for x in seg_entries]) 346 347 ret.coalesce() 348 349 return ret
350 351 # 352 # ============================================================================= 353 # 354 # General utilities 355 # 356 # ============================================================================= 357 #
358 -def ensure_segment_table(connection):
359 """Ensures that the DB represented by connection posses a segment table. 360 If not, creates one and prints a warning to stderr""" 361 362 count = connection.cursor().execute("SELECT count(*) FROM sqlite_master WHERE name='segment'").fetchone()[0] 363 364 if count == 0: 365 sys.stderr.write("WARNING: None of the loaded files contain a segment table\n") 366 theClass = lsctables.TableByName['segment'] 367 statement = "CREATE TABLE IF NOT EXISTS segment (" + ", ".join(["%s %s" % (key, ligolwtypes.ToSQLiteType[theClass.validcolumns[key]]) for key in theClass.validcolumns]) + ")" 368 369 connection.cursor().execute(statement)
370 371 372 373 # ============================================================================= 374 # 375 # Routines to write data to XML documents 376 # 377 # ============================================================================= 378 # 379
380 -def add_to_segment_definer(xmldoc, proc_id, ifo, name, version, comment=''):
381 try: 382 seg_def_table = lsctables.SegmentDefTable.get_table(xmldoc) 383 except: 384 seg_def_table = lsctables.New(lsctables.SegmentDefTable, columns = ["process_id", "segment_def_id", "ifos", "name", "version", "comment"]) 385 xmldoc.childNodes[0].appendChild(seg_def_table) 386 387 seg_def_id = seg_def_table.get_next_id() 388 segment_definer = lsctables.SegmentDef() 389 segment_definer.process_id = proc_id 390 segment_definer.segment_def_id = seg_def_id 391 segment_definer.ifos = ifo 392 segment_definer.name = name 393 segment_definer.version = version 394 segment_definer.comment = comment 395 396 seg_def_table.append(segment_definer) 397 398 return seg_def_id
399 400 401
402 -def add_to_segment(xmldoc, proc_id, seg_def_id, sgmtlist):
403 try: 404 segtable = lsctables.SegmentTable.get_table(xmldoc) 405 except: 406 segtable = lsctables.New(lsctables.SegmentTable, columns = ["process_id", "segment_def_id", "segment_id", "start_time", "start_time_ns", "end_time", "end_time_ns"]) 407 xmldoc.childNodes[0].appendChild(segtable) 408 409 for seg in sgmtlist: 410 segment = lsctables.Segment() 411 segment.process_id = proc_id 412 segment.segment_def_id = seg_def_id 413 segment.segment_id = segtable.get_next_id() 414 segment.start_time = seg[0] 415 segment.start_time_ns = 0 416 segment.end_time = seg[1] 417 segment.end_time_ns = 0 418 419 segtable.append(segment)
420 421
422 -def add_to_segment_summary(xmldoc, proc_id, seg_def_id, sgmtlist, comment=''):
423 try: 424 seg_sum_table = lsctables.SegmentSumTable.get_table(xmldoc) 425 except: 426 seg_sum_table = lsctables.New(lsctables.SegmentSumTable, columns = ["process_id", "segment_def_id", "segment_sum_id", "start_time", "start_time_ns", "end_time", "end_time_ns", "comment"]) 427 xmldoc.childNodes[0].appendChild(seg_sum_table) 428 429 for seg in sgmtlist: 430 segment_sum = lsctables.SegmentSum() 431 segment_sum.process_id = proc_id 432 segment_sum.segment_def_id = seg_def_id 433 segment_sum.segment_sum_id = seg_sum_table.get_next_id() 434 segment_sum.start_time = seg[0] 435 segment_sum.start_time_ns = 0 436 segment_sum.end_time = seg[1] 437 segment_sum.end_time_ns = 0 438 segment_sum.comment = comment 439 440 seg_sum_table.append(segment_sum)
441 442
443 -def add_segment_info(doc, proc_id, segdefs, segments, segment_summaries):
444 445 for i in range(len(segdefs)): 446 ifo, name, version, start_time, end_time, start_pad, end_pad = segdefs[i] 447 448 seg_def_id = add_to_segment_definer(doc, proc_id, ifo, name, version) 449 450 add_to_segment_summary(doc, proc_id, seg_def_id, segment_summaries[i]) 451 452 if segments: 453 add_to_segment(doc, proc_id, seg_def_id, segments[i])
454 455 # 456 # ============================================================================= 457 # 458 # Routines that should be obsolete 459 # 460 # ============================================================================= 461 # 462
463 -def build_segment_list(engine, gps_start_time, gps_end_time, ifo, segment_name, version = None, start_pad = 0, end_pad = 0):
464 """Optains a list of segments for the given ifo, name and version between the 465 specified times. If a version is given the request is straightforward and is 466 passed on to build_segment_list_one. Otherwise more complex processing is 467 performed (not yet implemented)""" 468 if version is not None: 469 return build_segment_list_one(engine, gps_start_time, gps_end_time, ifo, segment_name, version, start_pad, end_pad) 470 471 # This needs more sophisticated logic, for the moment just return the latest 472 # available version 473 sql = "SELECT max(version) FROM segment_definer " 474 sql += "WHERE segment_definer.ifos = '%s' " % ifo 475 sql += "AND segment_definer.name = '%s' " % segment_name 476 477 rows = engine.query(sql) 478 version = len(rows[0]) and rows[0][0] or 1 479 480 return build_segment_list_one(engine, gps_start_time, gps_end_time, ifo, segment_name, version, start_pad, end_pad)
481 482
483 -def build_segment_list_one(engine, gps_start_time, gps_end_time, ifo, segment_name, version = None, start_pad = 0, end_pad = 0):
484 """Builds a list of segments satisfying the given criteria """ 485 seg_result = segmentlist([]) 486 sum_result = segmentlist([]) 487 488 # Is there any way to get segment and segement summary in one query? 489 # Maybe some sort of outer join where we keep track of which segment 490 # summaries we've already seen. 491 sql = "SELECT segment_summary.start_time, segment_summary.end_time " 492 sql += "FROM segment_definer, segment_summary " 493 sql += "WHERE segment_summary.segment_def_id = segment_definer.segment_def_id " 494 sql += "AND segment_definer.ifos = '%s' " % ifo 495 if engine.__class__ == query_engine.LdbdQueryEngine: 496 sql += "AND segment_summary.segment_def_cdb = segment_definer.creator_db " 497 sql += "AND segment_definer.name = '%s' " % segment_name 498 sql += "AND segment_definer.version = %s " % version 499 sql += "AND NOT (%s > segment_summary.end_time OR segment_summary.start_time > %s)" % (gps_start_time, gps_end_time) 500 501 rows = engine.query(sql) 502 503 for sum_start_time, sum_end_time in rows: 504 sum_start_time = (sum_start_time < gps_start_time) and gps_start_time or sum_start_time 505 sum_end_time = (sum_end_time > gps_end_time) and gps_end_time or sum_end_time 506 507 sum_result |= segmentlist([segment(sum_start_time, sum_end_time)]) 508 509 # We can't use queries paramaterized with ? since the ldbd protocol doesn't support it... 510 sql = "SELECT segment.start_time + %d, segment.end_time + %d " % (start_pad, end_pad) 511 sql += "FROM segment, segment_definer " 512 sql += "WHERE segment.segment_def_id = segment_definer.segment_def_id " 513 514 if engine.__class__ == query_engine.LdbdQueryEngine: 515 sql += "AND segment.segment_def_cdb = segment_definer.creator_db " 516 sql += "AND segment_definer.ifos = '%s' " % ifo 517 sql += "AND segment_definer.name = '%s' " % segment_name 518 sql += "AND segment_definer.version = %s " % version 519 sql += "AND NOT (%s > segment.end_time OR segment.start_time > %s)" % (gps_start_time, gps_end_time) 520 521 rows = engine.query(sql) 522 523 for seg_start_time, seg_end_time in rows: 524 seg_start_time = (seg_start_time < gps_start_time) and gps_start_time or seg_start_time 525 seg_end_time = (seg_end_time > gps_end_time) and gps_end_time or seg_end_time 526 527 seg_result |= segmentlist([segment(seg_start_time, seg_end_time)]) 528 529 engine.close() 530 531 return sum_result, seg_result
532 533 534
535 -def run_query_segments(doc, proc_id, engine, gps_start_time, gps_end_time, included_segments_string, excluded_segments_string = None, write_segments = True, start_pad = 0, end_pad = 0):
536 """Runs a segment query. This was originally part of ligolw_query_segments, but now is also 537 used by ligolw_segments_from_cats. 538 539 The write_segments option is provided so callers can coalesce segments obtained over 540 sever invocations (as segments_from_cats does). 541 """ 542 543 if write_segments: 544 all_ifos = {} 545 546 for ifo, segment_name, version in split_segment_ids(included_segments_string.split(',')): 547 all_ifos[ifo] = True 548 549 550 new_seg_def_id = add_to_segment_definer(doc, proc_id, ''.join(list(all_ifos.keys())), 'result', 0) 551 add_to_segment_summary(doc, proc_id, new_seg_def_id, [[gps_start_time, gps_end_time]]) 552 553 result = segmentlist([]) 554 555 for ifo, segment_name, version in split_segment_ids(included_segments_string.split(',')): 556 sum_segments, seg_segments = build_segment_list(engine, gps_start_time, gps_end_time, ifo, segment_name, version, start_pad, end_pad) 557 seg_def_id = add_to_segment_definer(doc, proc_id, ifo, segment_name, version) 558 559 add_to_segment_summary(doc, proc_id, seg_def_id, sum_segments) 560 561 # and accumulate segments 562 result |= seg_segments 563 564 # Excluded segments are not required 565 if excluded_segments_string: 566 excluded_segments = segmentlist([]) 567 568 for ifo, segment_name, version in split_segment_ids(excluded_segments_string.split(',')): 569 sum_segments, seg_segments = build_segment_list(engine, gps_start_time, gps_end_time, ifo, segment_name, version) 570 excluded_segments |= seg_segments 571 572 result = result - excluded_segments 573 574 result.coalesce() 575 576 # Add the segments 577 if write_segments: 578 add_to_segment(doc, proc_id, new_seg_def_id, result) 579 580 return result
581 582 583
584 -def split_segment_ids(segment_ids):
585 """Given an array of strings of the form ifo:name and 586 ifo:name:version, returns an array of tuples of the form (ifo, 587 name, version) where version may be None""" 588 589 def split_segment_id(segment_id): 590 temp = segment_id.split(':') 591 if len(temp) == 2: 592 temp.append(None) 593 elif temp[2] == '*': 594 temp[2] = None 595 else: 596 temp[2] = int(temp[2]) 597 598 return temp
599 600 return list(map(split_segment_id, segment_ids)) 601