私のサイトを頻繁に訪れる読者は、サイトの下部に現在オンラインの人数が表示されているのを見たことがあるでしょう。最近、現在どの記事を読んでいるかの人数ランキングを表示する機能を追加し、特定の記事の左側にタイムラインを追加して、現在その記事を読んでいる読者の読書進捗を示すようにしました。

私のサイトのオープンソースプロジェクトを見たことがある方は、このサイトが Mix Space によって駆動されていることを知っているでしょう。サーバー駆動が必要な場合、他のプラットフォームが提供するサービスはほとんど利用せず、実現可能な機能は基本的に自分で実装しています。この機能については、SaaS サービスを使用する場合、Liveblocksを選択できます。もちろん、ここでは議論しません。この記事の目的は、サーバーサイドとフロントエンドの両方でこの機能をどのように実現するかです。
設計と実装#
リアルタイムの人数統計には、長い接続が必要です。ここでは WebSocket を使用できますが、特にリアルタイム性が必要ない場合はポーリング方式を使用することもできます。ここでは Socket.IO を使用して実装します。Socket.IO は WebSocket の上位ラッパーです。
上記の機能を分解して分析します。
サイトのリアルタイム人数#
すべてのリアルタイム人数は比較的簡単に統計できます。すべての WebSocket 接続を使用して現在のオンライン人数を統計することを考慮できます。
この方法を使用する場合、WebSocket に接続していない読者はオンライン人数にカウントされません。
サイトはログインなしでアクセスでき、各アクセスは WebSocket 接続を確立します。同じユーザーが同時に複数のページを開いてサイトにアクセスすると、オンライン人数が重複して計算されます。
上記の第一点については考慮しません。第二点については、いくつかのシナリオで制限をかけることができます。
各ページを開くたびに WebSocket 接続を確立することは許容されますが、同じユーザーの接続を重複しないようにする必要があります。
以下の方法を採用できます:
- 同じ接続の WebSocket の IP を重複排除する(大内網や CDN の場合は過度に攻撃的になる可能性があります)
- クライアント側で一意の接続の SessionId を生成し、ブラウザ内部で共有する(ローカルストレージを使用できます)
- ユーザーのログイン状態に基づいて重複排除するか、ローカルに記録された過去のコメントデータを使用する
以上を考慮し、最終的に 2 と 3 を実装として選択します。
https://cdn.jsdelivr.net/npm/@innei/[email protected]/excalidraw/realtime-socket/socket-client.json
したがって、フロントエンドでは SessionId がない場合に生成する必要があります。ローカルストレージに存在する場合は再利用します。
import { customAlphabet } from 'nanoid'
const alphabet = `1234567890abcdefghijklmnopqrstuvwxyz`
const nanoid = customAlphabet(alphabet)
const defaultSessionId = nanoid(8)
const storageKey = buildNSKey('web-session')
const getSocketWebSessionId = () => {
  if (!isClientSide) {
    return ''
  }
  const sessionId = localStorage.getItem(storageKey)
  if (sessionId) return sessionId
  localStorage.setItem(storageKey, defaultSessionId)
  return defaultSessionId
}
まず、nanoid を使用してデフォルトの sessionId を生成します。既存の SessionId を Socket Client に渡す必要があります。上記の第三点の Session Id はログイン状態の非同期に関わるため、ここでは現在の Socket Client の Session Id を後で更新することしかできません。
const client = io(GATEWAY_URL, {
      timeout: 10000,
      reconnectionDelay: 3000,
      autoConnect: false,
      reconnectionAttempts: 3,
      transports: ['websocket'],
      query: {
        socket_session_id: getSocketWebSessionId(),
      },
})
ここで生成された WebSessionId をクエリを通じて渡します。
Important
Socket.IO クライアントには sid という値があり、これは各クライアントの一意の ID です。生成したセッション ID と混同しないでください。
サーバー側で渡された sessionId を消費します。
 async handleConnection(socket: SocketIO.Socket) {
    const webSessionId =
      socket.handshake.query['socket_session_id'] ||
      // fallback sid
      socket.id
    await this.gatewayService.setSocketMetadata(socket, {
      sessionId: webSessionId,
    })
}
ここでは setSocketMetadata を使用してソケット上のメタデータを保存し、後でこれらの値を取得および更新できるようにします。これらのソケットに付加されたメタデータは Redis に保存され、具体的な実装は https://github.com/mx-space/core/blob/c82cb8ff54dc7b98bc411e03de81ede932aa612b/apps/core/src/processors/gateway/gateway.service.ts を参照できます。
次に、サーバー側でサイトの総人数を統計する実装は以下の通りです。
async getCurrentClientCount() {
    const server = this.namespace.server
    const socketsMeta = await Promise.all(
      await server
        .of(`/${namespace}`)
        .fetchSockets()
        .then((sockets) => {
          return sockets.map((socket) =>
            this.gatewayService.getSocketMetadata(socket),
          )
        }),
    )
    return uniqBy(socketsMeta, (x) => x?.sessionId).length
}
他の WebSocket 接続がある場合、すべての Socket Client に現在の人数をブロードキャストします。
async handleConnection(socket: SocketIO.Socket) {
  this.whenUserOnline()
}
whenUserOnline = async () => {
  this.broadcast(
    BusinessEvents.VISITOR_ONLINE,
    await this.sendOnlineNumber(),
  )
}
SessionId の更新#
フロントエンドユーザーのログイン状態が完了した後、SocketId を更新して、より正確なリアルタイム人数を取得する必要があります。
https://cdn.jsdelivr.net/npm/@innei/[email protected]/excalidraw/realtime-socket/update-session-id.json
特定の状態は Hooks に存在する可能性があるため、ここでも最後の SocketId を取得するための Hooks をラップします。
export const useSocketSessionId = () => {
  const user = useUser()
  const owner = useOwner()
  const ownerIsLogin = useIsLogged()
  return useMemo((): string => {
    const fallbackSid = getSocketWebSessionId()
    if (ownerIsLogin) {
      if (!owner) return fallbackSid
      return `owner-${owner.id}`
    } else if (user && user.isSignedIn) {
      return user.user.id.toLowerCase()
    }
    return fallbackSid
  }, [owner, ownerIsLogin, user])
}
次に、更新イベントを Emit します。
  const webSocketSessionId = useSocketSessionId()
  const previousWebSocketSessionIdRef = useRef(webSocketSessionId)
  const socketIsConnected = useSocketIsConnect()
  useEffect(() => {
    const previousWebSocketSessionId = previousWebSocketSessionIdRef.current
    previousWebSocketSessionIdRef.current = webSocketSessionId
    if (!socketIsConnected) return
    socketClient.emit(SocketEmitEnum.UpdateSid, {
      sessionId: webSocketSessionId,
    })
  }, [socketIsConnected, webSocketSessionId])
したがって、サーバー側でこのイベントを処理します。
@SubscribeMessage('message')
async handleMessageEvent(
  @MessageBody() data: MessageEventDto,
  @ConnectedSocket() socket: SocketIO.Socket,
) {
  const { payload, type } = data
  switch (type) {
    case SupportedMessageEvent.UpdateSid: {
      const { sessionId } = payload as { sessionId: string }
      if (sessionId) {
        await this.gatewayService.setSocketMetadata(socket, { sessionId })
        this.whenUserOnline() // broadcast all client online count
      }
    }
  }
}
記事のリアルタイム読者数#
各記事の現在のオンライン人数を統計する必要があります。Socket.IO の Room 機能を利用できます。Socket.IO を使用していない場合は、自分で一セットの Channel を実装することもできます。つまり、1 つの Socket を複数の Scope に分割し、各 Scope で異なるタイプのメッセージプッシュを購読できます。
したがって、各記事は対応する Room に対応し、同じ Room 内の Socket Client は相互にメッセージを受信できます。読者がページをスクロールする際に、Emit イベントを発生させ、サーバーがその Room 全体に読者の読書進捗をブロードキャストします。
記事に入ったら、すぐに Room への参加リクエストを発出し、記事を切り替えたりページを閉じたりすると、Room から退出します。
https://cdn.jsdelivr.net/npm/@innei/[email protected]/excalidraw/realtime-socket/reading-room.json
サーバー側では、まず Join と Leave イベントの処理を実装します。
@SubscribeMessage('message')
async handleMessageEvent(
  @MessageBody() data: MessageEventDto,
  @ConnectedSocket() socket: SocketIO.Socket,
) {
  const { payload, type } = data
  switch (type) {
    case SupportedMessageEvent.Join: {
      const { roomName } = payload as { roomName: string }
      if (roomName) {
        socket.join(roomName)
      }
      break
    }
    case SupportedMessageEvent.Leave: {
      const { roomName } = payload as { roomName: string }
      if (roomName) {
        socket.leave(roomName)
        const socketMeta = await this.gatewayService.getSocketMetadata(socket)
        if (socketMeta.presence) {
          this.webGateway.broadcast(
            BusinessEvents.ACTIVITY_LEAVE_PRESENCE,
            {
              identity: socketMeta.presence.identity,
              roomName,
            },
            {
              rooms: [roomName],
            },
          )
          handlePresencePersistToDb(socket)
        }
        const roomJoinedAtMap = await this.getSocketRoomJoinedAtMap(socket)
        delete roomJoinedAtMap[roomName]
        await this.gatewayService.setSocketMetadata(socket, {
          roomJoinedAtMap,
        })
      }
      break
    }
  }
}
RoomName は記事の ID に基づいて一意のキーを生成できます。次に、socket.join() を使用して Room に参加します。これは SocketIO の内部実装であり、私たちは気にする必要はありません。私たちがする必要があるのは、Join または Leave の際にいくつかのメタデータを追加または変更すること、または DB にいくつかのデータを永続化することです。
次に、フロントエンドでは、読者がページをスクロールしたときに、現在の読書進捗をインターフェースを通じてサーバーに送信し、サーバー側でその Room にいるすべての Socket Client にブロードキャストします。したがって、リクエストインターフェースを送信する際には、現在の RoomName と socket.sid、フロントエンドで生成された sessionId、および読書位置を送信する必要があります。
https://cdn.jsdelivr.net/npm/@innei/[email protected]/excalidraw/realtime-socket/presence-broadcast.json
フロントエンドが送信するデータ構造は次のようになります。
interface Persence {
    roomName: string;
    position: number;
    identity: string;
    sid: string;
    displayName?: string;
}
バックエンドの重要な実装は次のとおりです。
async updatePresence(data: UpdatePresenceDto) {
  const roomName = data.roomName
  if (!isValidRoomName(roomName)) {
    throw new BadRequestException('invalid room_name')
  }
  const roomSockets = await this.webGateway.getSocketsOfRoom(roomName)
  const presenceData: ActivityPresence = {
    ...data,
    operationTime: data.ts,
    updatedAt: Date.now(),
    connectedAt: +new Date(socket.handshake.time),
  }
  const roomJoinedAtMap =
    await this.webGateway.getSocketRoomJoinedAtMap(socket)
  Reflect.set(serializedPresenceData, 'joinedAt', roomJoinedAtMap[roomName])
  this.webGateway.broadcast(
    BusinessEvents.ACTIVITY_UPDATE_PRESENCE,
    serializedPresenceData,
    {
      rooms: [roomName],
    },
  )
  await this.gatewayService.setSocketMetadata(socket, {
    presence: presenceData,
  })
  return serializedPresenceData
}
インターフェースが到達すると、サーバーはリクエストインターフェースユーザーの読書状態をブロードキャストします。
現在のサイト内記事読書ランキング#
この機能は実際には上記の機能に基づいているため、上記の機能を実装した後、この機能は簡単に実現できます。既存のすべての Room を取得し、各 Room 内の Socket Client 数を取得するだけで済みます。
ここは理解しやすいため、詳細には触れません。
最後に#
上記のすべてのコードはオープンソースです。具体的な実装は:mx-space/core、Shiro にあります。
- https://github.com/mx-space/core/blob/310480f7b48d6460728a12a847575edd350c10c5/apps/core/src/modules/activity/activity.service.ts
- https://github.com/Innei/shiro/blob/c4bb476ac0fbc9f517d07bfdfc2e33b3890ef94a/src/components/modules/activity/Presence.tsx
- https://github.com/mx-space/core/blob/147441c2c99b163106c5fd02f0433510d1cea1b9/apps/core/src/processors/gateway/web/events.gateway.ts
この記事は Mix Space によって xLog に同期更新されました。元のリンクは https://innei.in/posts/dev-story/how-is-the-real-time-headcount-at-the-site-achieved