sqlite3是一种很好的数据科学工程实践中保存数据(包括原始数据和中间结果存储)的方法。相比于csv/tsv、pickle、parquet,sqlite3的使用场景和意义被大量低估了。这里数据科学(data scientist),既指机器学习的数据处理,又指数据分析的数据处理。
csv存储效率低,基于字符解析,类型识别(特别datetime)还需要额外处理;pickle,parquet跨工具使用不友好;数据库/数据仓库具有强类型、ER数据模型、检索效率高,但是需要服务器、客户端,维护成本也很高,不像文件这么方便。sqlite3一定程度上数据科学散人进行数据探索的最佳选择:
from sqlite3 import connect
import csv
DB_PATH = "../data/tweets.sqlite"
with connect(DB_PATH) as db:
db.execute("""
create table if not exists tweets (
tweet_id primary key,
airline_sentiment,
airline_sentiment_confidence,
negativereason,
negativereason_confidence,
airline,
airline_sentiment_gold,
name,
negativereason_gold,
retweet_count,
text,
tweet_coord,
tweet_created,
tweet_location,
user_timezone
)
""")
with open('../data/tweets.csv',encoding='UTF-8') as csv_file:
reader = csv.reader(csv_file, delimiter=',')
_ = next(reader)
for row in reader:
with connect(DB_PATH) as db:
db.execute("insert into tweets values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) on conflict do nothing", tuple(row))
很多开源数据以json格式存储,一条记录就是一个嵌套的json体,如下:
一般建立连接的消耗是最大,并且批量写是效率最高的。
import json
import pandas as pd
from gzip import GzipFile
from sqlite3 import connect
with GzipFile('../data/hn_dump.json.gz', 'r') as fin:
data = json.loads(fin.read().decode('utf-8'))
DB_PATH = '../sqlite-olt.db'
with connect(DB_PATH) as db:
db.execute("create table if not exists hn_items_raw(data)")
def make_chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
for chunk in make_chunks(data, 1000):
with connect(DB_PATH) as db:
db.executemany("insert into hn_items_raw(data) values (?)",
[(json.dumps(item),) for item in chunk]
)
笔者做过实验,同样的pc平台写同样1000条记录:
with connect(DB_PATH) as db:
db.execute("drop view if exists hn_items_fields")
db.execute("""
create view if not exists hn_items_fields as
select
json_extract(data, '$.created_at') as created_at,
json_extract(data, '$.title') as title,
json_extract(data, '$.url') as url,
json_extract(data, '$.author') as author,
json_extract(data, '$.points') as points,
json_extract(data, '$.comment_text') as comment_text,
length(json_extract(data, '$.comment_text')) as comment_text_length,
json_extract(data, '$.story_text') as story_text,
json_extract(data, '$.story_id') as story_id,
json_extract(data, '$.story_title') as story_title,
json_extract(data, '$.story_url') as story_url,
json_extract(data, '$.story_text') as story_text,
json_extract(data, '$.parent_id') as parent_id,
json_extract(data, '$.relevancy_score') as relevancy_score,
json_extract(data, '$._tags') as tags
from hn_items_raw
""")
with connect(DB_PATH) as db:
hn_items_fields = pd.read_sql('select * from hn_items_fields', db)
hn_items_fields
这样建立了hn_items_raw的数据规整化操作保存在视图hn_items_fields中
如果where子句中用json_extract的结果进行过滤的话,其效率较低,考虑通过建立index来加速
filter_author_query = """
select json_extract(data, '$.author'), json_extract(data, '$.objectID')
from hn_items_raw
where json_extract(data, '$.author') = 'luu'
"""
%%timeit
with connect(DB_PATH) as db:
luu_df = pd.read_sql(filter_author_query, db)
建立index索引加速查询
create_author_idx_query = """
create index if not exists idx_author on hn_items_raw (json_extract(data, '$.author'))
"""
with connect(DB_PATH) as db:
db.execute(create_author_idx_query)
%%timeit
with connect(DB_PATH) as db:
luu_df = pd.read_sql(filter_author_query, db)
采用index后查询可以从 2.49 s 减少到23 ms。
除了上述实践小技巧,笔者还格外关注sqlite3落地应用(主要是端末设备)其他一些特性:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。