Recently, two requirements were completed, one is the hot update backend panel, and the other is the installation and upgrade of dependencies. Both are full tasks, and real-time log output is required for visualizing task progress on the frontend. Similar to the log output of GitHub Action, Vercel, and other builds. Generally, WebSocket is used for real-time output. Although Socket.IO is also used in the project, establishing a Socket.IO is expensive in terms of design. Considering that HTTP2 SSE is worth a try.
SSE Interface Design#
SSE stands for Server-Sent Events. It can establish a long connection with the server, and the server can send multiple data to the browser, which is suitable for log output.
NestJS provides the @Sse
decorator, which is used in the same way as traditional HTTP requests. SSE requests are essentially HTTP GET requests, but the Content-Type changes to text/event-stream
.
In the NestJS SSE interface, in order to implement the transmission of a stream, the capabilities of rxjs need to be used, and observable objects in rxjs can continuously pass data. A simple stream is as follows:
@Sse('/hello')
async helloSse() {
const ob$ = new Observable((subscriber) => {
subscriber.next('hello')
subscriber.next('bye!')
subscriber.complete()
})
return ob$
}
The browser uses EventSource to establish a connection.
const ev = new EventSource('http://localhost:2333/debug/hello')
ev.onmessage = (e) => {
log.innerText += e.data + '\n'
}
ev.onerror = (e) => {
if (!e) {
ev.close()
}
}
However, after the stream transmission is successful, the browser has re-sent the request. As long as the connection is not manually closed, it will keep repeating. Here is a temporary HACK code.
const ev = new EventSource('http://localhost:2333/debug/hello')
let opened = false
ev.onopen = (e) => {
opened = true
console.log('opened')
}
ev.onmessage = (e) => {
log.innerText += e.data + '\n'
}
ev.onerror = (e) => {
if (!e) {
ev.close()
}
if (e.target.readyState === EventSource.CONNECTING && opened) {
return ev.close()
}
if (e.target.readyState === EventSource.CLOSED) {
log.innerText += 'closed'
ev.close()
}
}
Because there will be an error event after the transmission is completed.
Transmitting Shell Output Stream Using zx and node-pty#
zx is a library open-sourced by Google for convenient script writing. It is not only used for script writing, but I also use it a lot in projects. However, zx has dropped support for node-cjs. Here, I recommend zx-cjs, a library that modifies the official build method to synchronize and update the zx version compatible with node-cjs in real-time. After installation, don't forget to modify the application path in the package. Change it to:
"zx": "npm:zx-cjs@latest"
This way, it can be referenced as before.
node-pty is an open-source shell implementation by Microsoft, which can easily implement a terminal emulator.
When using zx to execute shell commands, add the .pipe
method to pipe the output stream to another Writable, and the Writable writes to an rxjs Observable. The recommended code is as follows.
const ob$ = new Observable((subscriber) => {
;(async () => {
const writable = new Stream.Writable({
autoDestroy: false,
write(chunk) {
subscriber.next(chunk.toString())
},
})
await $`ls -lh`.pipe(writable)
subscriber.complete()
writable.destroy()
})()
})
Note: Currently, Observable does not support passing async functions, otherwise, an error will occur when
.complete()
is executed.
This way, a simple log output is implemented.
Of course, this is a simple output that zx can handle. If encountering something like zip
, there may be problems. At this time, node-pty can be used for output. Here is a wrapper.
runShellCommandPipeOutput(
command: string,
args: any[],
subscriber: Subscriber<string>,
) {
return new Promise((resolve) => {
subscriber.next(`$ ${command} ${args.join(' ')}\n`)
const pty = spawn(command, args, {})
pty.onData((data) => {
subscriber.next(data.toString())
})
pty.onExit(() => {
resolve(null)
})
})
}
node-pty wraps spawn, which is not as flexible as zx, which can directly execute long commands. The command needs to be split into a program and arguments. Here is an example of calling it.
await this.runShellCommandPipeOutput(
'unzip',
['-o', 'admin-release.zip', '-d', folder],
subscriber,
)
In this way, a simple log output is implemented using Writable and rxjs Observable. The effect is as follows.
Insufficient#
Since SSE is only a long connection for GET requests, it does not have the ability to reconnect after disconnection. If the EventSource after disconnection is not handled in time, it will keep sending requests, causing resource waste. Moreover, the control of the connection is not friendly. SSE is still used very little, and more stable WebSocket is chosen.