banner
innei

innei

写代码是因为爱,写到世界充满爱!
github
telegram
twitter

NestJS 服務器推送日誌流輸出

最近做了兩個需求,一個是熱更新後台面板,另一個是安裝和升級依賴。兩個都是滿任務,為了在前端可視化任務進度,都需要做實時日誌的輸出。類似於 GitHub Action, Vercel 等構建的日誌輸出。一般來說,實時輸出使用 WebSocket,雖然在項目中也用到了 Socket.IO,但是建立一個 Socket.IO 從設計上來說就是昂貴的。考慮到 HTTP2 SSE 可以值得一試。

SSE 接口設計#

SSE 指服務器推送。可以與服務器建立一條長連接,服務器可以像瀏覽器發送多次數據,很適合作為日誌輸出。

在 NestJS 中提供了 @Sse 裝飾器,使用方式與傳統 HTTP 請求一致,SSE 請求本質也是 HTTP GET 請求,只不過 Content-Type 變成了 text/event-stream

image

在 NestJS SSE 接口中,為了實現一個流的傳輸,需要使用 rxjs 的能力,利用 rxjs 中的可觀察對象可以不停傳遞數據。一個簡單的流為:

@Sse('/hello')
async helloSse() {
  const ob$ = new Observable((subscriber) => {
  subscriber.next('hello')
  subscriber.next('bye!')
  subscriber.complete()
  })
  return ob$
}

瀏覽器使用 EventSource 進行連接。

const ev = new EventSource('http://localhost:2333/debug/hello')
  ev.onmessage = (e) => {
  log.innerText += e.data + '\n'
}

ev.onerror = (e) => {
  if (!e) {
    ev.close()
  }
}

但是流傳輸成功之後,瀏覽器有重新發送了請求。只要不手動關閉連接,就會一直重複。這裡暫時使用以下代碼 HACK。

const ev = new EventSource('http://localhost:2333/debug/hello')
let opened = false
ev.onopen = (e) => {
  opened = true
  console.log('opened')
}
ev.onmessage = (e) => {
  log.innerText += e.data + '\n'
}

ev.onerror = (e) => {

  if (!e) {
    ev.close()
  }

  if (e.target.readyState === EventSource.CONNECTING && opened) {
    return ev.close()
  }

  if (e.target.readyState === EventSource.CLOSED) {
    log.innerText += 'closed'
    ev.close()
  }
}

因為在完成傳輸後,都會有一個 error 事件。

利用 zx 和 node-pty 實現 Shell 輸出流的傳輸#

zx 是 Google 開源的一個方便寫腳本的庫。不止用來寫腳本,在項目中我也大量使用,但是現在 zx drop 了對 node-cjs 的支持,這裡安利一下 zx-cjs 這個庫,通過修改官方的構建方式實時同步更新 node-cjs 兼容的 zx 版本。安裝之後別忘了,修改 package 的應用路徑。改成:

"zx": "npm:zx-cjs@latest"

這樣就和原本一樣引用了。

node-pty 是微軟開源的一個 shell 實現,可以很方便的實現一個終端模擬器。

使用 zx 執行 shell 命令時添加 .pipe 方法,將 output 流 pipe 到另一個 Writable,Writable 實現 write 到 rxjs Observable。建議代碼如下。

const ob$ = new Observable((subscriber) => {
  ;(async () => {
    const writable = new Stream.Writable({
      autoDestroy: false,
      write(chunk) {
        subscriber.next(chunk.toString())
      },
    })
    await $`ls -lh`.pipe(writable)
    subscriber.complete()
    writable.destroy()
  })()
})

注意目前 Observable 不支持傳入 async 函數,否則 .complete() 執行會出現報錯。

這樣就實現了一個簡單的日誌輸出。

當然這是一個簡單的輸出 zx 就能勝任了,遇到了 zip 這種可能就會出現問題。這時候可以使用 node-pty 來實現輸出。以下是一個封裝。

 runShellCommandPipeOutput(
    command: string,
    args: any[],
    subscriber: Subscriber<string>,
  ) {
    return new Promise((resolve) => {
      subscriber.next(`$ ${command} ${args.join(' ')}\n`)

      const pty = spawn(command, args, {})
      pty.onData((data) => {
        subscriber.next(data.toString())
      })
      pty.onExit(() => {
        resolve(null)
      })
    })
  }

node-pty 對 spawn 進行了包裝,使用起來不像 zx 可以直接打長串命令來的靈活。命令需要拆分成程序和參數。以下是一個調用。

 await this.runShellCommandPipeOutput(
   'unzip',
   ['-o', 'admin-release.zip', '-d', folder],
   subscriber,
 )

就這樣通過 Writable 和 rxjs Observable 實現了一個簡單的日子輸出。效果如下。

image

不足#

由於 SSE 只是一個 GET 請求的長連接,所以在設計上沒有斷線重連的能力,如果沒有及時處理斷開後的 EventSource 則會不停再次發送請求,造成資源的浪費。而且對連接的控制不友好,屬於 SSE 依然使用很少,都會選擇更加穩定的 WebSocket。

參看#

update.controller.ts#L24

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。