Python SDK

最近更新时间:2026-05-07 11:54:32

我的收藏

操作场景

Python 凭借简洁的语法和丰富的生态,广泛应用于 Web 后端、数据分析和脚本自动化等场景。PyMongo 是 MongoDB 官方维护的 Python 驱动,提供了同步的数据库操作 API。本文提供使用 PyMongo 连接云数据库 MongoDB 的完整示例,覆盖连接配置和 CRUD 操作。

前提条件

已创建云数据库 MongoDB 实例,且实例状态为运行中
已准备一台与 MongoDB 实例在同一 VPC 内的云服务器 CVM,且已安装 Python 3.8 或以上版本。

安装驱动

推荐安装 PyMongo 4.6 或以上版本。如需 DNS SRV 连接支持,请额外安装 pip install pymongo[srv]
pip install pymongo

Running Environment

Operating System: Ubuntu 24.04.3 LTS / x86_64

Runtime Version: GNU bash, version 5.2.21(1)-release (x86_64-pc-linux-gnu)

注意事项

Python 版本:本文示例基于 Python 3.8 或以上版本。如您仍在使用 Python 2.x,请尽快升级,PyMongo 4.x 已不再支持 Python 2。
连接复用MongoClient 内置连接池,应在应用中全局复用同一个实例,避免重复创建。
密码转义:如密码中包含 @:/ 等特殊字符,请使用 urllib.parse.quote_plus() 进行 URL 编码。
超时设置:生产环境建议设置 connectTimeoutMSsocketTimeoutMS,避免因网络异常导致线程长时间阻塞。

连接示例

基础连接

from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, OperationFailure

# 连接 URI(请替换为您的实际连接信息)
uri = "mongodb://mongouser:thepasswordA1@10.66.187.127:27017/admin"

# 副本集连接串(推荐,支持自动故障切换):
# uri = "mongodb://mongouser:thepasswordA1@10.66.187.127:27017,10.66.187.128:27017,10.66.187.129:27017/admin?replicaSet=cmgo-xxxxxxxx_0"

try:
# 创建客户端连接(含连接池配置)
client = MongoClient(
uri,
maxPoolSize=50, # 连接池上限
minPoolSize=5, # 连接池下限
connectTimeoutMS=10000, # 连接超时 10 秒
socketTimeoutMS=30000, # 套接字超时 30 秒
retryWrites=True, # 启用可重试写入
retryReads=True, # 启用可重试读取
w="majority" # 写入关注级别
)

# 验证连接是否成功
client.admin.command("ping")
print("连接成功")

except ConnectionFailure as e:
print(f"连接失败: {e}")
raise

CRUD 操作示例

from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, OperationFailure
from bson.objectid import ObjectId
from datetime import datetime

def main():
uri = "mongodb://mongouser:thepasswordA1@10.66.187.127:27017/admin"

try:
client = MongoClient(uri, retryWrites=True, w="majority")
client.admin.command("ping")
print("连接成功")

# 选择数据库和集合
db = client["mydb"]
collection = db["users"]

# ========== 插入文档 ==========
# 插入单条文档
doc = {
"username": "jack",
"age": 31,
"email": "jack@example.com",
"created_at": datetime.now()
}
result = collection.insert_one(doc)
print(f"插入文档 ID: {result.inserted_id}")

# 批量插入文档
docs = [
{"username": "alice", "age": 28, "email": "alice@example.com"},
{"username": "bob", "age": 35, "email": "bob@example.com"},
]
result = collection.insert_many(docs)
print(f"批量插入 {len(result.inserted_ids)} 条文档")

# ========== 查询文档 ==========
# 查询单条文档
user = collection.find_one({"username": "jack"})
print(f"查询单条: {user}")

# 条件查询(年龄大于 30)
cursor = collection.find({"age": {"$gt": 30}})
print("年龄大于 30 的用户:")
for doc in cursor:
print(f" - {doc['username']}, 年龄: {doc['age']}")

# ========== 更新文档 ==========
result = collection.update_one(
{"username": "jack"},
{"$set": {"age": 32, "updated_at": datetime.now()}}
)
print(f"更新 {result.modified_count} 条文档")

# ========== 删除文档 ==========
result = collection.delete_many({"username": {"$in": ["jack", "alice", "bob"]}})
print(f"删除 {result.deleted_count} 条文档")

except ConnectionFailure as e:
print(f"连接失败: {e}")
except OperationFailure as e:
print(f"操作失败: {e}")
finally:
client.close()
print("连接已关闭")

if __name__ == "__main__":
main()
运行输出示例:
连接成功
插入文档 ID: 6789abcdef1234567890abcd
批量插入 2 条文档
查询单条: {'_id': ObjectId('6789abcdef1234567890abcd'), 'username': 'jack', 'age': 31, 'email': 'jack@example.com', 'created_at': datetime.datetime(2026, 4, 17, 15, 0, 0)}
年龄大于 30 的用户:
- jack, 年龄: 31
- bob, 年龄: 35
更新 1 条文档
删除 3 条文档
连接已关闭

读写分离配置

如需从从节点读取数据以分担主节点压力,可通过 read_preference 参数配置读写分离:
from pymongo import MongoClient, ReadPreference

client = MongoClient(
"mongodb://mongouser:thepasswordA1@IP1:27017,IP2:27017,IP3:27017/admin?replicaSet=cmgo-xxxxxxxx_0",
readPreference="secondaryPreferred"
)

# 或在集合级别指定
db = client["mydb"]
collection = db.get_collection("users", read_preference=ReadPreference.SECONDARY_PREFERRED)

常见问题

连接时提示 "pymongo.errors.OperationFailure: Authentication failed"

错误信息示例:
pymongo.errors.OperationFailure: Authentication failed., full error: {'ok': 0.0, 'errmsg': 'Authentication failed.', 'code': 18, 'codeName': 'AuthenticationFailed'}
请按以下步骤排查:
1. 确认 URI 中指定了认证库 admin
# 正确:URI 路径为 /admin
uri = "mongodb://mongouser:thepasswordA1@10.66.187.127:27017/admin"
# 正确:连接业务库时通过 authSource 指定认证库
uri = "mongodb://mongouser:thepasswordA1@10.66.187.127:27017/mydb?authSource=admin"

# 错误:连接业务库时未指定 authSource,驱动将默认以 mydb 库认证,导致认证失败
uri = "mongodb://mongouser:thepasswordA1@10.66.187.127:27017/mydb"

Running Environment

Operating System: Ubuntu 24.04.3 LTS / x86_64

Runtime Version: Python 3.11.1

2. 确认密码中的特殊字符已进行 URL 编码:使用 urllib.parse.quote_plus() 对密码进行编码:
from urllib.parse import quote_plus

password = quote_plus("pass@123") # 输出: pass%40123
uri = f"mongodb://mongouser:{password}@10.66.187.127:27017/admin"

Running Environment

Operating System: Ubuntu 24.04.3 LTS / x86_64

Runtime Version: Python 3.11.1

3. 确认用户名和密码正确:登录 MongoDB 控制台,在实例管理页面确认用户信息。如忘记密码,可在控制台重置。

连接时提示 "pymongo.errors.ServerSelectionTimeoutError"

错误信息示例:
pymongo.errors.ServerSelectionTimeoutError: 10.66.187.127:27017: [Errno 110] Connection timed out, Timeout: 30s
请按以下步骤排查:
1. 确认网络连通性:在 CVM 上执行以下命令,验证网络是否可达:
telnet <MongoDB实例IP> 27017
输出 Connected 表示网络可达。
输出 Connection timed out 表示网络不通,请确认 CVM 与 MongoDB 实例在同一 VPC 内,并检查安全组规则。
2. 检查 serverSelectionTimeoutMS 配置:PyMongo 默认的 Server Selection 超时为 30 秒。如需调整,可通过参数设置:
client = MongoClient(uri, serverSelectionTimeoutMS=10000) # 10 秒
3. 确认副本集名称正确:如使用副本集连接串,请确认 replicaSet 参数值与控制台显示一致:
uri = "mongodb://mongouser:thepasswordA1@IP1:27017,IP2:27017/admin?replicaSet=cmgo-xxxxxxxx_0"

Running Environment

Operating System: Ubuntu 24.04.3 LTS / x86_64

Runtime Version: Python 3.11.1

写入数据时提示 "pymongo.errors.AutoReconnect"

错误信息示例:
pymongo.errors.AutoReconnect: connection was closed
该错误通常由网络抖动或主从切换引起。处理方式如下:
1. 开启可重试写入:在创建客户端时设置 retryWrites=True,驱动会自动重试一次:
client = MongoClient(uri, retryWrites=True)
2. 在业务代码中添加重试逻辑:对于关键写操作,建议在应用层增加重试机制:
import time
from pymongo.errors import AutoReconnect

def insert_with_retry(collection, document, max_retries=3):
for attempt in range(max_retries):
try:
return collection.insert_one(document)
except AutoReconnect as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # 指数退避: 1s, 2s, 4s
print(f"连接中断,{wait_time} 秒后重试(第 {attempt + 1} 次): {e}")
time.sleep(wait_time)
else:
raise