最近做了两个需求,一个是热更新后台面板,另一个是安装和升级依赖。两个都是满任务,为了在前端可视化任务进度,都需要做实时日志的输出。类似于 GitHub Action, Vercel 等构建的日志输出。一般来说,实时输出使用 WebSocket,虽然在项目中也用到了 Socket.IO,但是建立一个 Socket.IO 从设计上来说就是昂贵的。考虑到 HTTP2 SSE 可以值得一试。
SSE 接口设计#
SSE 指服务端推送。可以与服务器建立一条长连接,服务器可以像浏览器发送多次数据,很适合作为日志输出。
在 NestJS 中提供了 @Sse
装饰器,使用方式与传统 HTTP 请求一致,SSE 请求本质也是 HTTP GET 请求,只不过 Content-Type 变成了 text/event-stream
。
在 NestJS SSE 接口中,为了实现一个流的传输,需要使用 rxjs 的能力,利用 rxjs 中的可观察对象可以不停传递数据。一个简单的流为:
@Sse('/hello')
async helloSse() {
const ob$ = new Observable((subscriber) => {
subscriber.next('hello')
subscriber.next('bye!')
subscriber.complete()
})
return ob$
}
浏览器使用 EventSource 进行连接。
const ev = new EventSource('http://localhost:2333/debug/hello')
ev.onmessage = (e) => {
log.innerText += e.data + '\n'
}
ev.onerror = (e) => {
if (!e) {
ev.close()
}
}
但是流传输成功之后,浏览器有重新发送了请求。只要不手动关闭连接,就会一直重复。这里暂时使用以下代码 HACK。
const ev = new EventSource('http://localhost:2333/debug/hello')
let opened = false
ev.onopen = (e) => {
opened = true
console.log('opened')
}
ev.onmessage = (e) => {
log.innerText += e.data + '\n'
}
ev.onerror = (e) => {
if (!e) {
ev.close()
}
if (e.target.readyState === EventSource.CONNECTING && opened) {
return ev.close()
}
if (e.target.readyState === EventSource.CLOSED) {
log.innerText += 'closed'
ev.close()
}
}
因为在完成传输后,都会有一个 error 事件。
利用 zx 和 node-pty 实现 Shell 输出流的传输#
zx 是 Google 开源的一个方便写脚本的库。不止用来写脚本,在项目中我也大量使用,但是现在 zx drop 了对 node-cjs 的支持,这里安利一下 zx-cjs 这个库,通过修改官方的构建方式实时同步更新 node-cjs 兼容的 zx 版本。安装之后别忘了,修改 package 的应用路径。改成:
"zx": "npm:zx-cjs@latest"
这样就和原本一样引用了。
node-pty 是微软开源的一个 shell 实现,可以很方便的实现一个终端模拟器。
使用 zx 执行 shell 命令时添加 .pipe
方法,将 output 流 pipe 到另一个 Writable,Writable 实现 write 到 rxjs Observable。建议代码如下。
const ob$ = new Observable((subscriber) => {
;(async () => {
const writable = new Stream.Writable({
autoDestroy: false,
write(chunk) {
subscriber.next(chunk.toString())
},
})
await $`ls -lh`.pipe(writable)
subscriber.complete()
writable.destroy()
})()
})
注意目前 Observable 不支持传入 async 函数,否则
.complete()
执行会出现报错。
这样就实现了一个简单的日志输出。
当然这是一个简单的输出 zx 就能胜任了,遇到了 zip
这种可能就会出现问题。这时候可以使用 node-pty 来实现输出。以下是一个封装。
runShellCommandPipeOutput(
command: string,
args: any[],
subscriber: Subscriber<string>,
) {
return new Promise((resolve) => {
subscriber.next(`$ ${command} ${args.join(' ')}\n`)
const pty = spawn(command, args, {})
pty.onData((data) => {
subscriber.next(data.toString())
})
pty.onExit(() => {
resolve(null)
})
})
}
node-pty 对 spawn 进行了包装,使用起来不像 zx 可以直接打长串命令来的灵活。命令需要拆分成程序和参数。以下是一个调用。
await this.runShellCommandPipeOutput(
'unzip',
['-o', 'admin-release.zip', '-d', folder],
subscriber,
)
就这样通过 Writable 和 rxjs Observable 实现了一个简单的日子输出。效果如下。
不足#
由于 SSE 只是一个 GET 请求的长连接,所以在设计上没有断线重连的能力,如果没有及时处理断开后的 EventSource 则会不停再次发送请求,造成资源的浪费。而且对连接的控制不友好,属于 SSE 依然使用很少,都会选择更加稳定的 WebSocket。