How to have a comfortable (e.g. GNU-readline style) input line in an asyncio task?(如何在异步任务中拥有舒适的(例如GNU-readline风格)输入行?)
本文介绍了如何在异步任务中拥有舒适的(例如GNU-readline风格)输入行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有一个异步程序,有两个任务:
- 任务%1做了一些工作
- 任务2提供命令行界面(CLI),它从用户读取命令并将其发送到任务1进行处理
CLI基本上是从连接到标准输入的异步流中读取行的循环。
它能用,但不太舒服。问题是输入行没有提供编辑命令,除了在Linux终端级别处理的[backspace ctrl-H]和[ctrl-U],而不是在程序中。我至少需要[左]和[右]箭头以及[删除]。
我在导入readline
的情况下尝试了内置input()
,这给我带来了舒适的编辑,但它必须在单独的线程中运行,因为它正在阻塞(loop.run_in_executor
)。
KeyboardInterrupt
)。任务已取消,但其自身线程中的input()
仍在等待[Enter]键。只有在[Enter]键之后,应用程序才会退出。这太令人困惑了,而且对于这种方法来说,它是一个拦截器错误。遗憾的是,无法终止线程,因此我无法使[ctrl-C]键正常终止程序。
您是否知道如何使用input()
解决问题,或者是否知道在异步任务中启用基本输入行编辑的替代方法?
推荐答案
您是否考虑过将input
部分保留在主线程中,并在子线程中运行事件循环?
这可能有点老生常谈,肯定需要改进,但您可以使用您已有的内容:
import asyncio
from threading import Thread
streaming_queue = asyncio.Queue()
async def main_async():
while True:
chunk = await streaming_queue.get()
print("got chunk: ", chunk)
def event_loop(loop):
try:
loop.run_until_complete(main_async())
except asyncio.CancelledError:
loop.close()
print("loop closed")
def send_chunk(chunk):
streaming_queue.put_nowait(chunk)
def cancel_all():
for task in asyncio.all_tasks():
task.cancel()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
event_thread = Thread(target=event_loop, args=(loop,))
event_thread.start()
try:
while True:
chunk = input()
loop.call_soon_threadsafe(send_chunk, chunk)
except KeyboardInterrupt:
print("cancelling all tasks")
loop.call_soon_threadsafe(cancel_all)
print("joining thread")
event_thread.join()
print("done")
最好的当然是input
的异步实现。不确定这样的东西是否存在,或者@Furas的链接是否有可能.
这篇关于如何在异步任务中拥有舒适的(例如GNU-readline风格)输入行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
沃梦达教程
本文标题为:如何在异步任务中拥有舒适的(例如GNU-readline风格)输入行?
基础教程推荐
猜你喜欢
- 合并具有多索引的两个数据帧 2022-01-01
- 如何在Python中绘制多元函数? 2022-01-01
- 将 YAML 文件转换为 python dict 2022-01-01
- Python 的 List 是如何实现的? 2022-01-01
- 哪些 Python 包提供独立的事件系统? 2022-01-01
- 症状类型错误:无法确定关系的真值 2022-01-01
- 如何在 Python 中检测文件是否为二进制(非文本)文 2022-01-01
- 使 Python 脚本在 Windows 上运行而不指定“.py";延期 2022-01-01
- 使用Python匹配Stata加权xtil命令的确定方法? 2022-01-01
- 使用 Google App Engine (Python) 将文件上传到 Google Cloud Storage 2022-01-01