1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17  """ 
 18  Algorithms for low latency clustering of event-like datasets. 
 19  """ 
 20  __author__ = "Leo Singer <leo.singer@ligo.org>" 
 21   
 22   
 23  import array 
 27          """A max heap that retains the order in which samples were added to it. 
 28          The history capacity grows dynamically by expanding to the smallest power of 
 29          two required to hold the contents. 
 30   
 31          This is essentially an ordinary max heap, storing values and their sample 
 32          offsets from the originating stream, coupled with a dynamically sized ring 
 33          buffer storing the heap ranks of each sample. 
 34   
 35          This procedure should take about O(1) time per input sample on average.""" 
 36   
 37           
 38           
 39   
 41                  if value is None: 
 42                          value = lambda x: x 
 43                  self._value = value 
 44                  self._history = array.array('I', (0, 0)) 
 45                  self._offset = 0 
 46                  self._heap = [] 
  47   
 49   
 50                   
 51                  old_index = self._history[(self._offset - len(self._heap)) % len(self._history)] 
 52   
 53                   
 54                  new_index = len(self._heap) - 1 
 55   
 56                   
 57                  if new_index != old_index: 
 58                          self._swap(old_index, new_index) 
 59                          self._sift(old_index) 
 60   
 61                   
 62                  self._heap.pop() 
  63   
 65                  while len(self._heap) > 0 and predicate(self._heap[self._history[(self._offset - len(self._heap)) % len(self._history)]][2]): 
 66                          self._drop_oldest() 
  67   
 69                   
 70                   
 71                  if len(self._heap) == len(self._history): 
 72   
 73                          end_index = self._offset % len(self._history) 
 74                          begin_index = end_index - len(self._heap) 
 75   
 76                          if begin_index >= 0: 
 77                                  old_history = self._history[begin_index:end_index] 
 78                          else: 
 79                                  old_history = self._history[begin_index:] + self._history[:end_index] 
 80   
 81                           
 82                          new_capacity = len(self._history) << 1 
 83                          self._history = array.array('I', (0,) * new_capacity) 
 84   
 85                          end_index = self._offset % len(self._history) 
 86                          begin_index = end_index - len(self._heap) 
 87                          if begin_index >= 0: 
 88                                  self._history[begin_index:end_index] = old_history 
 89                          else: 
 90                                  self._history[begin_index:] = old_history[:-begin_index] 
 91                                  self._history[:end_index] = old_history[-end_index:] 
 92   
 93                   
 94                  k = len(self._heap) 
 95                  self._history[self._offset % len(self._history)] = k 
 96                  self._heap.append( (self._value(x), self._offset, x) ) 
 97                  self._offset += 1 
 98                  self._sift_up(k) 
  99   
100          @property 
102                  """Return the maximum element in the history (the 'root' of the binary 
103                  tree that is the heap).""" 
104                  return self._heap[0][2] 
 105   
106          @property 
108                  end_index = self._offset % len(self._history) 
109                  begin_index = end_index - len(self._heap) 
110   
111                  if begin_index >= 0: 
112                          old_history = self._history[begin_index:end_index] 
113                  else: 
114                          old_history = self._history[begin_index % len(self._history):] + self._history[:end_index] 
115   
116                  return old_history 
 117   
118          @property 
120                  return len(self._heap) 
 121   
123                  return all(self._cmp(i, (i - 1) / 2) == -1 for i in range(1, len(self._heap))) 
 124   
126                  offset_i = self._heap[i][1] % len(self._history) 
127                  offset_j = self._heap[j][1] % len(self._history) 
128                  self._history[offset_i], self._history[offset_j] = j, i 
129                  self._heap[i], self._heap[j] = self._heap[j], self._heap[i] 
 130   
131 -        def _cmp(self, i, j): 
 132                  return cmp(self._heap[i][0], self._heap[j][0]) 
 133   
135                  while i > 0: 
136                          j = (i - 1) / 2 
137                          if self._cmp(i, j) == -1: 
138                                  break 
139                          self._swap(i, j) 
140                          i = j 
 141   
143                  n = len(self._heap) 
144                  while True: 
145                          j = 2 * i + 1 
146                          k = j + 1 
147                          if k < n - 1: 
148                                  if self._cmp(i, j) == -1: 
149                                          if self._cmp(j, k) == -1: 
150                                                  self._swap(i, k) 
151                                                  i = k 
152                                          else: 
153                                                  self._swap(i, j) 
154                                                  i = j 
155                                  elif self._cmp(i, k) == -1: 
156                                          self._swap(i, k) 
157                                          i = k 
158                                  else: 
159                                          break 
160                          elif j < n - 1 and self._cmp(i, j) == -1: 
161                                  self._swap(i, j) 
162                                  i = j 
163                          else: 
164                                  break 
 165   
167                  if i == 0: 
168                          self._sift_down(i) 
169                  else: 
170                          parent = (i - 1) / 2 
171                          if self._heap[i] > self._heap[parent]: 
172                                  self._sift_up(i) 
173                          else: 
174                                  self._sift_down(i) 
  175   
178          """Provides one-dimensional time symmetric clustering.  Every input that is 
179          greater than all samples that come before or after it within a certain window 
180          is reported. 
181   
182          key is a function that, when applied to the items being clustered, yields a 
183          monotonically increasing quantity that is treated as the 'time' of the item. 
184   
185          value is a function that, when applied to the items being clustered, is the 
186          quantity that is maximized -- for example, SNR. 
187   
188          Note: This implementation is tuned for event-like datasets in which sample 
189          times are widely and irregularly spaced.  For dense, regularly sampled data, 
190          there is a conceptually identical but practically much simpler algorithm. 
191   
192          The cluster() and flush() routines both return generators.  Both may be 
193          called multiple times.  History is retained between calls.""" 
194   
195 -        def __init__(self, key, value, window): 
 196                  self._key = key 
197                  self._delta = window 
198                  self._2delta = 2 * window 
199                  self._heap = SlidingMaxHeap(lambda x: value(x[1])) 
200                  self._min_key = None 
 201   
202 -        def flush(self, max_key): 
 203                  """Proceed with clustering items in the history under the assumption that 
204                  there are no further items with a key less than max_key.  This method 
205                  is used internally, but the user may call this as well if information 
206                  about the absence of triggers is available independently from the 
207                  triggers.  This may be useful in low latency applications.""" 
208                  if self._min_key is None: 
209                          self._min_key = max_key 
210                          return 
211   
212                  _2delta = self._2delta 
213                  _delta = self._delta 
214                  min_key = self._min_key 
215   
216                  if max_key < min_key: 
217                          raise ValueError("Input keys are not increasing monotonically.") 
218   
219                  while max_key - min_key >= _2delta: 
220                          if self._heap.count > 0: 
221                                  root_key, root = self._heap.root 
222                                  if root_key > max_key - _delta: 
223                                          min_key = root_key - _delta 
224                                  elif root_key < min_key + _delta: 
225                                          min_key = root_key 
226                                  else: 
227                                          yield root 
228                                          min_key = root_key 
229                                  self._heap.drop_while(lambda x: x[0] <= min_key) 
230                          else: 
231                                  min_key = max_key - _delta 
232   
233                  self._min_key = min_key 
 234   
236                  key = self._key 
237   
238                  for item in iterable: 
239                          max_key = key(item) 
240                          for clustered_item in self.flush(max_key): 
241                                  yield clustered_item 
242                          self._heap.append((max_key, item)) 
  243   
244   
245 -def clustered(iterable, key, value, window): 
 246          """Utility function for clustering an iterable without having to instantiate 
247          a ClusterBuilder object.""" 
248          cluster_builder = ClusterBuilder(key, value, window) 
249          for clustered_item in cluster_builder.cluster(iterable): 
250                  yield clustered_item 
 251