在之前的文章中,我详细的介绍了站点的实时人数是如何实现的?,在方案中,我留下了一个悬念,如何使用 SharedWorker 在页面之间共享 Socket 实例,实现跨页面单例 WebSocket。
动机#
探索这个问题的背景是无意间在知乎看到了 WebSocket 的一个问题,其中有回答提到 WebSocket 连接过于占用服务器资源。当页面重复打开很多个时,每个页面都会建立 WebSocket 连接,确实无意间的增大了服务器压力。虽说这个问题我之前也有考虑过,但是由于个人站点并不会出现大量的用户同时在线,所以并没有深入研究。我们使用的是 Socket.IO 的方案建立 WebSocket 连接,Socket.IO 本身就是对 WebSocket 的封装,所以从开销上来说,Socket.IO 会比原生 WebSocket 多一些开销。关于 Socket.IO 连接对服务器侧的内存开销,文档中有所提及:Memory Usage | Socket.IO。
虽然这点性能优化不痛不痒也没有任何提升,但是既然有这个问题,那我们这次就试着去解决这个问题。
找到方案#
我们要解决的问题很明确,就是当一个浏览器打开两个或以上的我站的页面时(多个 Tab),复用同一个 Socket 实例。
通过搜索关键字,我们了解到一个 API,SharedWorker,SharedWorker 是一个在多个浏览上下文(例如多个窗口、标签或 iframe)之间共享的 Worker。SharedWorker 有一个全局作用域,可以在多个浏览上下文中使用,这样就可以实现我们的目标。
我们只需要把原本在 Socket 实例的代码放到 SharedWorker 中,然后在页面中与 Worker 中的 Socket 实例通信即可。由于之前的 SocketClient 对 Socket 进行了抽象,在此次重构中并不需要修改太多代码,而是只需要实现通信层即可。
SharedWorker 基本使用#
先来看看 SharedWorker 是如何使用的。
let i = 0
const ports = []
onconnect = (e) => {
const port = e.ports[0];
ports.push(port)
port.addEventListener("message", (e) => {
const workerResult = `Result: ${e.data[0] * e.data[1]}`;
port.postMessage(workerResult);
});
port.start();
};
setInterval(() => {
i++;
ports.forEach(port => port.postMessage("Message from worker, time: " + i))
}, 1000);
const worker = new SharedWorker('/shared-worker.js')
worker.port.onmessage = (e) => {
console.log('Message received from worker', e.data)
}
worker.port.start()
上面的例子中,我们创建了一个 SharedWorker,然后在页面中与 Worker 通信。在 Worker 中,我们监听了 onconnect
事件,当 Worker 连接时,我们监听了 port 的 message 事件,然后在页面中,我们监听了 port 的 message 事件,worker 中的消息在页面中进行进一步处理。
SharedWorker 同 Worker 一样,消息通过 MessageChannel 进行传递。而 worker.port
本质就是对 MessageChannel 的封装。
现在我们运行这个基本的例子,看看效果。
https://codesandbox.io/p/sandbox/ecstatic-jerry-6xsgvs
当我们同时打开两个 Tab 时,我们可以看到两个 Tab 都会收到 Worker 发送的消息。
通过 chrome://inspect/#workers
我们可以看到 Worker 的信息。当打开多个 Tab 时,SharedWorker 会在多个 Tab 之间共享;当多个页面都被关闭时,Worker 会被销毁。
在 Next.js 中使用 SharedWorker#
上面的例子中,我们把 worker 的实现放在了 public 目录下,这样的方式并不适用于工程化的项目中,因为在工程化项目中,我们不应该直接引用 public 目录下的文件,而是应该通过 import 的方式引入。第二,我们或许需要在 Worker 实现中引用外部库,或者使用 TypeScript 编写 Worker 实现。而对于这些需求,直接裸写 js 的方式是无法实现的。
对此,我们需要对 Next.js 的 webpack 配置进行一些修改,以支持 Worker 的引入。
首先安装 worker-loader
:
npm i -D worker-loader
修改 next.config.js
:
/** @type {import('next').NextConfig} */
let nextConfig = {
webpack: (config, { webpack }) => {
config.module.rules.unshift({
test: /\.worker\.ts$/,
loader: 'worker-loader',
options: {
publicPath: '/_next/',
worker: {
type: 'SharedWorker',
// https://v4.webpack.js.org/loaders/worker-loader/#worker
options: {
name: 'ws-worker',
},
},
},
})
return config
},
}
export default nextConfig
这里我们使用了 worker-loader
,并且配置了文件后缀 .worker.ts
为 SharedWorker
实现,这样我们就可以在 Next.js 中使用 SharedWorker 了。
const worker = new SharedWorker("/worker.js"); // ![code --]
import worker from './worker.worker' // ![code ++]
实现 SharedWorker Socket#
现在我们在 Worker 中实现 Socket 的逻辑。
大概梳理一下流程:
- 首先在建立 Worker 时候(也就是第一个页面会建立 Worker),传递 Socket 连接配置。(因为 Worker 中无法获得环境变量和主线程变量)
- 创建 Socket 实例,然后对 Socket 连接状态和消息事件监听,通过 MessageChannel 传递消息给主线程(页面)。
- 在新页面打开时,或者 Socket 连接完成后,传递 Socket 相关信息到主线程,这些数据需要被主进程存储并用于其他组件消费。
- 在主进程中,实现和 Worker 的通信,以及对 Socket 实例的操作。比如
emit
方法。
具体的代码实现如下:
Note
下面的例子中,我们使用了 TypeScript 去编写 Worker,并且可以直接在 Worker 中使用 import 引用外部库,为了更好的支持 Worker Scope 的类型,所以在顶部我们使用 /// <reference lib="webworker" />
增加相关的类型支持。
import { io } from 'socket.io-client'
import type { Socket } from 'socket.io-client'
/// <reference lib="webworker" />
let ws: Socket | null = null
function setupIo(config: { url: string }) {
if (ws) return
// 使用 socket.io
console.log('Connecting to io, url: ', config.url)
ws = io(config.url, {
timeout: 10000,
reconnectionDelay: 3000,
autoConnect: false,
reconnectionAttempts: 3,
transports: ['websocket'],
})
if (!ws) return
ws.on('disconnect', () => {
boardcast({
type: 'disconnect',
})
})
/**
* @param {any} payload
*/
ws.on('message', (payload) => {
console.log('ws', payload)
boardcast({
type: 'message',
payload,
})
})
ws.on('connect', () => {
console.log('Connected to ws.io server from SharedWorker')
if (waitingEmitQueue.length > 0) {
waitingEmitQueue.forEach((payload) => {
if (!ws) return
ws.emit('message', payload)
})
waitingEmitQueue.length = 0
}
boardcast({
type: 'connect',
// @ts-expect-error
payload: ws.id,
})
})
ws.open()
boardcast({
type: 'sid',
payload: ws.id,
})
}
const ports = [] as MessagePort[]
self.addEventListener('connect', (ev: any) => {
const event = ev as MessageEvent
const port = event.ports[0]
ports.push(port)
port.onmessage = (event) => {
const { type, payload } = event.data
console.log('get message from main', event.data)
switch (type) {
case 'config':
setupIo(payload)
break
case 'emit':
if (ws) {
if (ws.connected) ws.emit('message', payload)
else waitingEmitQueue.push(payload)
}
break
case 'reconnect':
if (ws) ws.open()
break
case 'init':
port.postMessage({ type: 'ping' })
if (ws) {
if (ws.connected) port.postMessage({ type: 'connect' })
port.postMessage({ type: 'sid', payload: ws.id })
}
break
default:
console.log('Unknown message type:', type)
}
}
port.start()
})
function boardcast(payload: any) {
console.log('[ws] boardcast', payload)
ports.forEach((port) => {
port.postMessage(payload)
})
}
const waitingEmitQueue: any[] = []
Worker 写完了,那么现在就要写主线程和 Worker 通信的代码了。
先看看流程:
具体的代码实现如下:
Important
下面的例子中,我们在 constructor 中异步加载去初始化 SharedWorker 是为了避免在 Server Side 不存在 Worker 的情况下报错。
interface WorkerSocket {
sid: string
}
class SocketWorker {
private socket: WorkerSocket | null = null
worker: SharedWorker | null = null
constructor() {
if (isServerSide) return
// @ts-expect-error
import('./io.worker').then(({ default: SharedWorker }) => {
if (isServerSide) return
const worker = new SharedWorker()
this.prepare(worker)
this.worker = worker
})
}
async getSid() {
return this.socket?.sid
}
private setSid(sid: string) {
this.socket = {
...this.socket,
sid,
}
}
bindMessageHandler = (worker: SharedWorker) => {
worker.port.onmessage = (event: MessageEvent) => {
const { data } = event
const { type, payload } = data
switch (type) {
case 'ping': {
worker?.port.postMessage({
type: 'pong',
})
console.log('[ws worker] pong')
break
}
case 'connect': {
window.dispatchEvent(new SocketConnectedEvent())
setSocketIsConnect(true)
const sid = payload
this.setSid(sid)
break
}
case 'disconnect': {
window.dispatchEvent(new SocketDisconnectedEvent())
setSocketIsConnect(false)
break
}
case 'sid': {
const sid = payload
this.setSid(sid)
break
}
case 'message': {
const typedPayload = payload as string | Record<'type' | 'data', any>
if (typeof typedPayload !== 'string') {
return this.handleEvent(
typedPayload.type,
camelcaseKeys(typedPayload.data),
)
}
const { data, type } = JSON.parse(typedPayload) as {
data: any
type: EventTypes
}
this.handleEvent(type, camelcaseKeys(data))
}
}
}
}
prepare(worker: SharedWorker) {
const gatewayUrlWithoutTrailingSlash = GATEWAY_URL.replace(/\/$/, '')
this.bindMessageHandler(worker)
worker.port.postMessage({
type: 'config',
payload: {
url: `${gatewayUrlWithoutTrailingSlash}/web`,
},
})
worker.port.start()
worker.port.postMessage({
type: 'init',
})
}
handleEvent(type: EventTypes, data: any) {
// Handle biz event
}
emit(event: SocketEmitEnum, payload: any) {
this.worker?.port.postMessage({
type: 'emit',
payload: { type: event, payload },
})
}
reconnect() {
this.worker?.port.postMessage({
type: 'reconnect',
})
}
static shared = new SocketWorker()
}
export const socketWorker = SocketWorker.shared
export type TSocketClient = SocketWorker
那么就大功告成了,SocketWorker
基本还是从原有的 SocketClient
中抽象出来的,基本实现了相同的方法,所以在业务中使用没有太大的变化,迁移过程也非常平滑。
对了,这次的重构位于 Shiro/550abd。可供参考。
完成的实现位于:
https://github.com/Innei/Shiro/blob/c399372f7cc1bff55f842ff68342ffb0071b5ae6/src/socket/io.worker.ts
此文由 Mix Space 同步更新至 xLog
原始链接为 https://innei.in/posts/tech/using-sharedworker-singleton-websocket-in-nextjs