从S中抽取首k项放入「水塘」中 对于每一个S[j]项(j ≥ k): 随机产生一个范围0到j的整数r 若 r < k 则把水塘中的第r项换成S[j]项
/* S has items to sample, R will contain the result */ ReservoirSample(S[1..n], R[1..k]) // fill the reservoir array for i = 1 to k R[i] := S[i] // replace elements with gradually decreasing probability for i = k+1 to n j := random(1, i) // important: inclusive range if j <= k R[j] := S[i]
实现概述
获取到需要抽样RDD分区的样本大小k和分区的所有KEY数组input
初始化抽样结果集reservoir为分区前K个KEY值
如果分区的总数小于预计样本大小k,则将当前分区的所有数据作为样本数据,否则到第四步
遍历分区里所有Key组成的数组input
生成随机需要替换input数组的下标,如果下标小于K则替换
返回抽取的key值数组和当前分区的总数据量: (reservoir, l)
实现源码
/** * Reservoir sampling implementation that also returns the input size. * * @param input:RDD的分区里面的key组成的Iterator * @param k :抽样大小= val sampleSize = math.min(20.0 * partitions, 1e6) val k=math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt * @param seed random seed:选取随机数的种子 * @return (samples, input size) */ defreservoirSampleAndCount[T: ClassTag]( input: Iterator[T], k: Int, seed: Long = Random.nextLong()) : (Array[T], Long) = { val reservoir = newArray[T](k) // Put the first k elements in the reservoir. // 初始化水塘数据为input的钱K个数,即:reservoir数组中放了RDD分区的前K个key值 var i = 0 while (i < k && input.hasNext) { val item = input.next() reservoir(i) = item i += 1 }
// If we have consumed all the elements, return them. Otherwise do the replacement. // 如果当前的RDD总数小于预设值的采样量则全部作为采样数据并结束采样 if (i < k) { // If input size < k, trim the array to return only an array of input size. val trimReservoir = newArray[T](i) System.arraycopy(reservoir, 0, trimReservoir, 0, i) (trimReservoir, i) } else { // If input size > k, continue the sampling process. var l = i.toLong val rand = newXORShiftRandom(seed) // 遍历所有的key while (input.hasNext) { val item = input.next() l += 1 // There are k elements in the reservoir, and the l-th element has been // consumed. It should be chosen with probability k/l. The expression // below is a random long chosen uniformly from [0,l) // 计算出需要替换的数组下标 // 选取第n个数的概率是:n/l; 如果随机替换数组值的概率是p=rand.nextDouble, // 则如果p<k/l;则替换池中任意一个数,即: p*l < k 则进行替换,用p*l作为随机替换的下标 val replacementIndex = (rand.nextDouble() * l).toLong if (replacementIndex < k) { // 替换reservoir[随机抽取的下标]的值为input[l]的值item reservoir(replacementIndex.toInt) = item } } (reservoir, l) } }