Distributed Sampling on Dask Bags

Dask

Dask is a distributed framework to perform data analysis, if you know Apache Spark, you can easily understand what we’re talking about. Let’s say, Dask is an alternative to Apache Spark, but written in python and backed by Anaconda Inc. You can find here an interesting article describing the differences between the two. I won’t go into my feeling that the Dask project will become the next big thing in the data analysis world.

Distributed Data Structure

Those data structures are not typical data structures studied on Algorithm classes at CS degrees. Those are distributed, which means they do not necessarily reside on the same machine. In other words, those data structures are usually divided into partitions that are stored in memory, possibly on different workers of the same cluster. A key point here is that all algorithms that operate over distributed data structures must take into account that data communications among different workers are time-consuming since they are dominated by network performances.

Sampling

The feature implemented is sampling, more specifically simple sampling, with or without replacement. I will not explain the definition of sampling or the difference between with or without replacement since online there are plenty of resources on it.

Interface

The philosophy of Dask is to minimize the efforts on readapting code written for non-distributed data structures, in the case of a Bag, its non-distributed python versions are lists or their abstraction: sequences.
I implemented a new method in the Bag class, I named it after python the random.choices function and I decided to follow closely its interface.
Here’s an example:

import randomseq = [1, 2, 3, 4, 5, 6]s = random.choices(seq, k=2)print(s)>> [2, 6]
import dask.bag as db
from dask.bag import random
seq = db.from_sequence([1, 2, 3, 4, 5, 6], npartitions=3)s = random.choices(seq, k=2)print(list(s.compute()))>> [2, 6]

Two steps algorithm

The proposed solution is implemented with map and reduce. It performs two sampling steps: one on maps and one on reduce. The first sampling is done over all the elements of every single partition, as a result, all elements drawn from the same partition are selected with the same equal probability.

Informal Proof

Here I will give informal proof on how a simple sampling is equivalent to two consequent weighted sampling procedures.

Computational Costs

The proposed solution uses python random.choices both on maps and on reduce phases. So, its computational complexity is proportional to the original one. Regarding the amount of data passed from maps to the reduce, hence through the network, each map transfers k elements to the reduce. Given p the number of partitions: the number of sampled elements transferred on the network is O(k*p).
Which leads to be an acceptable behavior when k is small.

Statistical Test

To assess algorithm correctness here is performed a statistical test on a small example.

import dask.bag as db
numbers = range(6)
buckets = {i: 0 for i in numbers}
a = db.from_sequence(numbers, partition_size=4)
for i in range(150):
s = a.sample(k=1).compute()
for e in s:
buckets[e] += 1
print(buckets)
install.packages('XNomial')
require(XNomial)
obs = c(25,26,33,22,17,27)
n = length(obs)
expr = rep(1/n, n)
xmulti(obs, expr, statName = "LLR", histobins = T, histobounds = c(0, 0), showCurve = T)
P value  (LLR)  =  0.3335
P value (Prob) = 0.3339
P value (Chisq) = 0.3428

Conclusions

In this article, I described a solution for the problem of distributed sampling.
As a further development, it will be interesting to investigate its use on other types of distributed data structures.
Additionally, by using Dask internal API, I figured out possible places of refactoring, especially between the HighLevelGraph and Bag classes, since there is not always a clear picture of the division of responsibilities among the two.
By the way, this is quite common on open source projects, since is not always easy for main contributors to plan the right balance between the types of tasks to assign. In other words, is not simple to drive free contributions among different task types, such as new features, bug fixing, or refactoring.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store