首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用FastAPI全局访问云修复数据库

用FastAPI全局访问云修复数据库
EN

Stack Overflow用户
提问于 2022-08-26 14:53:06
回答 1查看 163关注 0票数 -1

我正在使用FastAPI作为我的数据库。我能够连接到我的app/server/app.py文件中的云修复数据库,但我希望能够在配置完其他文件后使用/编辑其他文件中的数据库。

我尝试使用“依赖”并尝试将其初始化为另一个文件,并将其导入到所需的文件中(没有成功)。在下面的示例中,我有一个启动函数,然后调用从另一个文件导入的"get_data()“。我希望"get_data()“(以及其他文件中的任何其他函数)能够编辑已配置过一次的数据库。

代码语言:javascript
复制
#app/server/app.py
from fastapi import FastAPI
from app.server.routes.users import router as User
from app.server.routes.users import get_data
import firebase_admin
import os
from firebase_admin import credentials, firestore, initialize_app, auth

os.environ["FIREBASE_AUTH_EMULATOR_HOST"] =  "127.0.0.1:9099"
os.environ["FIRESTORE_EMULATOR_HOST"] = "127.0.0.1:8080"

app = FastAPI()
app.include_router(User, tags=["User"], prefix="/a1")

#initialize database
cred = credentials.Certificate("path_to_file.json") 
firebase_admin.initialize_app(cred) 
db = firestore.client()

@app.on_event("startup")
async def start_up():
    await get_data()
代码语言:javascript
复制
#app/server/routes/users.py
from fastapi import APIRouter, HTTPException, Request
from fastapi_utils.tasks import repeat_every

router = APIRouter()

@repeat_every(seconds=5, max_repetitions=1)
async def get_data():
    #would like to be able to add/edit database here...for example:
    #db.collection("collection1").document("doc1").set({"a":1})
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-08-27 03:20:07

使用内置的pydantic模型进行解析非常有帮助。

下面是如何通过电子邮件获取用户的示例:

代码语言:javascript
复制
    async def get_user_by_email(self, email: str) -> FirestoreUser:
        """Retrieves a user from Firestore with the given email address.
        Args:
            email (str): users email
        Raises:
            `firebase_admin.exceptions`: [Firebase Admin Exceptions](https://firebase.google.com/docs/reference/admin/python/firebase_admin.exceptions)
            Exception: Multiple users found at email address
            Exception: No users found at email address
        Returns:
            FirestoreUser: Firebase 10X User object
        """
        users_found: List[FirestoreUser] = []
        user: FirestoreUser = {}

        user_docs = self.db.collection("users").where("email", "==", email).stream()
        for doc in user_docs:
            users_found.append(doc.to_dict())
        if len(users_found) > 1:
            raise Exception("Multiple users found at email address ", email)
        if len(users_found) == 0:
            raise Exception("No user found at email address ", email)
        user = users_found[0]
        return user

下面是如何更新用户的示例:

代码语言:javascript
复制
    async def update_user(self, userData, userId):
        """Update an individual user
        Args:
            user (User): User to be updated
            userId (str): User ID
        Raises:
            `firebase_admin.exceptions`: [Firebase Admin Exceptions](https://firebase.google.com/docs/reference/admin/python/firebase_admin.exceptions)
        """
        user_ref = self.db.collection("courses").document(userId)
        user_ref.set(userData.dict(), merge=True)

注*将.setmerge=True结合使用与使用.update相同。

另外,仿真器在Python中不工作,因为它们需要NodeJs运行时。这是可能的,但很困难。更好的解决方案是使用FastAPI读取和命中云函数端点来写入(这些端点可以在本地IP上模拟)。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73502898

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档