144 lines
3.0 KiB
Go
144 lines
3.0 KiB
Go
// Copyright 2014 Google Inc. All Rights Reserved.
|
||
//
|
||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
// you may not use this file except in compliance with the License.
|
||
// You may obtain a copy of the License at
|
||
//
|
||
// http://www.apache.org/licenses/LICENSE-2.0
|
||
//
|
||
// Unless required by applicable law or agreed to in writing, software
|
||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
// See the License for the specific language governing permissions and
|
||
// limitations under the License.
|
||
|
||
package sampling
|
||
|
||
import (
|
||
"container/heap"
|
||
"math"
|
||
"math/rand"
|
||
"sync"
|
||
)
|
||
|
||
type esSampleItem struct {
|
||
data interface{}
|
||
key float64
|
||
}
|
||
|
||
type esSampleHeap []esSampleItem
|
||
|
||
func (self esSampleHeap) Len() int {
|
||
return len(self)
|
||
}
|
||
|
||
func (self esSampleHeap) Less(i, j int) bool {
|
||
return self[i].key < self[j].key
|
||
}
|
||
|
||
func (self esSampleHeap) Swap(i, j int) {
|
||
self[i], self[j] = self[j], self[i]
|
||
}
|
||
|
||
func (self *esSampleHeap) Push(x interface{}) {
|
||
item := x.(esSampleItem)
|
||
*self = append(*self, item)
|
||
}
|
||
|
||
func (self *esSampleHeap) Pop() interface{} {
|
||
old := *self
|
||
item := old[len(old)-1]
|
||
*self = old[:len(old)-1]
|
||
return item
|
||
}
|
||
|
||
type esSampler struct {
|
||
weight func(interface{}) float64
|
||
samples *esSampleHeap
|
||
maxSize int
|
||
lock sync.RWMutex
|
||
}
|
||
|
||
func (self *esSampler) Update(d interface{}) {
|
||
self.lock.Lock()
|
||
defer self.lock.Unlock()
|
||
|
||
u := rand.Float64()
|
||
key := math.Pow(u, 1.0/self.weight(d))
|
||
|
||
if self.samples.Len() < self.maxSize {
|
||
heap.Push(self.samples, esSampleItem{
|
||
data: d,
|
||
key: key,
|
||
})
|
||
return
|
||
}
|
||
|
||
s := *(self.samples)
|
||
min := s[0]
|
||
|
||
// The key of the new item is larger than a key in existing item.
|
||
// Add this new item.
|
||
if key > min.key {
|
||
heap.Pop(self.samples)
|
||
heap.Push(self.samples, esSampleItem{
|
||
data: d,
|
||
key: key,
|
||
})
|
||
}
|
||
}
|
||
|
||
func (self *esSampler) Len() int {
|
||
self.lock.RLock()
|
||
defer self.lock.RUnlock()
|
||
return len(*self.samples)
|
||
}
|
||
|
||
func (self *esSampler) Reset() {
|
||
self.lock.Lock()
|
||
defer self.lock.Unlock()
|
||
self.samples = &esSampleHeap{}
|
||
heap.Init(self.samples)
|
||
}
|
||
|
||
func (self *esSampler) Map(f func(interface{})) {
|
||
self.lock.RLock()
|
||
defer self.lock.RUnlock()
|
||
|
||
for _, d := range *self.samples {
|
||
f(d.data)
|
||
}
|
||
}
|
||
|
||
func (self *esSampler) Filter(filter func(d interface{}) bool) {
|
||
self.lock.Lock()
|
||
defer self.lock.Unlock()
|
||
|
||
rmlist := make([]int, 0, len(*self.samples))
|
||
for i, d := range *self.samples {
|
||
if filter(d.data) {
|
||
rmlist = append(rmlist, i)
|
||
}
|
||
}
|
||
|
||
for _, i := range rmlist {
|
||
heap.Remove(self.samples, i)
|
||
}
|
||
}
|
||
|
||
// ES sampling algorithm described in
|
||
//
|
||
// Pavlos S. Efraimidis and Paul G. Spirakis. Weighted random sampling with a
|
||
// reservoir. Information Processing Letters, 97(5):181 – 185, 2006.
|
||
//
|
||
// http://dl.acm.org/citation.cfm?id=1138834
|
||
func NewESSampler(size int, weight func(interface{}) float64) Sampler {
|
||
s := &esSampleHeap{}
|
||
heap.Init(s)
|
||
return &esSampler{
|
||
maxSize: size,
|
||
samples: s,
|
||
weight: weight,
|
||
}
|
||
}
|