Skip to Content

创建Websocket客户端

💡
  • 本节目标:创建一个Websocket客户端,用于与comfyui进行通信
💡
  • 在”/core”目录中创建一个”ws_connection.py”文件,用于实现Websocket客户端组件,方便在后续接口中调用
  • Websocket客户端这里使用aiohttp库来实现。
/core/ws_connection.py
from typing import Optional, Dict, Any, AsyncGenerator import aiohttp from config import settings as cfg class WebSocketClient: def __init__(self, clientId: str): """ 初始化WebSocket客户端 Args: clientId (str): 客户端唯一标识 """ self.client_id = clientId self.websocket: Optional[aiohttp.ClientWebSocketResponse] = None self.connected: bool = False self._session: Optional[aiohttp.ClientSession] = None @property def session(self) -> aiohttp.ClientSession: """获取或创建aiohttp会话""" if self._session is None or self._session.closed: self._session = aiohttp.ClientSession() return self._session async def connect(self) -> bool: """ 连接到WebSocket服务器 Returns: bool: 连接是否成功 """ try: # 在连接URL中添加clientId查询参数 self.websocket = await self.session.ws_connect( f"{cfg.COMFY_URL}/ws?clientId={self.client_id}" ) self.connected = True return True except Exception as e: print(f"WebSocket连接失败: {e}") self.connected = False return False async def send_message(self, message: Dict[str, Any]) -> bool: """ 发送消息到服务器 Args: message (Dict[str, Any]): 要发送的消息 Returns: bool: 发送是否成功 """ if not self.connected or not self.websocket: return False try: await self.websocket.send_json(message) return True except Exception as e: print(f"发送消息失败: {e}") return False async def receive_message(self) -> Dict[str, Any]: """ 接收服务器消息 Returns: Dict[str, Any]: 接收到的消息,如果接收失败则返回空字典 """ if not self.connected or not self.websocket: return {} try: msg = await self.websocket.receive_json() return msg except Exception as e: print(f"接收消息失败: {e}") return {} async def close(self) -> None: """安全关闭所有连接资源""" try: if self.connected and self.websocket: await self.websocket.close() except Exception as e: print(f"关闭WebSocket时出错: {e}") finally: self.connected = False self.websocket = None try: if self._session and not self._session.closed: await self._session.close() except Exception as e: print(f"关闭session时出错: {e}") @property def is_connected(self) -> bool: """ 检查是否已连接 Returns: bool: True表示连接正常,False表示断开 """ return self.connected and self.websocket is not None async def __aenter__(self) -> 'WebSocketClient': """异步上下文管理器入口""" await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: """异步上下文管理器退出""" await self.close()
💡
  • Websocket建立连接需要使用到url,为了方便管理,把连接信息写入配置文件
/config.py
import os from dotenv import load_dotenv, find_dotenv from pydantic_settings import BaseSettings from typing import List class Config(BaseSettings): # 加载环境变量 load_dotenv(find_dotenv(), override=True) # 调试模式 APP_DEBUG: bool = True # 项目信息 VERSION: str = "0.0.1" PROJECT_NAME: str = "api-gateway" # 文档地址 DOCS_URL: str|None = "/docs" REDOC_URL: str|None = "" # 跨域请求 CORS_ORIGINS: List = ["*"] # 允许所有来源 CORS_ALLOW_CREDENTIALS: bool = True # 允许携带cookie CORS_ALLOW_METHODS: List = ["*"] # 允许所有请求方法 CORS_ALLOW_HEADERS: List = ["*"] # 允许所有请求头 # WebSocket 配置 COMFY_HOST: str = "localhost" COMFY_PORT: int = 8188 COMFY_URL: str = f"ws://{COMFY_HOST}:{COMFY_PORT}" settings = Config() ## 目录结构 <FileTree> <FileTree.Folder name="api-gateway" defaultOpen> <FileTree.Folder name="api" defaultOpen> <FileTree.File name="__init__.py" /> <FileTree.Folder name="endpoint" defaultOpen> <FileTree.File name="comfy_server.py" /> </FileTree.Folder> <FileTree.File name="api.py" /> </FileTree.Folder> <FileTree.Folder name="core" defaultOpen> <FileTree.File name="__init__.py" /> <FileTree.File name="ws_connection.py" /> </FileTree.Folder> <FileTree.Folder name="schemas" defaultOpen> <FileTree.File name="__init__.py" /> </FileTree.Folder> <FileTree.Folder name="utils" defaultOpen> <FileTree.File name="__init__.py" /> </FileTree.Folder> <FileTree.File name="main.py" /> <FileTree.File name="config.py" /> <FileTree.File name="pyproject.toml" /> <FileTree.File name="README.md" /> <FileTree.File name=".python-version" /> </FileTree.Folder> </FileTree>
Last updated on