1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17  from __future__ import division 
 18   
 19  __author__ = "Nickolas Fotopoulos <nickolas.fotopoulos@ligo.org>" 
 20   
 21  from bisect import bisect_right 
 22  import httplib 
 23  import os 
 24  import os.path 
 25  import shutil 
 26  import sys 
 27  import operator 
 28   
 29  from glue.lal import Cache 
 30  from glue.segments import segment, segmentlist 
 31  from pylal.metaarray import TimeSeries, TimeSeriesList 
 32  from pylal.Fr import frgetvect1d 
 33   
 34  __all__ = ('__author__', 'FrameCache', "AutoqueryingFrameCache") 
 35   
 37      """ 
 38  FrameCache is a transparent interface to LSC data. The user provides a LAL- 
 39  formatted cache file and the returned FrameCache object allows repeated 
 40  queries for channels and time, even across frame files. It also supports 
 41  smart, lazy local caching. Limitations: It only works for one-dimensional 
 42  time-series data. 
 43   
 44  Constructor: 
 45      FrameCache(cache_entries=None, scratchdir=None, verbose=False) 
 46   
 47  Inputs: 
 48      cache is a list of glue.lal.CacheEntry objects or a glue.lal.Cache. 
 49      Data will be retrieved from the frame files described within. 
 50   
 51      Scratchdir determines where to locally cache frames.  If None, no 
 52      caching is performed. 
 53   
 54  Example: 
 55  >>> from glue import lal 
 56  >>> from pylal import frutils 
 57  >>> c = lal.Cache.fromfile(open("test.cache")) 
 58  >>> d = frutils.FrameCache(c, scratchdir="/tmp", verbose=True) 
 59  >>> data = d.fetch("H1:LSC-STRAIN", 861417967, 861417969) 
 60  Copying /Users/nvf/temp/H-H1_RDS_C03_L2-861417967-128.gwf --> 
 61            /tmp/H-H1_RDS_C03_L2-861417967-128.gwf. 
 62  >>> print(data) 
 63  [  1.68448009e-16   1.69713183e-16   1.71046196e-16 ...,   1.80974629e-16 
 64     1.80911765e-16   1.80804879e-16] {'dt': 6.103515625e-05, 'segments': [segment(861417967, 861417969)], 'comments': [], 'name': 'H1:LSC-STRAIN'} 
 65  >>> exit() 
 66  Removing /tmp/H-H1_RDS_C03_L2-861417967-128.gwf. 
 67   
 68  """ 
 69   
 70 -    def __init__(self, cache_entries=None, scratchdir=None, verbose=False): 
  71          """ Initializes interface to frame data.  See .__class__.__doc__""" 
 72   
 73           
 74           
 75          self._verbose = verbose 
 76          self._scratchdir = scratchdir 
 77          self._remotefiles = []                
 78          self._remotesegs = segmentlist()      
 79          self._remotecoverage = segmentlist()  
 80   
 81           
 82          if scratchdir is not None: 
 83              self._cachedfiles = [] 
 84              self._cachedsegs = segmentlist() 
 85              self._cachecoverage = segmentlist() 
 86          else: 
 87              self._cachedfiles = self._remotefiles 
 88              self._cachedsegs = self._remotesegs 
 89              self._cachecoverage = self._remotecoverage 
 90   
 91          if cache_entries is not None: 
 92            self.add_cache(cache_entries) 
  93   
 95          """ 
 96          Add information from some cache entries. 
 97          """ 
 98          newentries = [entry for entry in cache_entries \ 
 99                      if entry.path not in self._remotefiles] 
100          newfiles = [entry.path for entry in newentries] 
101   
102           
103           
104          for entry in cache_entries: 
105               
106              if entry.path in self._remotefiles: 
107                  continue 
108              newseg, newfile = entry.segment, entry.path 
109              insert_idx = bisect_right(self._remotesegs, newseg) 
110              self._remotesegs.insert(insert_idx, newseg) 
111              self._remotefiles.insert(insert_idx, newfile) 
112              self._remotecoverage |= segmentlist([newseg]) 
113   
114          self._remotecoverage.coalesce() 
 115   
117          """ 
118          Clear cache in local scratch. 
119          """ 
120          if self._scratchdir is None: 
121              return 
122          for f,s in zip(self._cachedfiles, self._cachedsegs): 
123              self._unfetch(f, s) 
124          return 
 125   
126 -    def fetch(self, channel, start, end): 
 127          """ 
128          Retrieve data, caching file locations and the files themselves. 
129          """ 
130          seg = segment(start, end) 
131   
132          if not self._query(channel, start, end): 
133              raise ValueError("%s not found in cache" % repr(segmentlist([seg]) - self._remotecoverage)) 
134   
135           
136           
137          if seg not in self._cachecoverage: 
138              for f,s in zip(self._remotefiles, self._remotesegs): 
139                  if seg.intersects(s) and s not in self._cachecoverage: 
140                      dest = os.path.join(self._scratchdir, os.path.split(f)[-1]) 
141                      if self._verbose: 
142                          print "Copying %s -->\n          %s." % (f, dest) 
143                      shutil.copy(f, dest) 
144                      ind = bisect_right(self._cachedsegs, s) 
145                      self._cachedfiles.insert(ind, dest) 
146                      self._cachedsegs.insert(ind, s) 
147                      self._cachecoverage |= segmentlist([s]) 
148              assert seg in self._cachecoverage 
149   
150           
151          return self._fetch(channel, start, end) 
 152   
153 -    def _query(self, channel, start, end): 
 154          "Do we know where the frame file is?" 
155          return segment(start, end) in self._remotecoverage 
 156   
157 -    def _fetch(self, channel, start, end, comments=[]): 
 158          """ 
159          Internal method to actually retrieve and return data as TimeSeries, 
160          assuming that self._framefiles is all set.  Does not check boundaries. 
161          """ 
162          toreturn = TimeSeriesList([]) 
163   
164          if start==end: 
165              return toreturn 
166   
167           
168          try: 
169               
170               
171               
172              index = self._cachedsegs.find(start) 
173          except ValueError: 
174              print >>sys.stderr, "Couldn't find any frame files to cover",\ 
175                  str(start),"to",str(end),"among:" 
176              print >>sys.stderr, str(self._cachedfiles) 
177              return toreturn 
178   
179           
180           
181          now = start 
182          while now < end: 
183              dur = min(end, self._cachedsegs[index][1]) - now 
184              data, GPS_start, t_low, dt, x_unit, y_unit = \ 
185                  frgetvect1d(self._cachedfiles[index], channel, now, dur, 0) 
186              meta = {"name": channel, "dt": dt, 
187                  "segments": [segment(now, now+dur)], "comments": comments} 
188              toreturn.append(TimeSeries(data, meta)) 
189              now += dur 
190              index += 1 
191   
192          if len(toreturn) == 0: 
193              print >>sys.stderr, "This print statement should never execute." 
194              print >>sys.stderr,"Couldn't find all frame files needed to cover",\ 
195                  str(start), "to", str(end), "among:" 
196              print >>sys.stderr, str(self._cachedfiles) 
197   
198          toreturn = toreturn.merge_list() 
199          toreturn.metadata.segments.coalesce() 
200   
201          return toreturn 
 202   
204          """ 
205          Removes files from local scratch space based on start, end 
206          pairs.  Silently ignores non-existent times.  Remove if file end 
207          is between start and end.  This is biased to prevent cache misses 
208          for future fetches being in the future.  (Processing frames in 
209          chronological order) 
210          """ 
211          if self._scratchdir is None: 
212              return 
213   
214          for f,s in zip(self._cachedfiles, self._cachedsegs): 
215              if start < s[1] <= end: 
216                  self._unfetch(f,s) 
 217   
219          """ 
220          Internal method to actually remove a file from cache. 
221          """ 
222          if self._scratchdir is None: 
223              return 
224          if filename not in self._cachedfiles: 
225              print >>sys.stderr, \ 
226                  "Cache inconsistency: Delete request for file not in cache." 
227              return 
228          if self._verbose: print "Removing %s." % filename 
229          os.remove(filename) 
230          self._cachedfiles.remove(filename) 
231          self._cachedsegs.remove(seg) 
232          self._cachecoverage -= segmentlist([seg]) 
233          return 
  234   
235   
236   
237   
238   
239   
241      """ 
242      Test that the proxy certificate is RFC 3820 
243      compliant and that it is valid for at least 
244      the next 15 minutes. 
245      """ 
246      try: 
247          import M2Crypto 
248      except ImportError, e: 
249          print >> sys.stderr, """ 
250  validateProxy requires the M2Crypto module. 
251   
252  On CentOS 5 and other RHEL-based platforms 
253  this package is available from the EPEL 
254  repository by doing 
255   
256  yum install m2crypto 
257   
258  For Debian Lenny this package is available 
259  by doing 
260   
261  apt-get install python-m2crypto 
262   
263  Mac OS X users can find this package in MacPorts. 
264   
265  %s 
266  """ % e 
267          raise 
268   
269       
270      try: 
271          proxy = M2Crypto.X509.load_cert(path) 
272      except Exception, e: 
273          msg = "Unable to load proxy from path %s : %s" % (path, e) 
274          raise RuntimeError(msg) 
275   
276       
277      try: 
278          proxy.get_ext("proxyCertInfo") 
279      except LookupError: 
280          rfc_proxy_msg = """\ 
281  Could not find a RFC 3820 compliant proxy credential. 
282  Please run 'grid-proxy-init -rfc' and try again. 
283  """ 
284          raise RuntimeError(rfc_proxy_msg) 
285   
286       
287      import time, calendar 
288      try: 
289          expireASN1 = proxy.get_not_after().__str__() 
290          expireGMT  = time.strptime(expireASN1, "%b %d %H:%M:%S %Y %Z") 
291          expireUTC  = calendar.timegm(expireGMT) 
292          now = int(time.time()) 
293          secondsLeft = expireUTC - now 
294      except Exception, e: 
295           
296           
297          secondsLeft = 3600 
298   
299      if secondsLeft <= 0: 
300          msg = """\ 
301  Your proxy certificate is expired. 
302   
303  Please generate a new proxy certificate and 
304  try again. 
305  """ 
306          raise RuntimeError(msg) 
307   
308      if secondsLeft < (60 * 15): 
309          msg = """\ 
310  Your proxy certificate expires in less than 
311  15 minutes. 
312   
313  Please generate a new proxy certificate and 
314  try again. 
315  """ 
316          raise RuntimeError(msg) 
317   
318       
319      return True 
 320   
322      """ 
323      Follow the usual path that GSI libraries would 
324      follow to find a valid proxy credential but 
325      also allow an end entity certificate to be used 
326      along with an unencrypted private key if they 
327      are pointed to by X509_USER_CERT and X509_USER_KEY 
328      since we expect this will be the output from 
329      the eventual ligo-login wrapper around 
330      kinit and then myproxy-login. 
331      """ 
332      rfc_proxy_msg = """\ 
333  Could not find a RFC 3820 compliant proxy credential. 
334  Please run 'grid-proxy-init -rfc' and try again. 
335  """ 
336   
337       
338      if os.environ.has_key('X509_USER_PROXY'): 
339          filePath = os.environ['X509_USER_PROXY'] 
340          if validateProxy(filePath): 
341              return filePath, filePath 
342          else: 
343              raise RuntimeError(rfc_proxy_msg) 
344   
345       
346      if os.environ.has_key('X509_USER_CERT'): 
347          if os.environ.has_key('X509_USER_KEY'): 
348              certFile = os.environ['X509_USER_CERT'] 
349              keyFile = os.environ['X509_USER_KEY'] 
350              return certFile, keyFile 
351   
352       
353      uid = os.getuid() 
354      path = "/tmp/x509up_u%d" % uid 
355   
356      if os.access(path, os.R_OK): 
357          if validateProxy(path): 
358              return path, path 
359          else: 
360              raise RuntimeError(rfc_proxy_msg) 
361   
362       
363      raise RuntimeError(rfc_proxy_msg) 
 364   
365 -def query_LDR(server, port, site, frameType, gpsStart, gpsEnd, urlType=None, noproxy=False): 
 366      """ 
367      Return a list of URLs to frames covering the requested time, as returned 
368      by the LDR server. 
369      """ 
370      try: 
371          import cjson 
372      except ImportError, e: 
373          print >> sys.stderr, """ 
374      frutils requires the cjson module. 
375   
376      On CentOS 5 and other RHEL-based platforms 
377      this package is available from the EPEL 
378      repository by doing 
379   
380      yum install python-cjson 
381   
382      For Debian Lenny this package is available by doing 
383   
384      apt-get install python-cjson 
385   
386      Mac OS X users can find this package in MacPorts. 
387   
388      %s 
389      """ % e 
390          raise 
391   
392      url = "/LDR/services/data/v1/gwf/%s/%s/%s,%s" % (site, frameType, gpsStart, gpsEnd) 
393       
394      if urlType: 
395          url += "/%s" % urlType 
396   
397       
398      url += ".json" 
399   
400       
401      if noproxy or port == 80: 
402          h = httplib.HTTPConnection(server, port) 
403      else: 
404          certFile, keyFile = findCredential() 
405          h = httplib.HTTPSConnection(server, key_file = keyFile, cert_file = certFile) 
406   
407       
408      try: 
409          h.request("GET", url) 
410          response = h.getresponse() 
411      except Exception, e: 
412          msg = "Unable to query server %s: %s\n\nPerhaps you need a valid proxy credential?\n" % (server, e) 
413          raise RuntimeError(msg) 
414   
415       
416      if response.status != 200: 
417          msg = "Server returned code %d: %s" % (response.status, response.reason) 
418          body = response.read() 
419          msg += body 
420          raise RuntimeError(msg) 
421   
422       
423      body = response.read() 
424   
425       
426      return cjson.decode(body) 
 427   
429      """ 
430  This subclass of FrameCache will query ligo_data_find automatically, 
431  so no LAL-cache files are required. Limitation: you'll need one instance 
432  per frame type. 
433   
434  Constructor: 
435      AutoqueryingFrameCache(frametype, hostPortString=None, scratchdir=None, 
436          verbose=False) 
437   
438  Inputs: 
439      frametype is the type of GWF frame you seek (e.g. RDS_R_L1). 
440      hostPortString is the name of the LDR server and optionally, 
441          with colon separation, the port (e.g. ldr.ligo.caltech.edu) 
442      scratchdir determines where to locally cache frames. If None, no 
443          caching is performed. 
444   
445  Example: 
446  >>> from pylal import frutils 
447  >>> d = frutils.AutoqueryingFrameCache(frametype="H1_RDS_C03_L2", scratchdir="/tmp", verbose=True) 
448  >>> data = d.fetch("H1:LSC-STRAIN", 861417967, 861417969) 
449  Copying /Users/nvf/temp/H-H1_RDS_C03_L2-861417967-128.gwf --> 
450            /tmp/H-H1_RDS_C03_L2-861417967-128.gwf. 
451  >>> print(data) 
452  [  1.68448009e-16   1.69713183e-16   1.71046196e-16 ...,   1.80974629e-16 
453     1.80911765e-16   1.80804879e-16] {'dt': 6.103515625e-05, 'segments': [segment(861417967, 861417969)], 'comments': [], 'name': 'H1:LSC-STRAIN'} 
454  >>> exit() 
455  Removing /tmp/H-H1_RDS_C03_L2-861417967-128.gwf. 
456   
457  Using AutoqueryingFrameCache outside of LDG clusters, using Caltech as a 
458  gateway: 
459   * Just the first time you do this procedure: "sudo mkdir /data && sudo chown 
460     albert.einstein /data" (replace albert.einstein with your local username; 
461     /data may be different for different clusters) 
462   * Set the LIGO_DATAFIND_SERVER environment variable to ldr.ligo.caltech.edu 
463     (or the LDR server of the LDG cluster nearest you) 
464   * Use "sshfs -o ssh_command=gsissh 
465     albert.einstein@ldas-pcdev1.ligo.caltech.edu:/data /data" (replace 
466     albert.einstein with your cluster username) 
467   * Use "umount /data" when you're done. Unmounting cleanly will help prevent 
468     headaches the next time you want to set this up. 
469      """ 
470 -    def __init__(self, frametype, hostPortString=None, scratchdir=None, 
471          verbose=False): 
 472          FrameCache.__init__(self, None, scratchdir, verbose) 
473   
474          if not frametype: 
475              raise ValueError("frametype required") 
476          self.frametype = frametype 
477   
478          if hostPortString is None: 
479              if os.environ.has_key('LIGO_DATAFIND_SERVER'): 
480                  hostPortString = os.environ['LIGO_DATAFIND_SERVER'] 
481              else: 
482                  raise ValueError("no way to determine LIGO_DATAFIND_SERVER") 
483          if hostPortString.find(':') < 0: 
484               
485              self.host = hostPortString 
486              self.port = None 
487          else: 
488               
489              self.host, portString = hostPortString.split(':') 
490              self.port = int(portString) 
 491   
492 -    def _query(self, channel, start, end): 
 493          "Do we know where the frame file is?" 
494          if segment(start, end) in self._remotecoverage: 
495              return True 
496          urls = query_LDR(self.host, self.port, channel[0], self.frametype, start, end, urlType="file") 
497          if urls: 
498              new = Cache.from_urls(urls, coltype=int) 
499              new.sort(key=operator.attrgetter("segment")) 
500              self.add_cache(new) 
501          return segment(start, end) in self._remotecoverage 
  502