最近做了兩個需求,一個是熱更新後台面板,另一個是安裝和升級依賴。兩個都是滿任務,為了在前端可視化任務進度,都需要做實時日誌的輸出。類似於 GitHub Action, Vercel 等構建的日誌輸出。一般來說,實時輸出使用 WebSocket,雖然在項目中也用到了 Socket.IO,但是建立一個 Socket.IO 從設計上來說就是昂貴的。考慮到 HTTP2 SSE 可以值得一試。
SSE 接口設計#
SSE 指服務器推送。可以與服務器建立一條長連接,服務器可以像瀏覽器發送多次數據,很適合作為日誌輸出。
在 NestJS 中提供了 @Sse
裝飾器,使用方式與傳統 HTTP 請求一致,SSE 請求本質也是 HTTP GET 請求,只不過 Content-Type 變成了 text/event-stream
。
在 NestJS SSE 接口中,為了實現一個流的傳輸,需要使用 rxjs 的能力,利用 rxjs 中的可觀察對象可以不停傳遞數據。一個簡單的流為:
@Sse('/hello')
async helloSse() {
const ob$ = new Observable((subscriber) => {
subscriber.next('hello')
subscriber.next('bye!')
subscriber.complete()
})
return ob$
}
瀏覽器使用 EventSource 進行連接。
const ev = new EventSource('http://localhost:2333/debug/hello')
ev.onmessage = (e) => {
log.innerText += e.data + '\n'
}
ev.onerror = (e) => {
if (!e) {
ev.close()
}
}
但是流傳輸成功之後,瀏覽器有重新發送了請求。只要不手動關閉連接,就會一直重複。這裡暫時使用以下代碼 HACK。
const ev = new EventSource('http://localhost:2333/debug/hello')
let opened = false
ev.onopen = (e) => {
opened = true
console.log('opened')
}
ev.onmessage = (e) => {
log.innerText += e.data + '\n'
}
ev.onerror = (e) => {
if (!e) {
ev.close()
}
if (e.target.readyState === EventSource.CONNECTING && opened) {
return ev.close()
}
if (e.target.readyState === EventSource.CLOSED) {
log.innerText += 'closed'
ev.close()
}
}
因為在完成傳輸後,都會有一個 error 事件。
利用 zx 和 node-pty 實現 Shell 輸出流的傳輸#
zx 是 Google 開源的一個方便寫腳本的庫。不止用來寫腳本,在項目中我也大量使用,但是現在 zx drop 了對 node-cjs 的支持,這裡安利一下 zx-cjs 這個庫,通過修改官方的構建方式實時同步更新 node-cjs 兼容的 zx 版本。安裝之後別忘了,修改 package 的應用路徑。改成:
"zx": "npm:zx-cjs@latest"
這樣就和原本一樣引用了。
node-pty 是微軟開源的一個 shell 實現,可以很方便的實現一個終端模擬器。
使用 zx 執行 shell 命令時添加 .pipe
方法,將 output 流 pipe 到另一個 Writable,Writable 實現 write 到 rxjs Observable。建議代碼如下。
const ob$ = new Observable((subscriber) => {
;(async () => {
const writable = new Stream.Writable({
autoDestroy: false,
write(chunk) {
subscriber.next(chunk.toString())
},
})
await $`ls -lh`.pipe(writable)
subscriber.complete()
writable.destroy()
})()
})
注意目前 Observable 不支持傳入 async 函數,否則
.complete()
執行會出現報錯。
這樣就實現了一個簡單的日誌輸出。
當然這是一個簡單的輸出 zx 就能勝任了,遇到了 zip
這種可能就會出現問題。這時候可以使用 node-pty 來實現輸出。以下是一個封裝。
runShellCommandPipeOutput(
command: string,
args: any[],
subscriber: Subscriber<string>,
) {
return new Promise((resolve) => {
subscriber.next(`$ ${command} ${args.join(' ')}\n`)
const pty = spawn(command, args, {})
pty.onData((data) => {
subscriber.next(data.toString())
})
pty.onExit(() => {
resolve(null)
})
})
}
node-pty 對 spawn 進行了包裝,使用起來不像 zx 可以直接打長串命令來的靈活。命令需要拆分成程序和參數。以下是一個調用。
await this.runShellCommandPipeOutput(
'unzip',
['-o', 'admin-release.zip', '-d', folder],
subscriber,
)
就這樣通過 Writable 和 rxjs Observable 實現了一個簡單的日子輸出。效果如下。
不足#
由於 SSE 只是一個 GET 請求的長連接,所以在設計上沒有斷線重連的能力,如果沒有及時處理斷開後的 EventSource 則會不停再次發送請求,造成資源的浪費。而且對連接的控制不友好,屬於 SSE 依然使用很少,都會選擇更加穩定的 WebSocket。