服务器端(最佳实现)
pip install flask-sse
客户端关闭连接
js
if (event.data === '[DONE]') {
source.close()
} else {
document.getElementById("result").innerHTML += event.data + "<br>";
}
if (event.data === '[DONE]') {
source.close()
} else {
document.getElementById("result").innerHTML += event.data + "<br>";
}
服务器端(基础的EventStream实现)
python
# -*- coding: UTF-8 -*-
import logging
import time
from flask import Blueprint
from flask import Flask, Response, stream_with_context
route_bp = Blueprint('', __name__)
def format_sse(data: str, event=None) -> str:
msg = f'data: {data}\n\n'
if event is not None:
msg = f'event: {event}\n{msg}'
return msg
def event_stream():
for i in range(4):
# yield f'Data {i}\n\n'
yield format_sse(f'Data {i}', 'message')
time.sleep(1)
@route_bp.route('/stream', methods=['GET'])
def stream():
res = Response(stream_with_context(event_stream()), mimetype='text/event-stream')
return res
app = Flask(__name__, static_folder="static", static_url_path="/static")
app.config['SESSION_TYPE'] = 'filesystem'
app.config['SECRET_KEY'] = b'axx1ab234szf_007'
app.register_blueprint(route_bp)
app.logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
app.logger.addHandler(handler)
if __name__ == '__main__':
# 只有改为0.0.0.0,同一局域网下的其他设备才可以访问,否则就只能自嗨:同一个设备下只能访问自身!
app.run(host="0.0.0.0", port=8000)
# -*- coding: UTF-8 -*-
import logging
import time
from flask import Blueprint
from flask import Flask, Response, stream_with_context
route_bp = Blueprint('', __name__)
def format_sse(data: str, event=None) -> str:
msg = f'data: {data}\n\n'
if event is not None:
msg = f'event: {event}\n{msg}'
return msg
def event_stream():
for i in range(4):
# yield f'Data {i}\n\n'
yield format_sse(f'Data {i}', 'message')
time.sleep(1)
@route_bp.route('/stream', methods=['GET'])
def stream():
res = Response(stream_with_context(event_stream()), mimetype='text/event-stream')
return res
app = Flask(__name__, static_folder="static", static_url_path="/static")
app.config['SESSION_TYPE'] = 'filesystem'
app.config['SECRET_KEY'] = b'axx1ab234szf_007'
app.register_blueprint(route_bp)
app.logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
app.logger.addHandler(handler)
if __name__ == '__main__':
# 只有改为0.0.0.0,同一局域网下的其他设备才可以访问,否则就只能自嗨:同一个设备下只能访问自身!
app.run(host="0.0.0.0", port=8000)
原理剖析
根据这段代码
python
for i in range(4):
# yield f'Data {i}\n\n'
yield format_sse(f'Data {i}', 'message')
time.sleep(1)
for i in range(4):
# yield f'Data {i}\n\n'
yield format_sse(f'Data {i}', 'message')
time.sleep(1)
以及GPT的代码
python
def gpt_35_api_stream(messages: list):
completion = {'role': '', 'content': ''}
try:
response = openai.ChatCompletion.create(
model='gpt-3.5-turbo',
messages=messages,
stream=True,
)
completion = {'role': '', 'content': ''}
for event in response:
if event['choices'][0]['finish_reason'] == 'stop':
print(f'收到的完成数据: {completion}')
break
for delta_k, delta_v in event['choices'][0]['delta'].items():
print(f'流响应数据: {delta_k} = {delta_v}')
completion[delta_k] += delta_v
messages.append(completion) # 直接在传入参数 messages 中追加消息
except Exception as err:
return (False, f'OpenAI API 异常: {err}')
def gpt_35_api_stream(messages: list):
completion = {'role': '', 'content': ''}
try:
response = openai.ChatCompletion.create(
model='gpt-3.5-turbo',
messages=messages,
stream=True,
)
completion = {'role': '', 'content': ''}
for event in response:
if event['choices'][0]['finish_reason'] == 'stop':
print(f'收到的完成数据: {completion}')
break
for delta_k, delta_v in event['choices'][0]['delta'].items():
print(f'流响应数据: {delta_k} = {delta_v}')
completion[delta_k] += delta_v
messages.append(completion) # 直接在传入参数 messages 中追加消息
except Exception as err:
return (False, f'OpenAI API 异常: {err}')
可以看出for event in response中,response相当于是一个 message队列,这和flask_sse的订阅模式的实现很像。
python
list = []
@redis_bp.route('/helloMe', methods=["GET", "POST"])
def helloMe():
list.append(1)
list.append(2)
list.append(3)
list.append(4)
for i in list:
print(i)
time.sleep(3)
print("循环结束")
return "helloMe"
@redis_bp.route('/helloAdd', methods=["GET", "POST"])
def helloAdd():
list.append(random.randint(1, 100))
return "add"
list = []
@redis_bp.route('/helloMe', methods=["GET", "POST"])
def helloMe():
list.append(1)
list.append(2)
list.append(3)
list.append(4)
for i in list:
print(i)
time.sleep(3)
print("循环结束")
return "helloMe"
@redis_bp.route('/helloAdd', methods=["GET", "POST"])
def helloAdd():
list.append(random.randint(1, 100))
return "add"
JavaScript
js
var source = new EventSource(url, { withCredentials: true });
source.onopen = function (event) {
};
source.onmessage = function (event) {
var data = event.data;
};
source.onerror = function (event) {
// handle error event
};
// close方法用于关闭 SSE 连接。
source.close();
var source = new EventSource(url, { withCredentials: true });
source.onopen = function (event) {
};
source.onmessage = function (event) {
var data = event.data;
};
source.onerror = function (event) {
// handle error event
};
// close方法用于关闭 SSE 连接。
source.close();
可以监听 open、error、close 等其他事件:
- open: 连接建立时触发
- error: 发生错误时触发
- close: 连接关闭时触发
上面的监听也可以使用
js
source.addEventListener("open", (e) => {
console.log("连接打开!");
}, false);
source.addEventListener("error", (e) => {
if (e.readyState == EventSource.CLOSED) {
console.log("连接已关闭!");
} else {
console.log("发生错误!");
}
}, false);
source.addEventListener("open", (e) => {
console.log("连接打开!");
}, false);
source.addEventListener("error", (e) => {
if (e.readyState == EventSource.CLOSED) {
console.log("连接已关闭!");
} else {
console.log("发生错误!");
}
}, false);
在 EventSource 的 addEventListener 方法中,第三个参数为 false,表示:
- 使用事件捕获模式而非冒泡模式。
事件传播有两种模式:
- 冒泡(bubble):从目标元素开始,逐级向上祖先元素传播。
- 捕获(capture):从祖先元素开始,逐级向下传播到目标元素。 所以 false 表示使用捕获模式,事件将从窗口根节点开始向下捕获到目标元素。 如果使用 true 表示使用冒泡模式,事件将从目标元素开始向上冒泡到窗口根节点。
Android端实现(客户端)
java
fun getDataCenterRecordBySSE() {
mainScope.launch {
try {
val clientBuilder = OkHttpClient.Builder()
.connectTimeout(10 * 1000, TimeUnit.MILLISECONDS)
.readTimeout(5 * 1000, TimeUnit.MILLISECONDS)
.writeTimeout(5 * 1000, TimeUnit.MILLISECONDS)
val listener = object : EventSourceListener() {
override fun onOpen(eventSource: EventSource, response: Response) {
super.onOpen(eventSource, response)
println("SSE链接 connection opened: $response")
}
override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) {
println("SSE链接 Received event: $data id: $id type: $type")
}
override fun onClosed(eventSource: EventSource) {
super.onClosed(eventSource)
println("SSE链接 connection closed")
}
override fun onFailure(eventSource: EventSource, t: Throwable?, response: okhttp3.Response?) {
super.onFailure(eventSource, t, response)
println("SSE链接 connection failure: ${t?.message}")
}
}
val httpClient = clientBuilder.build()
val request = Request.Builder()
.url("http://192.168.13.20:8000/stream")
.addHeader("Accept", "text/event-stream")
.build()
val realEventSource1 = RealEventSource(request = request, listener = listener)
realEventSource1.connect(httpClient)
val realEventSource2 = EventSources.createFactory(httpClient).newEventSource(request, listener)
} catch (e: java.lang.Exception) {
e.printStackTrace()
}
}
}
fun getDataCenterRecordBySSE() {
mainScope.launch {
try {
val clientBuilder = OkHttpClient.Builder()
.connectTimeout(10 * 1000, TimeUnit.MILLISECONDS)
.readTimeout(5 * 1000, TimeUnit.MILLISECONDS)
.writeTimeout(5 * 1000, TimeUnit.MILLISECONDS)
val listener = object : EventSourceListener() {
override fun onOpen(eventSource: EventSource, response: Response) {
super.onOpen(eventSource, response)
println("SSE链接 connection opened: $response")
}
override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) {
println("SSE链接 Received event: $data id: $id type: $type")
}
override fun onClosed(eventSource: EventSource) {
super.onClosed(eventSource)
println("SSE链接 connection closed")
}
override fun onFailure(eventSource: EventSource, t: Throwable?, response: okhttp3.Response?) {
super.onFailure(eventSource, t, response)
println("SSE链接 connection failure: ${t?.message}")
}
}
val httpClient = clientBuilder.build()
val request = Request.Builder()
.url("http://192.168.13.20:8000/stream")
.addHeader("Accept", "text/event-stream")
.build()
val realEventSource1 = RealEventSource(request = request, listener = listener)
realEventSource1.connect(httpClient)
val realEventSource2 = EventSources.createFactory(httpClient).newEventSource(request, listener)
} catch (e: java.lang.Exception) {
e.printStackTrace()
}
}
}
基于Flask-sse的实现(现已废弃,简化逻辑,订阅不如用Socket-IO这个库)
python
# -*- coding: UTF-8 -*-
import random
import time
import openai
from flask import Blueprint, request, render_template
from flask_sse import sse
import ServiceRedis
import ToolsAPI
from ResultEntity import ResultEntity
redis_bp = Blueprint('redis_bp', __name__)
@redis_bp.route('/test_sse_page', methods=["GET", "POST"])
def test_sse_page():
channel = "my_channel"
text = "这是给你们的消息"
return render_template("sse_send_message.html", channel=channel, text=text)
@redis_bp.route('/test_sse_send', methods=["GET", "POST"])
def test_sse_send():
# sse推送消息
channel = request.get_json()['channel']
text = request.get_json()['text']
print("channel:{},message:{}".format(channel, text))
# 线程内部推送消息,type为事件类型,retry为重试时间,单位为毫秒,channel为频道。
# 匹配规则:
# 1. 首先匹配频道channel,如果没有设置channel,默认为sse,onmessage事件可以接收到消息。
# 2. 其次匹配事件type,即匹配了channel,又匹配了type,两者都匹配才能接收到事件消息。如果匹配到事件,那么订阅消息就不再发送了(因为事件更具体)。
timestamp = int(round(time.time() * 1000))
data = {
"body": {
"type": "text",
"content": "已经处理事件"
},
"code": 0,
"message": "success",
"timestamp": timestamp,
"ts_ms": timestamp
}
# sse.publish(data, channel=channel)
# sse.publish(data, type='event_greeting')
# sse.publish(data, channel=channel, type='event_girl')
sse.publish(data, channel=channel, type='event_chat', retry=3000)
return ResultEntity.success()
@redis_bp.route('/sse_chat', methods=["GET", "POST"])
def sseChat():
channel = "my_channel"
text = "这是给你们的消息"
user_name = "user_name: {}".format(random.randint(0, 80))
return render_template("sse_chat.html", channel=channel, text=text, user_name=user_name)
# 专属http长连接
@redis_bp.route('/stream', methods=["GET", "POST"])
def stream():
print("客户端:stream连接")
return sse.stream()
def handleChatGPT(channel, message_id, messages):
# 记住上下文:role目前有3个取值:
# user。表示提交prompt的一方。
# assistant。表示给出completion响应的一方,实际上就是ChatGPT本身。
# system。message里role为system,是为了让ChatGPT在对话过程中设定自己的行为,目前role为system的消息没有太大的实际作用。
# 但是全部传递非常消耗Token,第一种方式就是每次发送请求时,不用带上全部历史对话记录,只带上最近几轮对话的记录。
# 比如gpt-3.5-turbo支持的最大上下文长度是4097个token,如果单次请求和响应里包含的token数量超过这个数,ChatGPT就会返回如下错误
# 如就带上最近6条对话记录(3条prompt,3条completion),减少单次请求里包含的token数量。
# 在用户请求的时候判断长度,然后把提示给到用户,不再请求API。
# messages=[
# {"role": "system", "content": "You are a helpful assistant."},
# {"role": "user", "content": "Who won the world series in 2020?"},
# {"role": "assistant", "content": "The Los Angeles Dodgers won the World Series in 2020."},
# {"role": "user", "content": "Where was it played?"}
# ]
try:
response = openai.ChatCompletion.create(
model='gpt-3.5-turbo',
# model='gpt-4',
# max_tokens=193,
# temperature=0,
messages=messages,
stream=True,
)
completion = {'role': '', 'content': ''}
for event in response:
if event['choices'][0]['finish_reason'] == 'stop':
# 这个回调和max_tokens有关,如果max_tokens设置的太小,会导致流响应数据不完整,这个回调就不会触发。
# {'role': 'assistant', 'content': '题目:我的旅行日记\n\n我一直梦想着能够环游世界,去看看那些美丽的风景和神奇的文化。最近,我终于有机会去了一次旅行,这是我人生中最难忘的经历之一。\n\n我的旅行开始于一个阳光明媚的早晨。我和我的朋友们一起坐上了飞往巴黎的飞机。在飞机上,我兴奋地想着即将到来的冒险和新奇的体验。\n\n当我们到达巴黎时,我被这个城市的美丽和浪漫所吸引。我们参观了埃菲尔铁塔、卢浮宫和巴黎圣母院等著名景点。我还品尝了法国美食,如鹅肝、蜗牛和红酒等。\n\n接下来,我们前往意大利。在那里,我看到了许多美丽的建筑和艺术品,如梵蒂冈博物馆和斗兽场。我还品尝了意大利美食,如比萨和意大利面。\n\n最后,我们去了希腊。在那里,我看到了许多古代遗迹和美丽的海滩。我还品尝了希腊美食,如羊肉和橄榄油。\n\n整个旅行中,我学到了很多东西,如如何适应不同的文化和如何与不同的人交流。我也收获了许多美好的回忆和友谊。\n\n这次旅行让我更加热爱旅行和探索未知的世界。我相信,这只是我未来旅行的开始。'}
print(f'收到的完成数据: {completion}')
break
for key_role, key_role_value in event['choices'][0]['delta'].items():
print(f'流响应数据: {key_role} = {key_role_value}')
# 每个key_role_value都是一个字符,+= key_role_value等于是把每个字符拼接起来。
completion[key_role] += key_role_value
# 响应中
if "content" in key_role:
sse.publish(ResultEntity.getMinDict({
"type": "text",
"content": key_role_value,
"message_id": message_id
}), type="event_receive", channel=channel, retry=3000)
else:
print("机器人:{} {}".format(key_role, key_role_value))
# 响应开始
sse.publish(ResultEntity.getMinDict({
"type": "text",
"content": "",
"message_id": message_id
}), type="event_start", channel=channel, retry=3000)
# 记录上下文
messages.append(completion)
except Exception as err:
print(f'OpenAI API 异常: {err}')
def handleMockGPT(channel, message_id, messages):
completion = {'role': '', 'content': ''}
# 响应开始
sse.publish(ResultEntity.getMinDict({
"type": "text",
"content": "",
"message_id": message_id
}), type="event_start", channel=channel, retry=3000)
message = "你好,我是机器人,我是来帮你解决问题的。如果你有什么问题,可以直接问我。我会尽力帮你解决的。但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n"
# 一个字符一个字符的发送
for i in range(len(message)):
sse.publish(ResultEntity.getMinDict({
"type": "text",
"content": message[i],
"message_id": message_id
}), type="event_receive", channel=channel, retry=3000)
time.sleep(0.05)
@redis_bp.route('/fetch_stream', methods=["GET", "POST"])
def fetchStream():
# 延迟1秒,等待客户端连接
time.sleep(1)
apiKey = ToolsAPI.getApiKey()
if request.method == 'POST':
characterId = request.form.get('cid')
message_id = request.form.get('message_id')
channel = request.form.get('channel') or 'sse'
text = request.form.get('text')
sign = request.form.get('sign')
timestamp = request.form.get('timestamp')
else:
characterId = request.args.get('cid')
message_id = request.args.get('message_id')
channel = request.args.get('channel') or 'sse'
text = request.args.get('text')
sign = request.args.get('sign')
timestamp = request.args.get('timestamp')
prompt = ServiceRedis.getPrompt(characterId)
if prompt is not None:
# 替换prompt中的XXX为text
text = prompt.replace("XXX", text)
print("当前prompt:{}".format(prompt))
print("注册频道:{},cid:{},text:{},sign:{},timestamp:{}".format(channel, characterId, text, sign, timestamp))
openai.api_base = "https://api.chatanywhere.com.cn/v1"
openai.api_key = apiKey
messages = [
{
'role': 'user',
'content': text
}
]
# 当前消息的唯一标识:message_id 由前端生成或者后端生成,仅仅是为了标识当前UI列表中的消息(用于更新UI控件),没有其他作用。
if message_id is None or message_id == "":
message_id = "message_{}".format(random.randint(0, 100000))
# handleChatGPT(channel, message_id, messages)
handleMockGPT(channel, message_id, messages)
footer = "如果希望得到更多的有趣的功能,可以联系我哦!公众号:Alien的小窝"
for i in range(len(footer)):
sse.publish(ResultEntity.getMinDict({
"type": "text",
"content": footer[i],
"message_id": message_id
}), type="event_receive", channel=channel, retry=3000)
time.sleep(0.05)
# 响应结束
sse.publish(ResultEntity.getMinDict({
"type": "text",
"content": "",
"message_id": message_id
}), channel=channel, type="event_end", retry=3000)
return ResultEntity.success("success")
# -*- coding: UTF-8 -*-
import random
import time
import openai
from flask import Blueprint, request, render_template
from flask_sse import sse
import ServiceRedis
import ToolsAPI
from ResultEntity import ResultEntity
redis_bp = Blueprint('redis_bp', __name__)
@redis_bp.route('/test_sse_page', methods=["GET", "POST"])
def test_sse_page():
channel = "my_channel"
text = "这是给你们的消息"
return render_template("sse_send_message.html", channel=channel, text=text)
@redis_bp.route('/test_sse_send', methods=["GET", "POST"])
def test_sse_send():
# sse推送消息
channel = request.get_json()['channel']
text = request.get_json()['text']
print("channel:{},message:{}".format(channel, text))
# 线程内部推送消息,type为事件类型,retry为重试时间,单位为毫秒,channel为频道。
# 匹配规则:
# 1. 首先匹配频道channel,如果没有设置channel,默认为sse,onmessage事件可以接收到消息。
# 2. 其次匹配事件type,即匹配了channel,又匹配了type,两者都匹配才能接收到事件消息。如果匹配到事件,那么订阅消息就不再发送了(因为事件更具体)。
timestamp = int(round(time.time() * 1000))
data = {
"body": {
"type": "text",
"content": "已经处理事件"
},
"code": 0,
"message": "success",
"timestamp": timestamp,
"ts_ms": timestamp
}
# sse.publish(data, channel=channel)
# sse.publish(data, type='event_greeting')
# sse.publish(data, channel=channel, type='event_girl')
sse.publish(data, channel=channel, type='event_chat', retry=3000)
return ResultEntity.success()
@redis_bp.route('/sse_chat', methods=["GET", "POST"])
def sseChat():
channel = "my_channel"
text = "这是给你们的消息"
user_name = "user_name: {}".format(random.randint(0, 80))
return render_template("sse_chat.html", channel=channel, text=text, user_name=user_name)
# 专属http长连接
@redis_bp.route('/stream', methods=["GET", "POST"])
def stream():
print("客户端:stream连接")
return sse.stream()
def handleChatGPT(channel, message_id, messages):
# 记住上下文:role目前有3个取值:
# user。表示提交prompt的一方。
# assistant。表示给出completion响应的一方,实际上就是ChatGPT本身。
# system。message里role为system,是为了让ChatGPT在对话过程中设定自己的行为,目前role为system的消息没有太大的实际作用。
# 但是全部传递非常消耗Token,第一种方式就是每次发送请求时,不用带上全部历史对话记录,只带上最近几轮对话的记录。
# 比如gpt-3.5-turbo支持的最大上下文长度是4097个token,如果单次请求和响应里包含的token数量超过这个数,ChatGPT就会返回如下错误
# 如就带上最近6条对话记录(3条prompt,3条completion),减少单次请求里包含的token数量。
# 在用户请求的时候判断长度,然后把提示给到用户,不再请求API。
# messages=[
# {"role": "system", "content": "You are a helpful assistant."},
# {"role": "user", "content": "Who won the world series in 2020?"},
# {"role": "assistant", "content": "The Los Angeles Dodgers won the World Series in 2020."},
# {"role": "user", "content": "Where was it played?"}
# ]
try:
response = openai.ChatCompletion.create(
model='gpt-3.5-turbo',
# model='gpt-4',
# max_tokens=193,
# temperature=0,
messages=messages,
stream=True,
)
completion = {'role': '', 'content': ''}
for event in response:
if event['choices'][0]['finish_reason'] == 'stop':
# 这个回调和max_tokens有关,如果max_tokens设置的太小,会导致流响应数据不完整,这个回调就不会触发。
# {'role': 'assistant', 'content': '题目:我的旅行日记\n\n我一直梦想着能够环游世界,去看看那些美丽的风景和神奇的文化。最近,我终于有机会去了一次旅行,这是我人生中最难忘的经历之一。\n\n我的旅行开始于一个阳光明媚的早晨。我和我的朋友们一起坐上了飞往巴黎的飞机。在飞机上,我兴奋地想着即将到来的冒险和新奇的体验。\n\n当我们到达巴黎时,我被这个城市的美丽和浪漫所吸引。我们参观了埃菲尔铁塔、卢浮宫和巴黎圣母院等著名景点。我还品尝了法国美食,如鹅肝、蜗牛和红酒等。\n\n接下来,我们前往意大利。在那里,我看到了许多美丽的建筑和艺术品,如梵蒂冈博物馆和斗兽场。我还品尝了意大利美食,如比萨和意大利面。\n\n最后,我们去了希腊。在那里,我看到了许多古代遗迹和美丽的海滩。我还品尝了希腊美食,如羊肉和橄榄油。\n\n整个旅行中,我学到了很多东西,如如何适应不同的文化和如何与不同的人交流。我也收获了许多美好的回忆和友谊。\n\n这次旅行让我更加热爱旅行和探索未知的世界。我相信,这只是我未来旅行的开始。'}
print(f'收到的完成数据: {completion}')
break
for key_role, key_role_value in event['choices'][0]['delta'].items():
print(f'流响应数据: {key_role} = {key_role_value}')
# 每个key_role_value都是一个字符,+= key_role_value等于是把每个字符拼接起来。
completion[key_role] += key_role_value
# 响应中
if "content" in key_role:
sse.publish(ResultEntity.getMinDict({
"type": "text",
"content": key_role_value,
"message_id": message_id
}), type="event_receive", channel=channel, retry=3000)
else:
print("机器人:{} {}".format(key_role, key_role_value))
# 响应开始
sse.publish(ResultEntity.getMinDict({
"type": "text",
"content": "",
"message_id": message_id
}), type="event_start", channel=channel, retry=3000)
# 记录上下文
messages.append(completion)
except Exception as err:
print(f'OpenAI API 异常: {err}')
def handleMockGPT(channel, message_id, messages):
completion = {'role': '', 'content': ''}
# 响应开始
sse.publish(ResultEntity.getMinDict({
"type": "text",
"content": "",
"message_id": message_id
}), type="event_start", channel=channel, retry=3000)
message = "你好,我是机器人,我是来帮你解决问题的。如果你有什么问题,可以直接问我。我会尽力帮你解决的。但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n"
# 一个字符一个字符的发送
for i in range(len(message)):
sse.publish(ResultEntity.getMinDict({
"type": "text",
"content": message[i],
"message_id": message_id
}), type="event_receive", channel=channel, retry=3000)
time.sleep(0.05)
@redis_bp.route('/fetch_stream', methods=["GET", "POST"])
def fetchStream():
# 延迟1秒,等待客户端连接
time.sleep(1)
apiKey = ToolsAPI.getApiKey()
if request.method == 'POST':
characterId = request.form.get('cid')
message_id = request.form.get('message_id')
channel = request.form.get('channel') or 'sse'
text = request.form.get('text')
sign = request.form.get('sign')
timestamp = request.form.get('timestamp')
else:
characterId = request.args.get('cid')
message_id = request.args.get('message_id')
channel = request.args.get('channel') or 'sse'
text = request.args.get('text')
sign = request.args.get('sign')
timestamp = request.args.get('timestamp')
prompt = ServiceRedis.getPrompt(characterId)
if prompt is not None:
# 替换prompt中的XXX为text
text = prompt.replace("XXX", text)
print("当前prompt:{}".format(prompt))
print("注册频道:{},cid:{},text:{},sign:{},timestamp:{}".format(channel, characterId, text, sign, timestamp))
openai.api_base = "https://api.chatanywhere.com.cn/v1"
openai.api_key = apiKey
messages = [
{
'role': 'user',
'content': text
}
]
# 当前消息的唯一标识:message_id 由前端生成或者后端生成,仅仅是为了标识当前UI列表中的消息(用于更新UI控件),没有其他作用。
if message_id is None or message_id == "":
message_id = "message_{}".format(random.randint(0, 100000))
# handleChatGPT(channel, message_id, messages)
handleMockGPT(channel, message_id, messages)
footer = "如果希望得到更多的有趣的功能,可以联系我哦!公众号:Alien的小窝"
for i in range(len(footer)):
sse.publish(ResultEntity.getMinDict({
"type": "text",
"content": footer[i],
"message_id": message_id
}), type="event_receive", channel=channel, retry=3000)
time.sleep(0.05)
# 响应结束
sse.publish(ResultEntity.getMinDict({
"type": "text",
"content": "",
"message_id": message_id
}), channel=channel, type="event_end", retry=3000)
return ResultEntity.success("success")
参考链接
Server-sent events(SSE)是一种用于实现服务器到客户端的单向通信的协议。使用SSE,服务器可以向客户端推送实时数据,而无需客户端发出请求。