详细内容见:开源AIGC学习—文生视频模型本地运行
主要通过python 的fastapi方式,进行文生图、文生视频异步服务封装,详细代码见:
# -- utf-8 ---
import asyncio
import uvicorn
from asyncio import Queue
from random import randint
from typing import Annotated
from fastapi import FastAPI, Form
from contextlib import asynccontextmanager
from uuid import UUID, uuid1
from pydantic import BaseModel
import cv2
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
import torch
from diffusers import DiffusionPipeline, DPMSolverMultistepScheduler
from diffusers.utils import export_to_video
task = Tasks.text_to_image_synthesis
model_id = '/mnt/d/aigc_model/modelscope/damo/multi-modal_chinese_stable_diffusion_v1'
image_pipe = pipeline(task=task, model=model_id)
viedo_pipe = DiffusionPipeline.from_pretrained("/mnt/d/aigc_model/hub/models--damo-vilab--text-to-video-ms-1.7b/snapshots/8227dddca75a8561bf858d604cc5dae52b954d01", torch_dtype=torch.float16, variant="fp16")
viedo_pipe.scheduler = DPMSolverMultistepScheduler.from_config(viedo_pipe.scheduler.config)
viedo_pipe.to("cuda")
async def commandText2Image(queue: Queue):
while True:
print("Start to get task from text_to_image_queue")
text_info: TextInof = await queue.get()
prompt = text_info.prompts
print(f"task prompts is {text_info.prompts}, task id is {str(text_info.tracking_id)}.")
output = image_pipe({'text': prompt})
image_output= "/mnt/d/aigc_result/" + str(text_info.tracking_id) + ".png"
cv2.imwrite(image_output, output['output_imgs'][0])
queue.task_done()
print(f"Completed the task from the text_to_image_queue and image file output path is {image_output}.")
async def commandText2Viedo(queue: Queue):
while True:
print("Start to get task from text_to_video_queue")
text_info: TextInof = await queue.get()
prompt = text_info.prompts
print(f"task prompts is {text_info.prompts}, task id is {str(text_info.tracking_id)}")
video_frames = viedo_pipe(prompt, num_inference_steps=25).frames
video_output= "/mnt/d/aigc_result/" + str(text_info.tracking_id) + ".mp4"
export_to_video(video_frames, video_output)
queue.task_done()
print(f"Completed the task from the text_to_video_queue and video file output path is {video_output}.")
@asynccontextmanager
async def lifespan(app: FastAPI):
asyncio.create_task(commandText2Image(text_to_image_queue))
asyncio.create_task(commandText2Viedo(text_to_video_queue))
yield
app = FastAPI(lifespan=lifespan)
text_to_image_queue = Queue()
text_to_video_queue = Queue()
class TextInof(BaseModel):
tracking_id: UUID
prompts: str
process_time: int = None
@app.post("/text2x")
async def post_task(prompts: Annotated[str, Form()], type_info:Annotated[str, Form()]):
user_request = TextInof(
tracking_id=uuid1(),
prompts=prompts,
process_time=randint(1,5)
)
if type_info == "image":
await text_to_image_queue.put(user_request)
elif type_info == "viedo":
await text_to_video_queue.put(user_request)
print(f"Received task, prompt is {user_request.prompts}, task id is {user_request.tracking_id}.")
info = "Received your text to {} request.".format(type_info)
return info
if __name__ == "__main__":
uvicorn.run(app=app, port=2000)
可以使用celery进行异步封装调用:
文件目录celery/celery_app.py
celery_text2x.py
#celery_app.py
# -- utf-8 ---
from celery import Celery
import cv2
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
import torch
from diffusers import DiffusionPipeline, DPMSolverMultistepScheduler
from diffusers.utils import export_to_video
task = Tasks.text_to_image_synthesis
model_id = '/mnt/d/aigc_model/modelscope/damo/multi-modal_chinese_stable_diffusion_v1'
image_pipe = pipeline(task=task, model=model_id)
viedo_pipe = DiffusionPipeline.from_pretrained("/mnt/d/aigc_model/hub/models--damo-vilab--text-to-video-ms-1.7b/snapshots/8227dddca75a8561bf858d604cc5dae52b954d01", torch_dtype=torch.float16, variant="fp16")
viedo_pipe.scheduler = DPMSolverMultistepScheduler.from_config(viedo_pipe.scheduler.config)
viedo_pipe.to("cuda")
celery_app = Celery('text2xApp', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')
celery_app.conf.update(
result_expires=60
)
# 定义一个 Celery 文生图任务
@celery_app.task
def commandText2Image(prompts, tracking_id):
print(f"task prompts is {prompts}, task id is {tracking_id}.")
output = image_pipe({'text': prompts})
image_output= "/mnt/d/aigc_result/" + tracking_id + ".png"
cv2.imwrite(image_output, output['output_imgs'][0])
print(f"Completed the task from the text_to_image and image file output path is {image_output}.")
# 定义一个 Celery 文生视频任务
@celery_app.task
def commandText2Viedo(prompts, tracking_id):
print(f"task prompts is {text_info.prompts}, task id is {str(text_info.tracking_id)}")
video_frames = viedo_pipe(prompt, num_inference_steps=25).frames
video_output= "/mnt/d/aigc_result/" + tracking_id + ".mp4"
export_to_video(video_frames, video_output)
print(f"Completed the task from the text_to_video and video file output path is {video_output}.")
#celery_text2x.py
# -- utf-8 ---
import asyncio
import uvicorn
from typing import Annotated
from fastapi import FastAPI, Form
from uuid import UUID, uuid1
import celery_app
app = FastAPI()
@app.post("/text2x")
async def post_task(prompts: Annotated[str, Form()], type_info:Annotated[str, Form()]):
tracking_id=str(uuid1())
if type_info == "image":
# 触发 Celery 任务
task = commandText2Image.delay(prompts, tracking_id)
elif type_info == "viedo":
task = commandText2Viedo.delay(prompts, tracking_id)
print(f"Received task, prompt is {prompts}, task id is {tracking_id}.")
print(f"Received your text to {type_info} request.")
# 异步等待任务结果
result = await celery_app.AsyncResult(task.id).get()
return {"result": result}
if __name__ == "__main__":
uvicorn.run(app=app, port=2000)
执行启动命令:
/mnt/d/code/celery# celery -A celery_app worker -l info
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。