banner
innei

innei

写代码是因为爱,写到世界充满爱!
github
telegram
twitter

NestJS 服务端推送日志流输出

最近做了两个需求,一个是热更新后台面板,另一个是安装和升级依赖。两个都是满任务,为了在前端可视化任务进度,都需要做实时日志的输出。类似于 GitHub Action, Vercel 等构建的日志输出。一般来说,实时输出使用 WebSocket,虽然在项目中也用到了 Socket.IO,但是建立一个 Socket.IO 从设计上来说就是昂贵的。考虑到 HTTP2 SSE 可以值得一试。

SSE 接口设计

SSE 指服务端推送。可以与服务器建立一条长连接,服务器可以像浏览器发送多次数据,很适合作为日志输出。

在 NestJS 中提供了 @Sse 装饰器,使用方式与传统 HTTP 请求一致,SSE 请求本质也是 HTTP GET 请求,只不过 Content-Type 变成了 text/event-stream

image

在 NestJS SSE 接口中,为了实现一个流的传输,需要使用 rxjs 的能力,利用 rxjs 中的可观察对象可以不停传递数据。一个简单的流为:

@Sse('/hello')
async helloSse() {
  const ob$ = new Observable((subscriber) => {
  subscriber.next('hello')
  subscriber.next('bye!')
  subscriber.complete()
  })
  return ob$
}

浏览器使用 EventSource 进行连接。

const ev = new EventSource('http://localhost:2333/debug/hello')
  ev.onmessage = (e) => {
  log.innerText += e.data + '\n'
}

ev.onerror = (e) => {
  if (!e) {
    ev.close()
  }
}

image

但是流传输成功之后,浏览器有重新发送了请求。只要不手动关闭连接,就会一直重复。这里暂时使用以下代码 HACK。

const ev = new EventSource('http://localhost:2333/debug/hello')
let opened = false
ev.onopen = (e) => {
  opened = true
  console.log('opened')
}
ev.onmessage = (e) => {
  log.innerText += e.data + '\n'
}

ev.onerror = (e) => {

  if (!e) {
    ev.close()
  }

  if (e.target.readyState === EventSource.CONNECTING && opened) {
    return ev.close()
  }

  if (e.target.readyState === EventSource.CLOSED) {
    log.innerText += 'closed'
    ev.close()
  }
}

因为在完成传输后,都会有一个 error 事件。

利用 zx 和 node-pty 实现 Shell 输出流的传输

zx 是 Google 开源的一个方便写脚本的库。不止用来写脚本,在项目中我也大量使用,但是现在 zx drop 了对 node-cjs 的支持,这里安利一下 zx-cjs 这个库,通过修改官方的构建方式实时同步更新 node-cjs 兼容的 zx 版本。安装之后别忘了,修改 package 的应用路径。改成:

"zx": "npm:zx-cjs@latest"

这样就和原本一样引用了。

node-pty 是微软开源的一个 shell 实现,可以很方便的实现一个终端模拟器。

使用 zx 执行 shell 命令时添加 .pipe 方法,将 output 流 pipe 到另一个 Writable,Writable 实现 write 到 rxjs Observable。建议代码如下。

const ob$ = new Observable((subscriber) => {
  ;(async () => {
    const writable = new Stream.Writable({
      autoDestroy: false,
      write(chunk) {
        subscriber.next(chunk.toString())
      },
    })
    await $`ls -lh`.pipe(writable)
    subscriber.complete()
    writable.destroy()
  })()
})

注意目前 Observable 不支持传入 async 函数,否则 .complete() 执行会出现报错。

这样就实现了一个简单的日志输出。

当然这是一个简单的输出 zx 就能胜任了,遇到了 zip 这种可能就会出现问题。这时候可以使用 node-pty 来实现输出。以下是一个封装。

 runShellCommandPipeOutput(
    command: string,
    args: any[],
    subscriber: Subscriber<string>,
  ) {
    return new Promise((resolve) => {
      subscriber.next(`$ ${command} ${args.join(' ')}\n`)

      const pty = spawn(command, args, {})
      pty.onData((data) => {
        subscriber.next(data.toString())
      })
      pty.onExit(() => {
        resolve(null)
      })
    })
  }

node-pty 对 spawn 进行了包装,使用起来不像 zx 可以直接打长串命令来的灵活。命令需要拆分成程序和参数。以下是一个调用。

 await this.runShellCommandPipeOutput(
   'unzip',
   ['-o', 'admin-release.zip', '-d', folder],
   subscriber,
 )

就这样通过 Writable 和 rxjs Observable 实现了一个简单的日子输出。效果如下。

image

不足

由于 SSE 只是一个 GET 请求的长连接,所以在设计上没有断线重连的能力,如果没有及时处理断开后的 EventSource 则会不停再次发送请求,造成资源的浪费。而且对连接的控制不友好,属于 SSE 依然使用很少,都会选择更加稳定的 WebSocket。

参看

update.controller.ts#L24

Loading...
Ownership of this blog data is guaranteed by blockchain and smart contracts to the creator alone.