如何在 FastAPI 中进行多处理

How to do multiprocessing in FastAPI(如何在 FastAPI 中进行多处理)

本文介绍了如何在 FastAPI 中进行多处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在处理 FastAPI 请求时,我需要对列表的每个元素执行 CPU 密集型任务.我想在多个 CPU 内核上进行此处理.

While serving a FastAPI request, I have a CPU-bound task to do on every element of a list. I'd like to do this processing on multiple CPU cores.

在 FastAPI 中执行此操作的正确方法是什么?我可以使用标准的 multiprocessing 模块吗?到目前为止,我发现的所有教程/问题都只涉及 I/O 绑定任务,例如 Web 请求.

What's the proper way to do this within FastAPI? Can I use the standard multiprocessing module? All the tutorials/questions I found so far only cover I/O-bound tasks like web requests.

推荐答案

async def端点

您可以使用 loop.run_in_executor使用 ProcessPoolExecutor 在单独的进程中启动函数.

async def endpoint

You could use loop.run_in_executor with ProcessPoolExecutor to start function at a separate process.

@app.post("/async-endpoint")
async def test_endpoint():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound_func)  # wait result

def端点

由于 def 端点是 运行隐式 在单独的线程中,您可以使用模块的全部功能 multiprocessing 和 concurrent.futures.请注意,在 def 函数内部,可能不使用 await.样品:

def endpoint

Since def endpoints are run implicitly in a separate thread, you can use the full power of modules multiprocessing and concurrent.futures. Note that inside def function, await may not be used. Samples:

@app.post("/def-endpoint")
def test_endpoint():
    ...
    with multiprocessing.Pool(3) as p:
        result = p.map(f, [1, 2, 3])

@app.post("/def-endpoint/")
def test_endpoint():
    ...
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
      results = executor.map(f, [1, 2, 3])

注意:应该记住,在端点中创建进程池以及创建大量线程会导致响应速度变慢,因为请求增加.

在单独的进程中执行函数并立即等待结果的最简单和最原生的方法是使用 loop.run_in_executor 与 ProcessPoolExecutor.

The easiest and most native way to execute a function in a separate process and immediately wait for the results is to use the loop.run_in_executor with ProcessPoolExecutor.

可以在应用程序启动时创建池,如下例所示,并且不要忘记在应用程序退出时关闭.可以使用 设置池中使用的进程数max_workers ProcessPoolExecutor 构造函数参数.如果 max_workersNone 或未给出,则默认为机器上的处理器数.

A pool, as in the example below, can be created when the application starts and do not forget to shutdown on application exit. The number of processes used in the pool can be set using the max_workers ProcessPoolExecutor constructor parameter. If max_workers is None or not given, it will default to the number of processors on the machine.

这种方法的缺点是请求处理程序(路径操作)在单独的进程中等待计算完成,而客户端连接保持打开状态.而如果由于某种原因失去了连接,那么结果将无处可返回.

The disadvantage of this approach is that the request handler (path operation) waits for the computation to complete in a separate process, while the client connection remains open. And if for some reason the connection is lost, then the results will have nowhere to return.

import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from fastapi import FastAPI

from calc import cpu_bound_func

app = FastAPI()


async def run_in_process(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result


@app.get("/{param}")
async def handler(param: int):
    res = await run_in_process(cpu_bound_func, param)
    return {"result": res}


@app.on_event("startup")
async def on_startup():
    app.state.executor = ProcessPoolExecutor()


@app.on_event("shutdown")
async def on_shutdown():
    app.state.executor.shutdown()

移至背景

通常,CPU 密集型任务在后台执行.FastAPI 提供了运行后台任务的能力,以便在之后运行> 返回一个响应,您可以在其中启动并异步等待 CPU 绑定任务的结果.

Move to background

Usually, CPU bound tasks are executed in the background. FastAPI offers the ability to run background tasks to be run after returning a response, inside which you can start and asynchronously wait for the result of your CPU bound task.

在这种情况下,例如,您可以立即返回Accepted"的响应(HTTP代码202)和唯一的任务ID,继续计算后台,客户端可以稍后使用此ID请求任务的状态.

In this case, for example, you can immediately return a response of "Accepted" (HTTP code 202) and a unique task ID, continue calculations in the background, and the client can later request the status of the task using this ID.

BackgroundTasks 提供了一些功能,特别是,您可以运行其中的几个(包括在依赖项中).在它们中,您可以使用在依赖项中获得的资源,这些资源只有在所有任务完成后才会被清理,而在出现异常时,可以正确处理它们.这可以在这个 中更清楚地看到图.

BackgroundTasks provide some features, in particular, you can run several of them (including in dependencies). And in them you can use the resources obtained in the dependencies, which will be cleaned only when all tasks are completed, while in case of exceptions it will be possible to handle them correctly. This can be seen more clearly in this diagram.

以下是执行最小任务跟踪的示例.假定应用程序的一个实例正在运行.

Below is an example that performs minimal task tracking. One instance of the application running is assumed.

import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from http import HTTPStatus

from fastapi import BackgroundTasks
from typing import Dict
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field

from calc import cpu_bound_func


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    result: int = None


app = FastAPI()
jobs: Dict[UUID, Job] = {}


async def run_in_process(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result


async def start_cpu_bound_task(uid: UUID, param: int) -> None:
    jobs[uid].result = await run_in_process(cpu_bound_func, param)
    jobs[uid].status = "complete"


@app.post("/new_cpu_bound_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(param: int, background_tasks: BackgroundTasks):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(start_cpu_bound_task, new_task.uid, param)
    return new_task


@app.get("/status/{uid}")
async def status_handler(uid: UUID):
    return jobs[uid]


@app.on_event("startup")
async def startup_event():
    app.state.executor = ProcessPoolExecutor()


@app.on_event("shutdown")
async def on_shutdown():
    app.state.executor.shutdown()

更强大的解决方案

上面所有的例子都很简单,但是如果你需要一些更强大的系统来进行繁重的分布式计算,那么你可以把消息代理放在一边RabbitMQKafkaNATS 等.以及使用它们的库,如 Celery.

More powerful solutions

All of the above examples were pretty simple, but if you need some more powerful system for heavy distributed computing, then you can look aside message brokers RabbitMQ, Kafka, NATS and etc. And libraries using them like Celery.

这篇关于如何在 FastAPI 中进行多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本文标题为:如何在 FastAPI 中进行多处理

基础教程推荐