创建Websocket客户端
💡
- 本节目标:创建一个Websocket客户端,用于与comfyui进行通信
💡
- 在”/core”目录中创建一个”ws_connection.py”文件,用于实现Websocket客户端组件,方便在后续接口中调用
- Websocket客户端这里使用aiohttp 库来实现。
/core/ws_connection.py
from typing import Optional, Dict, Any, AsyncGenerator
import aiohttp
from config import settings as cfg
class WebSocketClient:
def __init__(self, clientId: str):
"""
初始化WebSocket客户端
Args:
clientId (str): 客户端唯一标识
"""
self.client_id = clientId
self.websocket: Optional[aiohttp.ClientWebSocketResponse] = None
self.connected: bool = False
self._session: Optional[aiohttp.ClientSession] = None
@property
def session(self) -> aiohttp.ClientSession:
"""获取或创建aiohttp会话"""
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
return self._session
async def connect(self) -> bool:
"""
连接到WebSocket服务器
Returns:
bool: 连接是否成功
"""
try:
# 在连接URL中添加clientId查询参数
self.websocket = await self.session.ws_connect(
f"{cfg.COMFY_URL}/ws?clientId={self.client_id}"
)
self.connected = True
return True
except Exception as e:
print(f"WebSocket连接失败: {e}")
self.connected = False
return False
async def send_message(self, message: Dict[str, Any]) -> bool:
"""
发送消息到服务器
Args:
message (Dict[str, Any]): 要发送的消息
Returns:
bool: 发送是否成功
"""
if not self.connected or not self.websocket:
return False
try:
await self.websocket.send_json(message)
return True
except Exception as e:
print(f"发送消息失败: {e}")
return False
async def receive_message(self) -> Dict[str, Any]:
"""
接收服务器消息
Returns:
Dict[str, Any]: 接收到的消息,如果接收失败则返回空字典
"""
if not self.connected or not self.websocket:
return {}
try:
msg = await self.websocket.receive_json()
return msg
except Exception as e:
print(f"接收消息失败: {e}")
return {}
async def close(self) -> None:
"""安全关闭所有连接资源"""
try:
if self.connected and self.websocket:
await self.websocket.close()
except Exception as e:
print(f"关闭WebSocket时出错: {e}")
finally:
self.connected = False
self.websocket = None
try:
if self._session and not self._session.closed:
await self._session.close()
except Exception as e:
print(f"关闭session时出错: {e}")
@property
def is_connected(self) -> bool:
"""
检查是否已连接
Returns:
bool: True表示连接正常,False表示断开
"""
return self.connected and self.websocket is not None
async def __aenter__(self) -> 'WebSocketClient':
"""异步上下文管理器入口"""
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""异步上下文管理器退出"""
await self.close()
💡
- Websocket建立连接需要使用到url,为了方便管理,把连接信息写入配置文件
/config.py
import os
from dotenv import load_dotenv, find_dotenv
from pydantic_settings import BaseSettings
from typing import List
class Config(BaseSettings):
# 加载环境变量
load_dotenv(find_dotenv(), override=True)
# 调试模式
APP_DEBUG: bool = True
# 项目信息
VERSION: str = "0.0.1"
PROJECT_NAME: str = "api-gateway"
# 文档地址
DOCS_URL: str|None = "/docs"
REDOC_URL: str|None = ""
# 跨域请求
CORS_ORIGINS: List = ["*"] # 允许所有来源
CORS_ALLOW_CREDENTIALS: bool = True # 允许携带cookie
CORS_ALLOW_METHODS: List = ["*"] # 允许所有请求方法
CORS_ALLOW_HEADERS: List = ["*"] # 允许所有请求头
# WebSocket 配置
COMFY_HOST: str = "localhost"
COMFY_PORT: int = 8188
COMFY_URL: str = f"ws://{COMFY_HOST}:{COMFY_PORT}"
settings = Config()
## 目录结构
<FileTree>
<FileTree.Folder name="api-gateway" defaultOpen>
<FileTree.Folder name="api" defaultOpen>
<FileTree.File name="__init__.py" />
<FileTree.Folder name="endpoint" defaultOpen>
<FileTree.File name="comfy_server.py" />
</FileTree.Folder>
<FileTree.File name="api.py" />
</FileTree.Folder>
<FileTree.Folder name="core" defaultOpen>
<FileTree.File name="__init__.py" />
<FileTree.File name="ws_connection.py" />
</FileTree.Folder>
<FileTree.Folder name="schemas" defaultOpen>
<FileTree.File name="__init__.py" />
</FileTree.Folder>
<FileTree.Folder name="utils" defaultOpen>
<FileTree.File name="__init__.py" />
</FileTree.Folder>
<FileTree.File name="main.py" />
<FileTree.File name="config.py" />
<FileTree.File name="pyproject.toml" />
<FileTree.File name="README.md" />
<FileTree.File name=".python-version" />
</FileTree.Folder>
</FileTree>
Last updated on