前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >python︱大规模数据存储与读取、并行计算:Dask库简述

python︱大规模数据存储与读取、并行计算:Dask库简述

作者头像
悟乙己
发布于 2018-01-02 08:50:07
发布于 2018-01-02 08:50:07
6.5K00
代码可运行
举报
文章被收录于专栏:素质云笔记素质云笔记
运行总次数:0
代码可运行

数据结构与pandas非常相似,比较容易理解。

github:https://github.com/dask

dask的内容很多,挑一些我比较看好的内容着重点一下。 .

一、数据读取与存储

先来看看dask能读入哪些内容:

1、csv

dask并不能读入excel,这个注意

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# pandas
import pandas as pd                    
df = pd.read_csv('2015-01-01.csv')      
df.groupby(df.user_id).value.mean()     

#dask
 import dask.dataframe as dd
 df = dd.read_csv('2015-*-*.csv')
 df.groupby(df.user_id).value.mean().compute()

非常相似,除了.compute() .

2、Dask Array读取hdf5

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import numpy as np                       import dask.array as da
f = h5py.File('myfile.hdf5')             f = h5py.File('myfile.hdf5')
x = np.array(f['/small-data'])           x = da.from_array(f['/big-data'],
                                                           chunks=(1000, 1000))
x - x.mean(axis=1)                       x - x.mean(axis=1).compute()

左是Pandas,右边是dask .

3、Dask Bag

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import dask.bag as db
b = db.read_text('2015-*-*.json.gz').map(json.loads)
b.pluck('name').frequencies().topk(10, lambda pair: pair[1]).compute()

读取大规模json文件,几亿都很easy

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
>>> b = db.read_text('myfile.txt')
>>> b = db.read_text(['myfile.1.txt', 'myfile.2.txt', ...])
>>> b = db.read_text('myfile.*.txt')

读取txt

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
>>> import dask.bag as db
>>> b = db.from_sequence([{'name': 'Alice',   'balance': 100},
...                       {'name': 'Bob',     'balance': 200},
...                       {'name': 'Charlie', 'balance': 300}],
...                      npartitions=2)
>>> df = b.to_dataframe()

变为dataframe格式的内容 .

4、Dask Delayed 并行计算

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from dask import delayed
L = []
for fn in filenames:                  # Use for loops to build up computation
    data = delayed(load)(fn)          # Delay execution of function
    L.append(delayed(process)(data))  # Build connections between variables

result = delayed(summarize)(L)
result.compute()

.

5、concurrent.futures自定义任务

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()

.

二、Delayed 并行计算模块

一个先行例子,本来的案例:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def inc(x):
    return x + 1

def double(x):
    return x + 2

def add(x, y):
    return x + y

data = [1, 2, 3, 4, 5]

output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

total = sum(output)

再来看看用delay加速的:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from dask import delayed

output = []
for x in data:
    a = delayed(inc)(x)
    b = delayed(double)(x)
    c = delayed(add)(a, b)
    output.append(c)

total = delayed(sum)(output)

还可以将计算流程可视化:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
total.visualize()  # see image to the right

.

三、和SKLearn结合的并行算法

广义回归GLM:https://github.com/dask/dask-glm tensorflow深度学习库:Dask-Tensorflow

以XGBoost为例,官方:https://github.com/dask/dask-xgboost 来看一个案例code .

1、加载数据

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import dask.dataframe as dd

# Subset of the columns to use
cols = ['Year', 'Month', 'DayOfWeek', 'Distance',
        'DepDelay', 'CRSDepTime', 'UniqueCarrier', 'Origin', 'Dest']

# Create the dataframe
df = dd.read_csv('s3://dask-data/airline-data/20*.csv', usecols=cols,
                  storage_options={'anon': True})

df = df.sample(frac=0.2) # we blow out ram otherwise

is_delayed = (df.DepDelay.fillna(16) > 15)

df['CRSDepTime'] = df['CRSDepTime'].clip(upper=2399)
del df['DepDelay']

df, is_delayed = persist(df, is_delayed)
progress(df, is_delayed)

2、One hot encode编码

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
df2 = dd.get_dummies(df.categorize()).persist()

.

3、准备训练集和测试集 + 训练

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
data_train, data_test = df2.random_split([0.9, 0.1], 
                                         random_state=1234)
labels_train, labels_test = is_delayed.random_split([0.9, 0.1], 
                                                    random_state=1234)

训练

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import dask_xgboost as dxgb

params = {'objective': 'binary:logistic', 'nround': 1000, 
          'max_depth': 16, 'eta': 0.01, 'subsample': 0.5, 
          'min_child_weight': 1}

bst = dxgb.train(client, params, data_train, labels_train)
bst

.

4、预测

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# Use normal XGBoost model with normal Pandas
import xgboost as xgb
dtest = xgb.DMatrix(data_test.head())
bst.predict(dtest)
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
predictions = dxgb.predict(client, bst, data_test).persist()
predictions.head()

.

5、模型评估

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from sklearn.metrics import roc_auc_score, roc_curve
print(roc_auc_score(labels_test.compute(), 
                    predictions.compute()))
import matplotlib.pyplot as plt
%matplotlib inline

fpr, tpr, _ = roc_curve(labels_test.compute(), predictions.compute())
# Taken from http://scikit-learn.org/stable/auto_examples/model_selection/plot_roc.html#sphx-glr-auto-examples-model-selection-plot-roc-py
plt.figure(figsize=(8, 8))
lw = 2
plt.plot(fpr, tpr, color='darkorange', lw=lw, label='ROC curve')
plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic example')
plt.legend(loc="lower right")
plt.show()

.

四、计算流程可视化部分——Dask.array

来源:https://gist.github.com/mrocklin/b61f795004ec0a70e43de350e453e97e

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import numpy as np
import dask.array as da
x = da.ones(15, chunks=(5,))
x.visualize('dask.svg')
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
(x + 1).sum().visualize('dask.svg')

来一个二维模块的:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
x = da.ones((15, 15), chunks=(5, 5))
x.visualize('dask.svg')
(x.dot(x.T + 1) - x.mean(axis=0)).std().visualize('dask.svg')
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
CompletableFuture:异步编程没那么难
我们在日常开发中经常用多线程优化性能,其实不过就是将串行操作变成并行操作。如果仔细观察,你还会发现在串行转换成并行的过程中,一定会涉及到异步化,例如下面的示例代码,现在是串行的,为了提升性能,我们得把它们并行化,那具体实施起来该怎么做呢?
码农架构
2020/10/26
7470
CompletableFuture:异步编程没那么难
并发编程系列-CompletableFuture
利用多线程来提升性能,实质上是将顺序执行的操作转化为并行执行。仔细观察后,你还会发现在顺序转并行的过程中,一定会牵扯到异步化。举个例子,现在下面这段示例代码是按顺序执行的,为了优化性能,我们需要将其改为并行执行。那具体的实施方法是什么呢?
架构狂人
2023/08/16
2040
并发编程系列-CompletableFuture
阅读 Flink 源码前必会的知识 - Java 8 异步编程 CompletableFuture 全解析
通常来说,程序都是顺序执行,同一时刻只会发生一件事情。如果一个函数依赖于另一个函数的结果,它只能等待那个函数结束才能继续执行,从用户角度来说,整个程序才算执行完毕。
kk大数据
2021/03/13
1.1K0
手把手教学妹CompletableFuture异步化,性能关系直接起飞!
由于 JDK1.5 Futrure 的 get 方法获取任务结果必须阻塞等待,Google 看不下去了,开发了 Guava 库
JavaEdge
2021/04/19
1.3K0
手把手教学妹CompletableFuture异步化,性能关系直接起飞!
CompletableFuture使用详解
在上一篇文章《CompletionService使用与源码分析》中,已经介绍过了Future的局限性,它没法直接对多个任务进行链式、组合等处理,需要借助并发工具类才能完成,实现逻辑比较复杂。
全栈程序员站长
2022/09/07
9440
CompletableFuture使用详解
Future和Callable学习
我们知道使用多线程时,最初的Thread到线程池,此时对于线程的使用,提供了其使用的复用率。而实现多线程的三种方式:继承Thread;实现Runnable接口,重写run方法;实现Callable接口,同时重写call方法,同时通过Future获取执行的返回值。也就是说callable执行任务,而Future拿到执行的结果。Future具有阻塞性在于其get()方法具有阻塞性,而isDone()是不具有阻塞性的。
路行的亚洲
2020/07/16
5050
聊聊异步编程的 7 种实现方式
当用户创建一笔电商交易订单时,要经历的业务逻辑流程还是很长的,每一步都要耗费一定的时间,那么整体的RT就会比较长。
微观技术
2022/09/28
6260
聊聊异步编程的 7 种实现方式
CompletableFuture 异步多线程,那叫一个优雅
虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,我们必须使用Future.get()的方式阻塞调用线程,或者使用轮询方式判断 Future.isDone 任务是否结束,再获取结果。
程序员大彬
2023/03/01
1.7K0
CompletableFuture 异步多线程,那叫一个优雅
大厂常用工具类Completablefuture的面试题都有哪些?这次我都替你收集好了。
最近在复习自己的简历,发现Completablefuture这个工具类怎么突然这么爱考了,牛客上的面经基本都在问这个工具类。因此今天我们就来专门总结一下Completablefuture常见的面试题。
程序员牛肉
2025/03/17
2920
大厂常用工具类Completablefuture的面试题都有哪些?这次我都替你收集好了。
有了Future为什么还要CompletableFuture?
cheese
2024/02/06
2510
有了Future为什么还要CompletableFuture?
不会用Java Future,我怀疑你泡茶没我快, 又是超长图文!!
现陆续将Demo代码和技术文章整理在一起 Github实践精选 ,方便大家阅读查看,本文同样收录在此,觉得不错,还请Star
用户4172423
2020/07/14
5740
理解Java8里面CompletableFuture异步编程
其中第三个特性,就是今天我们想要聊的话题,正是因为CompletableFuture的出现,才使得使用Java进行异步编程提供了可能。
我是攻城师
2018/11/23
16.7K0
CompletableFuture 使用详解
没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。
java404
2018/10/10
4.1K1
CompletableFuture详解
因为CompletableFuture实现了Future接口所以先看一下Future
用户10136162
2022/11/15
1.1K0
CompletableFuture详解
异步编程利器:CompletableFuture详解
最近刚好使用CompeletableFuture优化了项目中的代码,所以跟大家一起学习CompletableFuture。
捡田螺的小男孩
2021/06/15
7K0
异步编程利器:CompletableFuture详解
一网打尽:异步神器 CompletableFuture 万字详解!
CompletableFuture是jdk8的新特性。CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步会点、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
搜云库技术团队
2023/09/18
2.7K0
一网打尽:异步神器 CompletableFuture 万字详解!
【并发编程】异步编程CompletableFuture实战
在JDK8之前,我们使用的Java多线程变成,主要是 Thread+Runnable 来完成,但是这种方式有个弊端就是没有返回值。如果想要返回值怎么办呢,大多数人就会想到 Callable + Thread 的方式来获取到返回值。
互联网小阿祥
2023/05/28
1.1K0
【并发编程】异步编程CompletableFuture实战
CompletableFuture:supplyAsync与runAsync
CompletableFuture是Java 8中引入的一个类,用于简化异步编程和并发操作。它提供了一种方便的方式来处理异步任务的结果,以及将多个异步任务组合在一起执行。CompletableFuture支持链式操作,使得异步编程更加直观和灵活。
不惑
2023/11/14
1.3K0
聊聊Java中CompletableFuture的使用
CompletableFuture是java8引入的一个异步类,它最大的优势是可以在创建的对象中传入一个回调对象,在任务结束后(done或throw exception),自动调用回调对象的回调方法,而不用让主线程阻塞。
jinjunzhu
2020/08/20
9090
Java8异步利器CompletableFuture的骚操作
这篇关于CompletableFuture的文章在前一个月就写了一部分,后面没有时间去写,今天周末,所以就抽时间把它写完,因为CompletableFuture中的函数确实很多,也没必要一个一个的去写完,只是抽出大致的函数来说,因为CompletableFuture很像ES6中的Promise()函数,所以我们在学习的时候可以带着Promise()的思想去学习,异步编程不但能够提升我们的相应速度,也能使我们的代码更加简洁,但是我们是在用异步编程的时候也要充分考虑业务和方法是否合适异步操作,不然将会带来一些问题。
小四的技术之旅
2022/07/26
1.7K0
Java8异步利器CompletableFuture的骚操作
推荐阅读
相关推荐
CompletableFuture:异步编程没那么难
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验