借助WebSocket构建超高速实时协作体系

借助WebSocket构建超高速实时协作体系

第六章:全栈项目实战示例:实时协作系统

一、需求分析:实时白板/协同编辑场景

实时协作系统需支持多用户同时操作同一文档或白板,并实时同步所有变更。核心需求涵盖:
1. 毫秒级延迟:用户操作需在300毫秒内同步至所有参与者;
2. 操作一致性:确保最终所有客户端呈现一致内容;
3. 冲突处理:解决多用户同时修改同一区域的问题;
4. 状态恢复:断线重连后能自动同步最新状态。

participant 用户A
participant 用户B
participant 服务器
用户A->>服务器: 用户A登录
服务器-->>用户A: 登录成功响应
用户B->>服务器: 用户B登录
服务器-->>用户B: 登录成功响应
用户A->>服务器: 用户A创建新的白板会话
服务器-->>用户A: 白板会话ID
用户B->>服务器: 加入现有的白板会话 (会话ID)
服务器-->>用户B: 加入成功响应
用户A->>服务器: 用户A在白板上绘图/编辑
服务器-->>用户B: 更新操作
用户B->>服务器: 用户B在白板上绘图/编辑
服务器-->>用户A: 更新操作
用户A->>服务器: 保存白板内容
服务器-->>用户A: 保存成功确认
用户B->>服务器: 检视白板内容
服务器-->>用户B: 白板内容数据
用户A->>服务器: 退出白板会话
服务器-->>用户A: 退出成功确认
用户B->>服务器: 退出白板会话
服务器-->>用户B: 退出成功确认
二、后端WebSocket服务搭建

依赖安装

pip install fastapi==0.104.0 websockets==12.0 uvicorn==0.23.2 pydantic==2.5.2

核心代码实现

import asyncio
import logging
from typing import List, Dict

from fastapi import FastAPI, WebSocket
from pydantic import BaseModel

app = FastAPI()
logger = logging.getLogger("uvicorn.error")


class Operation(BaseModel):
    type: str  # "insert" or "delete"
    position: int
    content: str = ""  # 插入内容
    length: int = 1  # 删除长度
    client_id: str = ""  # 客户端标识
    version: int = 0  # 操作版本号


# OT转换引擎(示例)
class OTEngine:
    @staticmethod
    def transform(op1: Operation, op2: Operation) -> Operation:
        """操作转换核心算法"""
        # 插入 vs 插入
        if op1.type == "insert" and op2.type == "insert":
            if op1.position < op2.position:
                return Operation(**{**op2.dict(), "position": op2.position + len(op1.content)})
            elif op1.position > op2.position:
                return op2
            else:  # 相同位置按客户端ID排序
                return op2 if op1.client_id < op2.client_id else Operation(
                    **{**op2.dict(), "position": op2.position + len(op1.content)})

        # 插入 vs 删除
        elif op1.type == "insert" and op2.type == "delete":
            if op1.position <= op2.position:
                return Operation(**{**op2.dict(), "position": op2.position + len(op1.content)})
            else:
                return Operation(**{**op2.dict(), "position": op2.position})

        # 删除 vs 插入
        elif op1.type == "delete" and op2.type == "insert":
            if op1.position < op2.position:
                return Operation(**{**op2.dict(), "position": max(op2.position - op1.length, 0)})
            else:
                return op2

        # 删除 vs 删除
        else:
            if op1.position < op2.position:
                return Operation(**{**op2.dict(), "position": max(op2.position - op1.length, 0)})
            elif op1.position > op2.position:
                return Operation(**{**op2.dict(), "position": op2.position})
            else:  # 相同位置取范围更大的删除
                return op2 if op1.length >= op2.length else Operation(**{**op2.dict(), "length": op1.length})


# 协同编辑房间管理器
class CollaborationRoom:
    def __init__(self, room_id: str):
        self.room_id = room_id
        self.connections = set()  # 实际使用redis实现, 这里使用set模拟
        self.document = ""
        self.version = 0
        self.pending_ops: List[Operation] = []
        self.lock = asyncio.Lock()
        self.client_states: Dict[str, int] = {}  # 客户端最后确认版本

    async def add_connection(self, websocket: WebSocket, client_id: str):
        async with self.lock:
            self.connections.add(websocket)
            self.client_states[client_id] = self.version
            # 发送初始状态
            await websocket.send_json({
                "type": "snapshot",
                "document": self.document,
                "version": self.version
            })

    async def remove_connection(self, websocket: WebSocket, client_id: str):
        async with self.lock:
            self.connections.discard(websocket)
            if client_id in self.client_states:
                del self.client_states[client_id]

    async def apply_operation(self, operation: Operation):
        """应用操作转换并更新文档"""
        async with self.lock:
            # 转换所有待处理操作
            transformed_op = operation
            for pending in self.pending_ops:
                transformed_op = OTEngine.transform(pending, transformed_op)

            # 应用转换后的操作
            if transformed_op.type == "insert":
                self.document = (self.document[:transformed_op.position] +
                                 transformed_op.content +
                                 self.document[transformed_op.position:])
            elif transformed_op.type == "delete":
                start = transformed_op.position
                end = min(start + transformed_op.length, len(self.document))
                self.document = self.document[:start] + self.document[end:]

            # 更新状态
            self.version += 1
            self.pending_ops.append(transformed_op)

            # 广播转换后的操作
            broadcast_tasks = []
            for conn in self.connections:
                try:
                    broadcast_tasks.append(conn.send_json({
                        "type": "operation",
                        "operation": transformed_op.dict(),
                        "document": self.document,
                        "version": self.version
                    }))
                except:
                    pass
            await asyncio.gather(*broadcast_tasks, return_exceptions=True)

            # 清除已处理的操作
            min_client_version = min(self.client_states.values(), default=self.version)
            self.pending_ops = [op for op in self.pending_ops if op.version >= min_client_version]


# 全局房间管理
room_manager: Dict[str, CollaborationRoom] = {}  # 实际使用redis实现, 这里使用字典模拟
global_lock = asyncio.Lock()


@app.websocket("/ws/{room_id}/{client_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str, client_id: str):
    await websocket.accept()

    # 获取或创建房间
    async with global_lock:
        if room_id not in room_manager:
            room_manager[room_id] = CollaborationRoom(room_id)
        room = room_manager[room_id]

    # 加入房间
    await room.add_connection(websocket, client_id)
    logger.info(f"Client {client_id} joined room {room_id}")

    try:
        while True:
            data = await websocket.receive_json()
            op = Operation(**data)
            op.client_id = client_id

            # 应用操作转换
            await room.apply_operation(op)

    except Exception as e:
        logger.error(f"Error in room {room_id}: {str(e)}")
    finally:
        # 离开房间
        await room.remove_connection(websocket, client_id)
        logger.info(f"Client {client_id} left room {room_id}")

        # 清理空房间
        async with global_lock:
            if not room.connections:
                del room_manager[room_id]
                logger.info(f"Room {room_id} closed")

关键机制
- 采用WebSocket协议替代HTTP长轮询实现实时通信;
- 维护活动连接池管理连接;
- 通过Pydantic模型验证操作格式;
- 利用广播模式实现实时同步。

优化技巧
- 使用requestAnimationFrame合并高频操作;
- 添加操作版本号解决时序问题;
- 实现本地缓存防止数据丢失;
- 添加心跳机制检测连接状态。

三、前端Vue.js连接实现

组件核心代码
```vue