操作场景
Python 凭借简洁的语法和丰富的生态,广泛应用于 Web 后端、数据分析和脚本自动化等场景。PyMongo 是 MongoDB 官方维护的 Python 驱动,提供了同步的数据库操作 API。本文提供使用 PyMongo 连接云数据库 MongoDB 的完整示例,覆盖连接配置和 CRUD 操作。
前提条件
已创建云数据库 MongoDB 实例,且实例状态为运行中。
已准备一台与 MongoDB 实例在同一 VPC 内的云服务器 CVM,且已安装 Python 3.8 或以上版本。
安装驱动
推荐安装 PyMongo 4.6 或以上版本。如需 DNS SRV 连接支持,请额外安装
pip install pymongo[srv]。pip install pymongo
Running Environment
Operating System: Ubuntu 24.04.3 LTS / x86_64
Runtime Version: GNU bash, version 5.2.21(1)-release (x86_64-pc-linux-gnu)
注意事项
Python 版本:本文示例基于 Python 3.8 或以上版本。如您仍在使用 Python 2.x,请尽快升级,PyMongo 4.x 已不再支持 Python 2。
连接复用:
MongoClient 内置连接池,应在应用中全局复用同一个实例,避免重复创建。密码转义:如密码中包含
@、:、/ 等特殊字符,请使用 urllib.parse.quote_plus() 进行 URL 编码。超时设置:生产环境建议设置
connectTimeoutMS 和 socketTimeoutMS,避免因网络异常导致线程长时间阻塞。连接示例
基础连接
from pymongo import MongoClientfrom pymongo.errors import ConnectionFailure, OperationFailure# 连接 URI(请替换为您的实际连接信息)uri = "mongodb://mongouser:thepasswordA1@10.66.187.127:27017/admin"# 副本集连接串(推荐,支持自动故障切换):# uri = "mongodb://mongouser:thepasswordA1@10.66.187.127:27017,10.66.187.128:27017,10.66.187.129:27017/admin?replicaSet=cmgo-xxxxxxxx_0"try:# 创建客户端连接(含连接池配置)client = MongoClient(uri,maxPoolSize=50, # 连接池上限minPoolSize=5, # 连接池下限connectTimeoutMS=10000, # 连接超时 10 秒socketTimeoutMS=30000, # 套接字超时 30 秒retryWrites=True, # 启用可重试写入retryReads=True, # 启用可重试读取w="majority" # 写入关注级别)# 验证连接是否成功client.admin.command("ping")print("连接成功")except ConnectionFailure as e:print(f"连接失败: {e}")raise
CRUD 操作示例
from pymongo import MongoClientfrom pymongo.errors import ConnectionFailure, OperationFailurefrom bson.objectid import ObjectIdfrom datetime import datetimedef main():uri = "mongodb://mongouser:thepasswordA1@10.66.187.127:27017/admin"try:client = MongoClient(uri, retryWrites=True, w="majority")client.admin.command("ping")print("连接成功")# 选择数据库和集合db = client["mydb"]collection = db["users"]# ========== 插入文档 ==========# 插入单条文档doc = {"username": "jack","age": 31,"email": "jack@example.com","created_at": datetime.now()}result = collection.insert_one(doc)print(f"插入文档 ID: {result.inserted_id}")# 批量插入文档docs = [{"username": "alice", "age": 28, "email": "alice@example.com"},{"username": "bob", "age": 35, "email": "bob@example.com"},]result = collection.insert_many(docs)print(f"批量插入 {len(result.inserted_ids)} 条文档")# ========== 查询文档 ==========# 查询单条文档user = collection.find_one({"username": "jack"})print(f"查询单条: {user}")# 条件查询(年龄大于 30)cursor = collection.find({"age": {"$gt": 30}})print("年龄大于 30 的用户:")for doc in cursor:print(f" - {doc['username']}, 年龄: {doc['age']}")# ========== 更新文档 ==========result = collection.update_one({"username": "jack"},{"$set": {"age": 32, "updated_at": datetime.now()}})print(f"更新 {result.modified_count} 条文档")# ========== 删除文档 ==========result = collection.delete_many({"username": {"$in": ["jack", "alice", "bob"]}})print(f"删除 {result.deleted_count} 条文档")except ConnectionFailure as e:print(f"连接失败: {e}")except OperationFailure as e:print(f"操作失败: {e}")finally:client.close()print("连接已关闭")if __name__ == "__main__":main()
运行输出示例:
连接成功插入文档 ID: 6789abcdef1234567890abcd批量插入 2 条文档查询单条: {'_id': ObjectId('6789abcdef1234567890abcd'), 'username': 'jack', 'age': 31, 'email': 'jack@example.com', 'created_at': datetime.datetime(2026, 4, 17, 15, 0, 0)}年龄大于 30 的用户:- jack, 年龄: 31- bob, 年龄: 35更新 1 条文档删除 3 条文档连接已关闭
读写分离配置
如需从从节点读取数据以分担主节点压力,可通过
read_preference 参数配置读写分离:from pymongo import MongoClient, ReadPreferenceclient = MongoClient("mongodb://mongouser:thepasswordA1@IP1:27017,IP2:27017,IP3:27017/admin?replicaSet=cmgo-xxxxxxxx_0",readPreference="secondaryPreferred")# 或在集合级别指定db = client["mydb"]collection = db.get_collection("users", read_preference=ReadPreference.SECONDARY_PREFERRED)
常见问题
连接时提示 "pymongo.errors.OperationFailure: Authentication failed"
错误信息示例:
pymongo.errors.OperationFailure: Authentication failed., full error: {'ok': 0.0, 'errmsg': 'Authentication failed.', 'code': 18, 'codeName': 'AuthenticationFailed'}
请按以下步骤排查:
1. 确认 URI 中指定了认证库
admin。# 正确:URI 路径为 /adminuri = "mongodb://mongouser:thepasswordA1@10.66.187.127:27017/admin"# 正确:连接业务库时通过 authSource 指定认证库uri = "mongodb://mongouser:thepasswordA1@10.66.187.127:27017/mydb?authSource=admin"# 错误:连接业务库时未指定 authSource,驱动将默认以 mydb 库认证,导致认证失败uri = "mongodb://mongouser:thepasswordA1@10.66.187.127:27017/mydb"
Running Environment
Operating System: Ubuntu 24.04.3 LTS / x86_64
Runtime Version: Python 3.11.1
2. 确认密码中的特殊字符已进行 URL 编码:使用
urllib.parse.quote_plus() 对密码进行编码:from urllib.parse import quote_pluspassword = quote_plus("pass@123") # 输出: pass%40123uri = f"mongodb://mongouser:{password}@10.66.187.127:27017/admin"
Running Environment
Operating System: Ubuntu 24.04.3 LTS / x86_64
Runtime Version: Python 3.11.1
3. 确认用户名和密码正确:登录 MongoDB 控制台,在实例管理页面确认用户信息。如忘记密码,可在控制台重置。
连接时提示 "pymongo.errors.ServerSelectionTimeoutError"
错误信息示例:
pymongo.errors.ServerSelectionTimeoutError: 10.66.187.127:27017: [Errno 110] Connection timed out, Timeout: 30s
请按以下步骤排查:
1. 确认网络连通性:在 CVM 上执行以下命令,验证网络是否可达:
telnet <MongoDB实例IP> 27017
输出
Connected 表示网络可达。输出
Connection timed out 表示网络不通,请确认 CVM 与 MongoDB 实例在同一 VPC 内,并检查安全组规则。2. 检查
serverSelectionTimeoutMS 配置:PyMongo 默认的 Server Selection 超时为 30 秒。如需调整,可通过参数设置:client = MongoClient(uri, serverSelectionTimeoutMS=10000) # 10 秒
3. 确认副本集名称正确:如使用副本集连接串,请确认
replicaSet 参数值与控制台显示一致:uri = "mongodb://mongouser:thepasswordA1@IP1:27017,IP2:27017/admin?replicaSet=cmgo-xxxxxxxx_0"
Running Environment
Operating System: Ubuntu 24.04.3 LTS / x86_64
Runtime Version: Python 3.11.1
写入数据时提示 "pymongo.errors.AutoReconnect"
错误信息示例:
pymongo.errors.AutoReconnect: connection was closed
该错误通常由网络抖动或主从切换引起。处理方式如下:
1. 开启可重试写入:在创建客户端时设置
retryWrites=True,驱动会自动重试一次:client = MongoClient(uri, retryWrites=True)
2. 在业务代码中添加重试逻辑:对于关键写操作,建议在应用层增加重试机制:
import timefrom pymongo.errors import AutoReconnectdef insert_with_retry(collection, document, max_retries=3):for attempt in range(max_retries):try:return collection.insert_one(document)except AutoReconnect as e:if attempt < max_retries - 1:wait_time = 2 ** attempt # 指数退避: 1s, 2s, 4sprint(f"连接中断,{wait_time} 秒后重试(第 {attempt + 1} 次): {e}")time.sleep(wait_time)else:raise