經常光顧我站的讀者應該都看到過我站底部有顯示當前在線的人數。在不久之前我還為其增加了顯示當前正在閱讀具體哪篇文章的人數排行,並且在具體的某篇文章左側增加時間線,用於展示當前所有正在閱讀此文章的讀者閱讀進度。
如果你看過我站的開源項目話大概知道此站是由 Mix Space 驅動的,既然已經需要有伺服器驅動,那麼我基本沒有其他平台提供的服務了,能自己去實現的基本都是自己去實現。比如這個功能,如果你用 SasS 服務,則可以選擇 Liveblocks。當然這裡不做討論,因為此文章的目的是如何自己去實現這樣一個功能,服務端以及前端。
設計與實現#
即時人數的統計,需要長連接。這裡可以使用 WebSocket,當然如果你不需要特別即時的情況也可以使用輪詢的方式。這裡我們還是以 Socket.IO 實現,Socket.IO 是 WebSocket 的上層封裝。
下面對上面的功能進行拆解分析。
站點的即時人數#
所有的即時人數是比較好統計的。我們可以考慮用所有的 WebSocket 連接去統計當前的在線人數。
如果使用這個方法,那麼沒有連接到 WebSocket 的讀者,不計入在線人數。
因為站點不需要登錄就能訪問,而每個訪問都會建立一條 WebSocket 連接,如果同一個用戶同時打開多個頁面訪問站點就會重複計算在線人數。
對於上面的第一點,我們不作考慮。對於第二點,我們可以稍作分析,在一些場景進行限制。
每個打開的頁面都要建立 WebSocket 連接是可以接受的,但是需要對這些同用戶的進行去重。
可以採用下面的辦法:
- 去重同一個連接 WebSocket 的 IP(如果在大內網或者 CDN 的情況下會過於激進)
- 在客戶端生成唯一的連接的 SessionId 在瀏覽器內部共享(可採用 Local Storage)
- 根據用戶的登錄狀態去重,或者本地記錄的歷史評論數據
綜上,最後我們選擇 2,3 作為最後的實現。
https://cdn.jsdelivr.net/npm/@innei/[email protected]/excalidraw/realtime-socket/socket-client.json
那麼,在前端我們需要在沒有 SessionId 的時候生成一個,如果在 Local Storage 存在則復用。
import { customAlphabet } from 'nanoid'
const alphabet = `1234567890abcdefghijklmnopqrstuvwxyz`
const nanoid = customAlphabet(alphabet)
const defaultSessionId = nanoid(8)
const storageKey = buildNSKey('web-session')
const getSocketWebSessionId = () => {
if (!isClientSide) {
return ''
}
const sessionId = localStorage.getItem(storageKey)
if (sessionId) return sessionId
localStorage.setItem(storageKey, defaultSessionId)
return defaultSessionId
}
首先使用 nanoid 生成一個默認的 sessionId,我們需要把已經存在的 SessionId 傳遞給 Socket Client。至於上面第三點的 Session Id 因為設計到登錄態的異步,所以我這裡只能選擇後續更新當前 Socket Client 的 Session Id。
const client = io(GATEWAY_URL, {
timeout: 10000,
reconnectionDelay: 3000,
autoConnect: false,
reconnectionAttempts: 3,
transports: ['websocket'],
query: {
socket_session_id: getSocketWebSessionId(),
},
})
此處把生成的 WebSessionId 通過 query 傳遞。
Important
Socket.IO client 會有一個 sid
的值,這是每個 client 的唯一 id,請不要和我們生成的 session id 混淆
在服務端消費傳遞的 sessionId。
async handleConnection(socket: SocketIO.Socket) {
const webSessionId =
socket.handshake.query['socket_session_id'] ||
// fallback sid
socket.id
await this.gatewayService.setSocketMetadata(socket, {
sessionId: webSessionId,
})
}
這裡我使用了 setSocketMetadata
去存儲 socket 上的元數據,以便後續獲取和更新這些值。這些 Socket 附加的元數據是存儲在 Redis 上,具體實現可以參考 https://github.com/mx-space/core/blob/c82cb8ff54dc7b98bc411e03de81ede932aa612b/apps/core/src/processors/gateway/gateway.service.ts 。
那麼,我們在服務端的統計站點總人數,實現如下:
async getCurrentClientCount() {
const server = this.namespace.server
const socketsMeta = await Promise.all(
await server
.of(`/${namespace}`)
.fetchSockets()
.then((sockets) => {
return sockets.map((socket) =>
this.gatewayService.getSocketMetadata(socket),
)
}),
)
return uniqBy(socketsMeta, (x) => x?.sessionId).length
}
在有其他 WebSocket 連接時,廣播給所有的 Socket Client 當前的人數。
async handleConnection(socket: SocketIO.Socket) {
this.whenUserOnline()
}
whenUserOnline = async () => {
this.broadcast(
BusinessEvents.VISITOR_ONLINE,
await this.sendOnlineNumber(),
)
}
更新 SessionId#
當前端用戶登錄態完成之後,我們需要更新 SocketId,以獲取更加精確的即時人數。
https://cdn.jsdelivr.net/npm/@innei/[email protected]/excalidraw/realtime-socket/update-session-id.json
某些狀態可能存在於 Hooks 中,這裡也封裝一個 Hooks 取得最後的 SocketId。
export const useSocketSessionId = () => {
const user = useUser()
const owner = useOwner()
const ownerIsLogin = useIsLogged()
return useMemo((): string => {
const fallbackSid = getSocketWebSessionId()
if (ownerIsLogin) {
if (!owner) return fallbackSid
return `owner-${owner.id}`
} else if (user && user.isSignedIn) {
return user.user.id.toLowerCase()
}
return fallbackSid
}, [owner, ownerIsLogin, user])
}
然後 Emit 更新事件。
const webSocketSessionId = useSocketSessionId()
const previousWebSocketSessionIdRef = useRef(webSocketSessionId)
const socketIsConnected = useSocketIsConnect()
useEffect(() => {
const previousWebSocketSessionId = previousWebSocketSessionIdRef.current
previousWebSocketSessionIdRef.current = webSocketSessionId
if (!socketIsConnected) return
socketClient.emit(SocketEmitEnum.UpdateSid, {
sessionId: webSocketSessionId,
})
}, [socketIsConnected, webSocketSessionId])
那麼,在服務端上,對此事件處理。
@SubscribeMessage('message')
async handleMessageEvent(
@MessageBody() data: MessageEventDto,
@ConnectedSocket() socket: SocketIO.Socket,
) {
const { payload, type } = data
switch (type) {
case SupportedMessageEvent.UpdateSid: {
const { sessionId } = payload as { sessionId: string }
if (sessionId) {
await this.gatewayService.setSocketMetadata(socket, { sessionId })
this.whenUserOnline() // broadcast all client online count
}
}
}
}
文章的即時閱讀人數#
每篇文章我們都需要統計當前的在線人數,可以借助 Socket.IO 的 Room 特徵。如果沒有使用 Socket.IO 則可以自己實現一套 Channel,也就是把一個 Socket 拆分成多次 Scope,在每個 Scope 可以訂閱不同類型的消息推送。
那麼,每一篇文章就是對應一個 Room,在同一個 Room 下的 Socket Client 可以接受到相互的消息。在讀者滾動頁面的時候,Emit 事件,在服務端向整個 Room 廣播一位讀者的閱讀進度即可。
在文章進入後,立即發出加入 Room 的請求,在切換文章或者關閉頁面,退出 Room。
https://cdn.jsdelivr.net/npm/@innei/[email protected]/excalidraw/realtime-socket/reading-room.json
那麼在服務端首先實現 Join 和 Leave 事件的處理。
@SubscribeMessage('message')
async handleMessageEvent(
@MessageBody() data: MessageEventDto,
@ConnectedSocket() socket: SocketIO.Socket,
) {
const { payload, type } = data
switch (type) {
case SupportedMessageEvent.Join: {
const { roomName } = payload as { roomName: string }
if (roomName) {
socket.join(roomName)
}
break
}
case SupportedMessageEvent.Leave: {
const { roomName } = payload as { roomName: string }
if (roomName) {
socket.leave(roomName)
const socketMeta = await this.gatewayService.getSocketMetadata(socket)
if (socketMeta.presence) {
this.webGateway.broadcast(
BusinessEvents.ACTIVITY_LEAVE_PRESENCE,
{
identity: socketMeta.presence.identity,
roomName,
},
{
rooms: [roomName],
},
)
handlePresencePersistToDb(socket)
}
const roomJoinedAtMap = await this.getSocketRoomJoinedAtMap(socket)
delete roomJoinedAtMap[roomName]
await this.gatewayService.setSocketMetadata(socket, {
roomJoinedAtMap,
})
}
break
}
}
}
RoomName
我們可以根據文章的 id 去生成一個唯一的 key。然後使用 socket.join()
去加入一個 room,這是 SocketIO 的內部實現,我們無需關注,我們只需要在 Join 或者 Leave 時候附加或者修改一些元數據,或者持久化一些數據到 DB 即可。
接下來,前端這邊,當讀者滾動了頁面,那麼我們將此時的閱讀進度通過接口發送到伺服器,然後在伺服器端廣播給所有處於這個 Room 下的 Socket Client。所以在請求接口時我們需要發送當前的 RoomName
和 socket.sid
和前端生成的 sessionId
以及閱讀位置。
https://cdn.jsdelivr.net/npm/@innei/[email protected]/excalidraw/realtime-socket/presence-broadcast.json
假設前端發送的數據結構是這樣的。
interface Persence {
roomName: string;
position: number;
identity: string;
sid: string;
displayName?: string;
}
後端的關鍵實現為:
async updatePresence(data: UpdatePresenceDto) {
const roomName = data.roomName
if (!isValidRoomName(roomName)) {
throw new BadRequestException('invalid room_name')
}
const roomSockets = await this.webGateway.getSocketsOfRoom(roomName)
const presenceData: ActivityPresence = {
...data,
operationTime: data.ts,
updatedAt: Date.now(),
connectedAt: +new Date(socket.handshake.time),
}
const roomJoinedAtMap =
await this.webGateway.getSocketRoomJoinedAtMap(socket)
Reflect.set(serializedPresenceData, 'joinedAt', roomJoinedAtMap[roomName])
this.webGateway.broadcast(
BusinessEvents.ACTIVITY_UPDATE_PRESENCE,
serializedPresenceData,
{
rooms: [roomName],
},
)
await this.gatewayService.setSocketMetadata(socket, {
presence: presenceData,
})
return serializedPresenceData
}
當接口到達後,服務端將廣播請求接口用戶的閱讀狀態。
當前站內文章閱讀排名#
這個功能其實是建立在上面的基礎上的,所以當我們實現完了上面一個功能之後,這個功能就迎刃而解了。我們只需要獲取現有的所有的 Room,然後獲取每個 Room 中的 Socket Client 數量就可以了。
這裡比較好理解,就不做展開了。
寫在後面#
上述所有代碼均開源。具體實現位於:mx-space/core,Shiro
- https://github.com/mx-space/core/blob/310480f7b48d6460728a12a847575edd350c10c5/apps/core/src/modules/activity/activity.service.ts
- https://github.com/Innei/shiro/blob/c4bb476ac0fbc9f517d07bfdfc2e33b3890ef94a/src/components/modules/activity/Presence.tsx
- https://github.com/mx-space/core/blob/147441c2c99b163106c5fd02f0433510d1cea1b9/apps/core/src/processors/gateway/web/events.gateway.ts
此文由 Mix Space 同步更新至 xLog 原始鏈接為 https://innei.in/posts/dev-story/how-is-the-real-time-headcount-at-the-site-achieved