读者来信
我之前是 1、先用arcgis 栅格转点 2、给点添加xy坐标 3、给添加xy坐标后的点通过空间连接的方式添加行政区属性 4、最后计算指定行政区的质心 之前的解决办法是用arcgis 完成第一步和第二步,虽然完成的很慢,但是看起来好像没太大问题 但是第三步用arcgis会卡死,后来用geopandas也会卡死,后来了解到dask-geopandas,但是处理了两百万个点左右好像也报错了,不知道是我写的代码有问题还是我对dask的理解有问题,想要请教一下大佬
读者的问题涉及到地理信息系统(GIS)操作的一系列步骤,具体包括将栅格数据转换为点数据、为这些点数据添加XY坐标、通过空间连接给这些点添加行政区属性、以及计算指定行政区的质心。读者在使用ArcGIS软件完成前两步时未遇到明显问题,但在执行第三步时遇到了性能瓶颈,即使用ArcGIS和GeoPandas进行空间连接操作时系统会卡死。为了解决这个问题,读者尝试使用了dask-geopandas来处理约两百万个点的数据,但似乎遇到了错误。
针对这个情况,我们可以从几个方面进行分析和建议:
性能瓶颈分析:
ArcGIS和GeoPandas在处理大量数据时可能会遇到性能问题,特别是在普通硬件上运行时。这是因为这些操作往往需要大量的内存和CPU资源。 空间连接特别是在点数据量很大时,是一个资源密集型的操作,因为它需要对每个点检查其与其他几何对象(如行政区边界)的空间关系。 dask-geopandas的使用:
dask-geopandas旨在解决类似的性能问题,通过并行计算和延迟执行来提高处理大规模地理空间数据的效率。如果在使用dask-geopandas时遇到错误,可能是由于多种原因导致的,包括但不限于代码问题、内存管理、任务调度等。 为了更好地诊断问题,需要检查错误消息的具体内容。这可能会指示是配置问题、资源不足还是代码逻辑错误。 优化建议:
资源分配:确保有足够的计算资源(CPU和内存)来处理数据。对于dask-geopandas,可以通过调整Dask的工作进程数和内存限制来优化性能。 代码审查:仔细检查实现代码,尤其是dask-geopandas的部分,确认是否正确使用了并行计算和数据分区功能。 批处理:如果可能,尝试将数据分成更小的批次进行处理,而不是一次性处理所有点。这可以帮助减少内存压力。 索引和优化:在进行空间连接之前,为行政区数据建立空间索引可以大大提高查询效率。
注意,运行前需要将input的rar文件解压后再运行程序
花了一番功夫解决环境问题,使用以下步骤即可使用dask_geopandas
In [1]:
!pip install dask_geopandas -i https://pypi.mirrors.ustc.edu.cn/simple/
!pip install PyGEOS -i https://pypi.mirrors.ustc.edu.cn/simple/
!pip install --upgrade shapely https://pypi.mirrors.ustc.edu.cn/simple/
!pip install pyogrio -i https://pypi.mirrors.ustc.edu.cn/simpl
将 GeoPandas DataFrame 转换为 Dask-GeoPandas DataFrame 首先,使用 GeoPandas 读取地理数据文件:
python import geopandas df = geopandas.read_file('...') # 使用你的文件路径替换 '...' 然后,将其转换为 Dask-GeoPandas DataFrame:
python import dask_geopandas
ddf = dask_geopandas.from_geopandas(df, npartitions=4) 默认情况下,这会根据行来简单地重新分区数据。但是,你也可以提供空间分区,以利用 GeoDataFrame 的空间结构。
python
ddf = ddf.spatial_shuffle() GeoPandas 的熟悉的空间属性和方法也可用,并且将并行计算:
python
ddf.geometry.area.compute()
ddf.within(polygon) 此外,如果你有一个分布式的 dask.dataframe,你可以将 x-y 点的列传递给 set_geometry 方法来设置几何形状。
python import dask.dataframe as dd import dask_geopandas
ddf = dd.read_csv('...') # 使用你的文件路径替换 '...'
ddf = ddf.set_geometry( dask_geopandas.points_from_xy(ddf, 'longitude', 'latitude') ) 目前支持 Parquet 和 Feather 文件格式的写入(以及读回):
python
ddf.to_parquet("path/to/dir/")
ddf = dask_geopandas.read_parquet("path/to/dir/") 传统的 GIS 文件格式可以读入到分区的 GeoDataFrame 中(需要 pyogrio),但不支持写入:
python
ddf = dask_geopandas.read_file("file.gpkg", npartitions=4) 以上就是如何使用 Dask-GeoPandas 对大型地理空间数据进行高效处理的简单示例。
In [2]:
import geopandas as gpd
import time # 添加时间模块
# 添加dask模块
import dask_geopandas
def process_row():
outwen = '/home/mw/input/dask6250/201105.shp'
bianjie = '/home/mw/input/dask6250/xian/2023xian.shp'
jiabianjie = './'
start_time3 = time.time()
# 读取输入和裁剪边界的 shapefile
target_gdf = gpd.read_file(outwen)
join_gdf = gpd.read_file(bianjie)
# 改成dask方式
target_gdfnew = dask_geopandas.from_geopandas(target_gdf, npartitions=4)
# 重新投影参与连接的边界以匹配目标几何图形的 CRS
join_gdf = join_gdf.to_crs(target_gdf.crs)
# 改成dask方式
join_gdfnew = dask_geopandas.from_geopandas(join_gdf, npartitions=4)
# 使用空间连接找到相交的部分
joined = gpd.sjoin(target_gdfnew, join_gdfnew, how='inner', predicate='intersects')
# 将 'bianjie' 中的属性添加到 'outwen' 中
joined = joined.drop(columns='index_right') # 移除多余的索引列
result = target_gdfnew.merge(joined, how='left', on=target_gdfnew.columns.to_list())
# 保存结果到输出边界
result.to_file(jiabianjie, encoding='utf-8-sig') # 确保使用正确的编码
end_time3 = time.time()
execution_time3 = end_time3 - start_time3
print(f"'{jiabianjie}'已添加边界,开始时间为:{start_time3:.2f},结束时间为:{end_time3:.2f},执行时间为:{execution_time3:.2f}秒")
process_row()
print('finish')
好,运行一段时间爆内存了,应该考虑以下优化策略:
你的代码先用geopandas读取Shapefile,然后转换为dask_geopandas对象。这个过程中,原始数据会完全加载到内存中,这可能是导致内存溢出的原因之一。相反,你应该直接使用dask_geopandas.read_file来避免将整个数据集一次性加载到内存:
python
target_dgdf = dask_geopandas.read_file(outwen, npartitions=4)
join_dgdf = dask_geopandas.read_file(bianjie, npartitions=4)
在数据处理过程中,尽量减少不必要的数据复制。例如,在合并或连接操作之前,仔细考虑是否所有列都需要参与操作。
在使用dask_geopandas进行空间连接时,确保操作是高效的。你的代码尝试使用geopandas.sjoin,但是应该使用dask_geopandas.sjoin。此外,确保在执行空间连接之前,两个数据集已经有了匹配的坐标参考系统(CRS)。这样可以避免在每个分区上重复昂贵的CRS转换操作。
npartitions的选择对性能和内存使用有重大影响。太少的分区可能会导致单个分区过大,而太多的分区则会增加调度开销。你可能需要实验不同的npartitions值来找到最佳平衡。
在保存结果时,如果尝试将整个处理后的数据集写入单个文件,这可能也会导致内存问题。dask_geopandas目前可能不支持直接写入文件格式如Shapefile,因为这通常涉及将数据集合并到单个分区。你可能需要先将数据写入Parquet等格式,或者手动分批写入。
对自己的资源修改npartitions参数
In [1]:
import dask_geopandas as dgd
import time
input_shapefile = '/home/mw/input/dask6250/201105.shp'
boundary_shapefile = '/home/mw/input/dask6250/xian/2023xian.shp'
output_directory = './output/'
def process_row(target_gdf, join_gdf, jiabianjie_pat):
start_time = time.time()
# 直接使用dask-geopandas读取shapefiles
target_dgdf = dgd.read_file(input_shapefile, npartitions=16)
join_dgdf = dgd.read_file(boundary_shapefile, npartitions=16)
# 确保边界shapefile与目标shapefile的CRS一致
join_dgdf = join_dgdf.to_crs(target_dgdf.crs)
# 使用空间连接找到相交的部分
joined = dgd.sjoin(target_dgdf, join_dgdf, how='inner', predicate='intersects')
# 移除多余的索引列
joined = joined.drop(columns='index_right')
joined.compute().to_file(output_directory + 'result.shp', driver='ESRI Shapefile', encoding='utf-8')
# end_time = time.time()
# print(f"已添加边界,开始时间为:{start_time:.2f},结束时间为:{end_time:.2f},执行时间为:{end_time - start_time:.2f}秒")
if __name__ == "__main__":
process_row(input_shapefile,boundary_shapefile,output_directory)
print('finish')
In [ ]:
import dask_geopandas as dgd
import time
import gc # 导入垃圾收集器
input_shapefile = '/home/mw/input/dask6250/201105.shp'
boundary_shapefile = '/home/mw/input/dask6250/xian/2023xian.shp'
output_directory = './'
def process_row(target_gdf, join_gdf, jiabianjie_pat):
start_time = time.time()
# 根据你的硬件配置调整npartitions,减少分区数以减少内存开销
target_dgdf = dgd.read_file(input_shapefile, npartitions=16) # 适当调整npartitions
join_dgdf = dgd.read_file(boundary_shapefile, npartitions=16) # 适当调整npartitions
join_dgdf = join_dgdf.to_crs(target_dgdf.crs)
joined = dgd.sjoin(target_dgdf, join_dgdf, how='inner', predicate='intersects')
joined = joined.drop(columns='index_right')
# 计算结果前启动垃圾收集
gc.collect()
joined.compute().to_file(output_directory + 'result.shp', driver='ESRI Shapefile', encoding='utf-8')
# 手动启动垃圾收集释放内存
gc.collect()
end_time = time.time()
print(f"已添加边界,开始时间为:{start_time:.2f},结束时间为:{end_time:.2f},执行时间为:{end_time - start_time:.2f}秒")
if __name__ == "__main__":
process_row(input_shapefile, boundary_shapefile, output_directory)
print('finish')
/opt/conda/lib/python3.9/site-packages/geopandas/_compat.py:111: UserWarning: The Shapely GEOS version (3.10.0-CAPI-1.16.0) is incompatible with the GEOS version PyGEOS was compiled with (3.10.4-CAPI-1.16.2). Conversions between both will be slow.
warnings.warn(
/opt/conda/lib/python3.9/site-packages/geopandas/io/file.py:362: FutureWarning: pandas.Int64Index is deprecated and will be removed from pandas in a future version. Use pandas.Index with the appropriate dtype instead.
pd.Int64Index,
分批运行与采用gpkg方式存储
In [3]:
import dask_geopandas as dgd
import time
import gc
from dask import delayed, compute # 从dask中导入compute函数
input_shapefile = '/home/mw/input/dask6250/201105.shp'
boundary_shapefile = '/home/mw/input/dask6250/xian/2023xian.shp'
output_directory = './'
@delayed
def process_batch(batch, join_gdf, output_path):
# 将边界数据转换为目标数据的坐标参考系统
join_gdf = join_gdf.to_crs(batch.crs)
# 执行空间连接
joined = dgd.sjoin(batch, join_gdf, how='inner', predicate='intersects')
# 删除不必要的列
joined = joined.drop(columns='index_right')
# 计算并保存结果
joined.compute().to_file(output_path, driver='GPKG', layer='result', encoding='utf-8') # 使用GeoPackage格式
def main():
start_time = time.time()
# 明确指定npartitions
target_dgdf = dgd.read_file(input_shapefile, npartitions=16) # 明确设置npartitions
join_dgdf = dgd.read_file(boundary_shapefile, npartitions=16) # 明确设置npartitions
# 将目标数据集分批处理
batches = target_dgdf.to_delayed()
tasks = [process_batch(batch, join_dgdf, f"{output_directory}result_{i}.gpkg") for i, batch in enumerate(batches)]
# 使用dask的compute函数来执行所有延迟任务
compute(*tasks)
gc.collect() # 手动启动垃圾收集释放内存
end_time = time.time()
print(f"已添加边界,开始时间为:{start_time:.2f},结束时间为:{end_time:.2f},执行时间为:{end_time - start_time:.2f}秒")
if __name__ == "__main__":
main()
print('finish')
/opt/conda/lib/python3.9/site-packages/geopandas/_compat.py:111: UserWarning: The Shapely GEOS version (3.10.0-CAPI-1.16.0) is incompatible with the GEOS version PyGEOS was compiled with (3.10.4-CAPI-1.16.2). Conversions between both will be slow.
warnings.warn(
注意,由于资源限制,以上最终的result并没有运行完全,可以看到project目录下还有一部分gpkg 因为输出文件大于1g的限制,还请有兴趣的在自己的电脑运行,根据相应资源修改参数
另外gpkg可以使用geopandas转为为需要的shp
In [ ]:
import geopandas as gpd
import pandas as pd
# GeoPackage文件列表
gpkg_files = ['path/to/your/first_file.gpkg', 'path/to/your/second_file.gpkg']
# 读取所有GeoPackage文件到GeoDataFrame列表中
gdf_list = [gpd.read_file(gpkg) for gpkg in gpkg_files]
# 合并GeoDataFrame对象
merged_gdf = pd.concat(gdf_list, ignore_index=True)
# 指定输出Shapefile的路径
output_shp_path = 'path/to/your/output_file.shp'
# 将合并后的GeoDataFrame保存为Shapefile
merged_gdf.to_file(output_shp_path, driver='ESRI Shapefile')
print(f"合并后的Shapefile已保存至:{output_shp_path}")
点击链接可查看完整代码与在线运行代码