如何在具有500万行和50万组的Dask数据帧上加速groupby().sum()?

How to speed up groupby().sum() on a dask dataframe with 5 millions of rows and 500 thousands of groups?(如何在具有500万行和50万组的Dask数据帧上加速groupby().sum()?)

本文介绍了如何在具有500万行和50万组的Dask数据帧上加速groupby().sum()?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含

的数据帧
  • 500万行。
  • 唯一元素数为500.000的列group_id
  • 名为var1var2等的数千个其他列。var1var2、.仅包含0和1。

我想按group_id分组,然后将它们相加。为了有更好的性能,我使用Dask。但是,此简单聚合的速度仍然较慢。

The time spent on a dataframe with 10 columns is 6.285385847091675 seconds
The time spent on a dataframe with 100 columns is 64.9060411453247 seconds
The time spent on a dataframe with 200 columns is 150.6109869480133 seconds
The time spent on a dataframe with 300 columns is 235.77087807655334 seconds

我的实际数据集最多包含30.000列。我已经阅读了@Divakar关于使用numpy的答案(1和2)。但是,前一个线程是关于计数的,而后者是关于列求和的。

您能否详细说明加速此聚合的一些方法?

import numpy as np
import pandas as pd
import os, time
from multiprocessing import dummy
import dask.dataframe as dd

core = os.cpu_count()
P = dummy.Pool(processes = core)

n_docs = 500000
n_rows = n_docs * 10
data = {}

def create_col(i):
    name = 'var' + str(i)
    data[name] = np.random.randint(0, 2, n_rows)

n_cols = 300
P.map(create_col, range(1, n_cols + 1))
df = pd.DataFrame(data, dtype = 'int8')
df.insert(0, 'group_id', np.random.randint(1, n_docs + 1, n_rows))
df = dd.from_pandas(df, npartitions = 3 * core) 

start = time.time()
df.groupby('group_id').sum().compute()
end = time.time()
print('The time spent on a dataframe with {} columns is'.format(n_cols), end - start, 'seconds')

推荐答案

(我误解了原始答案中的OP,因此全部清除)。

我通过以下方式获得改进:

  • 切换到Numpy
  • 对组和数据使用相同的数据类型(np.int32)
  • 以并行模式使用Numba"
import numba as nb
@nb.njit('int32[:, :](int32[:, :], int_)', parallel=True)
def count_groups2(group_and_data, n_groups):
    n_cols = group_and_data.shape[1] - 1
    counts = np.zeros((n_groups, n_cols), dtype=np.int32)
    for idx in nb.prange(len(group_and_data)):
        row = group_and_data[idx]
        counts[row[0]] += row[1:]
    return counts

df = pd.DataFrame(data, dtype='int32')
group_id = np.random.randint(1, n_docs + 1, n_rows, dtype=np.int32)
df.insert(0, 'group_id', group_id)

# switching to numpy (line below) is costly
# it would be faster to work with numpy alone (no pandas)
group_and_data = df.values
count_groups2(group_and_data)
op_method(df)

    72         1    1439807.0 1439807.0      7.0      group_and_data = df.values
    73         1    1341527.0 1341527.0      6.5      count_groups2(group_and_data, n_groups=500_000)
    74         1   12043334.0 12043334.0     58.5      op_method(df)

这篇关于如何在具有500万行和50万组的Dask数据帧上加速groupby().sum()?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本文标题为:如何在具有500万行和50万组的Dask数据帧上加速groupby().sum()?

基础教程推荐