ChatGPT 是一种基于自然语言处理技术的人工智能对话系统,使用 GPT(Generative Pre-trained Transformer)算法架构进行训练。它可以生成自然语言响应以回答各种问题,并能够在多个领域进行应用,例如客户服务、语言翻译、自动化问答系统等。我们可以将其看成一个加强版的搜索引擎,通过其可以帮助我们快速找到想要的答案。
本文主要研究如何基于chatgpt来构建一个自动语言对话的程序。
要想通过程序来访问chatgpt需要注册OpenAI的api秘钥,拥有了账号之后,注册其实非常简单,注册成功之后,官方还会赠送18美元的体验券。
通过命令pip install openai
进行安装。之后只需要一行代码即可访问api了。
import openai
import os
openai.api_key = "apikey"
class ChatGpt:
def GetAnswer(self, input):
msg = {"role": "user", "content": input}
completion = openai.ChatCompletion.create(
model="gpt-3.5-turbo", messages=[msg])
if 0 == len(completion.choices):
return ""
return completion.choices[0].message.content
if __name__ == "__main__":
chapt = ChatGpt()
answer = chapt.GetAnswer("你是谁")
print(answer)
即可返回答案。
我们接受到chatgpt的文本回答之后,想将其播放出来,这个时候,通过科大讯飞的文本转语音来实现。
这里采用其在线语音合成来实现,主要注册成功,即可赠送一年5万次的调用服务。这里可以下载官方的python的demo来实现。基本上这个demo就是开箱即用的。
这里封装了一个类来实现,在使用之前,还需要安装依赖
pip install websocket-client
实例代码如下
import websocket
import datetime
import hashlib
import base64
import hmac
import json
from urllib.parse import urlencode
import time
import ssl
from wsgiref.handlers import format_date_time
from datetime import datetime
from time import mktime
import _thread as thread
import os
import threading
def on_error(ws, error):
print("###Text2Audio ws error:", error)
if ws.at is not None:
ws.at.StopAudio2Text()
# 收到websocket关闭的处理
def on_close(ws, a, b):
print("###Text2Audio ws closed ###")
if ws.at is not None:
ws.at.StopAudio2Text()
def on_open(ws):
print("###Text2Audio ws open###")
d = {"common": ws.at.CommonArgs,
"business": ws.at.BusinessArgs,
"data": ws.at.Data,
}
d = json.dumps(d)
print("------>开始发送文本数据")
ws.send(d)
def on_message(ws, message):
if ws.at is not None:
ws.at.HandleMessage(message)
def Audio2TextRun(self):
self.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
class Text2Audio():
def __init__(self, APPID, APIKey, APISecret, OnTranslateEnd=None):
self.APPID = APPID
self.APIKey = APIKey
self.APISecret = APISecret
self.OnTranslateEnd = OnTranslateEnd
# 公共参数(common)
self.CommonArgs = {"app_id": self.APPID}
# 业务参数(business),更多个性化参数可在官网查看
self.BusinessArgs = {
"aue": "raw", "auf": "audio/L16;rate=16000", "vcn": "xiaoyan", "tte": "utf8"}
self.thread = threading.Thread(target=Audio2TextRun, args=(self,))
def create_url(self):
url = 'wss://tts-api.xfyun.cn/v2/tts'
# 生成RFC1123格式的时间戳
now = datetime.now()
date = format_date_time(mktime(now.timetuple()))
# 拼接字符串
signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
signature_origin += "date: " + date + "\n"
signature_origin += "GET " + "/v2/tts " + "HTTP/1.1"
# 进行hmac-sha256进行加密
signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
digestmod=hashlib.sha256).digest()
signature_sha = base64.b64encode(
signature_sha).decode(encoding='utf-8')
authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % (
self.APIKey, "hmac-sha256", "host date request-line", signature_sha)
authorization = base64.b64encode(
authorization_origin.encode('utf-8')).decode(encoding='utf-8')
# 将请求的鉴权参数组合为字典
v = {
"authorization": authorization,
"date": date,
"host": "ws-api.xfyun.cn"
}
# 拼接鉴权参数,生成url
url = url + '?' + urlencode(v)
# print("date: ",date)
# print("v: ",v)
# 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
# print('websocket url :', url)
return url
def StartText2Audio(self, Text, pcm="./demo.pcm"):
self.pcm = pcm
self.Text = Text
self.Data = {"status": 2, "text": str(
base64.b64encode(self.Text.encode('utf-8')), "UTF8")}
if os.path.exists(pcm):
os.remove(pcm)
websocket.enableTrace(False)
wsUrl = self.create_url()
self.openws = False
self.ws = websocket.WebSocketApp(
wsUrl, on_open=on_open, on_close=on_close, on_error=on_error, on_message=on_message)
self.ws.at = self
self.thread.start()
def StopAudio2Text(self):
self.ws.at = None
self.OnTranslateEnd()
def HandleMessage(self, message):
try:
message = json.loads(message)
code = message["code"]
sid = message["sid"]
audio = message["data"]["audio"]
audio = base64.b64decode(audio)
status = message["data"]["status"]
if status == 2:
print("Text2Audio Translation End")
self.StopAudio2Text()
self.ws.close()
if code != 0:
errMsg = message["message"]
print("Text2Audio sid:%s call error:%s code is:%s" %
(sid, errMsg, code))
else:
with open(self.pcm, 'ab') as f:
f.write(audio)
except Exception as e:
print("receive msg,but parse exception:", e)
if __name__ == "__main__":
def TranslateEnd():
print("TranslateEnd")
text2Audio = Text2Audio(APPID='APPID', APISecret='APISecret',APIKey='APIKey', OnTranslateEnd=TranslateEnd)
text2Audio.StartText2Audio("我是chatgpt小助手")
time.sleep(20)
上面将文本转成了音频之后,保存成了pcm文件,这里使用pyaudio库可以轻松的将声音播放出来。 安装库
pip install pyaudio
这里要注意,音频播放是阻塞的,需要等声音播放完成,才会继续下面的函数执行,在实际应用中,需要开启一个线程来实现。
import pyaudio
import wave
# 打开PCM文件
with open('demo.pcm', 'rb') as f:
pcm_data = f.read()
# 创建PyAudio实例
p = pyaudio.PyAudio()
# 打开音频流
stream = p.open(format=pyaudio.paInt16,
channels=1,
rate=16000,
output=True)
# 写入PCM数据
stream.write(pcm_data)
# 关闭流和PyAudio
stream.stop_stream()
stream.close()
p.terminate()
由于我们计划做一个语音对话,所以还需要能够读取拾音器的音频。这里依旧采用pyaudio库来实现。示例如下,可以方便的将拾音器的音频进行读取。 由于读取音频的时候,会将当前线程阻塞,所以这里从新开启了一个线程进行数据的读取。
import time
import threading
import multiprocessing
import os
import pyaudio
def ReadAudioRun(obj):
pa = pyaudio.PyAudio()
stream = pa.open(format=pyaudio.paInt16,
channels=1,
rate=obj.sample_rate,
input=True,
frames_per_buffer=obj.chunk_size)
if obj.savepcm:
wf = open(obj.pcm, 'ab')
while (True):
isRun = False
with obj.run.get_lock():
isRun = obj.run.value
if True == isRun:
data = stream.read(obj.chunk_size)
if obj.savepcm:
wf.write(data)
if obj.OnAduio is not None:
obj.OnAduio(data)
if False == isRun:
stream.stop_stream()
stream.close()
pa.terminate()
if obj.savepcm:
wf.close()
break
class ReadAudio:
def __init__(self, pcm="./demo.pcm", savepcm=False, OnAudio=None):
self.run = multiprocessing.Value('b', False, lock=True)
self.thread = threading.Thread(target=ReadAudioRun, args=(self,))
self.pcm = pcm
self.chunk_size = 1280
self.sample_rate = 16000
self.OnAduio = OnAudio
self.savepcm = savepcm
if os.path.exists(self.pcm):
os.remove(self.pcm)
def StartRead(self):
print("StartRead Audio")
self.run.value = True
self.thread.start()
def StopRead(self):
self.run.value = False
self.thread.join()
if __name__ == "__main__":
readAudio = ReadAudio()
print("开始读取")
readAudio.StartRead()
time.sleep(5)
readAudio.StopRead()
print("结束读取")
由于chagpt目前还只支持文本输入,所以还需要将音频转成文本,这里和之前文本转音频一样,也是采用科大讯飞的在线服务。初次注册,也会赠送一年5万的服务量。
这里依旧采用官方的python的demo进行试验。
这里也进行了封装,方便调用。这里支持流式数据转换,可以进行一边读取,一边转换成文本。
from readaudio import ReadAudio
import threading
import queue
import websocket
import datetime
import hashlib
import base64
import hmac
import json
from urllib.parse import urlencode
import time
import ssl
from wsgiref.handlers import format_date_time
from datetime import datetime
from time import mktime
import _thread as thread
STATUS_FIRST_FRAME = 0 # 第一帧的标识
STATUS_CONTINUE_FRAME = 1 # 中间帧标识
STATUS_LAST_FRAME = 2 # 最后一帧的标识
# 收到websocket错误的处理
def on_error(ws, error):
print("###Audio2Text ws error:", error)
# 收到websocket关闭的处理
def on_close(ws, a, b):
print("###Audio2Text ws closed ###")
def on_open(ws):
print("###Audio2Text ws open###")
if ws.at is not None:
ws.at.openws = True
def on_message(ws, message):
if ws.at is not None:
ws.at.HandleMessage(message)
def Audio2TextRun(self):
self.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
class Audio2Text():
def __init__(self, APPID, APIKey, APISecret):
self.APPID = APPID
self.APIKey = APIKey
self.APISecret = APISecret
self.Text = ""
self.q = queue.Queue()
self.thread = threading.Thread(target=Audio2TextRun, args=(self,))
# 公共参数(common)
self.CommonArgs = {"app_id": self.APPID}
# 业务参数(business),更多个性化参数可在官网查看
self.BusinessArgs = {"domain": "iat", "language": "zh_cn",
"accent": "mandarin", "vinfo": 1, "vad_eos": 10000}
# 生成url
def create_url(self):
url = 'wss://ws-api.xfyun.cn/v2/iat'
# 生成RFC1123格式的时间戳
now = datetime.now()
date = format_date_time(mktime(now.timetuple()))
# 拼接字符串
signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
signature_origin += "date: " + date + "\n"
signature_origin += "GET " + "/v2/iat " + "HTTP/1.1"
# 进行hmac-sha256进行加密
signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
digestmod=hashlib.sha256).digest()
signature_sha = base64.b64encode(
signature_sha).decode(encoding='utf-8')
authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % (
self.APIKey, "hmac-sha256", "host date request-line", signature_sha)
authorization = base64.b64encode(
authorization_origin.encode('utf-8')).decode(encoding='utf-8')
# 将请求的鉴权参数组合为字典
v = {
"authorization": authorization,
"date": date,
"host": "ws-api.xfyun.cn"
}
# 拼接鉴权参数,生成url
url = url + '?' + urlencode(v)
# print("date: ",date)
# print("v: ",v)
# 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
# print('websocket url :', url)
return url
def StartAudio2Text(self):
print("StartAudio2Text")
websocket.enableTrace(False)
wsUrl = self.create_url()
self.openws = False
self.ws = websocket.WebSocketApp(
wsUrl, on_open=on_open, on_close=on_close, on_error=on_error, on_message=on_message)
self.ws.at = self
self.thread.start()
def StopAudio2Text(self):
self.ws.close()
self.ws.at = None
def SendPcm(self, pcm, status):
if status == STATUS_FIRST_FRAME:
print("Audio2Text SendPcm")
if status == STATUS_FIRST_FRAME:
d = {"common": self.CommonArgs,
"business": self.BusinessArgs,
"data": {"status": status, "format": "audio/L16;rate=16000",
"audio": str(base64.b64encode(pcm), 'utf-8'),
"encoding": "raw"}}
else:
d = {"data": {"status": status, "format": "audio/L16;rate=16000",
"audio": str(base64.b64encode(pcm), 'utf-8'),
"encoding": "raw"}}
d = json.dumps(d)
self.q.put(d)
if self.openws:
while True:
if self.q.empty():
break
item = self.q.get()
self.ws.send(item)
def HandleMessage(self, message):
try:
code = json.loads(message)["code"]
sid = json.loads(message)["sid"]
if code != 0:
errMsg = json.loads(message)["message"]
print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
else:
data = json.loads(message)["data"]["result"]["ws"]
# print(json.loads(message))
for i in data:
for w in i["cw"]:
self.Text += w["w"]
except Exception as e:
print("receive msg,but parse exception:", e)
if __name__ == "__main__":
audio2text = Audio2Text(APPID='APPID', APISecret='APISecret',
APIKey='APIKey')
stats = STATUS_FIRST_FRAME
canSend = True
def OnAudio(pcm):
global canSend, stats
if canSend == False:
return
audio2text.SendPcm(pcm, stats)
if stats == STATUS_FIRST_FRAME:
stats = STATUS_CONTINUE_FRAME
if stats == STATUS_LAST_FRAME:
canSend = False
readAudio = ReadAudio(OnAudio=OnAudio)
audio2text.StartAudio2Text()
readAudio.StartRead()
print("开始转换")
time.sleep(5)
print("结束转换")
stats = STATUS_LAST_FRAME
time.sleep(1)
readAudio.StopRead()
audio2text.StopAudio2Text()
print(audio2text.Text)
到目前为止,我们实现了读取麦克风音频--》转文本--》chatgpt--》转音频---》播放的全流程链路。 本身这个就是一个娱乐项目,所以用python进行了构建。
Inveta团队由研发、美术设计、建模等组成。团队介绍: https://www.inveta.cn/about.html 团队开源项目: https://github.com/inveta