且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

检测空闲异步事件循环

更新时间:2022-06-08 22:41:08

正如Philip Couling指出的,下面提供的解决方案不起作用。***不允许删除接受的答案,因此我添加此免责声明。


您所说的&q;空闲&q;可能更准确地描述为&q;正在等待IO或超时&q;。在正确编写的异步代码中,应该不需要检测循环是否处于该状态,因为它不应该重要-循环正在执行它的工作,这取决于像asyncio.gatherasyncio.waitloop.run_until_complete这样的工具来确保它在适当的时间结束。然而,事情并不总是完美的,如果你真的想做到这一点,那当然是有可能的。

在事件循环的每个步骤中,它都会检查准备运行的任务。如果有,则调用它们的步骤。一旦没有更多的任务就绪,事件循环将等待IO事件或最早的超时(以最先发生的为准)。需要注意的重要一点是,正在运行的任务始终优先于等待IO。因此,要检测没有任务就绪的情况,可以计划一个已知会立即触发的虚拟IO事件。

以下协同例程设置此类事件并等待其触发:

import socket, asyncio

async def detect_iowait():
    loop = asyncio.get_event_loop()
    rsock, wsock = socket.socketpair()
    wsock.close()
    await loop.sock_recv(rsock, 1)
    rsock.close()
它设置socket pair,其中从一个套接字读取将返回写入另一个套接字的数据。它立即关闭其中一个套接字,以便从另一个套接字读取数据时立即返回EOF,表示为空字节数组。等待从该套接字读取基本上是非阻塞的-但是asyncio不知道这一点,所以它将该套接字放在IO等待列表中。如上所述,一旦没有可运行的任务,asyncio将等待IO,detect_iowait将等待对套接字的读取并退出。因此等待detect_iowait()本身会检测到IO等待。

使用detect_iowait()的测试代码可能如下所示:

# stop loop.run_forever once iowait is detected
async def stop_on_iowait():
    await detect_iowait()
    print('iowait detected, stopping!')
    asyncio.get_event_loop().stop()

# a dummy calculation coroutine, emulating your execution path
async def calc(n):
    print('calc %d start' % n)
    async def noop():
        pass
    for i in range(n):
        await noop()
    print('calc %d end' % n)

# coroutine that waits on IO forever, also (ab)using a socket pair,
# this time creating a socket whose recv will never complete
async def io_forever():
    loop = asyncio.get_event_loop()
    sock, _ = socket.socketpair()
    sock.setblocking(False)
    await loop.sock_recv(sock, 1)

loop = asyncio.get_event_loop()
for t in calc(1000), calc(10000), calc(100000), io_forever():
    loop.create_task(t)
loop.create_task(stop_on_iowait())
loop.run_forever()