在之前的文章中,我詳細的介紹了站點的即時人數是如何實現的?,在方案中,我留下了一個懸念,如何使用 SharedWorker 在頁面之間共享 Socket 實例,實現跨頁面單例 WebSocket。
動機#
探索這個問題的背景是無意間在知乎看到了 WebSocket 的一個問題,其中有回答提到 WebSocket 連接過於佔用伺服器資源。當頁面重複打開很多個時,每個頁面都會建立 WebSocket 連接,確實無意間的增大了伺服器壓力。雖說這個問題我之前也有考慮過,但是由於個人站點並不會出現大量的用戶同時在線,所以並沒有深入研究。我們使用的是 Socket.IO 的方案建立 WebSocket 連接,Socket.IO 本身就是對 WebSocket 的封裝,所以從開銷上來說,Socket.IO 會比原生 WebSocket 多一些開銷。關於 Socket.IO 連接對伺服器側的記憶體開銷,文檔中有所提及:Memory Usage | Socket.IO。
雖然這點性能優化不痛不癢也沒有任何提升,但是既然有這個問題,那我們這次就試著去解決這個問題。
找到方案#
我們要解決的問題很明確,就是當一個瀏覽器打開兩個或以上的我站的頁面時(多個 Tab),復用同一個 Socket 實例。
通過搜索關鍵字,我們了解到一個 API,SharedWorker,SharedWorker 是一個在多個瀏覽上下文(例如多個窗口、標籤或 iframe)之間共享的 Worker。SharedWorker 有一個全局作用域,可以在多個瀏覽上下文中使用,這樣就可以實現我們的目標。
我們只需要把原本在 Socket 實例的代碼放到 SharedWorker 中,然後在頁面中與 Worker 中的 Socket 實例通信即可。由於之前的 SocketClient 對 Socket 進行了抽象,在此次重構中並不需要修改太多代碼,而是只需要實現通信層即可。
SharedWorker 基本使用#
先來看看 SharedWorker 是如何使用的。
let i = 0
const ports = []
onconnect = (e) => {
const port = e.ports[0];
ports.push(port)
port.addEventListener("message", (e) => {
const workerResult = `Result: ${e.data[0] * e.data[1]}`;
port.postMessage(workerResult);
});
port.start();
};
setInterval(() => {
i++;
ports.forEach(port => port.postMessage("Message from worker, time: " + i))
}, 1000);
const worker = new SharedWorker('/shared-worker.js')
worker.port.onmessage = (e) => {
console.log('Message received from worker', e.data)
}
worker.port.start()
上面的例子中,我們創建了一個 SharedWorker,然後在頁面中與 Worker 通信。在 Worker 中,我們監聽了 onconnect
事件,當 Worker 連接時,我們監聽了 port 的 message 事件,然後在頁面中,我們監聽了 port 的 message 事件,worker 中的消息在頁面中進行進一步處理。
SharedWorker 同 Worker 一樣,消息通過 MessageChannel 進行傳遞。而 worker.port
本質就是對 MessageChannel 的封裝。
現在我們運行這個基本的例子,看看效果。
https://codesandbox.io/p/sandbox/ecstatic-jerry-6xsgvs
當我們同時打開兩個 Tab 時,我們可以看到兩個 Tab 都會收到 Worker 發送的消息。
通過 chrome://inspect/#workers
我們可以看到 Worker 的信息。當打開多個 Tab 時,SharedWorker 會在多個 Tab 之間共享;當多個頁面都被關閉時,Worker 會被銷毀。
在 Next.js 中使用 SharedWorker#
上面的例子中,我們把 worker 的實現放在了 public 目錄下,這樣的方式並不適用於工程化的項目中,因為在工程化項目中,我們不應該直接引用 public 目錄下的文件,而是應該通過 import 的方式引入。第二,我們或許需要在 Worker 實現中引用外部庫,或者使用 TypeScript 編寫 Worker 實現。而對於這些需求,直接裸寫 js 的方式是無法實現的。
對此,我們需要對 Next.js 的 webpack 配置進行一些修改,以支持 Worker 的引入。
首先安裝 worker-loader
:
npm i -D worker-loader
修改 next.config.js
:
/** @type {import('next').NextConfig} */
let nextConfig = {
webpack: (config, { webpack }) => {
config.module.rules.unshift({
test: /\.worker\.ts$/,
loader: 'worker-loader',
options: {
publicPath: '/_next/',
worker: {
type: 'SharedWorker',
// https://v4.webpack.js.org/loaders/worker-loader/#worker
options: {
name: 'ws-worker',
},
},
},
})
return config
},
}
export default nextConfig
這裡我們使用了 worker-loader
,並且配置了文件後綴 .worker.ts
為 SharedWorker
實現,這樣我們就可以在 Next.js 中使用 SharedWorker 了。
const worker = new SharedWorker("/worker.js"); // ![code --]
import worker from './worker.worker' // ![code ++]
實現 SharedWorker Socket#
現在我們在 Worker 中實現 Socket 的邏輯。
大概梳理一下流程:
- 首先在建立 Worker 時(也就是第一個頁面會建立 Worker),傳遞 Socket 連接配置。(因為 Worker 中無法獲得環境變量和主線程變量)
- 創建 Socket 實例,然後對 Socket 連接狀態和消息事件監聽,通過 MessageChannel 傳遞消息給主線程(頁面)。
- 在新頁面打開時,或者 Socket 連接完成後,傳遞 Socket 相關信息到主線程,這些數據需要被主進程存儲並用於其他組件消費。
- 在主進程中,實現和 Worker 的通信,以及對 Socket 實例的操作。比如
emit
方法。
具體的代碼實現如下:
Note
下面的例子中,我們使用了 TypeScript 去編寫 Worker,並且可以直接在 Worker 中使用 import 引用外部庫,為了更好的支持 Worker Scope 的類型,所以在頂部我們使用 /// <reference lib="webworker" />
增加相關的類型支持。
import { io } from 'socket.io-client'
import type { Socket } from 'socket.io-client'
/// <reference lib="webworker" />
let ws: Socket | null = null
function setupIo(config: { url: string }) {
if (ws) return
// 使用 socket.io
console.log('Connecting to io, url: ', config.url)
ws = io(config.url, {
timeout: 10000,
reconnectionDelay: 3000,
autoConnect: false,
reconnectionAttempts: 3,
transports: ['websocket'],
})
if (!ws) return
ws.on('disconnect', () => {
boardcast({
type: 'disconnect',
})
})
/**
* @param {any} payload
*/
ws.on('message', (payload) => {
console.log('ws', payload)
boardcast({
type: 'message',
payload,
})
})
ws.on('connect', () => {
console.log('Connected to ws.io server from SharedWorker')
if (waitingEmitQueue.length > 0) {
waitingEmitQueue.forEach((payload) => {
if (!ws) return
ws.emit('message', payload)
})
waitingEmitQueue.length = 0
}
boardcast({
type: 'connect',
// @ts-expect-error
payload: ws.id,
})
})
ws.open()
boardcast({
type: 'sid',
payload: ws.id,
})
}
const ports = [] as MessagePort[]
self.addEventListener('connect', (ev: any) => {
const event = ev as MessageEvent
const port = event.ports[0]
ports.push(port)
port.onmessage = (event) => {
const { type, payload } = event.data
console.log('get message from main', event.data)
switch (type) {
case 'config':
setupIo(payload)
break
case 'emit':
if (ws) {
if (ws.connected) ws.emit('message', payload)
else waitingEmitQueue.push(payload)
}
break
case 'reconnect':
if (ws) ws.open()
break
case 'init':
port.postMessage({ type: 'ping' })
if (ws) {
if (ws.connected) port.postMessage({ type: 'connect' })
port.postMessage({ type: 'sid', payload: ws.id })
}
break
default:
console.log('Unknown message type:', type)
}
}
port.start()
})
function boardcast(payload: any) {
console.log('[ws] boardcast', payload)
ports.forEach((port) => {
port.postMessage(payload)
})
}
const waitingEmitQueue: any[] = []
Worker 寫完了,那麼現在就要寫主線程和 Worker 通信的代碼了。
先看看流程:
具體的代碼實現如下:
Important
下面的例子中,我們在 constructor 中異步加載去初始化 SharedWorker 是為了避免在 Server Side 不存在 Worker 的情況下報錯。
interface WorkerSocket {
sid: string
}
class SocketWorker {
private socket: WorkerSocket | null = null
worker: SharedWorker | null = null
constructor() {
if (isServerSide) return
// @ts-expect-error
import('./io.worker').then(({ default: SharedWorker }) => {
if (isServerSide) return
const worker = new SharedWorker()
this.prepare(worker)
this.worker = worker
})
}
async getSid() {
return this.socket?.sid
}
private setSid(sid: string) {
this.socket = {
...this.socket,
sid,
}
}
bindMessageHandler = (worker: SharedWorker) => {
worker.port.onmessage = (event: MessageEvent) => {
const { data } = event
const { type, payload } = data
switch (type) {
case 'ping': {
worker?.port.postMessage({
type: 'pong',
})
console.log('[ws worker] pong')
break
}
case 'connect': {
window.dispatchEvent(new SocketConnectedEvent())
setSocketIsConnect(true)
const sid = payload
this.setSid(sid)
break
}
case 'disconnect': {
window.dispatchEvent(new SocketDisconnectedEvent())
setSocketIsConnect(false)
break
}
case 'sid': {
const sid = payload
this.setSid(sid)
break
}
case 'message': {
const typedPayload = payload as string | Record<'type' | 'data', any>
if (typeof typedPayload !== 'string') {
return this.handleEvent(
typedPayload.type,
camelcaseKeys(typedPayload.data),
)
}
const { data, type } = JSON.parse(typedPayload) as {
data: any
type: EventTypes
}
this.handleEvent(type, camelcaseKeys(data))
}
}
}
}
prepare(worker: SharedWorker) {
const gatewayUrlWithoutTrailingSlash = GATEWAY_URL.replace(/\/$/, '')
this.bindMessageHandler(worker)
worker.port.postMessage({
type: 'config',
payload: {
url: `${gatewayUrlWithoutTrailingSlash}/web`,
},
})
worker.port.start()
worker.port.postMessage({
type: 'init',
})
}
handleEvent(type: EventTypes, data: any) {
// Handle biz event
}
emit(event: SocketEmitEnum, payload: any) {
this.worker?.port.postMessage({
type: 'emit',
payload: { type: event, payload },
})
}
reconnect() {
this.worker?.port.postMessage({
type: 'reconnect',
})
}
static shared = new SocketWorker()
}
export const socketWorker = SocketWorker.shared
export type TSocketClient = SocketWorker
那麼就大功告成了,SocketWorker
基本還是從原有的 SocketClient
中抽象出來的,基本實現了相同的方法,所以在業務中使用沒有太大的變化,遷移過程也非常平滑。
對了,這次的重構位於 Shiro/550abd。可供參考。
完成的實現位於:
https://github.com/Innei/Shiro/blob/c399372f7cc1bff55f842ff68342ffb0071b5ae6/src/socket/io.worker.ts
此文由 Mix Space 同步更新至 xLog 原始鏈接為 https://innei.in/posts/tech/using-sharedworker-singleton-websocket-in-nextjs