SSE用 Python 像 ChatGPT 一样返回流式响应

2024-01-1509:08:23后端程序开发Comments1,422 views字数 8116阅读模式

HTTP/1.1 采用的是标准的请求-响应模型,客户端主动发请求,服务端被动地返回响应。这种模型在客户端需要实时获取结果的场景下是不合适的,因为这意味着客户端需要不断地轮询,所以最好的做法是服务端生成结果之后,主动推送给客户端。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

比如 ChatGPT,它在生成内容时,也是生成一部分,就主动向客户端推送一部分。而在这个过程中,客户端不需要做任何事情,只需等待 ChatGPT 服务端返回内容即可。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

说到这儿,你肯定想到了 WebSocket,没错这是一种解决方案。但 WebSocket 太重了,它和 HTTP 都是基于 TCP 的应用层传输协议,只不过在握手的时候搭了 HTTP 的便车,利用 HTTP 本身的协议升级特性,伪装成 HTTP,这样就能绕过浏览器沙箱、网络防火墙等限制。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

当完成握手之后,后续传输的数据就不再是 HTTP 报文,而是 WebSocket 格式的二进制帧。所以这两者完全是不同的协议,那有没有一种办法,我们仍然使用 HTTP 协议,同时还能让服务端主动推送数据呢?文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

答案是有的,也就是本文将要介绍的 SSE 技术,它的英文全称是 Server-Sent Events(服务端推送事件)。通过 SSE 可以让服务端即时推送数据到客户端,而不需要客户端轮询服务端以获取更新。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

SSE用 Python 像 ChatGPT 一样返回流式响应文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

到这你可能会问,那 WebSocket 和 SSE 有什么区别呢?文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

1)通信方式文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

WebSocket 提供全双工通信,服务端和客户端都可以在同一个连接上同时发送和接收数据。最重要的是,WebSocket 独立于 HTTP 协议,尽管它开始于一个 HTTP 握手。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

SSE 仅提供服务端到客户端的单向通信,客户端不能通过 SSE 给服务端发信息。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

2)协议和实现文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

WebSocket 使用自己的协议(ws:// 或 wss://),需要服务端和客户端都支持,并且协议比较复杂。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

SSE 则是使用标准的 HTTP 协议,实现起来更简单,尤其是在服务端。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

3)适用场景文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

WebSocket 适用于服务端和客户端之间双向实时通信的场景,如在线游戏、聊天应用等。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

SSE 适用于服务端向客户端单向推送数据的场景,如消息通知、数据更新。并且 SSE 自动支持断线重连,而 WebSocket 则需要额外部署。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

4)复杂性和资源使用文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

WebSocket 由于其双向通信的能力,通常比 SSE 更复杂,可能需要更多的资源来维护和管理连接。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

SSE 因为其单向性和基于 HTTP 的特性,它可以利用现有的网络基础设施,如代理服务器、负载均衡器和防火墙等等,通常更容易实现和维护。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html


相信现在你已经明白 SSE 是做什么的了,它的目的就是让服务端能够主动推送数据给客户端。如果不需要和服务端动态交互,只是希望服务端在有数据的时候推过来,那么 WebSocket 就有些太重了,因为这意味着要替换 HTTP 协议,而使用 SSE 无疑是更好的选择。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

SSE 是什么我们已经知道了,那它是怎么实现的呢?原理是什么呢?文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

1)建立连接文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

客户端发起一个标准的 HTTP 请求来开启 SSE 会话,这个请求的特殊之处在于它包含一个头字段。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

Accept: text/event-stream

相当于客户端告诉服务端,期望接收 SSE 消息流。而服务端在看到该字段时,也知道这是一个 SSE 请求,于是立即向客户端返回响应头,注意:返回的只有响应头,里面会包含如下头字段。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

Content-Type: text/event-stream

响应头返回之后标志着 SSE 连接成功建立,并且连接会保持开放状态,服务端后续可以随时通过此连接向客户端发送数据。此外当连接不小心断开时,客户端也会自动进行重连。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

所以在普通的 HTTP 请求中,一旦服务端返回,那么请求结束了。虽然可以将 Connection 头字段设置为 keep-alive 保证连接不断开,但每次访问都包含了 HTTP 请求/响应的完整过程。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

而在 SSE 中,服务端会保持一个开放的连接,只要有新数据可用,就会直接发送给客户端。所以服务端会将响应以流的形式发送给客户端,每次发送的消息都是响应流的一部分,而不是独立的 HTTP 响应。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

因此 SSE 的服务端在发送数据时,并不遵循传统的一次请求,一次响应模式。它在建立连接之后会保持连接开放,并通过这个持续的连接流式地发送数据,这种方式就使得 SSE 非常适合实时数据推送的场景。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

2)发送消息文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

客户端发送请求,服务端返回响应头之后,SSE 连接就建立成功了。此时客户端只需要躺平,安静地等待服务端的输出即可。所以现在的关键就在于服务端要返回什么格式的数据呢?很简单,一个基本的消息由以下几部分组成:文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

  • data:实际的消息数据;
  • id:可选,消息的唯一标识符,用于在连接重新建立时同步消息;
  • event:可选,定义事件类型,用于客户端区分消息的类型;
  • retry:可选,自动重连的时间(毫秒),如果连接中断,客户端在自动重新连接之前,需要等待多长时间;

注意:每个消息要以两个换行符(\n\n)结束,举个例子,我们发送一个 Hello World。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

data: Hello World\n\n

也可以发送带有事件类型的消息:文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

event: userUpdate
data: {"username": "Serpen", "age": 18}\n\n

还是比较简单的,服务端可以保持连接并随时发送更多数据。然后客户端在收到时会进行处理,但不需要(也不能)对服务端作出任何回应,它只需要被动地接收来自服务端的数据即可。当服务端认为数据已经全部发送完毕、无需再发时,那么便可以主动断开连接。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html


 文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

关于 SSE 的原理我们就解释清楚了,下面来实际编程实现它,这里我们先使用原生的 asyncio 实现 SSE。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

import asyncio
from asyncio import StreamReader, StreamWriter

class SSE:

    def __init__(self, host="0.0.0.0", port=9999):
        self.host = host
        self.port = port

    @staticmethod
    def parse_request_headers(data: bytes) -> dict:
        """
        此函数负责从原始字节流中解析出请求头
        """
        headers = data.split(b"\r\n\r\n")[0].split(b"\r\n")
        header_dict = {}
        for header in headers[1:]:
            key, val = header.decode("utf-8").split(":", 1)
            header_dict[key.lower()] = val.strip()
        return header_dict

    async def handler_requests(self,
                               reader: StreamReader,
                               writer: StreamWriter):
        """
        负责处理来自客户端的请求
        每来一个客户端连接,就会基于此函数创建一个协程
        并且自动传递两个参数:reader 和 writer
        reader.read  负责读取数据,等价于 socket.recv
        writer.write 负责发送数据,等价于 socket.send
        """
        # 获取客户端的请求报文,这里对请求方法、请求地址不做限制
        data = await reader.readuntil(b"\r\n\r\n")
        # 解析出请求头
        request_headers = self.parse_request_headers(data)
        # 简单检测一下 accept 字段,如果不是建立 SSE,那么直接关闭连接
        if request_headers.get("accept") != "text/event-stream":
            writer.close()
            return await writer.wait_closed()
        # 如果是 SSE 连接,那么返回响应头
        response_header = (
            b"HTTP/1.1 200 OK\r\n"
            b"Content-Type: text/event-stream\r\n"
            b"Cache-Control: no-cache\r\n"
            b"Connection: keep-alive\r\n"
            b'Access-Control-Allow-Origin: *\r\n'
            b"\r\n"
        )
        writer.write(response_header)
        await writer.drain()

        # 然后便可以不断地向客户端返回数据了
        for _ in range(5):
            # 每隔 1 秒返回数据
            data = "data: 高老师总能分享出好东西\r\n\r\n".encode("utf-8")
            writer.write(data)
            await writer.drain()
            await asyncio.sleep(1)
        # 数据传输完毕
        writer.close()
        await writer.wait_closed()

    async def __create_server(self):
        # 创建服务,第一个参数是一个回调函数
        # 当连接过来的时候就会根据此函数创建一个协程
        # 后面是绑定的 ip 和 端口
        server = await asyncio.start_server(self.handler_requests,
                                            self.host,
                                            self.port)
        # 然后开启无限循环
        async with server:
            await server.serve_forever()

    def run_server(self):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.__create_server())

if __name__ == '__main__':
    sse = SSE()
    sse.run_server()

服务端代码编写完毕,下面编写前端代码。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <style>
        #data {
            font-weight: bold;
            color: cadetblue;
            font-size: large;
        }
    </style>
</head>
<body>
    <h1>SSE Test</h1>
    <div id="data"></div>
    <script>
        document.addEventListener("DOMContentLoaded", function () {
            // 和服务端建立 SSE 连接
            var eventSource = new EventSource("http://localhost:9999");

            eventSource.onmessage = function (e) {
                // 将数据渲染在 <div id="data"></div> 的内部
                var data = e.data + "\n";
                document.getElementById('data').innerText += data;
            };

            eventSource.onerror = function (e) {
                console.error('Error occurred:', e);
                eventSource.close();
            };
        });
    </script>
</body>
</html>

代码编写完毕,我们用浏览器打开 HTML 文件,便可看到如下效果。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

SSE用 Python 像 ChatGPT 一样返回流式响应文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

以上我们就简单实现了 SSE,当然为了加深印象,这里的后端是使用原生的 asyncio 编写的,但在工作中,我们会使用现成的 Web 框架,比如 FastAPI,Blacksheep 等等。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

需要说明的是,虽然通过 SSE 技术可以实现类似 ChatGPT 的效果,但 ChatGPT 内部并没有用到 SSE,它内部是基于 HTTP 的分块传输实现的。因为 SSE 只能通过 GET 请求发出,并且无法自定义请求头。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

如果想实现 ChatGPT 的效果,需要使用 HTTP 的分块传输。而像 FastAPI、BlackSheep 等框架提供的流式响应,便是基于 HTTP 的分块传输实现的,比如 FastAPI:文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import uvicorn

app = FastAPI()
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

async def event_generator():
    for _ in range(5):
        # 每隔 1 秒返回数据
        data = "data: 高老师总能分享出好东西\r\n\r\n".encode("utf-8")
        yield data
        await asyncio.sleep(1)

@app.get("/")
async def sse():
    return StreamingResponse(event_generator(), 
                             media_type="text/event-stream")

if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=9999)

首先之前的前端代码依旧可以正常访问,通过修改数据格式和 Content-Type 可以让其支持 SSE。但最正确的做法是直接访问 localhost:9999,效果如下:文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

SSE用 Python 像 ChatGPT 一样返回流式响应文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

所以基于 StreamingResponse 可以实现 SSE,也可以直接访问。而直接访问的话,此时里面的 data: 和 \r\n 就是实体数据的一部分。并且这种方式和 ChatGPT 的工作机制是相似的,都使用了 HTTP 的分块传输,支持所有的请求方法,而 SSE 只支持 GET 请求。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

BlackSheep 也是类似的,它同样也支持流式响应。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

import asyncio
from blacksheep import Application, Response, StreamedContent
import uvicorn

app = Application()
app.use_cors(
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

async def event_generator():
    for _ in range(5):
        # 每隔 1 秒返回数据
        data = "data: 高老师总能分享出好东西\r\n\r\n".encode("utf-8")
        yield data
        await asyncio.sleep(1)

@app.router.get("/")
async def sse():
    return Response(
        200,
        content=StreamedContent(b"text/event-stream", event_generator),
    )

if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=9999)

可以测试一下,效果是一样的。如果你不想实现 SSE,只是希望固定的数据以流的形式一点一点返回,那么记得将数据中多余的 data: 和 \r\n 给去掉,并最好修改 Content-Type 为合适的类型。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

所以 SSE 一般用于需要服务端推数据,但数据不知道什么时候会过来,于是通过 SSE 保持连接开放。后续当服务端有数据了,直接通过连接发送给客户端即可。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

而 FastAPI 和 BlackSheep 提供的流式响应更像是,返回的数据比较庞大,如果全部准备好再一次性返回,会让用户陷入长时间的等待,造成不好的体验。于是通过分块传输,准备好一部分就返回一部分。虽然整体时间没变,但可以让用户立刻获取到数据,从而提升用户体验。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

比如 ChatGPT,当它回答的内容比较多的时候,那么整个过程耗费几十秒钟是常有的事情,假设 30 秒。相比让用户等待 30 秒,然后内容一下子刷出来,显然生成一部分返回一部分这种方式更让人喜欢。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

因此使用 SSE 还是流式响应,则取决于你当前的业务。如果你返回的数据是确定的,只是准备的时间比较长,或者数据量比较大,那么推荐使用流式响应。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

至于 SSE,在这些现成的 Web 框架里面,也可以通过流式响应来实现,只需要将 Content-Type 设置为 text/event-stream,并将数据加上前缀 data: 和后缀 \r\n\r\n。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

但说实话,如果想实现 SSE,不建议通过流式响应来实现,而是使用专门的库。以 FastAPI 为例:文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

from sse_starlette.sse import EventSourceResponse

FastAPI 其实就是在 starlette 的基础上套了一层壳,通过安装 sse_starlette 可以让 FastAPI 更好地支持 SSE。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

来源于古明地觉的编程教室 ,作者古明地觉文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/58625.html

  • 本站内容整理自互联网,仅提供信息存储空间服务,以方便学习之用。如对文章、图片、字体等版权有疑问,请在下方留言,管理员看到后,将第一时间进行处理。
  • 转载请务必保留本文链接:https://www.cainiaoxueyuan.com/bc/58625.html

Comment

匿名网友 填写信息

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定