假定有数据个数未知的数据流,要求随机其中的选择k 个数据,且保证每个数据选中的概率相等。
先将前k 个数据取出来放入结果集中,然后从第k+1 个数据到来时开始替换,假设现在为数据流中的第i 个数据,以 \frac{k}{i} 的概率替换掉蓄水池中的某个数据可满足要求。
不失一般性,我们假设数据流共有n个数据。
根据以上描述,蓄水池采样算法实现如下:
# coding:utf-8
import random
def sampling(k, n, reservoir):
"""采样算法"""
for i in range(k, n):
idx = random.randint(0,i)
# 选择替换
if idx < k:
reservoir[idx] = i
def test():
"""单元测试"""
# 蓄水池可以存下的数据个数
k = 10
# 数据流的数据个数
n = 100000
reservoir = [i for i in range(min(k,n))]
sampling(k, n, reservoir)
print(reservoir)
if __name__ == '__main__':
test()
为了验证方法的正确性,我们假设k=1 且n=10 ,即:共有10个数,每次随机选择1个数,并重复100000次采样。代码稍作修改如下:
# coding:utf-8
import random
def sampling(k, n):
"""采样算法"""
# 初始化,第1个数据接进入蓄水池
ret_idx = 0
# 从第2个数据(下标为1)开始,需要执看是否替换
for i in range(k, n):
idx = random.randint(0, i)
# 选择替换
if idx < k:
ret_idx = i
return ret_idx
def test():
"""单元测试"""
# 蓄水池可以存下的数据个数
k = 1
# 数据流的数据个数
n = 10
cnt_dict = {i: 0 for i in range(n)}
# 重复采样10万次,每次从0-9中随机选取一个数
for _ in range(100000):
ret_idx = sampling(k, n)
cnt_dict[ret_idx] += 1
for k, v in cnt_dict.items():
print("{}:{}".format(k,v))
if __name__ == '__main__':
test()
输出:
0:10029
1:9840
2:9946
3:10002
4:10119
5:9843
6:10004
7:10197
8:10001
9:10019
可以看到,每个数的采样的次数均在1万左右,符合预期。
参考资料: