Skip to content

商务合作:vTikTok


公众号:



服务器端(最佳实现)

pip install flask-sse

客户端关闭连接

js
if (event.data === '[DONE]') {
    source.close()
} else {
    document.getElementById("result").innerHTML += event.data + "<br>";
}
if (event.data === '[DONE]') {
    source.close()
} else {
    document.getElementById("result").innerHTML += event.data + "<br>";
}

服务器端(基础的EventStream实现)

python
# -*- coding: UTF-8 -*-
import logging
import time

from flask import Blueprint
from flask import Flask, Response, stream_with_context

route_bp = Blueprint('', __name__)

def format_sse(data: str, event=None) -> str:
    msg = f'data: {data}\n\n'
    if event is not None:
        msg = f'event: {event}\n{msg}'
    return msg

def event_stream():
    for i in range(4):
        # yield f'Data {i}\n\n'
        yield format_sse(f'Data {i}', 'message')
        time.sleep(1)

@route_bp.route('/stream', methods=['GET'])
def stream():
    res = Response(stream_with_context(event_stream()), mimetype='text/event-stream')
    return res

app = Flask(__name__, static_folder="static", static_url_path="/static")
app.config['SESSION_TYPE'] = 'filesystem'
app.config['SECRET_KEY'] = b'axx1ab234szf_007'

app.register_blueprint(route_bp)

app.logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
app.logger.addHandler(handler)

if __name__ == '__main__':
    # 只有改为0.0.0.0,同一局域网下的其他设备才可以访问,否则就只能自嗨:同一个设备下只能访问自身!
    app.run(host="0.0.0.0", port=8000)
# -*- coding: UTF-8 -*-
import logging
import time

from flask import Blueprint
from flask import Flask, Response, stream_with_context

route_bp = Blueprint('', __name__)

def format_sse(data: str, event=None) -> str:
    msg = f'data: {data}\n\n'
    if event is not None:
        msg = f'event: {event}\n{msg}'
    return msg

def event_stream():
    for i in range(4):
        # yield f'Data {i}\n\n'
        yield format_sse(f'Data {i}', 'message')
        time.sleep(1)

@route_bp.route('/stream', methods=['GET'])
def stream():
    res = Response(stream_with_context(event_stream()), mimetype='text/event-stream')
    return res

app = Flask(__name__, static_folder="static", static_url_path="/static")
app.config['SESSION_TYPE'] = 'filesystem'
app.config['SECRET_KEY'] = b'axx1ab234szf_007'

app.register_blueprint(route_bp)

app.logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
app.logger.addHandler(handler)

if __name__ == '__main__':
    # 只有改为0.0.0.0,同一局域网下的其他设备才可以访问,否则就只能自嗨:同一个设备下只能访问自身!
    app.run(host="0.0.0.0", port=8000)

原理剖析

根据这段代码

python
for i in range(4):
    # yield f'Data {i}\n\n'
    yield format_sse(f'Data {i}', 'message')
    time.sleep(1)
for i in range(4):
    # yield f'Data {i}\n\n'
    yield format_sse(f'Data {i}', 'message')
    time.sleep(1)

以及GPT的代码

python
def gpt_35_api_stream(messages: list):

    completion = {'role': '', 'content': ''}

    try:
        response = openai.ChatCompletion.create(
            model='gpt-3.5-turbo',
            messages=messages,
            stream=True,
        )
        completion = {'role': '', 'content': ''}
        for event in response:
            if event['choices'][0]['finish_reason'] == 'stop':
                print(f'收到的完成数据: {completion}')
                break
            for delta_k, delta_v in event['choices'][0]['delta'].items():
                print(f'流响应数据: {delta_k} = {delta_v}')
                completion[delta_k] += delta_v

        messages.append(completion)  # 直接在传入参数 messages 中追加消息

    except Exception as err:
        return (False, f'OpenAI API 异常: {err}')
def gpt_35_api_stream(messages: list):

    completion = {'role': '', 'content': ''}

    try:
        response = openai.ChatCompletion.create(
            model='gpt-3.5-turbo',
            messages=messages,
            stream=True,
        )
        completion = {'role': '', 'content': ''}
        for event in response:
            if event['choices'][0]['finish_reason'] == 'stop':
                print(f'收到的完成数据: {completion}')
                break
            for delta_k, delta_v in event['choices'][0]['delta'].items():
                print(f'流响应数据: {delta_k} = {delta_v}')
                completion[delta_k] += delta_v

        messages.append(completion)  # 直接在传入参数 messages 中追加消息

    except Exception as err:
        return (False, f'OpenAI API 异常: {err}')

可以看出for event in response中,response相当于是一个 message队列,这和flask_sse的订阅模式的实现很像。

python
list = []
@redis_bp.route('/helloMe', methods=["GET", "POST"])
def helloMe():
    list.append(1)
    list.append(2)
    list.append(3)
    list.append(4)
    for i in list:
        print(i)
        time.sleep(3)
    print("循环结束")
    return "helloMe"

@redis_bp.route('/helloAdd', methods=["GET", "POST"])
def helloAdd():
    list.append(random.randint(1, 100))
    return "add"
list = []
@redis_bp.route('/helloMe', methods=["GET", "POST"])
def helloMe():
    list.append(1)
    list.append(2)
    list.append(3)
    list.append(4)
    for i in list:
        print(i)
        time.sleep(3)
    print("循环结束")
    return "helloMe"

@redis_bp.route('/helloAdd', methods=["GET", "POST"])
def helloAdd():
    list.append(random.randint(1, 100))
    return "add"

JavaScript

js
var source = new EventSource(url, { withCredentials: true });
source.onopen = function (event) {

};

source.onmessage = function (event) {
  var data = event.data;

};

source.onerror = function (event) {
  // handle error event
};

// close方法用于关闭 SSE 连接。
source.close();
var source = new EventSource(url, { withCredentials: true });
source.onopen = function (event) {

};

source.onmessage = function (event) {
  var data = event.data;

};

source.onerror = function (event) {
  // handle error event
};

// close方法用于关闭 SSE 连接。
source.close();

可以监听 open、error、close 等其他事件:

  • open: 连接建立时触发
  • error: 发生错误时触发
  • close: 连接关闭时触发

上面的监听也可以使用

js
source.addEventListener("open", (e) => {
  console.log("连接打开!");
}, false);

source.addEventListener("error", (e) => {
  if (e.readyState == EventSource.CLOSED) {
    console.log("连接已关闭!");
  } else {
    console.log("发生错误!");
  }  
}, false);
source.addEventListener("open", (e) => {
  console.log("连接打开!");
}, false);

source.addEventListener("error", (e) => {
  if (e.readyState == EventSource.CLOSED) {
    console.log("连接已关闭!");
  } else {
    console.log("发生错误!");
  }  
}, false);

在 EventSource 的 addEventListener 方法中,第三个参数为 false,表示:

  • 使用事件捕获模式而非冒泡模式。

事件传播有两种模式:

  • 冒泡(bubble):从目标元素开始,逐级向上祖先元素传播。
  • 捕获(capture):从祖先元素开始,逐级向下传播到目标元素。 所以 false 表示使用捕获模式,事件将从窗口根节点开始向下捕获到目标元素。 如果使用 true 表示使用冒泡模式,事件将从目标元素开始向上冒泡到窗口根节点。

Android端实现(客户端)

java
fun getDataCenterRecordBySSE() {
    mainScope.launch {
        try {
            val clientBuilder = OkHttpClient.Builder()
                .connectTimeout(10 * 1000, TimeUnit.MILLISECONDS)
                .readTimeout(5 * 1000, TimeUnit.MILLISECONDS)
                .writeTimeout(5 * 1000, TimeUnit.MILLISECONDS)

            val listener = object : EventSourceListener() {

                override fun onOpen(eventSource: EventSource, response: Response) {
                    super.onOpen(eventSource, response)
                    println("SSE链接 connection opened: $response")
                }

                override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) {
                    println("SSE链接 Received event: $data id: $id type: $type")
                }

                override fun onClosed(eventSource: EventSource) {
                    super.onClosed(eventSource)
                    println("SSE链接 connection closed")
                }

                override fun onFailure(eventSource: EventSource, t: Throwable?, response: okhttp3.Response?) {
                    super.onFailure(eventSource, t, response)
                    println("SSE链接 connection failure: ${t?.message}")
                }
            }

            val httpClient = clientBuilder.build()
            val request = Request.Builder()
                .url("http://192.168.13.20:8000/stream")
                .addHeader("Accept", "text/event-stream")
                .build()

            val realEventSource1 = RealEventSource(request = request, listener = listener)
            realEventSource1.connect(httpClient)
            val realEventSource2 = EventSources.createFactory(httpClient).newEventSource(request, listener)

        } catch (e: java.lang.Exception) {
            e.printStackTrace()
        }
    }
}
fun getDataCenterRecordBySSE() {
    mainScope.launch {
        try {
            val clientBuilder = OkHttpClient.Builder()
                .connectTimeout(10 * 1000, TimeUnit.MILLISECONDS)
                .readTimeout(5 * 1000, TimeUnit.MILLISECONDS)
                .writeTimeout(5 * 1000, TimeUnit.MILLISECONDS)

            val listener = object : EventSourceListener() {

                override fun onOpen(eventSource: EventSource, response: Response) {
                    super.onOpen(eventSource, response)
                    println("SSE链接 connection opened: $response")
                }

                override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) {
                    println("SSE链接 Received event: $data id: $id type: $type")
                }

                override fun onClosed(eventSource: EventSource) {
                    super.onClosed(eventSource)
                    println("SSE链接 connection closed")
                }

                override fun onFailure(eventSource: EventSource, t: Throwable?, response: okhttp3.Response?) {
                    super.onFailure(eventSource, t, response)
                    println("SSE链接 connection failure: ${t?.message}")
                }
            }

            val httpClient = clientBuilder.build()
            val request = Request.Builder()
                .url("http://192.168.13.20:8000/stream")
                .addHeader("Accept", "text/event-stream")
                .build()

            val realEventSource1 = RealEventSource(request = request, listener = listener)
            realEventSource1.connect(httpClient)
            val realEventSource2 = EventSources.createFactory(httpClient).newEventSource(request, listener)

        } catch (e: java.lang.Exception) {
            e.printStackTrace()
        }
    }
}

基于Flask-sse的实现(现已废弃,简化逻辑,订阅不如用Socket-IO这个库)

python
# -*- coding: UTF-8 -*-
import random
import time

import openai
from flask import Blueprint, request, render_template
from flask_sse import sse

import ServiceRedis
import ToolsAPI
from ResultEntity import ResultEntity

redis_bp = Blueprint('redis_bp', __name__)

@redis_bp.route('/test_sse_page', methods=["GET", "POST"])
def test_sse_page():
    channel = "my_channel"
    text = "这是给你们的消息"
    return render_template("sse_send_message.html", channel=channel, text=text)

@redis_bp.route('/test_sse_send', methods=["GET", "POST"])
def test_sse_send():
    # sse推送消息
    channel = request.get_json()['channel']
    text = request.get_json()['text']
    print("channel:{},message:{}".format(channel, text))

    # 线程内部推送消息,type为事件类型,retry为重试时间,单位为毫秒,channel为频道。
    # 匹配规则:
    # 1. 首先匹配频道channel,如果没有设置channel,默认为sse,onmessage事件可以接收到消息。
    # 2. 其次匹配事件type,即匹配了channel,又匹配了type,两者都匹配才能接收到事件消息。如果匹配到事件,那么订阅消息就不再发送了(因为事件更具体)。

    timestamp = int(round(time.time() * 1000))

    data = {
        "body": {
            "type": "text",
            "content": "已经处理事件"
        },
        "code": 0,
        "message": "success",
        "timestamp": timestamp,
        "ts_ms": timestamp
    }

    # sse.publish(data, channel=channel)
    # sse.publish(data, type='event_greeting')
    # sse.publish(data, channel=channel, type='event_girl')

    sse.publish(data, channel=channel, type='event_chat', retry=3000)

    return ResultEntity.success()

@redis_bp.route('/sse_chat', methods=["GET", "POST"])
def sseChat():
    channel = "my_channel"
    text = "这是给你们的消息"
    user_name = "user_name: {}".format(random.randint(0, 80))
    return render_template("sse_chat.html", channel=channel, text=text, user_name=user_name)

# 专属http长连接
@redis_bp.route('/stream', methods=["GET", "POST"])
def stream():
    print("客户端:stream连接")
    return sse.stream()

def handleChatGPT(channel, message_id, messages):
    # 记住上下文:role目前有3个取值:
    # user。表示提交prompt的一方。
    # assistant。表示给出completion响应的一方,实际上就是ChatGPT本身。
    # system。message里role为system,是为了让ChatGPT在对话过程中设定自己的行为,目前role为system的消息没有太大的实际作用。
    # 但是全部传递非常消耗Token,第一种方式就是每次发送请求时,不用带上全部历史对话记录,只带上最近几轮对话的记录。
    # 比如gpt-3.5-turbo支持的最大上下文长度是4097个token,如果单次请求和响应里包含的token数量超过这个数,ChatGPT就会返回如下错误
    # 如就带上最近6条对话记录(3条prompt,3条completion),减少单次请求里包含的token数量。
    # 在用户请求的时候判断长度,然后把提示给到用户,不再请求API。

    #  messages=[
    #         {"role": "system", "content": "You are a helpful assistant."},
    #         {"role": "user", "content": "Who won the world series in 2020?"},
    #         {"role": "assistant", "content": "The Los Angeles Dodgers won the World Series in 2020."},
    #         {"role": "user", "content": "Where was it played?"}
    #     ]

    try:
        response = openai.ChatCompletion.create(
            model='gpt-3.5-turbo',
            # model='gpt-4',
            # max_tokens=193,
            # temperature=0,
            messages=messages,
            stream=True,
        )
        completion = {'role': '', 'content': ''}

        for event in response:
            if event['choices'][0]['finish_reason'] == 'stop':
                # 这个回调和max_tokens有关,如果max_tokens设置的太小,会导致流响应数据不完整,这个回调就不会触发。
                # {'role': 'assistant', 'content': '题目:我的旅行日记\n\n我一直梦想着能够环游世界,去看看那些美丽的风景和神奇的文化。最近,我终于有机会去了一次旅行,这是我人生中最难忘的经历之一。\n\n我的旅行开始于一个阳光明媚的早晨。我和我的朋友们一起坐上了飞往巴黎的飞机。在飞机上,我兴奋地想着即将到来的冒险和新奇的体验。\n\n当我们到达巴黎时,我被这个城市的美丽和浪漫所吸引。我们参观了埃菲尔铁塔、卢浮宫和巴黎圣母院等著名景点。我还品尝了法国美食,如鹅肝、蜗牛和红酒等。\n\n接下来,我们前往意大利。在那里,我看到了许多美丽的建筑和艺术品,如梵蒂冈博物馆和斗兽场。我还品尝了意大利美食,如比萨和意大利面。\n\n最后,我们去了希腊。在那里,我看到了许多古代遗迹和美丽的海滩。我还品尝了希腊美食,如羊肉和橄榄油。\n\n整个旅行中,我学到了很多东西,如如何适应不同的文化和如何与不同的人交流。我也收获了许多美好的回忆和友谊。\n\n这次旅行让我更加热爱旅行和探索未知的世界。我相信,这只是我未来旅行的开始。'}
                print(f'收到的完成数据: {completion}')
                break
            for key_role, key_role_value in event['choices'][0]['delta'].items():
                print(f'流响应数据: {key_role} = {key_role_value}')
                # 每个key_role_value都是一个字符,+= key_role_value等于是把每个字符拼接起来。
                completion[key_role] += key_role_value

                # 响应中
                if "content" in key_role:
                    sse.publish(ResultEntity.getMinDict({
                        "type": "text",
                        "content": key_role_value,
                        "message_id": message_id
                    }), type="event_receive", channel=channel, retry=3000)
                else:
                    print("机器人:{} {}".format(key_role, key_role_value))
                    # 响应开始
                    sse.publish(ResultEntity.getMinDict({
                        "type": "text",
                        "content": "",
                        "message_id": message_id
                    }), type="event_start", channel=channel, retry=3000)

        # 记录上下文
        messages.append(completion)
    except Exception as err:
        print(f'OpenAI API 异常: {err}')

def handleMockGPT(channel, message_id, messages):
    completion = {'role': '', 'content': ''}

    # 响应开始
    sse.publish(ResultEntity.getMinDict({
        "type": "text",
        "content": "",
        "message_id": message_id
    }), type="event_start", channel=channel, retry=3000)

    message = "你好,我是机器人,我是来帮你解决问题的。如果你有什么问题,可以直接问我。我会尽力帮你解决的。但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n"
    # 一个字符一个字符的发送
    for i in range(len(message)):
        sse.publish(ResultEntity.getMinDict({
            "type": "text",
            "content": message[i],
            "message_id": message_id
        }), type="event_receive", channel=channel, retry=3000)
        time.sleep(0.05)

@redis_bp.route('/fetch_stream', methods=["GET", "POST"])
def fetchStream():
    # 延迟1秒,等待客户端连接
    time.sleep(1)

    apiKey = ToolsAPI.getApiKey()

    if request.method == 'POST':
        characterId = request.form.get('cid')
        message_id = request.form.get('message_id')
        channel = request.form.get('channel') or 'sse'
        text = request.form.get('text')
        sign = request.form.get('sign')
        timestamp = request.form.get('timestamp')
    else:
        characterId = request.args.get('cid')
        message_id = request.args.get('message_id')
        channel = request.args.get('channel') or 'sse'
        text = request.args.get('text')
        sign = request.args.get('sign')
        timestamp = request.args.get('timestamp')

    prompt = ServiceRedis.getPrompt(characterId)
    if prompt is not None:
        # 替换prompt中的XXX为text
        text = prompt.replace("XXX", text)
    print("当前prompt:{}".format(prompt))
    print("注册频道:{},cid:{},text:{},sign:{},timestamp:{}".format(channel, characterId, text, sign, timestamp))

    openai.api_base = "https://api.chatanywhere.com.cn/v1"
    openai.api_key = apiKey

    messages = [
        {
            'role': 'user',
            'content': text
        }
    ]
    # 当前消息的唯一标识:message_id 由前端生成或者后端生成,仅仅是为了标识当前UI列表中的消息(用于更新UI控件),没有其他作用。
    if message_id is None or message_id == "":
        message_id = "message_{}".format(random.randint(0, 100000))

    # handleChatGPT(channel, message_id, messages)
    handleMockGPT(channel, message_id, messages)

    footer = "如果希望得到更多的有趣的功能,可以联系我哦!公众号:Alien的小窝"
    for i in range(len(footer)):
        sse.publish(ResultEntity.getMinDict({
            "type": "text",
            "content": footer[i],
            "message_id": message_id
        }), type="event_receive", channel=channel, retry=3000)
        time.sleep(0.05)

    # 响应结束
    sse.publish(ResultEntity.getMinDict({
        "type": "text",
        "content": "",
        "message_id": message_id
    }), channel=channel, type="event_end", retry=3000)

    return ResultEntity.success("success")
# -*- coding: UTF-8 -*-
import random
import time

import openai
from flask import Blueprint, request, render_template
from flask_sse import sse

import ServiceRedis
import ToolsAPI
from ResultEntity import ResultEntity

redis_bp = Blueprint('redis_bp', __name__)

@redis_bp.route('/test_sse_page', methods=["GET", "POST"])
def test_sse_page():
    channel = "my_channel"
    text = "这是给你们的消息"
    return render_template("sse_send_message.html", channel=channel, text=text)

@redis_bp.route('/test_sse_send', methods=["GET", "POST"])
def test_sse_send():
    # sse推送消息
    channel = request.get_json()['channel']
    text = request.get_json()['text']
    print("channel:{},message:{}".format(channel, text))

    # 线程内部推送消息,type为事件类型,retry为重试时间,单位为毫秒,channel为频道。
    # 匹配规则:
    # 1. 首先匹配频道channel,如果没有设置channel,默认为sse,onmessage事件可以接收到消息。
    # 2. 其次匹配事件type,即匹配了channel,又匹配了type,两者都匹配才能接收到事件消息。如果匹配到事件,那么订阅消息就不再发送了(因为事件更具体)。

    timestamp = int(round(time.time() * 1000))

    data = {
        "body": {
            "type": "text",
            "content": "已经处理事件"
        },
        "code": 0,
        "message": "success",
        "timestamp": timestamp,
        "ts_ms": timestamp
    }

    # sse.publish(data, channel=channel)
    # sse.publish(data, type='event_greeting')
    # sse.publish(data, channel=channel, type='event_girl')

    sse.publish(data, channel=channel, type='event_chat', retry=3000)

    return ResultEntity.success()

@redis_bp.route('/sse_chat', methods=["GET", "POST"])
def sseChat():
    channel = "my_channel"
    text = "这是给你们的消息"
    user_name = "user_name: {}".format(random.randint(0, 80))
    return render_template("sse_chat.html", channel=channel, text=text, user_name=user_name)

# 专属http长连接
@redis_bp.route('/stream', methods=["GET", "POST"])
def stream():
    print("客户端:stream连接")
    return sse.stream()

def handleChatGPT(channel, message_id, messages):
    # 记住上下文:role目前有3个取值:
    # user。表示提交prompt的一方。
    # assistant。表示给出completion响应的一方,实际上就是ChatGPT本身。
    # system。message里role为system,是为了让ChatGPT在对话过程中设定自己的行为,目前role为system的消息没有太大的实际作用。
    # 但是全部传递非常消耗Token,第一种方式就是每次发送请求时,不用带上全部历史对话记录,只带上最近几轮对话的记录。
    # 比如gpt-3.5-turbo支持的最大上下文长度是4097个token,如果单次请求和响应里包含的token数量超过这个数,ChatGPT就会返回如下错误
    # 如就带上最近6条对话记录(3条prompt,3条completion),减少单次请求里包含的token数量。
    # 在用户请求的时候判断长度,然后把提示给到用户,不再请求API。

    #  messages=[
    #         {"role": "system", "content": "You are a helpful assistant."},
    #         {"role": "user", "content": "Who won the world series in 2020?"},
    #         {"role": "assistant", "content": "The Los Angeles Dodgers won the World Series in 2020."},
    #         {"role": "user", "content": "Where was it played?"}
    #     ]

    try:
        response = openai.ChatCompletion.create(
            model='gpt-3.5-turbo',
            # model='gpt-4',
            # max_tokens=193,
            # temperature=0,
            messages=messages,
            stream=True,
        )
        completion = {'role': '', 'content': ''}

        for event in response:
            if event['choices'][0]['finish_reason'] == 'stop':
                # 这个回调和max_tokens有关,如果max_tokens设置的太小,会导致流响应数据不完整,这个回调就不会触发。
                # {'role': 'assistant', 'content': '题目:我的旅行日记\n\n我一直梦想着能够环游世界,去看看那些美丽的风景和神奇的文化。最近,我终于有机会去了一次旅行,这是我人生中最难忘的经历之一。\n\n我的旅行开始于一个阳光明媚的早晨。我和我的朋友们一起坐上了飞往巴黎的飞机。在飞机上,我兴奋地想着即将到来的冒险和新奇的体验。\n\n当我们到达巴黎时,我被这个城市的美丽和浪漫所吸引。我们参观了埃菲尔铁塔、卢浮宫和巴黎圣母院等著名景点。我还品尝了法国美食,如鹅肝、蜗牛和红酒等。\n\n接下来,我们前往意大利。在那里,我看到了许多美丽的建筑和艺术品,如梵蒂冈博物馆和斗兽场。我还品尝了意大利美食,如比萨和意大利面。\n\n最后,我们去了希腊。在那里,我看到了许多古代遗迹和美丽的海滩。我还品尝了希腊美食,如羊肉和橄榄油。\n\n整个旅行中,我学到了很多东西,如如何适应不同的文化和如何与不同的人交流。我也收获了许多美好的回忆和友谊。\n\n这次旅行让我更加热爱旅行和探索未知的世界。我相信,这只是我未来旅行的开始。'}
                print(f'收到的完成数据: {completion}')
                break
            for key_role, key_role_value in event['choices'][0]['delta'].items():
                print(f'流响应数据: {key_role} = {key_role_value}')
                # 每个key_role_value都是一个字符,+= key_role_value等于是把每个字符拼接起来。
                completion[key_role] += key_role_value

                # 响应中
                if "content" in key_role:
                    sse.publish(ResultEntity.getMinDict({
                        "type": "text",
                        "content": key_role_value,
                        "message_id": message_id
                    }), type="event_receive", channel=channel, retry=3000)
                else:
                    print("机器人:{} {}".format(key_role, key_role_value))
                    # 响应开始
                    sse.publish(ResultEntity.getMinDict({
                        "type": "text",
                        "content": "",
                        "message_id": message_id
                    }), type="event_start", channel=channel, retry=3000)

        # 记录上下文
        messages.append(completion)
    except Exception as err:
        print(f'OpenAI API 异常: {err}')

def handleMockGPT(channel, message_id, messages):
    completion = {'role': '', 'content': ''}

    # 响应开始
    sse.publish(ResultEntity.getMinDict({
        "type": "text",
        "content": "",
        "message_id": message_id
    }), type="event_start", channel=channel, retry=3000)

    message = "你好,我是机器人,我是来帮你解决问题的。如果你有什么问题,可以直接问我。我会尽力帮你解决的。但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n但是我不是万能的,我只能回答我知道的问题。如果我不知道,我会告诉你我不知道。\n\n"
    # 一个字符一个字符的发送
    for i in range(len(message)):
        sse.publish(ResultEntity.getMinDict({
            "type": "text",
            "content": message[i],
            "message_id": message_id
        }), type="event_receive", channel=channel, retry=3000)
        time.sleep(0.05)

@redis_bp.route('/fetch_stream', methods=["GET", "POST"])
def fetchStream():
    # 延迟1秒,等待客户端连接
    time.sleep(1)

    apiKey = ToolsAPI.getApiKey()

    if request.method == 'POST':
        characterId = request.form.get('cid')
        message_id = request.form.get('message_id')
        channel = request.form.get('channel') or 'sse'
        text = request.form.get('text')
        sign = request.form.get('sign')
        timestamp = request.form.get('timestamp')
    else:
        characterId = request.args.get('cid')
        message_id = request.args.get('message_id')
        channel = request.args.get('channel') or 'sse'
        text = request.args.get('text')
        sign = request.args.get('sign')
        timestamp = request.args.get('timestamp')

    prompt = ServiceRedis.getPrompt(characterId)
    if prompt is not None:
        # 替换prompt中的XXX为text
        text = prompt.replace("XXX", text)
    print("当前prompt:{}".format(prompt))
    print("注册频道:{},cid:{},text:{},sign:{},timestamp:{}".format(channel, characterId, text, sign, timestamp))

    openai.api_base = "https://api.chatanywhere.com.cn/v1"
    openai.api_key = apiKey

    messages = [
        {
            'role': 'user',
            'content': text
        }
    ]
    # 当前消息的唯一标识:message_id 由前端生成或者后端生成,仅仅是为了标识当前UI列表中的消息(用于更新UI控件),没有其他作用。
    if message_id is None or message_id == "":
        message_id = "message_{}".format(random.randint(0, 100000))

    # handleChatGPT(channel, message_id, messages)
    handleMockGPT(channel, message_id, messages)

    footer = "如果希望得到更多的有趣的功能,可以联系我哦!公众号:Alien的小窝"
    for i in range(len(footer)):
        sse.publish(ResultEntity.getMinDict({
            "type": "text",
            "content": footer[i],
            "message_id": message_id
        }), type="event_receive", channel=channel, retry=3000)
        time.sleep(0.05)

    # 响应结束
    sse.publish(ResultEntity.getMinDict({
        "type": "text",
        "content": "",
        "message_id": message_id
    }), channel=channel, type="event_end", retry=3000)

    return ResultEntity.success("success")

参考链接

Server-sent events(SSE)是一种用于实现服务器到客户端的单向通信的协议。使用SSE,服务器可以向客户端推送实时数据,而无需客户端发出请求。

参考文章阮一峰flask-sse知乎python+flask+eventSource