Distributed Sampling on Dask Bags

Contributing to open-source projects is a rewarding experience. Even though the contribution regards a small amount of code, being aware that those few lines will be used by many users, many times, in the future, gives a really satisfying feeling.

The workflow was simple, I went on the issue tracker of the project, I selected a new feature to implement which I knew was not impossible to implement, then I manifested my intentions and I started working on it.

The challenging part of this work was to prove its mathematical correctness.
I had the initial idea since the beginning, but writing its proof helped me out to improve the algorithm and to fix its corner cases.

First I will introduce Dask and the concept of distributed data structure, then I will briefly describe the problem of sampling and my proposed solution, including a small proof of correctness and some words on its computational complexity. I will conclude with a statistical test to show that one generated result is not unbalanced.

Dask

My first contribution was about a specific distributed data structure called: Bag. In Math terms a Bag is a Multiset, which can be thought of as a python list where the order of its elements is not meaningful; in other words, data are inserted following an order which is not necessarily the same when they are read.

Distributed Data Structure

Sampling

Interface

import randomseq = [1, 2, 3, 4, 5, 6]s = random.choices(seq, k=2)print(s)>> [2, 6]

Here’s the code using the newly implemented method:

import dask.bag as db
from dask.bag import random
seq = db.from_sequence([1, 2, 3, 4, 5, 6], npartitions=3)s = random.choices(seq, k=2)print(list(s.compute()))>> [2, 6]

Two steps algorithm

By using this approach, elements belonging to different partitions are selected with different probability, since partitions do not necessarily have the same size.
To counterbalance this unfairness, when the sampled elements are gathered on the reduce, a weighted sampling is performed.

Weights used on the reduce are computed taking into account specific properties of the partition where each element comes from, like the number of elements of the partition or the real number of elements selected during the first sampling. Those properties are cleverly passed from the maps to reduce alongside the sampled elements.

After this initial description of the algorithm, now I will outline the math behind it.

Informal Proof

Let’s first assert the equivalence between simple sampling and weighted sampling with weights all equal to 1/N. This result will not be proven here.

Since the procedure selects k random elements, on each selection and for each element j in the Bag: the probability to pick that element is 1/N.
Let’s rewrite the algorithm by associating to each element a random variable X with a uniform distribution between 0 and 1. The element is selected by the algorithm if X is smaller than the associated weight 1/N. In other words, an element is selected with probability equal to its weight.
In formulae:

Since selections are made with replacement and probabilities of selection over k iterations are independent: the joint probability is the product of the probability of the single selections:

By leveraging the properties of the specific partition where the i-th element comes from, let’s rewrite it as the product of two ratios:

Here, k i-th is the actual number of elements extracted from the i-th partition, N i-th is the number of elements of that partition.
By substituting we obtain:

Now, let’s define two random variables F and S for each element in the Bag. The former is associated with the map phase and the latter with the reduce. Their distribution is uniform between 0 and 1:

Where j is the associated element in the Bag.
Now, by using the same procedure as before, let’s define two weighted sampling procedures based on those just defined random variables:

Where i is the partition associated with the element j.
By doing so, we obtain two weighted sampling procedures, where not all weights are equal to 1/N, but instead, each weight is specifically associated with an element and the partition where it belongs to.
Now, since we want to obtain the previous decomposition, the first sampling procedure must be repeatedly k i-th times and the second k times. Is important to stress out that k i-th and k can be different since the number of elements in each partition (N i-th) can be smaller than k. In those cases, the used k i-th is forced to be N i-th.
In formulae:

By substituting, we obtain:

So, a simple sampling can be split into two distributed weighted sampling procedures and, by wisely choose the parameters of those procedures, is possible to keep unaltered the probability to select each element in the population.

Computational Costs

Statistical Test

To test the correctness of the sampling procedure let’s compare it to a dice toss. In other words, the probability to sample one element is a random variable with a multinomial distribution with a parameter vector made by N elements all with value 1/N.

The idea here is to execute the experiment many times and count the occurrences of each element of the sequence. By doing so, we will expect all single occurrences to be close to their expected value, which is T/N, where T is the number of executions. The point is to leverage a statistical test that shows if the real performance of the experiment lays too far from the expected value.
The test sequence is made by 6 integers and two slightly unbalanced partitions: one partition is made by 4 elements and the other by 2.
The experiment involved 150 random selections of only one element.
Here’s the code:

import dask.bag as db
numbers = range(6)
buckets = {i: 0 for i in numbers}
a = db.from_sequence(numbers, partition_size=4)
for i in range(150):
s = a.sample(k=1).compute()
for e in s:
buckets[e] += 1
print(buckets)

The expected value is 150/6=25.
We obtained the following occurrences of the six elements of the sequence:
25,26,33,22,17,27.

For this experiment I used R and XNomial package:

install.packages('XNomial')
require(XNomial)
obs = c(25,26,33,22,17,27)
n = length(obs)
expr = rep(1/n, n)
xmulti(obs, expr, statName = "LLR", histobins = T, histobounds = c(0, 0), showCurve = T)

Here’s a plot for the test log-likelihood ratio, which, under the null-hypothesis, asymptotically converges to a chi-squared distribution with 5 degrees of freedom:

P value  (LLR)  =  0.3335
P value (Prob) = 0.3339
P value (Chisq) = 0.3428

Obtaining as test outcome a p-value of 0.33.

By using a significance level α of 0.05, we cannot reject the null hypothesis that the distribution is multinomial with parameter vector 1/N.

More information about the theory behind this test is here, and the R package here.

Conclusions

To keep track of the entire workflow here’s the link to the Github initial issue and my pull request.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store