banner
innei

innei

写代码是因为爱,写到世界充满爱!
github
telegram
twitter

NestJS 服务端推送日志流输出

最近做了两个需求,一个是热更新后台面板,另一个是安装和升级依赖。两个都是满任务,为了在前端可视化任务进度,都需要做实时日志的输出。类似于 GitHub Action, Vercel 等构建的日志输出。一般来说,实时输出使用 WebSocket,虽然在项目中也用到了 Socket.IO,但是建立一个 Socket.IO 从设计上来说就是昂贵的。考虑到 HTTP2 SSE 可以值得一试。

SSE 接口设计#

SSE 指服务端推送。可以与服务器建立一条长连接,服务器可以像浏览器发送多次数据,很适合作为日志输出。

在 NestJS 中提供了 @Sse 装饰器,使用方式与传统 HTTP 请求一致,SSE 请求本质也是 HTTP GET 请求,只不过 Content-Type 变成了 text/event-stream

image

在 NestJS SSE 接口中,为了实现一个流的传输,需要使用 rxjs 的能力,利用 rxjs 中的可观察对象可以不停传递数据。一个简单的流为:

@Sse('/hello')
async helloSse() {
  const ob$ = new Observable((subscriber) => {
  subscriber.next('hello')
  subscriber.next('bye!')
  subscriber.complete()
  })
  return ob$
}

浏览器使用 EventSource 进行连接。

const ev = new EventSource('http://localhost:2333/debug/hello')
  ev.onmessage = (e) => {
  log.innerText += e.data + '\n'
}

ev.onerror = (e) => {
  if (!e) {
    ev.close()
  }
}

image

但是流传输成功之后,浏览器有重新发送了请求。只要不手动关闭连接,就会一直重复。这里暂时使用以下代码 HACK。

const ev = new EventSource('http://localhost:2333/debug/hello')
let opened = false
ev.onopen = (e) => {
  opened = true
  console.log('opened')
}
ev.onmessage = (e) => {
  log.innerText += e.data + '\n'
}

ev.onerror = (e) => {

  if (!e) {
    ev.close()
  }

  if (e.target.readyState === EventSource.CONNECTING && opened) {
    return ev.close()
  }

  if (e.target.readyState === EventSource.CLOSED) {
    log.innerText += 'closed'
    ev.close()
  }
}

因为在完成传输后,都会有一个 error 事件。

利用 zx 和 node-pty 实现 Shell 输出流的传输#

zx 是 Google 开源的一个方便写脚本的库。不止用来写脚本,在项目中我也大量使用,但是现在 zx drop 了对 node-cjs 的支持,这里安利一下 zx-cjs 这个库,通过修改官方的构建方式实时同步更新 node-cjs 兼容的 zx 版本。安装之后别忘了,修改 package 的应用路径。改成:

"zx": "npm:zx-cjs@latest"

这样就和原本一样引用了。

node-pty 是微软开源的一个 shell 实现,可以很方便的实现一个终端模拟器。

使用 zx 执行 shell 命令时添加 .pipe 方法,将 output 流 pipe 到另一个 Writable,Writable 实现 write 到 rxjs Observable。建议代码如下。

const ob$ = new Observable((subscriber) => {
  ;(async () => {
    const writable = new Stream.Writable({
      autoDestroy: false,
      write(chunk) {
        subscriber.next(chunk.toString())
      },
    })
    await $`ls -lh`.pipe(writable)
    subscriber.complete()
    writable.destroy()
  })()
})

注意目前 Observable 不支持传入 async 函数,否则 .complete() 执行会出现报错。

这样就实现了一个简单的日志输出。

当然这是一个简单的输出 zx 就能胜任了,遇到了 zip 这种可能就会出现问题。这时候可以使用 node-pty 来实现输出。以下是一个封装。

 runShellCommandPipeOutput(
    command: string,
    args: any[],
    subscriber: Subscriber<string>,
  ) {
    return new Promise((resolve) => {
      subscriber.next(`$ ${command} ${args.join(' ')}\n`)

      const pty = spawn(command, args, {})
      pty.onData((data) => {
        subscriber.next(data.toString())
      })
      pty.onExit(() => {
        resolve(null)
      })
    })
  }

node-pty 对 spawn 进行了包装,使用起来不像 zx 可以直接打长串命令来的灵活。命令需要拆分成程序和参数。以下是一个调用。

 await this.runShellCommandPipeOutput(
   'unzip',
   ['-o', 'admin-release.zip', '-d', folder],
   subscriber,
 )

就这样通过 Writable 和 rxjs Observable 实现了一个简单的日子输出。效果如下。

image

不足#

由于 SSE 只是一个 GET 请求的长连接,所以在设计上没有断线重连的能力,如果没有及时处理断开后的 EventSource 则会不停再次发送请求,造成资源的浪费。而且对连接的控制不友好,属于 SSE 依然使用很少,都会选择更加稳定的 WebSocket。

参看#

update.controller.ts#L24

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。