Scale out your Pandas DataFrame operations using Dask
In Pandas, one can easily apply operations on all the data using the apply method. However, this method is quite slow and is not useful when scaling up your methods. Is there a way to speed up these operations? And if so, how? Yes, there is! This blog post will explain how you can use Dask to maximize the power of parallelization and to scale out your DataFrame operations.
An example
As an example, consider the following: suppose we generate a collection of numbers. We generate 1,000,000 integer between 0 and 9999. Then, we turn the integers into a Pandas DataFrame since this is a common operation which is done in most data-related projects:
import pandas as pd
import random
if __name__ == '__main__':
# The amount of numbers to generate
N = 1000000
# Now generate 1,000,000 integers between 0 and 9999
data = [random.randint(0, 9999) for _ in range(N)]
# And finally, create the DataFrame
df = pd.DataFrame(data, columns=['number'])
The task is then to filter out all the integers smaller than 1000 (so 0 … 999).
How fast can we process all the numbers?
We can check whether the numbers are small (< 1000) using the following code:
import pandas as pd
import random
def is_small_number(x):
# Returns True if x is less than 1000, False otherwise
return x < 1000
if __name__ == '__main__':
# The amount of numbers to generate
N = 1000000
# Now generate 1,000,000 integers between 0 and 9999
data = [random.randint(0, 9999) for _ in range(N)]
# And finally, create the DataFrame
df = pd.DataFrame(data, columns=['number'])
# Now apply the "is_small_number" operation to othe data
small_number_booleans = df.apply(is_small_number)
Here, the final variable contains booleans whether the number is small enough (less than 1000). We can now use timeit to figure out the speed of the apply operation. We run timeit for 100 times using the following code:
import pandas as pd
from timeit import timeit
import random
def is_small_number(x):
# Returns True if x is less than 1000, False otherwise
return x < 1000
if __name__ == '__main__':
# The amount of numbers to generate
N = 1000000
# Now generate 1,000,000 integers between 0 and 9999
data = [random.randint(0, 9999) for _ in range(N)]
# And finally, create the DataFrame
df = pd.DataFrame(data, columns=['number'])
# Now we can figure out the speed of the apply method
print('pandas apply method', timeit(lambda: df.apply(is_small_number), number=100))
This resulted into the following outcome:
pandas apply method 1.4612023359952082
Not bad, approximately 1.5 seconds to process 1,000,000 numbers (a 100 times)! However, if you are dealing with heavy operations (such as tokenization) this number would be way larger. Then, parallelization is needed. But even in this small example, there is room for improvement.
Parallelization to the Rescue
The main problem with the apply method is that it gets executed on a single core. How can we do better? We could “chunk” the work into smaller subtasks. Suppose that multiple machines work on the different subtasks. In this way, the problem is solved way faster. Here, Dask comes to the rescue. Dask is used for scaling out your method. Instead of running your problem-solver on only one machine, Dask can even scale out to a cluster of machines. If you have only one machine, then Dask can scale out from one thread to multiple threads. First, we need to convert our Pandas DataFrame to a Dask DataFrame. Here, you will loose some flexibility. The Dask DataFrame does not support all the operations of a Pandas DataFrame. Luckily for us, we can convert easily from a Pandas DataFrame to a Dask DataFrame and back. Consider the following code in which our Pandas DataFrame is converted to a Dask DataFrame:
import pandas as pd
import dask.dataframe as dd
from timeit import timeit
import random
def is_small_number(x):
# Returns True if x is less than 1000, False otherwise
return x < 1000
if __name__ == '__main__':
# The amount of numbers to generate
N = 1000000
# Now generate 1,000,000 integers between 0 and 9999
data = [random.randint(0, 9999) for _ in range(N)]
# And finally, create the DataFrame
df = pd.DataFrame(data, columns=['number'])
# Now convert it to a Dask DataFrame and chunk it into 4 partitions
df = dd.from_pandas(df, npartitions=4)
# Now we can figure out the speed of the apply method
print('dask parallel map partitions method', timeit(lambda: df.map_partitions(is_small_number).compute(), number=100))
This resulted into the following:
dask parallel map partitions method 0.7870494624626411
That is, 0.78 seconds which means a speed-up of 1.8x! A few words here: the more partitions you use, the more overhead for setting up the threads. Be careful with the number of partitions. For small tasks like this, a low number of partitions works better. For larger tasks, it might be the case that a larger number of partitions works better. Also note that the interface is similar to the Apache Spark interface. The map-reduce concepts are also available in Dask and Dask builds a computation graph in the background. When the compute() method is called, the computation graph is executed. This will be explained in a later post on Dask. For now, it is interesting that you can speed-up your Pandas DataFrame apply method calls!
Conclusions
- You now know how Dask can scale out operations on your Pandas DataFrames.
- In the simple example, we achieved a speed-up of 1.8x. This speed-up is way larger for heavy tasks and datasets.
- Parallelization is key to faster computations on large amounts of data.