1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """The client library for the LIGO Data Replicator (LDR) service.
19
20 The DataFind service allows users to query for the location of
21 Gravitational-Wave Frame (GWF) files containing data from the current
22 LIGO and Virgo gravitational-wave detectors.
23
24 This module defines the L{GWDataFindHTTPConnection} and
25 L{GWDataFindHTTPSConnection} class objects, for connecting to an LDR
26 server in open and authenticated access modes respectively.
27 The authenticated L{GWDataFindHTTPSConnection} connection requires users
28 have a valid X509 certificate that is registered with the server in
29 question.
30
31 A new connection can be opened as follows:
32
33 >>> from glue.datafind import GWDataFindHTTPConnection
34 >>> connection = GWDataFindHTTPConnection(host, port)
35
36 and similar for the HTTPS version.
37
38 Users on the LIGO Data Grid (LDG) can connect without giving the name of
39 the relevant host, so long as the C{LIGO_DATAFIND_SERVER} environment
40 variable is defined:
41
42 >>> connection = GWDataFindHTTPConnection()
43
44 Queries for frames can be made using the L{find_frame_urls<GWDataFindHTTPConnection.find_frame_urls>} method of the
45 relevant connection:
46
47 >>> cache = connection.find_frame_urls('L', 'L1_R', 1093564816, 1093651216)
48
49 By default, the returned L{Cache<glue.lal.Cache>} object includes both C{gsiftp} and local C{file} versions of each frame, but the C{urlfile} keyword argument can be given to return only one of those:
50
51 >>> cache = connection.find_frame_urls('L', 'L1_R', 1093564816, 1093651216, urltype='file')
52
53 See the documentation for each connection method for more detailed examples.
54 """
55
56 from __future__ import division
57
58 import os
59 import sys
60 import time
61 import calendar
62 import six.moves.http_client
63 import re
64 import unittest
65
66 from OpenSSL import crypto
67
68 try:
69 from cjson import decode
70 except ImportError:
71 from json import loads as decode
72
73 from glue import (lal, git_version, segments)
74
75 __author__ = "Duncan Macleod <duncan.macleod@ligo.org>"
76 __credits__ = "Scott Koranda <scott.koranda@ligo.org>"
77 __version__ = git_version.id
78 __date__ = git_version.date
79
80 _server_env = "LIGO_DATAFIND_SERVER"
81 _url_prefix = "/LDR/services/data/v1"
82
83
85 """Connection to LIGO data replicator service using HTTP.
86
87 @param host: the name of the server with which to connect
88 @param port: the port on which to connect
89 @param **kwargs: other keyword arguments accepted by
90 L{httplib.HTTPConnection}
91
92 @type host: L{str}
93 @type port: L{int}
94 """
95 LIGOTimeGPSType = lal.LIGOTimeGPS
96 - def __init__(self, host=None, **kwargs):
97 """Connect to the LDR host using HTTPS. Default host is
98 defined by the %s environment variable.
99 """
100 if not host:
101 host,port = find_server()
102 kwargs.setdefault("port", port)
103 six.moves.http_client.HTTPConnection.__init__(self, host, **kwargs)
104 __init__.__doc__ %= _server_env
105
107 """Internal method to perform request and verify reponse.
108
109 @param method: name of the method to use (e.g. 'GET')
110 @param url : remote URL to query
111
112 @type method: L{str}
113 @type url : L{str}
114
115 @returns: L{str} response from server query
116
117 @raises RuntimeError: if query is unsuccessful
118 """
119 try:
120 self.request(method, url)
121 response = self.getresponse()
122 except Exception as e:
123 raise RuntimeError("Unable to query server %s: %s\n\n"
124 "Perhaps you need a valid proxy credential?\n"
125 % (self.host, e))
126 if response.status != 200:
127 raise RuntimeError("Server returned code %d: %s%s"
128 % (response.status, response.reason,
129 response.read()))
130 return response
131
133 """Ping the LDR host to test for life
134
135 @raises RuntimeError: when ping fails
136 @returns: 0 if ping was successful
137 """
138 url = "%s/gwf/%s/%s/%s,%s" % (_url_prefix, 'H', 'R', '1', '2')
139 self._requestresponse("HEAD", url)
140 return 0
141
143 """Query the LDR host for observatories. Use match to
144 restrict returned observatories to those matching the
145 regular expression.
146
147 Example:
148
149 >>> connection.find_observatories()
150 ['AGHLT', 'G', 'GHLTV', 'GHLV', 'GHT', 'H', 'HL', 'HLT',
151 'L', 'T', 'V', 'Z']
152 >>> connection.find_observatories("H")
153 ['H', 'HL', 'HLT']
154
155 @type match: L{str}
156 @param match:
157 name to match return observatories against
158
159 @returns: L{list} of observatory prefixes
160 """
161 url = "%s/gwf.json" % _url_prefix
162 response = self._requestresponse("GET", url)
163 sitelist = sorted(set(decode(response.read())))
164 if match:
165 regmatch = re.compile(match)
166 sitelist = [site for site in sitelist if regmatch.search(site)]
167 return sitelist
168
170 """Query the LDR host for frame types. Use site to restrict
171 query to given observatory prefix, and use match to restrict
172 returned types to those matching the regular expression.
173
174 Example:
175
176 >>> connection.find_types("L", "RDS")
177 ['L1_RDS_C01_LX',
178 'L1_RDS_C02_LX',
179 'L1_RDS_C03_L2',
180 'L1_RDS_R_L1',
181 'L1_RDS_R_L3',
182 'L1_RDS_R_L4',
183 'PEM_RDS_A6',
184 'RDS_R_L1',
185 'RDS_R_L2',
186 'RDS_R_L3',
187 'TESTPEM_RDS_A6']
188
189 @param site: single-character name of site to match
190 @param match: type-name to match against
191
192 @type site: L{str}
193 @type match: L{str}
194
195 @returns: L{list} of frame types
196 """
197 if site:
198 url = "%s/gwf/%s.json" % (_url_prefix, site[0])
199 else:
200 url = "%s/gwf/all.json" % _url_prefix
201 response = self._requestresponse("GET", url)
202 typelist = sorted(set(decode(response.read())))
203 if match:
204 regmatch = re.compile(match)
205 typelist = [type for type in typelist if regmatch.search(type)]
206 return typelist
207
208 - def find_times(self, site, frametype, gpsstart=None, gpsend=None):
209 """Query the LDR for times for which frames are avaliable
210
211 Use gpsstart and gpsend to restrict the returned times to
212 this semiopen interval.
213
214 @returns: L{segmentlist<glue.segments.segmentlist>}
215
216 @param site:
217 single-character name of site to match
218 @param frametype:
219 name of frametype to match
220 @param gpsstart:
221 integer GPS start time of query
222 @param gpsend:
223 integer GPS end time of query
224
225 @type site: L{str}
226 @type frametype: L{str}
227 @type gpsstart: L{int}
228 @type gpsend: L{int}
229 """
230 if gpsstart and gpsend:
231 url = ("%s/gwf/%s/%s/segments/%d,%d.json"
232 % (_url_prefix, site, frametype, gpsstart, gpsend))
233 else:
234 url = ("%s/gwf/%s/%s/segments.json"
235 % (_url_prefix, site, frametype))
236
237 response = self._requestresponse("GET", url)
238 segmentlist = decode(response.read())
239 return segments.segmentlist(map(segments.segment, segmentlist))
240
241 - def find_frame(self, framefile, urltype=None, on_missing="warn"):
242 """Query the LDR host for a single framefile
243
244 @returns: L{Cache<glue.lal.Cache>}
245
246 @param frametype:
247 name of frametype to match
248 @param urltype:
249 file scheme to search for (e.g. 'file')
250 @param on_missing:
251 what to do when the requested frame isn't found, one of:
252 - C{'warn'} (default): print a warning,
253 - C{'error'}: raise an L{RuntimeError}, or
254 - C{'ignore'}: do nothing
255
256 @type frametype: L{str}
257 @type urltype: L{str}
258 @type on_missing: L{str}
259
260 @raises RuntimeError: if given framefile is malformed
261 """
262 if on_missing not in ("warn", "error", "ignore"):
263 raise ValueError("on_missing must be 'warn', 'error', or 'ignore'.")
264 framefile = os.path.basename(framefile)
265
266 try:
267 site,frametype,_,_ = framefile.split("-")
268 except Exception as e:
269 raise RuntimeError("Error parsing filename %s: %s" % (framefile, e))
270 url = ("%s/gwf/%s/%s/%s.json"
271 % (_url_prefix, site, frametype, framefile))
272 response = self._requestresponse("GET", url)
273 urllist = decode(response.read())
274 if len(urllist) == 0:
275 if on_missing == "warn":
276 sys.stderr.write("No files found!\n")
277 elif on_missing == "error":
278 raise RuntimeError("No files found!")
279
280 cache = lal.Cache(e for e in
281 [lal.CacheEntry.from_T050017(x, coltype=self.LIGOTimeGPSType)
282 for x in urllist] if not urltype or e.scheme == urltype)
283 return cache
284
285 - def find_latest(self, site, frametype, urltype=None, on_missing="warn"):
286 """Query for the most recent framefile of a given type.
287
288 @param site:
289 single-character name of site to match
290 @param frametype:
291 name of frametype to match
292 @param urltype:
293 file scheme to search for (e.g. 'file')
294 @param on_missing:
295 what to do when the requested frame isn't found, one of:
296 - C{'warn'} (default): print a warning,
297 - C{'error'}: raise an L{RuntimeError}, or
298 - C{'ignore'}: do nothing
299
300 @type site: L{str}
301 @type frametype: L{str}
302 @type urltype: L{str}
303 @type on_missing: L{str}
304
305 @returns: L{Cache<glue.lal.Cache>} with one
306 L{entry<glue.lal.CacheEntry>}
307
308 @raises RuntimeError: if given framefile is malformed
309 @raises RuntimeError: if no frames are found and C{on_missing='error'}
310 """
311 if on_missing not in ('warn', 'error', 'ignore'):
312 raise ValueError("on_missing must be 'warn', 'error', or 'ignore'.")
313 url = "%s/gwf/%s/%s/latest" % (_url_prefix, site, frametype)
314
315 if urltype:
316 url += "/%s" % urltype
317
318 url += ".json"
319 response = self._requestresponse("GET", url)
320 urllist = decode(response.read())
321 if len(urllist) == 0:
322 if on_missing == "warn":
323 sys.stderr.write("No files found!\n")
324 elif on_missing == "error":
325 raise RuntimeError("No files found!")
326 return lal.Cache([lal.CacheEntry.from_T050017(x,
327 coltype=self.LIGOTimeGPSType) for x in urllist])
328
329 - def find_frame_urls(self, site, frametype, gpsstart, gpsend,
330 match=None, urltype=None, on_gaps="warn"):
331 """Find the framefiles for the given type in the [start, end) interval
332 frame
333
334 @param site:
335 single-character name of site to match
336 @param frametype:
337 name of frametype to match
338 @param gpsstart:
339 integer GPS start time of query
340 @param gpsend:
341 integer GPS end time of query
342 @param match:
343 regular expression to match against
344 @param urltype:
345 file scheme to search for (e.g. 'file')
346 @param on_gaps:
347 what to do when the requested frame isn't found, one of:
348 - C{'warn'} (default): print a warning,
349 - C{'error'}: raise an L{RuntimeError}, or
350 - C{'ignore'}: do nothing
351
352 @type site: L{str}
353 @type frametype: L{str}
354 @type gpsstart: L{int}
355 @type gpsend: L{int}
356 @type match: L{str}
357 @type urltype: L{str}
358 @type on_gaps: L{str}
359
360 @returns: L{Cache<glue.lal.Cache>}
361
362 @raises RuntimeError: if gaps are found and C{on_gaps='error'}
363 """
364 if on_gaps not in ("warn", "error", "ignore"):
365 raise ValueError("on_gaps must be 'warn', 'error', or 'ignore'.")
366 url = ("%s/gwf/%s/%s/%s,%s"
367 % (_url_prefix, site, frametype, gpsstart, gpsend))
368
369 if urltype:
370 url += "/%s" % urltype
371
372 url += ".json"
373
374 if match:
375 url += "?match=%s" % match
376
377 response = self._requestresponse("GET", url)
378 urllist = decode(response.read())
379
380 out = lal.Cache([lal.CacheEntry.from_T050017(x,
381 coltype=self.LIGOTimeGPSType) for x in urllist])
382
383 if on_gaps == "ignore":
384 return out
385 else:
386 span = segments.segment(gpsstart, gpsend)
387 seglist = segments.segmentlist(e.segment for e in out).coalesce()
388 missing = (segments.segmentlist([span]) - seglist).coalesce()
389 if span in seglist:
390 return out
391 else:
392 msg = "Missing segments: \n%s" % "\n".join(map(str, missing))
393 if on_gaps=="warn":
394 sys.stderr.write("%s\n" % msg)
395 return out
396 else:
397 raise RuntimeError(msg)
398
400 """Secured connection to LIGO data replicator service using HTTPS.
401 """
402 - def __init__(self, host=None, **kwargs):
403 """Connect to the LDR host using HTTPS.
404
405 Default host is defined by the %s environment variable.
406 """
407 if not host:
408 host, port = find_server()
409 kwargs.setdefault("port", port)
410 six.moves.http_client.HTTPSConnection.__init__(self, host, **kwargs)
411 __init__.__doc__ %= _server_env
412
413
415 """Validate the users X509 proxy certificate
416
417 Tests that the proxy certificate is RFC 3820 compliant and that it
418 is valid for at least the next 15 minutes.
419
420 @returns: L{True} if the certificate validates
421 @raises RuntimeError: if the certificate cannot be validated
422 """
423
424 try:
425 with open(path, 'rt') as f:
426 cert = crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
427 except IOError as e:
428 e.args = ('Failed to load proxy certificate: %s' % str(e),)
429 raise
430
431
432 rfc3820 = False
433 for i in range(cert.get_extension_count()):
434 if cert.get_extension(i).get_short_name() == 'proxyCertInfo':
435 rfc3820 = True
436 break
437
438
439 if not rfc3820:
440 subject = cert.get_subject()
441 if subject.CN.startswith('proxy'):
442 raise RuntimeError('Could not find a valid proxy credential')
443
444
445 expiry = cert.get_notAfter()
446 if isinstance(expiry, bytes):
447 expiry = expiry.decode('utf-8')
448 expiryu = calendar.timegm(time.strptime(expiry, "%Y%m%d%H%M%SZ"))
449 if expiryu < time.time():
450 raise RuntimeError('Required proxy credential has expired')
451
452
453 return True
454
456 """Locate the users X509 certificate and key files
457
458 This method uses the C{X509_USER_CERT} and C{X509_USER_KEY} to locate
459 valid proxy information. If those are not found, the standard location
460 in /tmp/ is searched.
461
462 @raises RuntimeError: if the proxy found via either method cannot
463 be validated
464 @raises RuntimeError: if the cert and key files cannot be located
465 """
466
467 rfc_proxy_msg = ("Could not find a RFC 3820 compliant proxy credential."
468 "Please run 'grid-proxy-init -rfc' and try again.")
469
470
471 if 'X509_USER_PROXY' in os.environ:
472 filePath = os.environ['X509_USER_PROXY']
473 if validate_proxy(filePath):
474 return filePath, filePath
475 else:
476 raise RuntimeError(rfc_proxy_msg)
477
478
479 if ('X509_USER_CERT' in os.environ and
480 'X509_USER_KEY' in os.environ):
481 certFile = os.environ['X509_USER_CERT']
482 keyFile = os.environ['X509_USER_KEY']
483 return certFile, keyFile
484
485
486 uid = os.getuid()
487 path = "/tmp/x509up_u%d" % uid
488
489 if os.access(path, os.R_OK):
490 if validate_proxy(path):
491 return path, path
492 else:
493 raise RuntimeError(rfc_proxy_msg)
494
495
496 raise RuntimeError(rfc_proxy_msg)
497
499 """Find the default server host from the environment
500
501 This method uses the C{LIGO_DATAFIND_SERVER} variable to construct
502 a C{(host, port)} tuple.
503
504 @returns: C{(host, port)}: the L{str} host name and L{int} port number
505
506 @raises RuntimeError: if the C{LIGO_DATAFIND_SERVER} environment variable
507 is not set
508 """
509
510 if _server_env in os.environ:
511 host = os.environ[_server_env]
512 port = None
513 if re.search(':', host):
514 host, port = host.split(':', 1)
515 if port:
516 port = int(port)
517 return host, port
518 else:
519 raise RuntimeError("Environment variable %s is not set" % _server_env)
520
521
523 """Small suite of test functions.
524
525 Probably won't work if you're not on an LDAS
526 machine...
527 """
531
535
540
545
550
555
560
561 if __name__ == "__main__":
562 unittest.main()
563