最近、2 つの要件を実装しました。1 つはホット更新のバックエンドパネルで、もう 1 つは依存関係のインストールとアップグレードです。どちらもタスクが多く、フロントエンドでタスクの進捗を視覚化するためにリアルタイムのログ出力が必要です。GitHub Action や Vercel などでビルドされたログ出力に似ています。通常、リアルタイム出力には WebSocket が使用されますが、プロジェクトでは Socket.IO も使用していますが、Socket.IO を設計することはコストがかかります。HTTP2 SSE を試してみる価値があると考えました。
SSE インターフェースの設計#
SSE は Server-Sent Events の略で、サーバーとの間に長時間の接続を確立し、サーバーからブラウザに複数回データを送信することができます。ログ出力に適しています。
NestJS では、@Sse
デコレータを提供しており、従来の HTTP リクエストと同じように使用します。SSE リクエストは本質的には HTTP GET リクエストですが、Content-Type がtext/event-stream
に変わります。
NestJS の SSE インターフェースでは、ストリームの転送を実現するために rxjs の機能を使用する必要があります。rxjs の Observable を使用してデータを連続的に送信することができます。簡単なストリームの例は次のとおりです。
@Sse('/hello')
async helloSse() {
const ob$ = new Observable((subscriber) => {
subscriber.next('hello')
subscriber.next('bye!')
subscriber.complete()
})
return ob$
}
ブラウザは EventSource を使用して接続します。
const ev = new EventSource('http://localhost:2333/debug/hello')
ev.onmessage = (e) => {
log.innerText += e.data + '\n'
}
ev.onerror = (e) => {
if (!e) {
ev.close()
}
}
しかし、ストリームの転送が成功すると、ブラウザはリクエストを再送信します。接続を手動で閉じない限り、リクエストは繰り返し送信されます。ここでは、以下のコードを一時的に使用しています。
const ev = new EventSource('http://localhost:2333/debug/hello')
let opened = false
ev.onopen = (e) => {
opened = true
console.log('opened')
}
ev.onmessage = (e) => {
log.innerText += e.data + '\n'
}
ev.onerror = (e) => {
if (!e) {
ev.close()
}
if (e.target.readyState === EventSource.CONNECTING && opened) {
return ev.close()
}
if (e.target.readyState === EventSource.CLOSED) {
log.innerText += 'closed'
ev.close()
}
}
転送が完了すると、常にエラーイベントが発生するためです。
zx と node-pty を使用してシェルの出力ストリームを転送する#
zxは、スクリプトを簡単に書くための Google のオープンソースライブラリです。スクリプトの作成だけでなく、プロジェクトでも大量に使用していますが、現在の zx は node-cjs のサポートを終了しています。ここで、zx-cjsというライブラリを紹介します。これは、公式のビルド方法を変更して、node-cjs 互換の zx バージョンをリアルタイムに同期更新することができます。インストールした後、パッケージのアプリケーションパスを変更することを忘れないでください。次のように変更します。
"zx": "npm:zx-cjs@latest"
これで元のようにインポートできます。
node-ptyは、マイクロソフトがオープンソースで提供するシェルの実装で、簡単にターミナルエミュレータを実現することができます。
zx を使用してシェルコマンドを実行する際に.pipe
メソッドを追加し、出力ストリームを別の Writable にパイプします。Writable は rxjs Observable に書き込むための write メソッドを実装します。以下のコードをお勧めします。
const ob$ = new Observable((subscriber) => {
;(async () => {
const writable = new Stream.Writable({
autoDestroy: false,
write(chunk) {
subscriber.next(chunk.toString())
},
})
await $`ls -lh`.pipe(writable)
subscriber.complete()
writable.destroy()
})()
})
注意 現在、Observable は async 関数を受け付けないため、
.complete()
を実行するとエラーが発生します。
これにより、簡単なログ出力が実現されます。
もちろん、これは zx だけで実現できる簡単な出力です。zip
などの場合は問題が発生する可能性があります。その場合は、node-pty を使用して出力を実現できます。以下はラップする例です。
runShellCommandPipeOutput(
command: string,
args: any[],
subscriber: Subscriber<string>,
) {
return new Promise((resolve) => {
subscriber.next(`$ ${command} ${args.join(' ')}\n`)
const pty = spawn(command, args, {})
pty.onData((data) => {
subscriber.next(data.toString())
})
pty.onExit(() => {
resolve(null)
})
})
}
node-pty は spawn をラップしており、zx のように長いコマンドを直接入力することはできません。コマンドはプログラムと引数に分割する必要があります。以下は呼び出しの例です。
await this.runShellCommandPipeOutput(
'unzip',
['-o', 'admin-release.zip', '-d', folder],
subscriber,
)
これにより、Writable と rxjs Observable を使用して簡単なログ出力が実現されます。以下は効果です。
不足点#
SSE は GET リクエストの長時間接続であるため、切断と再接続の機能がありません。タイムリーに EventSource の切断を処理しない場合、リクエストが繰り返し送信され、リソースが浪費される可能性があります。また、接続の制御が不十分であり、SSE はまだあまり使用されていないため、より安定した WebSocket を選択する傾向があります。