在本节中,我们使用 Dask 和 dask.delayed
并行化简单的 for 循环样例代码。通常,这是将函数转换为与 Dask 一起使用所需的唯一函数。
这是使用 dask
并行化现有代码库或构建复杂系统的一种简单方法。这也将有助于我们对后面的部分进行理解。
相关文档
正如我们将在分布式调度器笔记本中看到的,Dask 有多种并行执行代码的方法。我们将通过创建 dask.distributed.Client
来使用分布式调度器。现在,这将为我们提供一些不错的诊断。稍后我们将深入讨论调度器。
from dask.distributed import Client
client = Client(n_workers=4)
首先让我们创建一些玩具函数,inc
和 add
,它们会休眠一段时间来模拟工作。然后我们将正常运行这些函数。
在下一节中,我们将并行化此代码。
from time import sleep
def inc(x):
sleep(1)
return x + 1
def add(x, y):
sleep(1)
return x + y
我们使用 %%time
magic 指令来计时这段普通代码的执行时间,这是 Jupyter Notebook 的一个特殊功能。
%%time
# 这需要三秒钟才能运行,因为我们依次调用每个函数,一个接一个
x = inc(1)
y = inc(2)
z = add(x, y)
Wall time: 3.02 s
dask.delayed
装饰器并行化两个 inc
调用可以并行调用,因为它们完全相互独立。
我们将使用 dask.delayed
函数转换 inc
和 add
函数。当我们通过传递参数调用延迟版本时,与以前完全一样,原始函数实际上还没有被调用 —— 这就是单元执行很快完成的原因。相反,会生成一个延迟对象,它会跟踪要调用的函数和要传递给它的参数。
from dask import delayed
%%time
# 这会立即运行,它所做的只是构建一个图
x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x, y)
Wall time: 1e+03 µs
上述代码立即运行,因为还没有真正发生任何事情。
要获得结果,请调用 compute
。请注意,这比原始代码运行得更快。
%%time
# 实际上使用本地线程池运行我们的计算
z.compute()
Wall time: 2.05 s
5
z
对象是一个惰性 Delayed
对象。这个对象包含我们计算最终结果所需的一切,包括对所有所需函数的引用,以及它们的输入和相互之间的关系。我们可以使用上面的 .compute()
评估结果,或者我们可以使用 .visualize()
可视化此值的任务图。
z
Delayed('add-25aea027-2aa1-4253-9eb7-962a7d804914')
查看 z
的任务图
z.visualize()
请注意,这包括之前的函数名称,以及 inc
函数输出到 add
输入的逻辑流。
为什么我们从 3s 变成了 2s?为什么我们不能并行化到 1s?
如果 inc
和 add
函数不包括 sleep(1)
会发生什么?Dask 还能加速这段代码吗?
不会加速
def inc_v2(x):
return x + 1
def add_v2(x, y):
return x + y
%%time
x = inc_v2(1)
y = inc_v2(2)
z = add_v2(x, y)
z
Wall time: 0 ns
5
x = delayed(inc_v2)(1)
y = delayed(inc_v2)(2)
z = delayed(add_v2)(x, y)
%%time
z.compute()
Wall time: 24 ms
5
如果我们有多个输出或者还想访问 x
或 y
怎么办?
for
循环for
循环是我们想要并行化的最常见的事情之一。在 inc
和 sum
上使用 dask.delayed
并行化以下计算。
串行代码
data = [1, 2, 3, 4, 5, 6, 7, 8]
%%time
# 串行代码
results = []
for x in data:
y = inc(x)
results.append(y)
total = sum(results)
Wall time: 8.05 s
total
44
并行代码
%%time
for x in data:
y = delayed(inc)(x)
results.append(y)
total = delayed(sum)(results)
print("Before computing:", total) # 查看 total 的类型
result = total.compute()
print("After computing:", result) # 计算之后
Before computing: Delayed('sum-492662c6-3934-408a-beea-763b4f421a40')
After computing: 88
Wall time: 1.04 s
与直接使用 sum
函数而不是延迟包装的版本相比,图形可视化与给定的解决方案相比如何?你能解释一下后面的版本吗?您可能会发现以下表达式的结果很有启发性
delayed(inc)(1) + delayed(inc)(2)
z = delayed(inc)(1) + delayed(inc)(2) + delayed(inc)(3)
z.visualize()
z = delayed(sum)(delayed(inc)(1), delayed(inc)(2), delayed(inc)(3))
z.visualize()
通常我们只想延迟一些函数,立即运行其中的几个。当这些函数速度很快时,这尤其有用,并帮助我们确定应该调用哪些其他较慢的函数。这个决定,延迟还是不延迟,通常是我们在使用 dask.delayed
时需要深思熟虑的地方。
在下面的示例中,我们遍历输入列表。如果输入是偶数,那么我们想调用 inc
。如果输入是奇数,那么我们要调用 double
。必须立即(而不是懒惰地)做出调用 inc
或 double
的 is_even
决定,以便我们的图形构建 Python 代码继续进行。
def double(x):
sleep(1)
return 2 * x
def is_even(x):
return not x % 2
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
串行代码
%%time
results = []
for x in data:
if is_even(x):
y = double(x)
else:
y = inc(x)
results.append(y)
total = sum(results)
print(total)
90
Wall time: 10.1 s
并行版本
%%time
results = []
for x in data:
if is_even(x):
y = delayed(double)(x)
else:
y = delayed(inc)(x)
results.append(y)
total = delayed(sum)(results)
print(total)
Delayed('sum-f5af7db2-ff32-4186-af6c-2106e51a7341')
Wall time: 999 µs
%time total.compute()
Wall time: 2.04 s
90
total.visualize()
我们不能使用延迟的其他控制流示例是什么?
如果我们在上面的例子中延迟了 is_even(x)
的计算会发生什么?
你对延迟 sum()
有什么看法?这个函数既是计算又运行快速。
运行此代码以准备一些数据。
这将下载并提取 1990 年至 2000 年间从纽约出发的航班的一些历史航班数据。数据最初来自此处。
%run prep.py -d flights
import pathlib
sorted(pathlib.Path("data", "nycflights").iterdir())
[WindowsPath('data/nycflights/1990.csv'),
WindowsPath('data/nycflights/1991.csv'),
WindowsPath('data/nycflights/1992.csv'),
WindowsPath('data/nycflights/1993.csv'),
WindowsPath('data/nycflights/1994.csv'),
WindowsPath('data/nycflights/1995.csv'),
WindowsPath('data/nycflights/1996.csv'),
WindowsPath('data/nycflights/1997.csv'),
WindowsPath('data/nycflights/1998.csv'),
WindowsPath('data/nycflights/1999.csv')]
使用 pandas.read_csv
读取一个文件,并计算平均起飞延误
import pandas as pd
df = pd.read_csv(pathlib.Path("data", "nycflights", "1990.csv"))
df.head()
数据模式
df.dtypes
Year int64
Month int64
DayofMonth int64
DayOfWeek int64
DepTime float64
CRSDepTime int64
ArrTime float64
CRSArrTime int64
UniqueCarrier object
FlightNum int64
TailNum float64
ActualElapsedTime float64
CRSElapsedTime int64
AirTime float64
ArrDelay float64
DepDelay float64
Origin object
Dest object
Distance float64
TaxiIn float64
TaxiOut float64
Cancelled int64
Diverted int64
dtype: object
数据中有哪些始发机场
df.Origin.unique()
array(['EWR', 'LGA', 'JFK'], dtype=object)
每个机场平均起飞延误
df.groupby("Origin").DepDelay.mean()
Origin
EWR 9.168411
JFK 11.857274
LGA 8.560045
Name: DepDelay, dtype: float64
上述单元格计算每个机场一年的平均起飞延误。在这里,我们使用顺序 for 循环将其扩展到所有年份。
import pathlib
filenames = sorted(pathlib.Path("data", "nycflights").glob("*.csv"))
filenames
[WindowsPath('data/nycflights/1990.csv'),
WindowsPath('data/nycflights/1991.csv'),
WindowsPath('data/nycflights/1992.csv'),
WindowsPath('data/nycflights/1993.csv'),
WindowsPath('data/nycflights/1994.csv'),
WindowsPath('data/nycflights/1995.csv'),
WindowsPath('data/nycflights/1996.csv'),
WindowsPath('data/nycflights/1997.csv'),
WindowsPath('data/nycflights/1998.csv'),
WindowsPath('data/nycflights/1999.csv')]
%%time
sums = []
counts = []
for fn in filenames:
# 读取文件
df = pd.read_csv(fn)
# 按起飞机场分组
by_origin = df.groupby("Origin")
# 按起飞机场计算所有起飞延误和
total = by_origin.DepDelay.sum()
# 按机场汇总航班数
count = by_origin.DepDelay.count()
# 保存中间结果
sums.append(total)
counts.append(count)
# 组合中间结果得到全部 mean-delay-per-origin
total_delays = sum(sums)
n_flights = sum(counts)
mean = total_delays / n_flights
Wall time: 9.92 s
mean
Origin
EWR 10.295469
JFK 10.351299
LGA 7.431142
Name: DepDelay, dtype: float64
使用 dask.delayed 并行化上面的代码。需要知道一些额外的事情。
x = delayed(np.arange)(10)
y = (x + 1)[::2].sum() # 所有计算都被延迟
.compute()
方法效果很好。当您有多个输出时,您可能需要使用 dask.compute
函数:>>> from dask import compute
>>> x = delayed(np.arange)(10)
>>> y = x ** 2
>>> min_, max_ = compute(y.min(), y.max())
>>> min_, max_
(0, 81)
这样 Dask 就可以共享中间值 (比如 y = x**2
)
因此,您的目标是使用 dask.delayed
并行化上面的代码 (已在下面复制)。您可能还想对一些计算进行可视化,看看您是否正确地进行了计算。
from dask import compute
%%time
sums = []
counts = []
for fn in filenames:
# 读取文件
df = delayed(pd.read_csv)(fn)
# 按起飞机场分组
by_origin = df.groupby("Origin")
# 按起飞机场计算所有起飞延误和
total = by_origin.DepDelay.sum()
# 按机场汇总航班数
count = by_origin.DepDelay.count()
# 保存中间结果
sums.append(total)
counts.append(count)
# 组合中间结果得到全部 mean-delay-per-origin
sums, counts = compute(sums, counts)
total_delays = sum(sums)
n_flights = sum(counts)
mean = total_delays / n_flights
Wall time: 2.55 s
mean
Origin
EWR 10.295469
JFK 10.351299
LGA 7.431142
Name: DepDelay, dtype: float64
你得到了多少加速?这是您期望的加速程度吗?
尝试在何处调用 compute
。当你在 sum
和 counts
上使用时会发生什么?如果你等待并在 mean
上调用会发生什么?
mean
上使用 compute
sums = []
counts = []
for fn in filenames:
# 读取文件
df = delayed(pd.read_csv)(fn)
# 按起飞机场分组
by_origin = df.groupby("Origin")
# 按起飞机场计算所有起飞延误和
total = by_origin.DepDelay.sum()
# 按机场汇总航班数
count = by_origin.DepDelay.count()
# 保存中间结果
sums.append(total)
counts.append(count)
# 组合中间结果得到全部 mean-delay-per-origin
total_delays = delayed(sum)(sums)
n_flights = delayed(sum)(counts)
mean = delayed(lambda a, b: a/b)(total_delays, n_flights)
mean.visualize()
%%time
mean = mean.compute()
Wall time: 1.98 s
尝试延迟 sum
调用。如果 sum
延迟,图形会是什么样子?如果不是,图表会是什么样子?
sum
上使用 compute
sums = []
counts = []
for fn in filenames:
# 读取文件
df = delayed(pd.read_csv)(fn)
# 按起飞机场分组
by_origin = df.groupby("Origin")
# 按起飞机场计算所有起飞延误和
total = by_origin.DepDelay.sum()
# 按机场汇总航班数
count = by_origin.DepDelay.count()
# 保存中间结果
sums.append(total)
counts.append(count)
# 组合中间结果得到全部 mean-delay-per-origin
total_delays = delayed(sum)(sums)
n_flights = delayed(sum)(counts)
from dask import visualize
visualize(total_delays, n_flights)
%%time
total_delays, n_flights = compute(total_delays, n_flights)
mean = total_delays / n_flights
Wall time: 2.12 s
原始版本
sums = []
counts = []
for fn in filenames:
# 读取文件
df = delayed(pd.read_csv)(fn)
# 按起飞机场分组
by_origin = df.groupby("Origin")
# 按起飞机场计算所有起飞延误和
total = by_origin.DepDelay.sum()
# 按机场汇总航班数
count = by_origin.DepDelay.count()
# 保存中间结果
sums.append(total)
counts.append(count)
visualize(sums, counts)
你能想出你想要以一种方式减少另一种方式的任何原因吗?
访问 Delayed documentation。特别是,(delayed screencast 将强化您在此处学到的概念,delayed best practices 文档收集了有关如何使用 dask.delayed
的建议。
在继续下一个练习之前,请确保关闭您的客户端或停止此内核。
client.close()
dask-tutorial
https://github.com/dask/dask-tutorial
Dask 教程
相关文章
题图由 Jan-Christoph Horn 在 Pixabay 上发布。