模拟ChatGPT流式数据——SSE最佳实践
发布时间:2024年06月06日
在使用 ChatGPT 时,发现输入 prompt 后,是使用流式的效果返回的数据,起初以为使用了 双工协议做的持久化连接,查看其网络请求,发现这个接口的通信方式并非传统的 http 接口或者 WebSockets,而是基于 EventStream 的事件流。
为什么要这样传输,从使用场景上来说,ChatGPT 是一个基于深度学习的大型语言模型,处理自然语言需要大量的计算资源和时间,响应速度肯定比普通的读数据库要慢
接口等待时间过长,显然不合适。对于这种对话场景,ChagtGPT 将先计算出的数据“推送”给用户,采用 SSE 技术边计算边返回,避免用户因为等待时间过长关闭页面。。
概述
Server-Sent Events服务器推送事件,简称 SSE,是一种服务端实时主动向浏览器推送消息的技术。
SSE 是 HTML5 中一个与通信相关的
API,主要由两部分组成:
·服务端与浏览器端的通信协议(HTTP
协议)
·浏览器端可供 JavaScript 使用的EventSource
对象。
从“服务端主动向浏览器实时推送消息”这一点来看,该 API 与 WebSockets API 有一些相似之处。但是,该 API 与 WebSockers API 的不同之处在于:
Server-Sent Events API |
WebSockets API |
基于 HTTP 协议 |
基于 TCP 协议 |
单工,只能服务端单向发送消息 |
全双工,可以同时发送和接收消息 |
轻量级,使用简单 |
相对复杂 |
内置断线重连和消息追踪的功能 |
不在协议范围内,需手动实现 |
文本或使用 Base64 编码和 gzip 压缩的二进制消息 |
类型广泛 |
支持自定义事件类型 |
不支持自定义事件类型 |
连接数 HTTP/1.1 6 个,HTTP/2 可协商(默认 100) |
连接数无限制 |
服务端实现
协议
本质是浏览器发起 http 请求,服务器在收到请求后,返回状态与数据,并附带以下
headers
:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
·SSE API规定推送事件流的 MIME 类型为text/event-stream
。
·必须指定浏览器不缓存服务端发送的数据,以确保浏览器可以实时显示服务端发送的数据。
·SSE 是一个一直保持开启的 TCP 连接,所以 Connection 为 keep-alive。
消息格式
·EventStream(事件流)为UTF-8
格式编码的文本
或使用 Base64 编码和 gzip 压缩的二进制消息。
·每条消息由一行或多行字段(event
、id
、retry
、data
)组成,每个字段组成形式为:字段名:字段值
。字段以行为单位,每行一个(即以\n
结尾)。
·以冒号
开头的行为注释行,会被浏览器忽略。
·每次推送,可由多个消息组成,每个消息之间以空行分隔(即最后一个字段以\n\n
结尾)
tips:
·如果一行字段中不包含冒号,则整行文本将被视为字段名,字段值为空。
·注释行可以用来防止链接超时,服务端可以定期向浏览器发送一条消息注释行,以保持连接不断。
event
事件类型。如果指定了该字段,则在浏览器收到该条消息时,会在当前EventSource
对象(见 4)上触发一个事件,事件类型就是该字段的字段值。可以使用addEventListener
方法在当前EventSource
对象上监听任意类型的命名事件。
如果该条消息没有event
字段,则会触发EventSource
对象onmessage
属性上的事件处理函数。
id
事件ID。事件的唯一标识符,浏览器会跟踪事件ID,如果发生断连,浏览器会把收到的最后一个事件ID放到 HTTP Header Last-Event-Id
中进行重连,作为一种简单的同步机制。
例如可以在服务端将每次发送的事件ID值自动加 1,当浏览器接收到该事件ID后,下次与服务端建立连接后再请求的 Header 中将同时提交该事件ID,服务端检查该事件ID是否为上次发送的事件ID,如果与上次发送的事件ID不一致则说明浏览器存在与服务器连接失败的情况,本次需要同时发送前几次浏览器未接收到的数据。
retry
重连时间。整数值,单位 ms,如果与服务器的连接丢失,浏览器将等待指定时间,然后尝试重新连接。如果该字段不是整数值,会被忽略。
当服务端没有指定浏览器的重连时间时,由浏览器自行决定每隔多久与服务端建立一次连接(一般为 30s)。
data
消息数据。数据内容只能以一个字符串的文本形式进行发送,如果需要发送一个对象时,需要将该对象以一个 JSON 格式的字符串的形式进行发送。在浏览器接收到该字符串后,再把它还原为一个
JSON 对象。
浏览器 API
在浏览器端,可以使用 JavaScript 的 EventSource API 创建EventSource
对象监听服务器发送的事件。一旦建立连接,服务器就可以使用 HTTP 响应的 'text/event-stream' 内容类型发送事件消息,浏览器则可以通过监听
EventSource 对象的onmessage
、onopen
和onerror
事件来处理这些消息。
建立连接
EventSource 接受两个参数:URL 和 options。
URL 为 http 事件来源,一旦 EventSource 对象被创建后,浏览器立即开始对该 URL 地址发送过来的事件进行监听。
options 是一个可选的对象,包含
withCredentials 属性,表示是否发送凭证(cookie、HTTP认证信息等)到服务端,默认为 false。
const eventSource = new EventSource('http_api_url', { withCredentials: true })
与 XMLHttpRequest 对象类型,EventSource 对象有一个 readyState 属性值,具体含义如下表:
readyState |
含义 |
0 |
浏览器与服务端尚未建立连接或连接已被关闭 |
1 |
浏览器与服务端已成功连接,浏览器正在处理接收到的事件及数据 |
2 |
浏览器与服务端建立连接失败,客户端不再继续建立与服务端之间的连接 |
可以使用 EventSource 对象的close
方法关闭与服务端之间的连接,使浏览器不再建立与服务端之间的连接。
//
关闭连接
eventSource.close()
监听事件
EventSource 对象本身继承自 EventTarget 接口,因此可以使用 addEventListener() 方法来监听事件。EventSource 对象触发的事件主要包括以下三种:
·open 事件:当成功连接到服务端时触发。
·message 事件:当接收到服务器发送的消息时触发。该事件对象的 data 属性包含了服务器发送的消息内容。
·error 事件:当发生错误时触发。该事件对象的 event 属性包含了错误信息。
tips:
EventSource
对象的属性监听只能监听预定义的事件类型(open
、message
、error
)。不能用于监听自定义事件类型。如果要实现自定义事件类型的监听,可以使用addEventListener()
方法。
实践
服务端
使用 Node.js 实现 SSE 的简单示例:(知乎的markdown真难用~)
consthttp
=
require('http')
const
fs
=
require('fs')
http.createServer((req,
res)
=>
{
const
url
=
req.url
if
(url
===
'/'
||
url
===
'index.html')
{
// 如果请求根路径,返回 index.html 文件
fs.readFile('index.html',
(err,
data)
=>
{
if
(err)
{
res.writeHead(500)
res.end('Error loading')
}
else
{
res.writeHead(200,
{'Content-Type':
'text/html'})
res.end(data)
}
})
}
else
if
(url.includes('/sse'))
{
// 如果请求 /events 路径,建立 SSE 连接
res.writeHead(200,
{
'Content-Type':
'text/event-stream',
'Cache-Control':
'no-cache',
'Connection':
'keep-alive',
'Access-Control-Allow-Origin':
'*',
// 允许跨域 })
// 每隔 1 秒发送一条消息
let
id
=
0
const
intervalId
=
setInterval(()
=>
{
res.write(`event: customEvent\n`)
res.write(`id: ${id}\n`)
res.write(`retry: 30000\n`)
const
params
=
url.split('?')[1]
const
data
=
{
id,
time:
new
Date().toISOString(),
params
}
res.write(`data: ${JSON.stringify(data)}\n\n`)
id++
if
(id
>=
10)
{
clearInterval(intervalId)
res.end()
}
},
1000)
// 当客户端关闭连接时停止发送消息
req.on('close',
()
=>
{
clearInterval(intervalId)
id
=
0
res.end()
})
}
else
{
// 如果请求的路径无效,返回 404 状态码
res.writeHead(404)
res.end()
}
}).listen(3000)
console.log('Server listening on port 3000')
客户端
将上面的两份代码保存为 server.js
和 index.html
,并在命令行中执行 node
启动服务端,然后在浏览器中打开
server.jshttp://localhost:3000
即可看到 SSE 效果。
兼容性
发展至今,SSE 已具有广泛的的浏览器兼容性,几乎除 IE 之外的浏览器均已支持。
对于不支持 EventSource 的浏览器,可以使用polyfill实现。判断浏览器是否支持 EventSource:
if(typeof(EventSource)!==
“undefined”)
{
// 支持 }
else
{
// 不支持,使用 polyfill}
Fetch 实现
虽然使用 SSE 技术可以实现 ChatGPT 一样的打字机效果,但是通过上文请求 type 对比可以发现,在使用 SSE 时,type 为eventSource
,而 ChatGPT 为fetch
。且受浏览器 EventSource API 限制,在使用 SSE 时不能自定义请求头、只能发出 GET 请求,且在大多数浏览器中,URL 限制2000个字符,也无法满足 ChatGPT 参数传递需求。
此时,可以使用 Fetch API 实现一个替代接口,用于模拟 SSE 实现。简单实现如下:
服务端
浏览器
1
2
不同于XMLHttpRequest
,fetch
并未原生提供终止操作方法,可以通过 DOM API [AbortController](https://developer.mozilla.org/zh-CN/docs/Web/API/AbortController)
和AbortSignal
实现 fetch 请求终止操作。
将上面的两份代码保存为server-fetch.js
和index-fetch.html
,并在命令行中执行node
启动服务端,然后在浏览器中打开
server-fetch.jshttp://localhost:3001
即可看到 fetch 版 SSE 效果。
爬坑
代理环境
pending时间过长
在本地代理的加持下,存在eventstream在当前请求结束后一股脑发完的情况
后台限定1s一条,但到了客户端便成了上图这样(与云服务器流量大小也有一定关系)
重定向
在nginx重定向接口后,存在等待服务器返回的问题(与云服务器流量大小也有一定关系)
总结
SSE 技术是一种轻量级的实时通信技术,基于 HTTP 协议,具有服务端推送、断线重连、简单轻量等优点。但是,SSE 技术也有一些缺点,如不能进行双向通信、连接数受限、仅支持 get 请求等。
SSE 可以在 Web 应用程序中实现诸如股票在线数据、日志推送、聊天室实时人数等即时数据推送功能。需要注意的是,SSE 并不是适用于所有的实时推送场景。在需要高并发、高吞吐量和低延迟的场景下,WebSockets
可能更加适合。而在需要更轻量级的推送场景下,SSE 可能更加适合。因此,在选择即时更新方案时,需要根据具体的需求和场景进行选择。
出自:
https://zhuanlan.zhihu.com/p/656030695#showWechatShareTip?utm_source=wechat_session&utm_medium=social&wechatShare=1&s_r=0
DreaMoving,一种基于扩散的可控视频生成框架,用于生成高质量的定制人类舞蹈视频。