FastAPI:拒绝带有 HTTP 响应的 WebSocket 连接

在基于 FastAPI 的 Web 应用程序中,我有一个 WebSocket 端点,只有在满足某些条件时才允许连接,否则它应该返回 HTTP 404 回复,而不是使用 HTTP 101 升级连接。

据我了解,协议完全支持这一点,但我找不到使用 FastAPI 或 Starlette 的任何方法。

如果我有类似的东西:

@router.websocket("/foo")
async def ws_foo(request: WebSocket):
    if _user_is_allowed(request):
        await request.accept()
        _handle_ws_connection(request)
    else:
        raise HTTPException(status_code=404)

该异常不会转换为 404 响应,因为 FastAPI 的 ExceptionMiddleware 似乎无法处理此类情况。

是否有任何原生/内置方式支持这种“拒绝”流程?

stack overflow FastAPI: reject a WebSocket connection with HTTP response
原文答案

答案:

作者头像

一旦握手完成, protocol changes from HTTP to WebSocket 。如果您尝试在 websocket 端点内引发 HTTP 异常,您会看到这是不可能的,或者返回 HTTP 响应(例如, return JSONResponse(...status_code=404) ),您会收到内部服务器错误,即, ASGI callable returned without sending handshake

选项1

因此,如果您想在协议升级之前有某种检查机制,则需要使用 Middleware ,如下所示。在中间件内部,不能引发异常,但可以返回响应(即 ResponseJSONResponsePlainTextResponse 等),这实际上是 FastAPI 背后的 handles exceptions 场景。作为参考,请看一下这个 post ,以及讨论 here

async def is_user_allowed(request: Request):
    # if conditions are not met, return False
    print(request['headers'])
    print(request.client)
    return False

@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    if not await is_user_allowed(request):
        return JSONResponse(content={"message": "User not allowed"}, status_code=404)
    response = await call_next(request)
    return response

或者,如果您愿意,您可以使用 is_user_allowed() 方法引发您需要使用 try-except 块捕获的自定义异常:

class UserException(Exception):
    def __init__(self, message):
        self.message = message
        super().__init__(message)

async def is_user_allowed(request: Request):
    # if conditions are not met, raise UserException
    raise UserException(message="User not allowed.")

@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    try:
        await is_user_allowed(request)
    except UserException as e:
        return JSONResponse(content={"message": f'{e.message}'}, status_code=404)
    response = await call_next(request)
    return response

选项 2

但是,如果您需要使用 websocket 实例来执行此操作,则可以使用与上述相同的逻辑,但相反,在 websocket 方法中传递 is_user_allowed() 实例,并在websocket 端点(受 this 启发)。

@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
    await ws.accept()
    try:
        await is_user_allowed(ws)
        await handle_conn(ws)
    except UserException as e:
        await ws.send_text(e.message) # optionally send a message to the client before closing the connection
        await ws.close()

但是,在上面,您必须先接受连接,以便在引发异常时调用 close() 方法来终止连接。如果你愿意,你可以使用类似下面的东西。但是,如前所述,嵌入 return 块的 except 语句会引发内部服务器错误(即 ASGI callable returned without sending handshake. )。

@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
    try:
        await is_user_allowed(ws)
    except UserException as e:
        return
    await ws.accept()
    await handle_conn(ws)