借助WebSocket构建超高速实时协作体系
第六章:全栈项目实战示例:实时协作系统
一、需求分析:实时白板/协同编辑场景
实时协作系统需支持多用户同时操作同一文档或白板,并实时同步所有变更。核心需求涵盖:
1. 毫秒级延迟:用户操作需在300毫秒内同步至所有参与者;
2. 操作一致性:确保最终所有客户端呈现一致内容;
3. 冲突处理:解决多用户同时修改同一区域的问题;
4. 状态恢复:断线重连后能自动同步最新状态。
participant 用户A
participant 用户B
participant 服务器
用户A->>服务器: 用户A登录
服务器-->>用户A: 登录成功响应
用户B->>服务器: 用户B登录
服务器-->>用户B: 登录成功响应
用户A->>服务器: 用户A创建新的白板会话
服务器-->>用户A: 白板会话ID
用户B->>服务器: 加入现有的白板会话 (会话ID)
服务器-->>用户B: 加入成功响应
用户A->>服务器: 用户A在白板上绘图/编辑
服务器-->>用户B: 更新操作
用户B->>服务器: 用户B在白板上绘图/编辑
服务器-->>用户A: 更新操作
用户A->>服务器: 保存白板内容
服务器-->>用户A: 保存成功确认
用户B->>服务器: 检视白板内容
服务器-->>用户B: 白板内容数据
用户A->>服务器: 退出白板会话
服务器-->>用户A: 退出成功确认
用户B->>服务器: 退出白板会话
服务器-->>用户B: 退出成功确认
二、后端WebSocket服务搭建
依赖安装:
pip install fastapi==0.104.0 websockets==12.0 uvicorn==0.23.2 pydantic==2.5.2
核心代码实现:
import asyncio
import logging
from typing import List, Dict
from fastapi import FastAPI, WebSocket
from pydantic import BaseModel
app = FastAPI()
logger = logging.getLogger("uvicorn.error")
class Operation(BaseModel):
type: str # "insert" or "delete"
position: int
content: str = "" # 插入内容
length: int = 1 # 删除长度
client_id: str = "" # 客户端标识
version: int = 0 # 操作版本号
# OT转换引擎(示例)
class OTEngine:
@staticmethod
def transform(op1: Operation, op2: Operation) -> Operation:
"""操作转换核心算法"""
# 插入 vs 插入
if op1.type == "insert" and op2.type == "insert":
if op1.position < op2.position:
return Operation(**{**op2.dict(), "position": op2.position + len(op1.content)})
elif op1.position > op2.position:
return op2
else: # 相同位置按客户端ID排序
return op2 if op1.client_id < op2.client_id else Operation(
**{**op2.dict(), "position": op2.position + len(op1.content)})
# 插入 vs 删除
elif op1.type == "insert" and op2.type == "delete":
if op1.position <= op2.position:
return Operation(**{**op2.dict(), "position": op2.position + len(op1.content)})
else:
return Operation(**{**op2.dict(), "position": op2.position})
# 删除 vs 插入
elif op1.type == "delete" and op2.type == "insert":
if op1.position < op2.position:
return Operation(**{**op2.dict(), "position": max(op2.position - op1.length, 0)})
else:
return op2
# 删除 vs 删除
else:
if op1.position < op2.position:
return Operation(**{**op2.dict(), "position": max(op2.position - op1.length, 0)})
elif op1.position > op2.position:
return Operation(**{**op2.dict(), "position": op2.position})
else: # 相同位置取范围更大的删除
return op2 if op1.length >= op2.length else Operation(**{**op2.dict(), "length": op1.length})
# 协同编辑房间管理器
class CollaborationRoom:
def __init__(self, room_id: str):
self.room_id = room_id
self.connections = set() # 实际使用redis实现, 这里使用set模拟
self.document = ""
self.version = 0
self.pending_ops: List[Operation] = []
self.lock = asyncio.Lock()
self.client_states: Dict[str, int] = {} # 客户端最后确认版本
async def add_connection(self, websocket: WebSocket, client_id: str):
async with self.lock:
self.connections.add(websocket)
self.client_states[client_id] = self.version
# 发送初始状态
await websocket.send_json({
"type": "snapshot",
"document": self.document,
"version": self.version
})
async def remove_connection(self, websocket: WebSocket, client_id: str):
async with self.lock:
self.connections.discard(websocket)
if client_id in self.client_states:
del self.client_states[client_id]
async def apply_operation(self, operation: Operation):
"""应用操作转换并更新文档"""
async with self.lock:
# 转换所有待处理操作
transformed_op = operation
for pending in self.pending_ops:
transformed_op = OTEngine.transform(pending, transformed_op)
# 应用转换后的操作
if transformed_op.type == "insert":
self.document = (self.document[:transformed_op.position] +
transformed_op.content +
self.document[transformed_op.position:])
elif transformed_op.type == "delete":
start = transformed_op.position
end = min(start + transformed_op.length, len(self.document))
self.document = self.document[:start] + self.document[end:]
# 更新状态
self.version += 1
self.pending_ops.append(transformed_op)
# 广播转换后的操作
broadcast_tasks = []
for conn in self.connections:
try:
broadcast_tasks.append(conn.send_json({
"type": "operation",
"operation": transformed_op.dict(),
"document": self.document,
"version": self.version
}))
except:
pass
await asyncio.gather(*broadcast_tasks, return_exceptions=True)
# 清除已处理的操作
min_client_version = min(self.client_states.values(), default=self.version)
self.pending_ops = [op for op in self.pending_ops if op.version >= min_client_version]
# 全局房间管理
room_manager: Dict[str, CollaborationRoom] = {} # 实际使用redis实现, 这里使用字典模拟
global_lock = asyncio.Lock()
@app.websocket("/ws/{room_id}/{client_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str, client_id: str):
await websocket.accept()
# 获取或创建房间
async with global_lock:
if room_id not in room_manager:
room_manager[room_id] = CollaborationRoom(room_id)
room = room_manager[room_id]
# 加入房间
await room.add_connection(websocket, client_id)
logger.info(f"Client {client_id} joined room {room_id}")
try:
while True:
data = await websocket.receive_json()
op = Operation(**data)
op.client_id = client_id
# 应用操作转换
await room.apply_operation(op)
except Exception as e:
logger.error(f"Error in room {room_id}: {str(e)}")
finally:
# 离开房间
await room.remove_connection(websocket, client_id)
logger.info(f"Client {client_id} left room {room_id}")
# 清理空房间
async with global_lock:
if not room.connections:
del room_manager[room_id]
logger.info(f"Room {room_id} closed")
关键机制:
- 采用WebSocket协议替代HTTP长轮询实现实时通信;
- 维护活动连接池管理连接;
- 通过Pydantic模型验证操作格式;
- 利用广播模式实现实时同步。
优化技巧:
- 使用requestAnimationFrame合并高频操作;
- 添加操作版本号解决时序问题;
- 实现本地缓存防止数据丢失;
- 添加心跳机制检测连接状态。
三、前端Vue.js连接实现
组件核心代码:
```vue
在线用户 ({{ users.length }})
{{ user }}