实现一个没有 async/asyncio 的 Python WebSocket 监听器

我在一个单独的线程中运行一个 websocket 侦听器。我想连接到 websocket 然后做:

while True:
    msg = sock.wait_for_message()
    f(msg)

即没有异步/异步

这是愚蠢的吗?有没有办法做到这一点?

stack overflow Implement a Python WebSocket listener without async/asyncio
原文答案
author avatar

接受的答案

在没有更好的答案的情况下,我发现 https://github.com/websocket-client/websocket-client 使用起来很轻松。


答案:

作者头像

我知道这个问题是关于 websocket 客户端的。但我对 websocket 服务器有同样的疑问。作为 asyncio 的新手和喜欢老​​式同步设计的人,看到人们对同步服务器设计的热情如此之低,我感到很困惑。我在网上找到的所有示例都假设整个应用程序是异步的,这可能不是真的。

所以我提出了这个解决方案,将异步 websockets 模块包装到一个同步类中。希望它可以帮助别人。

这个想法是:

1.专门为websockets模块创建一个事件循环

  1. 在每次循环迭代时,入队一个“停止”事件以更新一次事件循环。这可以多次完成而不会出现问题。 (参见 this link
  2. 将数据放入队列,供调用层读写websockets

注意:查看 websockets 模块源代码,似乎不推荐使用 loop= 输入参数。我不确定这到底是什么意思,作者是否会在将来删除它或提出替代方案。 (参见 here

import websockets
import websockets.server
import queue
import asyncio
import time

class SynchronousWebsocketServer:
    """
    Synchronous wrapper around asynchronous websockets server by Pier-Yves Lessard
    """
    def __init__(self, connect_callback=None, disconnect_callback=None):
        self.rxqueue = queue.Queue()
        self.txqueue = queue.Queue()
        self.loop = asyncio.new_event_loop()
        self.ws_server = None
        self.connect_callback = connect_callback
        self.disconnect_callback = disconnect_callback

    # Executed for each websocket
    async def server_routine(self, websocket, path):
        if self.connect_callback is not None:
            self.connect_callback(websocket)

        try:
            async for message in websocket:
                self.rxqueue.put( (websocket, message) )   # Possible improvement : Handle queue full scenario.
        except websockets.exceptions.ConnectionClosedError:
            pass
        finally:
            if self.disconnect_callback is not None:
                self.disconnect_callback(websocket)

    def process_tx_queue(self):
        while not self.txqueue.empty():
            (websocket, message) = self.txqueue.get()
            try:
                self.loop.run_until_complete(websocket.send(message))
            except websockets.exceptions.ConnectionClosedOK:
                pass    # Client is disconnected. Disconnect callback not called yet.

    def process(self, nloop=3) -> None:
        self.process_tx_queue()
        for i in range(nloop):  # Process events few times to make sure we handles events generated within the loop
            self.loop.call_soon(self.loop.stop)
            self.loop.run_forever()

    def start(self, host, port) -> None:
        # Warning. websockets source code says that loop argument might be deprecated. 
        self.ws_server = websockets.serve(self.server_routine, host, port, loop=self.loop)
        self.loop.run_until_complete(self.ws_server)    # Initialize websockets async server

    def stop(self) -> None:
        if self.ws_server is not None:
            self.ws_server.ws_server.close()
            self.loop.run_until_complete(asyncio.ensure_future(self.ws_server.ws_server.wait_closed(), loop=self.loop))
            self.loop.stop()

if __name__ == '__main__':
    # Demo on how to use the SynchronousWebsocketServer
    clients = set()
    def connect_callback(websocket):
        clients.add(websocket)
        print('New client. Websocket ID = %s. We now have %d clients' % (id(websocket), len(clients)))

    def diconnect_callback(websocket):
        clients.remove(websocket)
        print('Client diconnected. Websocket ID = %s. %d clients remaining' % (id(websocket), len(clients)))

    server = SynchronousWebsocketServer(connect_callback=connect_callback, disconnect_callback=diconnect_callback)
    print("Starting server")
    server.start('localhost', 5555)
    print("Server started")

    while True: # Synchornous loop
        try:
            server.process()
            if not server.rxqueue.empty():
                websocket, message = server.rxqueue.get_nowait()   # Non-blocking read. We need to keep call "server.process()" 
                print("Received message from websocket ID=%s. Echoing %s " % (id(websocket), message))
                server.txqueue.put((websocket, message))    # echo
            time.sleep(0.005)
        except KeyboardInterrupt:
            break

    print("Stopping server")
    server.stop()
    print("Server stopped")

为了测试此服务器,我在浏览器控制台中执行了以下操作:

ws1 = new WebSocket('ws://localhost:5555')  
ws2 = new WebSocket('ws://localhost:5555')
ws3 = new WebSocket('ws://localhost:5555')

ws2.send('Hello World')
ws2.close()
ws1.send('Hi') 

// Killed server here.

这产生了这个输出:

Starting server
Server started
New client. Websocket ID = 140097014008368. We now have 1 clients
New client. Websocket ID = 140097014017824. We now have 2 clients
New client. Websocket ID = 140097014018928. We now have 3 clients
Received message from websocket ID=140097014017824. Echoing Hello World 
Client diconnected. Websocket ID = 140097014017824. 2 clients remaining
Received message from websocket ID=140097014008368. Echoing Hi 
^C
Stopping server
Client diconnected. Websocket ID = 140097014018928. 1 clients remaining
Client diconnected. Websocket ID = 140097014008368. 0 clients remaining
Server stopped