vader
是一种低延迟、高带宽的机制,用于通过共享内存在两个进程之间传输数据。此 BTL 只能在同一个节点上执行的进程之间使用, 参考: https://www.open-mpi.org/faq/?category=sm, 文件字节传输层 (fbtl) 框架为单独的阻塞和非阻塞读写操作提供了抽象。 此模块中接口提供的功能可用于实现 MPI I/O 中的相应操作。 但请注意,接口不是 MPI 单独读写操作的一对一映射,因为 fbtl 框架避免使用派生的 MPI 数据类型。将 MPI 派生数据类型映射/展开为(文件偏移量、内存地址、长度)向量的步骤在 IO 框架的 OMPIO 模块中完成。 这些是组件函数原型。这些函数指针进入组件结构。这些函数(query() 和 finalize() * 在 fbtl_base_select() 期间调用。每个组件都经过 query() 处理,随后,所有未选中的组件都经过 finalize() 处理,这样它们在 query() 期间执行的任何操作都可以撤消。按照类似的逻辑,在销毁通信器时选择的组件也会调用 finalize()。 因此,总结一下,每个组件都有 4 个函数: * 1. open() - 在 MPI_INIT 期间调用 * 2. close() - 在 MPI_FINALIZE 期间调用 * 3. query() - 调用以选择特定组件 * 4. finalize() - 在查询期间采取的操作必须撤消时调用高性能计算(HPC) 是使用多组尖端计算机系统执行标准商用计算系统无法实现的复杂模拟、计算和数据分析的艺术和科学。
HPC 计算机系统具有高速处理能力、高性能网络和大内存容量的特点,能够执行大量并行处理。超级计算机是一种非常先进的 HPC 计算机,可提供巨大的计算能力和速度,是高性能计算系统的关键组成部分。
近年来,HPC 已从一种专注于基于模拟的科学研究的工具发展成为同时运行模拟和机器学习 (ML)的工具。HPC 系统范围的扩大已获得发展势头,因为基于物理的模拟和机器学习的结合缩短了气候建模、药物发现、蛋白质折叠和计算流体力学 (CFD) 等领域的科学探索时间
下图为2021年交付给美国阿贡实验室的百亿亿次HPC系统(Aurora):
软件和加速包:
高性能计算一般使用MPI库完成并行消息传递, MPI 标准包括点对点消息传递、集体通信、组和通信器概念、进程拓扑、环境管理、进程创建和管理、单边通信、扩展集体操作、外部接口、I/O、一些杂项主题和多个工具接口。定义了 C 和 Fortran 的语言绑定。
广播(Broadcast):
全聚合从所有(K个)RANK收集, 每个RANK(N个值), 收集到大小为k * N的内存中, 然后将该结果分发到所有RANK
Allgather:
rank send buf recv buf
---- -------- --------
0 a,b,c MPI_Allgather a,b,c,A,B,C,#,@,%
1 A,B,C ----------------> a,b,c,A,B,C,#,@,%
2 #,@,% a,b,c,A,B,C,#,@,%
// SameResult
rank send buf recv buf
---- -------- --------
0 a MPI_Allgather a,A,#
1 A ----------------> a,A,#
2 # a,A,#
AlltoAll(也可以看作是一个全局转置操作,作用于数据块)
rank send buf recv buf
---- -------- --------
0 a,b,c MPI_Alltoall a,A,#
1 A,B,C ----------------> b,B,@
2 #,@,% c,C,%
(a more elaborate case with two elements per process 更复杂的情况)
rank send buf recv buf
---- -------- --------
0 a,b,c,d,e,f MPI_Alltoall a,b,A,B,#,@
1 A,B,C,D,E,F ----------------> c,d,C,D,%,$
2 #,@,%,$,&,* e,f,E,F,&,*
// SameResult
rank send buf recv buf
---- -------- --------
0 a,a,a MPI_Alltoall a,A,#
1 A,A,A ----------------> a,A,#
2 #,#,# a,A,#
MPI_Allgather 结束时,每个进程的接收缓冲区中都有完全相同的数据,并且每个进程都会为整个数组贡献一个值。例如,如果一组进程中的每个进程都需要与其他进程共享有关其状态的某个值,则每个进程都会提供其单个值。然后,这些值将发送给每个人,因此每个人都会拥有相同结构的副本。
MPI_Alltoall 不会向其他进程发送相同的值。每个进程不会提供应与其他进程共享的单个值,而是指定一个值提供给其他进程。换句话说,对于 n 个进程,每个进程必须指定要共享的 n 个值。然后,对于每个处理器 j,其第 k 个值将发送到接收缓冲区中进程 k 的第 j 个索引。如果每个进程对其他进程都有一条唯一的消息,则这很有用。
最后需要注意的是,如果每个进程都用相同的值填充其发送缓冲区,则运行 allgather 和 alltoall 的结果将相同。唯一的区别是 allgather 可能效率更高
MPI_Alltoall 的工作方式与 MPI_Scatter 和 MPI_Gather 的组合相同 - 每个进程中的发送缓冲区像 MPI_Scatter 中一样被拆分,然后由相应的进程收集每列数据块,其等级与数据块列的编号相匹配。MPI_Alltoall 也可以看作是作用于数据块的全局转置操作。
operation send buf size recv buf size
--------- ------------- -------------
MPI_Allgather sendcnt n_procs * sendcnt
MPI_Alltoall n_procs * sendcnt n_procs * sendcnt
规约(Reduce):
https://github.com/pmodels/mpich/blob/main/doc/wiki/developer_guide.md
顶级函数是 MPI 函数,例如 MPI_Send。您不会直接在我们当前的代码库中找到这些函数。它们由 maint/gen_binding_c.py 根据 src/binding/mpi_standard_api.txt 和其他元文件生成。MPI 函数处理参数验证、简单情况早期返回、标准错误行为以及调用具有必要参数转换的内部实现例程。这些功能包含大量样板,因此更适合脚本生成。PMPI 前缀函数名称用于支持 MPI 分析接口。当用户调用 MPI 函数(例如 MPI_Send)时,符号可能会链接到工具或分析库函数,该函数会拦截调用、执行其分析或分析,然后调用 PMPI_Send。因此,所有顶级函数都使用 PMPI_ 名称定义。这就是为什么 PMPI 名称经常出现在回溯日志中的原因。为了在没有工具库(常见情况)的情况下工作,PMPI 和 MPI 符号都已定义。如果编译器支持弱符号,MPI 名称就是链接到 PMPI 名称的弱符号。这就是我们在 Linux 上的做法。如果没有弱符号支持,顶级函数会被编译两次,一次使用 MPI 名称,第二次使用 PMPI 名称。这就是 MacOs 的工作方式。由于这一层主要是生成的,我们也将此层称为绑定层,这也广泛适用于 Fortran/CXX 绑定。
如:
src/mpi/romio/adio/include/mpipr.h
...
#define MPI_Send PMPI_Send
...
src/binding/abi/mpi_abi.h
...
int PMPI_Send(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm);
...
Elastic Fabric Adapter (EFA) 是一种网络设备,您可以将其连接到 Amazon EC2 实例以加速人工智能 (AI)、机器学习 (ML) 和高性能计算 (HPC) 应用程序。EFA 使您能够实现本地 AI/ML 或 HPC 集群的应用程序性能,同时具有 AWS 云提供的可扩展性、灵活性和弹性。
与传统上在基于云的 HPC 系统中使用的 TCP 传输相比,EFA 提供更低、更一致的延迟和更高的吞吐量。它增强了实例间通信的性能,这对于扩展 AI/ML 和 HPC 应用程序至关重要。它经过优化,可在现有的 AWS 网络基础设施上运行,并且可以根据应用程序要求进行扩展。
EFA 与 Libfabric 1.7.0 及更高版本集成,它支持用于 AI 和 ML 应用程序的 Nvidia Collective Communications Library (NCCL),以及用于 HPC 应用程序的 Open MPI 4 及更高版本和 Intel MPI 2019 Update 5 及更高版本
Libfabric 也称为开放结构接口 (OFI),为高性能并行和分布式应用程序定义了通信 API。它是一个低级通信库,抽象了各种网络技术。Libfabric 由 OFI 工作组(OFIWG,发音为“o-fee-wig”)开发,该工作组是 OpenFabrics联盟 - OFA的一个子组。OFIWG 向任何人开放,不仅限于 OFA 成员。
libfabric 的目标是定义接口,以在应用程序和底层结构服务之间实现紧密的语义映射。具体来说,libfabric软件接口是与fabric硬件提供商和应用程序开发商共同设计的,重点关注HPC用户的需求。Libfabric 支持多种通信语义,与结构和硬件实现无关,并利用和扩展现有的 RDMA 开源社区。
Libfabric 旨在最大限度地减少应用程序之间的阻抗失配,包括 MPI、SHMEM、数据存储和 PGAS 等中间件以及结构通信硬件。其接口针对高带宽、低延迟的 NIC,目标是扩展到数万个节点。
Libfabric 的目标是支持 Linux、Free BSD、Windows 和 OS X。我们尽力支持所有主要的现代 Linux 发行版;但是,验证仅限于 Red Hat Enterprise Linux (RHEL) 和 SUSE Linux Enterprise Server (SLES) 的最新 2-3 个版本。对特定操作系统版本或发行版的支持是特定于供应商的。例外情况是基于 tcp 和 udp 的套接字提供程序在所有平台上都可用
https://github.com/ofiwg/libfabric/blob/main/fabtests/man/fabtests.7.md
客户端和服务器以乒乓方式交换消息(对于乒乓命名测试)或单向传输消息(对于 bw 命名测试)。这些测试可以传输各种大小的消息,控制测试使用哪些功能,并报告性能数字。测试是根据 OSU MPI 提供的基准构建的。它们不能保证提供给定提供商或系统可能实现的最佳延迟或带宽性能数字。
fi_dgram_pingpong : Latency test for datagram endpoints
fi_msg_bw : Message transfer bandwidth test for connected (MSG) endpoints.
fi_msg_pingpong : Message transfer latency test for connected (MSG) endpoints.
fi_rdm_cntr_pingpong : Message transfer latency test for reliable-datagram (RDM) endpoints that uses counters as the completion mechanism.
fi_rdm_pingpong : Message transfer latency test for reliable-datagram (RDM) endpoints.
fi_rdm_tagged_bw : Tagged message bandwidth test for reliable-datagram (RDM) endpoints.
fi_rdm_tagged_pingpong : Tagged message latency test for reliable-datagram (RDM) endpoints.
fi_rma_bw : An RMA read and write bandwidth test for reliable (MSG and RDM) endpoints.
fi_rma_pingpong : An RMA write and writedata latency test for reliable-datagram (RDM) endpoints.
服务端:
export PATH=./build/fabtests/bin:$PATH
gdb --args ./build/fabtests/bin/fi_pingpong -e rdm -p "verbs;ofi_rxm" -m tagged -d mlx5_0 -v -I 2
#fi_pingpong -e rdm -p "verbs;ofi_rxm" -m tagged -d ib17-0 -v -I 2
客户端:
export PATH=./build/fabtests/bin:$PATH
fi_pingpong -e rdm -p "verbs;ofi_rxm" -m tagged -d ib17-0 -v -I 2
mpivars.sh
MPI参考
libfabric_mpi_collective
libfabric_mpi_collective_cmdline_and_log
libfabric_mpi_collective_log
libfabric_mpi_log
libfabric_mpi_runmultinode_sh_fi_multinode_coll
libfabric_mpi_send_recv
libfabric_mpi_send_recv_log
提交记录: util/collective: Add basic collective infrastructure
util/collective:添加基本集体基础设施结构 util_coll_mc 代表集体通信地址。此补丁将一个 coll_mc 指针添加到 util av,该指针对应于包含 AV 中所有地址的 av_set。当需要对整个 av 执行集体操作或创建 av_set 时,此指针会被延迟分配适当的值。fi_av_set 会在创建任何子集 av_set 之前为整个 av 创建 av_set 和相应的 coll_mc。util_av_set 结构保存其成员的 fi_addr 数组。在相应的 coll_mc 创建中,每个进程都会使用其 fi_addr 的索引作为排名。coll_mc 创建反过来对母 av_set 成员使用 allreduce 集体操作来获取上下文 id 变量中最小公共位集。位位置用作该 coll_mc 的通信器 id。此 comm id 用于标签中,以区分来自不同 coll_mc 的集体消息
fabtests/multinode/src/core_coll.c
struct coll_test tests[] = {
{
.name = "join_test",
.setup = coll_setup,
.run = join_test_run,
.teardown = coll_teardown,
.coll_op = FI_BARRIER,
.op = FI_NOOP,
.datatype = FI_VOID,
},
{
.name = "barrier_test",
.setup = coll_setup,
.run = barrier_test_run,
.teardown = coll_teardown,
.coll_op = FI_BARRIER,
.op = FI_NOOP,
.datatype = FI_VOID,
},
{
.name = "sum_all_reduce_test",
.setup = coll_setup,
.run = sum_all_reduce_test_run,
.teardown = coll_teardown,
.coll_op = FI_ALLREDUCE,
.op = FI_SUM,
.datatype = FI_UINT64,
},
{
.name = "sum_all_reduce_w_stride_test",
.setup = coll_setup_w_stride,
.run = sum_all_reduce_test_run,
.teardown = coll_teardown,
.coll_op = FI_ALLREDUCE,
.op = FI_SUM,
.datatype = FI_UINT64,
},
{
.name = "all_gather_test",
.setup = coll_setup,
.run = all_gather_test_run,
.teardown = coll_teardown,
.coll_op = FI_ALLGATHER,
.op = FI_NOOP,
.datatype = FI_UINT64,
},
{
.name = "scatter_test",
.setup = coll_setup,
.run = scatter_test_run,
.teardown = coll_teardown,
.coll_op = FI_SCATTER,
.op = FI_NOOP,
.datatype = FI_UINT64
},
{
.name = "broadcast_test",
.setup = coll_setup,
.run = broadcast_test_run,
.teardown = coll_teardown,
.coll_op = FI_BROADCAST,
.op = FI_NOOP,
.datatype = FI_UINT64
},
{
.name = "empty_test_to_stop_the_sequence_of_execution",
.run = NULL,
},
};
UCF Unified Communication Framework 统一通信框架, https://ucfconsortium.org/, 统一通信框架 - 行业、实验室和学术界之间的协作,为以数据为中心的高性能应用程序创建生产级通信框架和开放标准, 关于中佛罗里达大学, 开放式通信框架是协同设计的重要推动者,为异构协同处理元素的创新和开发提供了机会,异构协同处理元素可以协同、无缝地协同工作,从而为百亿亿级计算及其他计算实现强大而强大的生态系统。多核处理架构、定制 FPGA 处理元件、互连智能和数据感知存储的出现只是一些关键技术,这些技术依赖于在此类框架内有效通信的能力,以实现其能力的潜力。
演讲: https://ucfconsortium.org/presentations/
博客: https://ucfconsortium.org/blogs/
项目主页: https://github.com/openucx/ucx/wiki, 基础组件: https://github.com/openucx/ucx/wiki/Infrastructure-and-Tools, 高层设计: https://github.com/openucx/ucx/wiki/High-Level-design
架构和基准测试:
主流程(服务端或客户端):
1. 主函数中解析命令行参数(parse_cmd), 设置默认服务端口
2. 初始化上下文(ucs_async_context_create, 异步事件上下文用于管理定时器和FD通知), 在其中, 初始化多生产者/多消费者队列(ucs_mpmc_queue_init), 初始化非阻塞异步轮询器(ucs_async_poll_init), 初始化可重入自旋锁上下文等
3. 创建工人(uct_worker_create), 工人代表着 progress 的引擎。 可以在应用程序中创建多个进度引擎,例如供多个线程使用
4. 根据入参查找期望的传输层(dev_tl_lookup, 由最小延迟决定要使用的设备和传输)
5. 设置回调(uct_iface_set_am_handler), 设置服务端接收到客户端数据后的回调
6. 建立socket连接(connect_common), 服务端监听端口, 等待客户端发起socket连接
7. 客户端连接服务端后, 两边交换地址(sendrecv, 先通过socket发送和接收长度, 然后发送和接收地址, 交换地址)
8. 创建端点(uct_ep_create), 获取端点地址(uct_ep_get_address), 连接对等端点(uct_ep_connect_to_ep, 内部通过 ibv_modify_qp 设置QP状态机建立QP连接)
9. 连接建立后, 客户端调用短消息(do_am_short)/缓冲区(do_am_bcopy)/零拷贝(do_am_zcopy)发送数据
10. 显示驱动工人推进(uct_worker_progress, 该例程显式地处理任何未完成的通信操作和活动消息请求, 底层通过poll网卡完成事件,ibv_poll_cq)
11. 资源销毁(uct_ep_destroy,free其他资源等)
代码位置: examples/uct_hello_world.c
编译: cd examples; make && ./uct_hello_world
•
服务端执行(指定RDMA网口和零拷贝模式):
/home/xb/project/ucx/examples/.libs/lt-uct_hello_world -d mlx5_0:1 -t rc_verbs -z
•
客户端执行(指定RDMA网口和零拷贝模式, 以及服务端IP):
/home/xb/project/ucx/examples/.libs/lt-uct_hello_world -d mlx5_0:1 -t rc_verbs -n 172.17.29.63 -z
基准测试:
服务端:
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/xb/project/ucx/install-debug/lib
export PATH=/home/xb/project/ucx/install-debug/bin:$PATH
#gdb --args ucx_perftest -c 0
ucx_perftest -c 0
客户端:
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/xb/project/ucx/install-debug/lib
export PATH=/home/xb/project/ucx/install-debug/bin:$PATH
#
ucx_perftest 172.17.29.63 -t tag_lat -c 1
UCT对外API的头文件: src/uct/api/uct.h
PUT操作
uct_ep_put_short
uct_ep_put_bcopy
uct_ep_put_zcopy
GET操作
uct_ep_get_short
uct_ep_get_bcopy
uct_ep_get_zcopy
AM活动消息
uct_ep_am_short
uct_ep_am_short_iov
uct_ep_am_bcopy
uct_ep_am_zcopy
原子操作
uct_ep_atomic_cswap64
uct_ep_atomic_cswap32
uct_ep_atomic32_post
uct_ep_atomic64_post
uct_ep_atomic32_fetch
uct_ep_atomic64_fetch
TAG操作
uct_ep_tag_eager_short
uct_ep_tag_eager_bcopy
uct_ep_tag_eager_zcopy
uct_ep_tag_rndv_zcopy
uct_ep_tag_rndv_cancel
uct_ep_tag_rndv_request
uct_iface_tag_recv_zcopy
uct_iface_tag_recv_cancel
分布式异步对象存储 (DAOS) 是一个开源对象存储,专为大规模分布式非易失性存储器 (NVM) 而设计。DAOS 利用下一代 NVM 技术,如 Intel© Optane™ 持久内存和 NVM Express (NVMe),同时在商用硬件之上提供键值存储接口,提供事务性非阻塞 I/O、具有自我修复功能的高级数据保护、端到端数据完整性、细粒度数据控制和弹性存储等功能,以优化性能和成本。除了本地部署外,DAOS 还可以部署在云环境中
内部组件
[component]
component=daos
[commit_versions]
argobots=v1.2
fuse=fuse-3.16.2
pmdk=2.1.0
isal=v2.30.0
isal_crypto=v2.23.0
spdk=v22.01.2
ofi=v1.22.0
mercury=v2.4.0
protobufc=v1.3.3
ucx=v1.14.1
[repos]
argobots=https://github.com/pmodels/argobots.git
fuse=https://github.com/libfuse/libfuse.git
pmdk=https://github.com/pmem/pmdk.git
isal=https://github.com/intel/isa-l.git
isal_crypto=https://github.com/intel/isa-l_crypto.git
spdk=https://github.com/spdk/spdk.git
ofi=https://github.com/ofiwg/libfabric.git
mercury=https://github.com/mercury-hpc/mercury.git
protobufc=https://github.com/protobuf-c/protobuf-c.git
ucx=https://github.com/openucx/ucx.git
[patch_versions]
spdk=https://github.com/spdk/spdk/commit/b0aba3fcd5aceceea530a702922153bc75664978.diff,https://github.com/spdk/spdk/commit/445a4c808badbad3942696ecf16fa60e8129a747.diff
fuse=https://github.com/libfuse/libfuse/commit/c9905341ea34ff9acbc11b3c53ba8bcea35eeed8.diff
mercury=https://raw.githubusercontent.com/daos-stack/mercury/f3dc286fb40ec1a3a38a2e17c45497bc2aa6290d/na_ucx.patch
pmdk=https://github.com/pmem/pmdk/commit/2abe15ac0b4eed894b6768cd82a3b0a7c4336284.diff
argobots=https://github.com/pmodels/argobots/pull/397/commits/411e5b344642ebc82190fd8b125db512e5b449d1.diff,https://github.com/pmodels/argobots/commit/bb0c908abfac4bfe37852eee621930634183c6aa.diff
DAOS与并行文件系统
https://docs.daos.io/latest/testing/ior/
ior 基准测试工具可用于生成 HPC 类型的 I/O 负载。这是一个 MPI 并行工具,需要 MPI 来启动和控制客户端节点上的 IOR 进程。为此,可以使用任何 MPI 实现。IOR 还具有 MPI-IO 后端,为了将此 MPI-IO 后端与 DAOS 一起使用,必须使用包含 DAOS ROMIO 后端的 MPI 堆栈来构建和运行 IOR。有关详细信息,请参阅 MPI-IO 支持(https://docs.daos.io/latest/user/mpi-io/)。除了默认的 POSIX API 之外,IOR 还原生支持直接使用 DAOS 文件系统 (DFS) I/O 调用而不是 POSIX I/O 调用的 DFS 后端。有关 IOR 的 DAOS DFS 后端的详细信息,请参阅 IOR github 存储库中的 README_DAOS。用于对元数据性能进行基准测试的 mdtest 工具与 IOR 工具包含在同一个存储库中。与 IOR 一样,它也是一个 MPI 并行应用程序。管理指南的性能调整部分包含有关 IOR 和 mdtest 的更多信息。
编译ior和参数解析:
Get IOR:
git clone https://github.com/hpc/ior.git
Build:
After configure and building MPICH (see directions here):
cd ior/
./bootstrap
mkdir build; cd build;
MPICC=mpicc ../configure --with-mpiio
make
Run:
Launch server(s)
Using all the directions from this page for client side execution:
create pool with dmg and export related env variables
cd into the ior build dir from above
cd src/
mpirun -np 8 --hostfile ~/cli_hosts ./ior -a MPIIO -w -W -r -R -t 1m -b 1g -o daos:testFile -V -S -C -c
the ior options can be varied for different MPI-IO configurations:
The main option is the file name to include daos: prefix to use the MPI-IO DAOS ADIO driver
-a apistr
-b blockSize
-w -W -r -R
-k (keep file) then relaunch with -r -R to check data after relaunch
-F for file per process (default is single shared file)
-C reorderTasks, change task order for data read back
-c for using MPI-IO collective IO calls
-V for using MPI file views
-S to indicate strided IO pattern
-d interTestDelay
-t transferSize
-r readFile
-R checkRead
-w writeFile
-W checkWrite
-i repetitions
https://ior.readthedocs.io/en/latest/userDoc/options.html
option_help * createGlobalOptions(IOR_param_t * params){
char APIs[1024];
char APIs_legacy[1024];
aiori_supported_apis(APIs, APIs_legacy, IOR);
char * apiStr = safeMalloc(1024);
sprintf(apiStr, "API for I/O [%s]", APIs);
option_help o [] = {
{'a', NULL, apiStr, OPTION_OPTIONAL_ARGUMENT, 's', & params->api},
{'A', NULL, "refNum -- user supplied reference number to include in the summary", OPTION_OPTIONAL_ARGUMENT, 'd', & params->referenceNumber},
{'b', NULL, "blockSize -- contiguous bytes to write per task (e.g.: 8, 4k, 2m, 1g)", OPTION_OPTIONAL_ARGUMENT, 'l', & params->blockSize},
{'c', "collective", "Use collective I/O", OPTION_FLAG, 'd', & params->collective},
{'C', NULL, "reorderTasks -- changes task ordering for readback (useful to avoid client cache)", OPTION_FLAG, 'd', & params->reorderTasks},
{'d', NULL, "interTestDelay -- delay between reps in seconds", OPTION_OPTIONAL_ARGUMENT, 'd', & params->interTestDelay},
{'D', NULL, "deadlineForStonewalling -- seconds before stopping write or read phase", OPTION_OPTIONAL_ARGUMENT, 'd', & params->deadlineForStonewalling},
{.help=" -O stoneWallingWearOut=1 -- once the stonewalling timeout is over, all process finish to access the amount of data", .arg = OPTION_OPTIONAL_ARGUMENT},
{.help=" -O stoneWallingWearOutIterations=N -- stop after processing this number of iterations, needed for reading data back written with stoneWallingWearOut", .arg = OPTION_OPTIONAL_ARGUMENT},
{.help=" -O stoneWallingStatusFile=FILE -- this file keeps the number of iterations from stonewalling during write and allows to use them for read", .arg = OPTION_OPTIONAL_ARGUMENT},
{.help=" -O minTimeDuration=0 -- minimum Runtime for the run (will repeat from beginning of the file if time is not yet over)", .arg = OPTION_OPTIONAL_ARGUMENT},
#ifdef HAVE_CUDA
{.help=" -O allocateBufferOnGPU=X -- allocate I/O buffers on the GPU: X=1 uses managed memory - verifications are run on CPU; X=2 managed memory - verifications on GPU; X=3 device memory with verifications on GPU.", .arg = OPTION_OPTIONAL_ARGUMENT},
{.help=" -O GPUid=X -- select the GPU to use, use -1 for round-robin among local procs.", .arg = OPTION_OPTIONAL_ARGUMENT},
#ifdef HAVE_GPU_DIRECT
{0, "gpuDirect", "allocate I/O buffers on the GPU and use gpuDirect to store data; this option is incompatible with any option requiring CPU access to data.", OPTION_FLAG, 'd', & params->gpuDirect},
#endif
#endif
{'e', NULL, "fsync -- perform a fsync() operation at the end of each read/write phase", OPTION_FLAG, 'd', & params->fsync},
{'E', NULL, "useExistingTestFile -- do not remove test file before write access", OPTION_FLAG, 'd', & params->useExistingTestFile},
{'f', NULL, "scriptFile -- test script name", OPTION_OPTIONAL_ARGUMENT, 's', & params->testscripts},
{'F', NULL, "filePerProc -- file-per-process", OPTION_FLAG, 'd', & params->filePerProc},
{'g', NULL, "intraTestBarriers -- use barriers between open, write/read, and close", OPTION_FLAG, 'd', & params->intraTestBarriers},
/* This option toggles between Incompressible Seed and Time stamp sig based on -l,
* so we'll toss the value in both for now, and sort it out in initialization
* after all the arguments are in and we know which it keep.
*/
{'G', NULL, "setTimeStampSignature -- set value for time stamp signature/random seed", OPTION_OPTIONAL_ARGUMENT, 'd', & params->setTimeStampSignature},
{'i', NULL, "repetitions -- number of repetitions of test", OPTION_OPTIONAL_ARGUMENT, 'd', & params->repetitions},
{'j', NULL, "outlierThreshold -- warn on outlier N seconds from mean", OPTION_OPTIONAL_ARGUMENT, 'd', & params->outlierThreshold},
{'k', NULL, "keepFile -- don't remove the test file(s) on program exit", OPTION_FLAG, 'd', & params->keepFile},
{'K', NULL, "keepFileWithError -- keep error-filled file(s) after data-checking", OPTION_FLAG, 'd', & params->keepFileWithError},
{'l', "dataPacketType", "datapacket type-- type of packet that will be created [offset|incompressible|timestamp|random|o|i|t|r]", OPTION_OPTIONAL_ARGUMENT, 's', & params->buffer_type},
{'m', NULL, "multiFile -- use number of reps (-i) for multiple file count", OPTION_FLAG, 'd', & params->multiFile},
{'M', NULL, "memoryPerNode -- hog memory on the node (e.g.: 2g, 75%)", OPTION_OPTIONAL_ARGUMENT, 's', & params->memoryPerNodeStr},
{'N', NULL, "numTasks -- number of tasks that are participating in the test (overrides MPI)", OPTION_OPTIONAL_ARGUMENT, 'd', & params->numTasks},
{'o', NULL, "testFile -- full name for test", OPTION_OPTIONAL_ARGUMENT, 's', & params->testFileName},
{'O', NULL, "string of IOR directives (e.g. -O checkRead=1,GPUid=2)", OPTION_OPTIONAL_ARGUMENT, 'p', & decodeDirectiveWrapper},
{'Q', NULL, "taskPerNodeOffset for read tests use with -C & -Z options (-C constant N, -Z at least N)", OPTION_OPTIONAL_ARGUMENT, 'd', & params->taskPerNodeOffset},
{'r', NULL, "readFile -- read existing file", OPTION_FLAG, 'd', & params->readFile},
{'R', NULL, "checkRead -- verify that the output of read matches the expected signature (used with -G)", OPTION_FLAG, 'd', & params->checkRead},
{'s', NULL, "segmentCount -- number of segments", OPTION_OPTIONAL_ARGUMENT, 'l', & params->segmentCount},
{'t', NULL, "transferSize -- size of transfer in bytes (e.g.: 8, 4k, 2m, 1g)", OPTION_OPTIONAL_ARGUMENT, 'l', & params->transferSize},
{'T', NULL, "maxTimeDuration -- max time in minutes executing repeated test; it aborts only between iterations and not within a test!", OPTION_OPTIONAL_ARGUMENT, 'd', & params->maxTimeDuration},
{'u', NULL, "uniqueDir -- use unique directory name for each file-per-process", OPTION_FLAG, 'd', & params->uniqueDir},
{'v', NULL, "verbose -- output information (repeating flag increases level)", OPTION_FLAG, 'd', & params->verbose},
{'w', NULL, "writeFile -- write file", OPTION_FLAG, 'd', & params->writeFile},
{'W', NULL, "checkWrite -- check read after write", OPTION_FLAG, 'd', & params->checkWrite},
{'x', NULL, "singleXferAttempt -- do not retry transfer if incomplete", OPTION_FLAG, 'd', & params->singleXferAttempt},
{'X', NULL, "reorderTasksRandomSeed -- random seed for -Z option", OPTION_OPTIONAL_ARGUMENT, 'd', & params->reorderTasksRandomSeed},
{'y', NULL, "dualMount -- use dual mount points for a filesystem", OPTION_FLAG, 'd', & params->dualMount},
{'Y', NULL, "fsyncPerWrite -- perform sync operation after every write operation", OPTION_FLAG, 'd', & params->fsyncPerWrite},
{'z', NULL, "randomOffset -- access is to shuffled, not sequential, offsets within a file, specify twice for random (potentially overlapping)", OPTION_FLAG, 'd', & params->randomOffset},
{0, "randomPrefill", "For random -z access only: Prefill the file with this blocksize, e.g., 2m", OPTION_OPTIONAL_ARGUMENT, 'l', & params->randomPrefillBlocksize},
{0, "random-offset-seed", "The seed for -z", OPTION_OPTIONAL_ARGUMENT, 'd', & params->randomSeed},
{'Z', NULL, "reorderTasksRandom -- changes task ordering to random select regions for readback, use twice for shuffling", OPTION_FLAG, 'd', & params->reorderTasksRandom},
{0, "warningAsErrors", "Any warning should lead to an error.", OPTION_FLAG, 'd', & params->warningAsErrors},
{.help=" -O summaryFile=FILE -- store result data into this file", .arg = OPTION_OPTIONAL_ARGUMENT},
{.help=" -O summaryFormat=[default,JSON,CSV] -- use the format for outputting the summary", .arg = OPTION_OPTIONAL_ARGUMENT},
{.help=" -O saveRankPerformanceDetailsCSV=<FILE> -- store the performance of each rank into the named CSV file.", .arg = OPTION_OPTIONAL_ARGUMENT},
{.help=" -O savePerOpDataCSV=<FILE> -- store the performance of each rank into an individual file prefixed with this option.", .arg = OPTION_OPTIONAL_ARGUMENT},
{0, "dryRun", "do not perform any I/Os just run evtl. inputs print dummy output", OPTION_FLAG, 'd', & params->dryRun},
LAST_OPTION,
};
option_help * options = malloc(sizeof(o));
memcpy(options, & o, sizeof(o));
return options;
}
$ module load mpi/mpich-x86_64 # or any other MPI stack
$ mpirun -hostfile /path/to/hostfile_clients -np 30 <your_dir>/bin/ior -a POSIX -b 5G -t 1M -v -W -w -r -R -i 1 -o /tmp/daos_dfuse/testfile
IOR-3.4.0+dev: MPI Coordinated Test of Parallel I/O
Began : Thu Apr 29 23:23:09 2021
Command line : ior -a POSIX -b 5G -t 1M -v -W -w -r -R -i 1 -o /tmp/daos_dfuse/testfile
Machine : Linux wolf-86.wolf.hpdd.intel.com
Start time skew across all tasks: 0.00 sec
TestID : 0
StartTime : Thu Apr 29 23:23:09 2021
Path : /tmp/daos_dfuse/testfile
FS : 789.8 GiB Used FS: 16.5% Inodes: -0.0 Mi Used Inodes: 0.0%
Participating tasks : 30
Options:
api : POSIX
apiVersion :
test filename : /tmp/daos_dfuse/testfile
access : single-shared-file
type : independent
segments : 1
ordering in a file : sequential
ordering inter file : no tasks offsets
nodes : 3
tasks : 30
clients per node : 10
repetitions : 1
xfersize : 1 MiB
blocksize : 5 GiB
aggregate filesize : 150 GiB
verbose : 1
Results:
access bw(MiB/s) IOPS Latency(s) block(KiB) xfer(KiB) open(s) wr/rd(s) close(s) total(s) iter
------ --------- ---- ---------- ---------- --------- -------- -------- -------- -------- ----
Commencing write performance test: Thu Apr 29 23:23:09 2021
write 1299.23 1299.84 0.022917 5242880 1024.00 10.79 118.17 0.000377 118.22 0
Verifying contents of the file(s) just written.
Thu Apr 29 23:25:07 2021
Commencing read performance test: Thu Apr 29 23:25:35 2021
read 5429 5431 0.005523 5242880 1024.00 0.012188 28.28 0.000251 28.29 0
Max Write: 1299.23 MiB/sec (1362.35 MB/sec)
Max Read: 5429.38 MiB/sec (5693.11 MB/sec)
Summary of all tests:
Operation Max(MiB) Min(MiB) Mean(MiB) StdDev Max(OPs) Min(OPs) Mean(OPs) StdDev Mean(s) Stonewall(s) Stonewall(MiB) Test# #Tasks tPN reps fPP reord reordoff reordrand seed segcnt blksiz xsize aggs(MiB) API RefNum
write 1299.23 1299.23 1299.23 0.00 1299.23 1299.23 1299.23 0.00 118.22343 NA NA 0 30 10 1 0 0 1 0 0 1 5368709120 1048576 153600.0 POSIX 0
read 5429.38 5429.38 5429.38 0.00 5429.38 5429.38 5429.38 0.00 28.29054 NA NA 0 30 10 1 0 0 1 0 0 1 5368709120 1048576 153600.0 POSIX 0
Finished : Thu Apr 29 23:26:03 2021
元数据测试:
$ mpirun -hostfile /path/to/hostfile_clients -np 10 <your_dir>/bin/mdtest -a POSIX -z 0 -F -C -i 1 -n 3334 -e 4096 -d /tmp/daos_dfuse/ -w 4096
-- started at 04/29/2021 23:28:11 --
mdtest-3.4.0+dev was launched with 10 total task(s) on 3 node(s)
Command line used: mdtest '-a' 'POSIX' '-z' '0' '-F' '-C' '-i' '1' '-n' '3334' '-e' '4096' '-d' '/tmp/daos_dfuse/' '-w' '4096'
Path: /tmp/daos_dfuse
FS: 36.5 GiB Used FS: 18.8% Inodes: 2.3 Mi Used Inodes: 5.9%
Nodemap: 1001001001
10 tasks, 33340 files
SUMMARY rate: (of 1 iterations)
Operation Max Min Mean Std Dev
--------- --- --- ---- -------
File creation : 2943.697 2943.674 2943.686 0.006
File stat : 0.000 0.000 0.000 0.000
File read : 0.000 0.000 0.000 0.000
File removal : 0.000 0.000 0.000 0.000
Tree creation : 1079.858 1079.858 1079.858 0.000
Tree removal : 0.000 0.000 0.000 0.000
-- finished at 04/29/2021 23:28:22 --
参考结果:
ior中抽象IO(aio)回调定义:
typedef struct ior_aiori {
char *name;
char *name_legacy;
aiori_fd_t *(*create)(char *, int iorflags, aiori_mod_opt_t *);
int (*mknod)(char *);
aiori_fd_t *(*open)(char *, int iorflags, aiori_mod_opt_t *);
/*
Allow to set generic transfer options that shall be applied to any subsequent IO call.
*/
void (*xfer_hints)(aiori_xfer_hint_t * params);
IOR_offset_t (*xfer)(int access, aiori_fd_t *, IOR_size_t *,
IOR_offset_t size, IOR_offset_t offset, aiori_mod_opt_t * module_options);
void (*close)(aiori_fd_t *, aiori_mod_opt_t * module_options);
void (*remove)(char *, aiori_mod_opt_t * module_options);
char* (*get_version)(void);
void (*fsync)(aiori_fd_t *, aiori_mod_opt_t * module_options);
IOR_offset_t (*get_file_size)(aiori_mod_opt_t * module_options, char * filename);
int (*statfs) (const char *, ior_aiori_statfs_t *, aiori_mod_opt_t * module_options);
int (*mkdir) (const char *path, mode_t mode, aiori_mod_opt_t * module_options);
int (*rmdir) (const char *path, aiori_mod_opt_t * module_options);
int (*access) (const char *path, int mode, aiori_mod_opt_t * module_options);
int (*stat) (const char *path, struct stat *buf, aiori_mod_opt_t * module_options);
void (*initialize)(aiori_mod_opt_t * options); /* called once per program before MPI is started */
void (*finalize)(aiori_mod_opt_t * options); /* called once per program after MPI is shutdown */
int (*rename) (const char *oldpath, const char *newpath, aiori_mod_opt_t * module_options);
option_help * (*get_options)(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t* init_values); /* initializes the backend options as well and returns the pointer to the option help structure */
int (*check_params)(aiori_mod_opt_t *); /* check if the provided module_optionseters for the given test and the module options are correct, if they aren't print a message and exit(1) or return 1*/
void (*sync)(aiori_mod_opt_t * ); /* synchronize every pending operation for this storage */
bool enable_mdtest;
} ior_aiori_t;
daos回调实现:
ior_aiori_t dfs_aiori = {
.name = "DFS",
.initialize = DFS_Init,
.finalize = DFS_Finalize,
.create = DFS_Create,
.open = DFS_Open,
.xfer = DFS_Xfer,
.close = DFS_Close,
.remove = DFS_Delete,
.get_version = DFS_GetVersion,
.fsync = DFS_Fsync,
.sync = DFS_Sync,
.get_file_size = DFS_GetFileSize,
.xfer_hints = DFS_init_xfer_options,
.statfs = DFS_Statfs,
.mkdir = DFS_Mkdir,
.rename = DFS_Rename,
.rmdir = DFS_Rmdir,
.access = DFS_Access,
.stat = DFS_Stat,
.get_options = DFS_options,
.check_params = DFS_check_params,
.enable_mdtest = true,
};
ior
int main(int argc, char **argv) -> ior_main(argc, argv)
MPI_CHECK(MPI_Init(&argc, &argv), "cannot initialize MPI")
MPI_CHECK(MPI_Comm_rank(MPI_COMM_WORLD, &rank), "cannot get rank")
InitTests(tests_head)
DistributeHints(com)
MPI_CHECK(MPI_Bcast(&hintCount, sizeof(hintCount), MPI_BYTE, 0, com), "cannot broadcast hints")
for (i = 0; i < hintCount; i++)
MPI_CHECK(MPI_Bcast(&hint[i], MAX_STR, MPI_BYTE, 0, com), "cannot broadcast hints")
params->tasksBlockMapping = QueryNodeMapping(com,false)
MPI_Comm_size(comm, &num_ranks)
MPI_Bcast(roothost, MAX_PATHLEN, MPI_CHAR, 0, comm)
MPI_Gather( &same_as_root, 1, MPI_INT, node_map, 1, MPI_INT, 0, comm)
MPI_Bcast(&ret, 1, MPI_INT, 0, comm)
for (tptr = tests_head; tptr != NULL; tptr = tptr->next)
int participate = test_initialize(tptr)
backend->initialize(test->params.backend_options) -> DFS_Init(aiori_mod_opt_t * options)
objectClass = daos_oclass_name2id(o->oclass)
rc = daos_init()
rc = d_hash_table_create(D_HASH_FT_EPHEMERAL | D_HASH_FT_NOLOCK | D_HASH_FT_LRU, 4, NULL, &hdl_hash_ops, &aiori_dfs_hash)
rc = daos_pool_connect(o->pool, o->group, DAOS_PC_RW, &poh, &pool_info, NULL)
rc = daos_cont_open(poh, o->cont, DAOS_COO_RW, &coh, &co_info, NULL)
rc = dfs_cont_create_with_label(poh, o->cont, NULL, NULL, &coh, NULL)
rc = dfs_mount(poh, coh, O_RDWR, &dfs)
HandleDistribute(POOL_HANDLE)
MPI_CHECK(MPI_Bcast(&global.iov_buf_len, 1, MPI_UINT64_T, 0, testComm),
MPI_CHECK(MPI_Bcast(global.iov_buf, global.iov_buf_len, MPI_BYTE, 0, testComm),
TestIoSys(tptr)
WriteOrReadSingle
amtXferred = backend->xfer(access, fd, buffer, transfer, offset, test->backend_options) -> static IOR_offset_t DFS_Xfer(int access, aiori_fd_t *file, IOR_size_t *buffer, IOR_offset_t length, IOR_offset_t off, aiori_mod_opt_t *param)
while (remaining > 0)
d_iov_set(&iov, (void *)ptr, remaining)
if (access == WRITE)
rc = dfs_write(dfs, obj, &sgl, off, NULL)
else rc = dfs_read(dfs, obj, &sgl, off, &ret, NULL)
ShowTestEnd(tptr)
test_finalize(tptr)
MPI_CHECK(MPI_Finalize(), "cannot finalize MPI")
DestroyTests(tests_head)
src/mpi/romio/adio/include/adioi.h
struct ADIOI_Fns_struct {
void (*ADIOI_xxx_Open) (ADIO_File fd, int *error_code);
void (*ADIOI_xxx_OpenColl) (ADIO_File fd, int rank, int access_mode, int *error_code);
void (*ADIOI_xxx_ReadContig) (ADIO_File fd, void *buf, MPI_Aint count,
MPI_Datatype datatype, int file_ptr_type,
ADIO_Offset offset, ADIO_Status * status, int *error_code);
void (*ADIOI_xxx_WriteContig) (ADIO_File fd, const void *buf, MPI_Aint count,
MPI_Datatype datatype, int file_ptr_type,
ADIO_Offset offset, ADIO_Status * status, int *error_code);
void (*ADIOI_xxx_ReadStridedColl) (ADIO_File fd, void *buf, MPI_Aint count,
MPI_Datatype datatype, int file_ptr_type,
ADIO_Offset offset, ADIO_Status * status, int *error_code);
void (*ADIOI_xxx_WriteStridedColl) (ADIO_File fd, const void *buf, MPI_Aint count,
MPI_Datatype datatype, int file_ptr_type,
ADIO_Offset offset, ADIO_Status * status, int *error_code);
ADIO_Offset(*ADIOI_xxx_SeekIndividual) (ADIO_File fd, ADIO_Offset offset,
int whence, int *error_code);
void (*ADIOI_xxx_Fcntl) (ADIO_File fd, int flag, ADIO_Fcntl_t * fcntl_struct, int *error_code);
void (*ADIOI_xxx_SetInfo) (ADIO_File fd, MPI_Info users_info, int *error_code);
void (*ADIOI_xxx_ReadStrided) (ADIO_File fd, void *buf, MPI_Aint count,
MPI_Datatype datatype, int file_ptr_type,
ADIO_Offset offset, ADIO_Status * status, int *error_code);
void (*ADIOI_xxx_WriteStrided) (ADIO_File fd, const void *buf, MPI_Aint count,
MPI_Datatype datatype, int file_ptr_type,
ADIO_Offset offset, ADIO_Status * status, int *error_code);
void (*ADIOI_xxx_Close) (ADIO_File fd, int *error_code);
void (*ADIOI_xxx_IreadContig) (ADIO_File fd, void *buf, MPI_Aint count,
MPI_Datatype datatype, int file_ptr_type,
ADIO_Offset offset, ADIO_Request * request, int *error_code);
void (*ADIOI_xxx_IwriteContig) (ADIO_File fd, const void *buf, MPI_Aint count,
MPI_Datatype datatype, int file_ptr_type,
ADIO_Offset offset, ADIO_Request * request, int *error_code);
int (*ADIOI_xxx_ReadDone) (ADIO_Request * request, ADIO_Status * status, int *error_code);
int (*ADIOI_xxx_WriteDone) (ADIO_Request * request, ADIO_Status * status, int *error_code);
void (*ADIOI_xxx_ReadComplete) (ADIO_Request * request, ADIO_Status * status, int *error_code);
void (*ADIOI_xxx_WriteComplete) (ADIO_Request * request, ADIO_Status * status, int *error_code);
void (*ADIOI_xxx_IreadStrided) (ADIO_File fd, void *buf, MPI_Aint count,
MPI_Datatype datatype, int file_ptr_type,
ADIO_Offset offset, ADIO_Request * request, int *error_code);
void (*ADIOI_xxx_IwriteStrided) (ADIO_File fd, const void *buf, MPI_Aint count,
MPI_Datatype datatype, int file_ptr_type,
ADIO_Offset offset, ADIO_Request * request, int *error_code);
void (*ADIOI_xxx_Flush) (ADIO_File fd, int *error_code);
void (*ADIOI_xxx_Resize) (ADIO_File fd, ADIO_Offset size, int *error_code);
void (*ADIOI_xxx_Delete) (const char *filename, int *error_code);
int (*ADIOI_xxx_Feature) (ADIO_File fd, int flag);
const char *fsname;
void (*ADIOI_xxx_IreadStridedColl) (ADIO_File fd, void *buf, MPI_Aint count,
MPI_Datatype datatype, int file_ptr_type,
ADIO_Offset offset, ADIO_Request * request,
int *error_code);
void (*ADIOI_xxx_IwriteStridedColl) (ADIO_File fd, const void *buf, MPI_Aint count,
MPI_Datatype datatype, int file_ptr_type,
ADIO_Offset offset, ADIO_Request * request,
int *error_code);
int (*ADIOI_xxx_SetLock) (ADIO_File fd, int cmd, int type, ADIO_Offset offset, int whence,
ADIO_Offset len);
};
src/mpi/romio/adio/ad_daos/ad_daos.c
struct ADIOI_Fns_struct ADIO_DAOS_operations = {
ADIOI_DAOS_Open, /* Open */
ADIOI_DAOS_OpenColl, /* OpenColl */
ADIOI_DAOS_ReadContig, /* ReadContig */
ADIOI_DAOS_WriteContig, /* WriteContig */
ADIOI_GEN_ReadStridedColl, /* ReadStridedColl */
ADIOI_GEN_WriteStridedColl, /* WriteStridedColl */
ADIOI_GEN_SeekIndividual, /* SeekIndividual */
ADIOI_DAOS_Fcntl, /* Fcntl */
ADIOI_DAOS_SetInfo, /* SetInfo */
ADIOI_DAOS_ReadStrided, /* ReadStrided */
ADIOI_DAOS_WriteStrided, /* WriteStrided */
ADIOI_DAOS_Close, /* Close */
ADIOI_DAOS_IReadContig, /* IreadContig */
ADIOI_DAOS_IWriteContig, /* IwriteContig */
ADIOI_FAKE_IODone, /* ReadDone */
ADIOI_FAKE_IODone, /* WriteDone */
ADIOI_FAKE_IOComplete, /* ReadComplete */
ADIOI_FAKE_IOComplete, /* WriteComplete */
ADIOI_DAOS_IreadStrided, /* IreadStrided */
ADIOI_DAOS_IwriteStrided, /* IwriteStrided */
ADIOI_DAOS_Flush, /* Flush */
ADIOI_DAOS_Resize, /* Resize */
ADIOI_DAOS_Delete, /* Delete */
ADIOI_DAOS_Feature, /* Features */
"DAOS: ROMIO driver for DAOS",
ADIOI_GEN_IreadStridedColl, /* IreadStridedColl */
ADIOI_GEN_IwriteStridedColl, /* IwriteStridedColl */
#if defined(F_SETLKW64)
ADIOI_GEN_SetLock /* SetLock */
#else
ADIOI_GEN_SetLock64 /* SetLock */
#endif
};
MPI_File_iwrite_at -> 使用显式偏移的非阻塞写入, https://www.mpich.org/static/docs/v3.0.x/www3/MPI_File_iwrite_at.html
MPIR_File_iwrite_at_impl
MPIOI_File_iwrite
ADIO_WriteContig -> ADIOI_DAOS_WriteContig
DAOS_IOContig(fd, (void *) buf, count, datatype, file_ptr_type, offset, status, NULL, DAOS_WRITE, error_code)
daos_event_init(&aio_req->daos_event, DAOS_HDL_INVAL, NULL)
memcpy(&(aio_req->req), request, sizeof(MPI_Request))
d_iov_set(iov, buf, len)
ret = dfs_write(cont->dfs, cont->obj, sgl, offset, (request ? &aio_req->daos_event : NULL))
or ret = dfs_read(cont->dfs, cont->obj, sgl, offset, nbytes, (request ? &aio_req->daos_event : NULL))
MPID 接口 MPICH 的一个关键设计目标是允许下游供应商轻松创建特定于供应商的实现。这通过抽象设备接口 (ADI: Abstract Device Interface) 实现。ADI 是一组 MPID 前缀函数,可实现 MPI 操作的功能。例如,MPID_Send 实现 MPI_Send。几乎所有 MPI 函数都会首先调用 MPID 计数器部分,从而允许设备层提供完整功能或通过调用 MPIR 实现简单地回退。对于性能关键路径,例如 pt2pt 和 rma 路径,我们直接从绑定调用 MPID 层。这允许完全内联构建以实现最大编译器优化。其他 ADI 不是性能关键,但作为钩子提供 - MPIR 层将在关键点调用这些钩子 - 以允许设备正确设置并控制实现行为。注意:ADI 中的所有 pt2pt 通信都是非阻塞的 - 它只会在 MPID_Progress_wait/test 期间启动通信并完成。参考: ch3/include/mpidpre.h ch4/include/mpidch4.h ch3 Ch3 目前处于维护模式。由于仍有供应商基于 ch3,因此它仍受到全面支持。ch3 中有两个通道。ch3:sock 是纯套接字实现。ch3:nemesis 添加了共享内存通信,还支持网络模块 (netmod)。我们目前支持 ch3:nemesis:tcp 和 ch3:nemesis:ofi。ch4 Ch4 是目前正在进行积极研究和开发的地方。许多新功能,例如每个 VCI 线程、GPU IPC、分区通信等,仅在 Ch4 中可用。Ch4 设计了一个额外的类似 ADI 的接口,通常称为 ch4 API 或 shm/netmod API。对于大多数 MPID 函数,ch4 层将检查通信是否是本地的(可以使用共享内存进行)并调用 shm API 或 netmod API。可以完全禁用 shm。 ch4 API 的框架涉及大量样板,因为需要使用函数表允许完全内联构建和非线性构建。我们使用脚本来生成大多数这些 API 文件。参考:ch4 api autogen 注释 ch4 内联机制注释 ch4 命名空间约定注释 ch4_api.txt request.md
MPICH调用OFI(也支持UCX)调用栈如下:
MPID_Send 实现 MPI_Send
MPL_STATIC_INLINE_PREFIX int MPID_Send(const void *buf,
MPI_Aint count,
MPI_Datatype datatype,
int rank,
int tag, MPIR_Comm * comm, int attr, MPIR_Request ** request)
{
return MPID_Isend(buf, count, datatype, rank, tag, comm, attr, request);
mpi_errno = MPIDI_isend
MPIDI_NM_mpi_isend
MPIDI_OFI_send // ucx对应参考函数为: MPIDI_UCX_send
MPIDI_OFI_send_iov
fi_tsendmsg(MPIDI_OFI_global.ctx[ctx_idx].tx, &msg, flags) // OFI带TAG发送消息的接口
or MPIDI_OFI_send_normal
}
Intel(R) MPI 基准测试提供了一组符合 MPI-1、MPI-2 和 MPI-3 标准的基本基准测试。您可以使用一个可执行文件运行所有受支持的基准测试,也可以运行命令行中指定的子集。使用命令行参数指定各种设置,例如时间测量、消息长度和通信器选择。有关详细信息,请参阅位于以下位置的 Intel(R) MPI 基准测试用户指南: https: //www.intel.com/content/www/us/en/docs/mpi-library/user-guide-benchmarks/2021-2/overview.html
运行基准测试:
mpirun -n <number_of_processes> IMB- [arguments]
Open MPI 对 InfiniBand 和 RoCE 设备的支持随着时间的推移而发生了变化。在 Open MPI v5.1.x 系列中,InfiniBand 和 RoCE 设备通过 UCX (ucx) PML 获得支持。Open MPI 的早期版本还包括用于 InfiniBand 和 RoCE 设备的 openib BTL。Open MPI v5.1.x 不再包含 openib BTL
The Modular Component Architecture (MCA) is the backbone of Open MPI – most services and functionality are implemented through MCA components.
下文: https://cloud.tencent.com/developer/article/2508938
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。