我正在使用FastAPI作为我的数据库。我能够连接到我的app/server/app.py文件中的云修复数据库,但我希望能够在配置完其他文件后使用/编辑其他文件中的数据库。
我尝试使用“依赖”并尝试将其初始化为另一个文件,并将其导入到所需的文件中(没有成功)。在下面的示例中,我有一个启动函数,然后调用从另一个文件导入的"get_data()“。我希望"get_data()“(以及其他文件中的任何其他函数)能够编辑已配置过一次的数据库。
#app/server/app.py
from fastapi import FastAPI
from app.server.routes.users import router as User
from app.server.routes.users import get_data
import firebase_admin
import os
from firebase_admin import credentials, firestore, initialize_app, auth
os.environ["FIREBASE_AUTH_EMULATOR_HOST"] = "127.0.0.1:9099"
os.environ["FIRESTORE_EMULATOR_HOST"] = "127.0.0.1:8080"
app = FastAPI()
app.include_router(User, tags=["User"], prefix="/a1")
#initialize database
cred = credentials.Certificate("path_to_file.json")
firebase_admin.initialize_app(cred)
db = firestore.client()
@app.on_event("startup")
async def start_up():
await get_data()#app/server/routes/users.py
from fastapi import APIRouter, HTTPException, Request
from fastapi_utils.tasks import repeat_every
router = APIRouter()
@repeat_every(seconds=5, max_repetitions=1)
async def get_data():
#would like to be able to add/edit database here...for example:
#db.collection("collection1").document("doc1").set({"a":1})发布于 2022-08-27 03:20:07
使用内置的pydantic模型进行解析非常有帮助。
下面是如何通过电子邮件获取用户的示例:
async def get_user_by_email(self, email: str) -> FirestoreUser:
"""Retrieves a user from Firestore with the given email address.
Args:
email (str): users email
Raises:
`firebase_admin.exceptions`: [Firebase Admin Exceptions](https://firebase.google.com/docs/reference/admin/python/firebase_admin.exceptions)
Exception: Multiple users found at email address
Exception: No users found at email address
Returns:
FirestoreUser: Firebase 10X User object
"""
users_found: List[FirestoreUser] = []
user: FirestoreUser = {}
user_docs = self.db.collection("users").where("email", "==", email).stream()
for doc in user_docs:
users_found.append(doc.to_dict())
if len(users_found) > 1:
raise Exception("Multiple users found at email address ", email)
if len(users_found) == 0:
raise Exception("No user found at email address ", email)
user = users_found[0]
return user下面是如何更新用户的示例:
async def update_user(self, userData, userId):
"""Update an individual user
Args:
user (User): User to be updated
userId (str): User ID
Raises:
`firebase_admin.exceptions`: [Firebase Admin Exceptions](https://firebase.google.com/docs/reference/admin/python/firebase_admin.exceptions)
"""
user_ref = self.db.collection("courses").document(userId)
user_ref.set(userData.dict(), merge=True)注*将.set与merge=True结合使用与使用.update相同。
另外,仿真器在Python中不工作,因为它们需要NodeJs运行时。这是可能的,但很困难。更好的解决方案是使用FastAPI读取和命中云函数端点来写入(这些端点可以在本地IP上模拟)。
https://stackoverflow.com/questions/73502898
复制相似问题