banner
innei

innei

写代码是因为爱,写到世界充满爱!
github
telegram
twitter

利好 SharedWorker 實現跨頁面單例 WebSocket

在之前的文章中,我詳細的介紹了站點的即時人數是如何實現的?,在方案中,我留下了一個懸念,如何使用 SharedWorker 在頁面之間共享 Socket 實例,實現跨頁面單例 WebSocket。

動機#

探索這個問題的背景是無意間在知乎看到了 WebSocket 的一個問題,其中有回答提到 WebSocket 連接過於佔用伺服器資源。當頁面重複打開很多個時,每個頁面都會建立 WebSocket 連接,確實無意間的增大了伺服器壓力。雖說這個問題我之前也有考慮過,但是由於個人站點並不會出現大量的用戶同時在線,所以並沒有深入研究。我們使用的是 Socket.IO 的方案建立 WebSocket 連接,Socket.IO 本身就是對 WebSocket 的封裝,所以從開銷上來說,Socket.IO 會比原生 WebSocket 多一些開銷。關於 Socket.IO 連接對伺服器側的記憶體開銷,文檔中有所提及:Memory Usage | Socket.IO

雖然這點性能優化不痛不癢也沒有任何提升,但是既然有這個問題,那我們這次就試著去解決這個問題。

找到方案#

我們要解決的問題很明確,就是當一個瀏覽器打開兩個或以上的我站的頁面時(多個 Tab),復用同一個 Socket 實例。

通過搜索關鍵字,我們了解到一個 API,SharedWorker,SharedWorker 是一個在多個瀏覽上下文(例如多個窗口、標籤或 iframe)之間共享的 Worker。SharedWorker 有一個全局作用域,可以在多個瀏覽上下文中使用,這樣就可以實現我們的目標。

我們只需要把原本在 Socket 實例的代碼放到 SharedWorker 中,然後在頁面中與 Worker 中的 Socket 實例通信即可。由於之前的 SocketClient 對 Socket 進行了抽象,在此次重構中並不需要修改太多代碼,而是只需要實現通信層即可。

SharedWorker 基本使用#

先來看看 SharedWorker 是如何使用的。

let i = 0

const ports = []

onconnect = (e) => {
  const port = e.ports[0];

  ports.push(port)

  port.addEventListener("message", (e) => {
    const workerResult = `Result: ${e.data[0] * e.data[1]}`;
    port.postMessage(workerResult);
  });
  port.start();
};
setInterval(() => {
  i++;
  ports.forEach(port => port.postMessage("Message from worker, time: " + i))
}, 1000);
const worker = new SharedWorker('/shared-worker.js')
worker.port.onmessage = (e) => {
  console.log('Message received from worker', e.data)
}
worker.port.start()

上面的例子中,我們創建了一個 SharedWorker,然後在頁面中與 Worker 通信。在 Worker 中,我們監聽了 onconnect 事件,當 Worker 連接時,我們監聽了 port 的 message 事件,然後在頁面中,我們監聽了 port 的 message 事件,worker 中的消息在頁面中進行進一步處理。

SharedWorker 同 Worker 一樣,消息通過 MessageChannel 進行傳遞。而 worker.port 本質就是對 MessageChannel 的封裝。

現在我們運行這個基本的例子,看看效果。

https://codesandbox.io/p/sandbox/ecstatic-jerry-6xsgvs

當我們同時打開兩個 Tab 時,我們可以看到兩個 Tab 都會收到 Worker 發送的消息。

image

通過 chrome://inspect/#workers 我們可以看到 Worker 的信息。當打開多個 Tab 時,SharedWorker 會在多個 Tab 之間共享;當多個頁面都被關閉時,Worker 會被銷毀。

在 Next.js 中使用 SharedWorker#

上面的例子中,我們把 worker 的實現放在了 public 目錄下,這樣的方式並不適用於工程化的項目中,因為在工程化項目中,我們不應該直接引用 public 目錄下的文件,而是應該通過 import 的方式引入。第二,我們或許需要在 Worker 實現中引用外部庫,或者使用 TypeScript 編寫 Worker 實現。而對於這些需求,直接裸寫 js 的方式是無法實現的。

對此,我們需要對 Next.js 的 webpack 配置進行一些修改,以支持 Worker 的引入。

首先安裝 worker-loader

npm i -D worker-loader

修改 next.config.js

/** @type {import('next').NextConfig} */

let nextConfig = {
  webpack: (config, { webpack }) => {
    config.module.rules.unshift({
      test: /\.worker\.ts$/,
      loader: 'worker-loader',
      options: {
        publicPath: '/_next/',
        worker: {
          type: 'SharedWorker',
          // https://v4.webpack.js.org/loaders/worker-loader/#worker
          options: {
            name: 'ws-worker',
          },
        },
      },
    })

    return config
  },
}

export default nextConfig

這裡我們使用了 worker-loader,並且配置了文件後綴 .worker.tsSharedWorker 實現,這樣我們就可以在 Next.js 中使用 SharedWorker 了。

const worker = new SharedWorker("/worker.js"); // ![code --]
import worker from './worker.worker' // ![code ++]

實現 SharedWorker Socket#

現在我們在 Worker 中實現 Socket 的邏輯。

大概梳理一下流程:

  1. 首先在建立 Worker 時(也就是第一個頁面會建立 Worker),傳遞 Socket 連接配置。(因為 Worker 中無法獲得環境變量和主線程變量)
  2. 創建 Socket 實例,然後對 Socket 連接狀態和消息事件監聽,通過 MessageChannel 傳遞消息給主線程(頁面)。
  3. 在新頁面打開時,或者 Socket 連接完成後,傳遞 Socket 相關信息到主線程,這些數據需要被主進程存儲並用於其他組件消費。
  4. 在主進程中,實現和 Worker 的通信,以及對 Socket 實例的操作。比如 emit 方法。
Mermaid Loading...

具體的代碼實現如下:

Note

下面的例子中,我們使用了 TypeScript 去編寫 Worker,並且可以直接在 Worker 中使用 import 引用外部庫,為了更好的支持 Worker Scope 的類型,所以在頂部我們使用 /// <reference lib="webworker" /> 增加相關的類型支持。

import { io } from 'socket.io-client'
import type { Socket } from 'socket.io-client'

/// <reference lib="webworker" />

let ws: Socket | null = null

function setupIo(config: { url: string }) {
  if (ws) return
  // 使用 socket.io
  console.log('Connecting to io, url: ', config.url)

  ws = io(config.url, {
    timeout: 10000,
    reconnectionDelay: 3000,
    autoConnect: false,
    reconnectionAttempts: 3,
    transports: ['websocket'],
  })
  if (!ws) return

  ws.on('disconnect', () => {
    boardcast({
      type: 'disconnect',
    })
  })

  /**
   * @param {any} payload
   */
  ws.on('message', (payload) => {
    console.log('ws', payload)

    boardcast({
      type: 'message',
      payload,
    })
  })

  ws.on('connect', () => {
    console.log('Connected to ws.io server from SharedWorker')

    if (waitingEmitQueue.length > 0) {
      waitingEmitQueue.forEach((payload) => {
        if (!ws) return
        ws.emit('message', payload)
      })
      waitingEmitQueue.length = 0
    }
    boardcast({
      type: 'connect',
      // @ts-expect-error
      payload: ws.id,
    })
  })

  ws.open()
  boardcast({
    type: 'sid',
    payload: ws.id,
  })
}

const ports = [] as MessagePort[]

self.addEventListener('connect', (ev: any) => {
  const event = ev as MessageEvent

  const port = event.ports[0]

  ports.push(port)

  port.onmessage = (event) => {
    const { type, payload } = event.data
    console.log('get message from main', event.data)

    switch (type) {
      case 'config':
        setupIo(payload)
        break
      case 'emit':
        if (ws) {
          if (ws.connected) ws.emit('message', payload)
          else waitingEmitQueue.push(payload)
        }
        break
      case 'reconnect':
        if (ws) ws.open()
        break
      case 'init':
        port.postMessage({ type: 'ping' })

        if (ws) {
          if (ws.connected) port.postMessage({ type: 'connect' })
          port.postMessage({ type: 'sid', payload: ws.id })
        }
        break
      default:
        console.log('Unknown message type:', type)
    }
  }

  port.start()
})

function boardcast(payload: any) {
  console.log('[ws] boardcast', payload)
  ports.forEach((port) => {
    port.postMessage(payload)
  })
}

const waitingEmitQueue: any[] = []

Worker 寫完了,那麼現在就要寫主線程和 Worker 通信的代碼了。

先看看流程:

Mermaid Loading...

具體的代碼實現如下:

Important

下面的例子中,我們在 constructor 中異步加載去初始化 SharedWorker 是為了避免在 Server Side 不存在 Worker 的情況下報錯。

interface WorkerSocket {
  sid: string
}

class SocketWorker {
  private socket: WorkerSocket | null = null

  worker: SharedWorker | null = null

  constructor() {
    if (isServerSide) return
    // @ts-expect-error
    import('./io.worker').then(({ default: SharedWorker }) => {
      if (isServerSide) return
      const worker = new SharedWorker()

      this.prepare(worker)
      this.worker = worker
    })
  }

  async getSid() {
    return this.socket?.sid
  }

  private setSid(sid: string) {
    this.socket = {
      ...this.socket,
      sid,
    }
  }
  bindMessageHandler = (worker: SharedWorker) => {
    worker.port.onmessage = (event: MessageEvent) => {
      const { data } = event
      const { type, payload } = data

      switch (type) {
        case 'ping': {
          worker?.port.postMessage({
            type: 'pong',
          })
          console.log('[ws worker] pong')
          break
        }
        case 'connect': {
          window.dispatchEvent(new SocketConnectedEvent())
          setSocketIsConnect(true)

          const sid = payload
          this.setSid(sid)
          break
        }
        case 'disconnect': {
          window.dispatchEvent(new SocketDisconnectedEvent())
          setSocketIsConnect(false)
          break
        }
        case 'sid': {
          const sid = payload
          this.setSid(sid)
          break
        }
        case 'message': {
          const typedPayload = payload as string | Record<'type' | 'data', any>
          if (typeof typedPayload !== 'string') {
            return this.handleEvent(
              typedPayload.type,
              camelcaseKeys(typedPayload.data),
            )
          }
          const { data, type } = JSON.parse(typedPayload) as {
            data: any
            type: EventTypes
          }
          this.handleEvent(type, camelcaseKeys(data))
        }
      }
    }
  }

  prepare(worker: SharedWorker) {
    const gatewayUrlWithoutTrailingSlash = GATEWAY_URL.replace(/\/$/, '')
    this.bindMessageHandler(worker)
    worker.port.postMessage({
      type: 'config',

      payload: {
        url: `${gatewayUrlWithoutTrailingSlash}/web`,
      },
    })

    worker.port.start()

    worker.port.postMessage({
      type: 'init',
    })
  }
  handleEvent(type: EventTypes, data: any) {
    // Handle biz event
  }

  emit(event: SocketEmitEnum, payload: any) {
    this.worker?.port.postMessage({
      type: 'emit',
      payload: { type: event, payload },
    })
  }

  reconnect() {
    this.worker?.port.postMessage({
      type: 'reconnect',
    })
  }

  static shared = new SocketWorker()
}

export const socketWorker = SocketWorker.shared
export type TSocketClient = SocketWorker

那麼就大功告成了,SocketWorker 基本還是從原有的 SocketClient 中抽象出來的,基本實現了相同的方法,所以在業務中使用沒有太大的變化,遷移過程也非常平滑。

對了,這次的重構位於 Shiro/550abd。可供參考。

完成的實現位於:

https://github.com/Innei/Shiro/blob/c399372f7cc1bff55f842ff68342ffb0071b5ae6/src/socket/io.worker.ts

此文由 Mix Space 同步更新至 xLog 原始鏈接為 https://innei.in/posts/tech/using-sharedworker-singleton-websocket-in-nextjs

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。