Sharing python objects across multiple workers(在多个工作进程之间共享Python对象)
问题描述
我们已经使用FastAPI创建了一个服务。当我们的服务启动时,它创建几个Python对象,然后端点使用这些对象来存储或检索数据。
生产中的FastAPI从多个工作进程开始。我们的问题是,每个员工都创建了自己的对象,而不是共享单个对象。
下面的脚本显示了我们正在执行的操作的(简化)示例,不过在我们的示例中,Meta()的用法要复杂得多。
from fastapi import FastAPI, status
class Meta:
   def __init__(self):
      self.count = 0  
app = FastAPI()
meta = Meta()
# increases the count variable in the meta object by 1
@app.get("/increment")
async def increment():
    meta.count += 1
    return status.HTTP_200_OK
# returns a json containing the current count from the meta object
@app.get("/report")
async def report():
    return {'count':meta.count}
# resets the count in the meta object to 0
@app.get("/reset")
async def reset():
    meta.count = 0
    return status.HTTP_200_OK
如上所述,多个工作进程的问题在于每个工作进程都有自己的meta对象。请注意,当使用单个Worker运行API时,此问题不可见。
更明确地说,当我们第一次到达/increment端点时,我们将只看到两个工作进程中的一个响应调用(这是正确的,我们不希望两个工作进程执行相同的操作)。但是,因为有两个单独的meta对象,所以这两个对象中只有一个会递增。
到达/report终结点时,将根据响应请求的工作进程返回1或0。
那么问题是,我们如何让员工共享同一对象并对其进行操作?
作为一个次要问题,上面的问题也会影响/reset端点。如果调用此终结点,则只有一个工作进程将重置其对象。是否有办法强制所有工作人员响应终结点上的单个呼叫?
谢谢!
编辑:我忘记提到我们已经尝试(没有成功)将meta对象存储在app.state中。本质:
app.state.meta = Meta()
...
@app.get("/report")
async def report():
    return {'count':app.state.meta.count}
推荐答案
不可能在不同进程之间直接共享Python对象。
multiprocessing模块中包含的工具(如管理器或共享内存)不适合在工作进程之间共享资源,因为它们需要创建资源的主进程,并且没有持久性属性。此外,服务器进程可以在不同的计算机上运行。
员工之间共享资源的首选方式:
- 数据库-在需要可靠存储和可扩展性的持久性资源的情况下。示例:
PostgreSQL、MariaDB、MongoDB等。 - 缓存(键/值)-在数据的临时性情况下,比数据库更快,但没有这样的可伸缩性,而且通常不符合ACID。示例:
Redis、Memcached等 
FastAPI应用程序中的数据。作为示例,我以Redis为后端的aiocache库和PostgreSQL为后端的Tortoise ORM库为例。因为FastAPI是异步框架,所以我选择了基于asyncio的库。
测试项目结构如下:
.
├── app_cache.py
├── app_db.py
├── docker-compose.yml
├── __init__.py
Docker-Compose文件:
对于实验,您可以使用以下docker组成文件将5432(Postgres)和6379(Redis)端口公开到localhost。
version: '3'
services:
  database:
    image: postgres:12-alpine
    ports:
      - "5432:5432"
    environment:
      POSTGRES_PASSWORD: test_pass
      POSTGRES_USER: test_user
      POSTGRES_DB: test_db
  redis:
    image: redis:6-alpine
    ports:
      - "6379:6379"
开始:
docker-compose up -d
缓存(aiocache)
Aiocache提供3个主要实体:
- 后端:允许您指定要用于缓存的后端。目前支持:
 SimpleMemoryCache、RedisCache使用aioredis和MemCache使用aiomcache。serializers:序列化和反序列化代码和后端之间的数据。此允许您将任何Python对象保存到缓存。目前支持:StringSerializer、PickleSerializer、JsonSerializer、MsgPackSerializer。但您也可以自定义。- 插件:实现允许在每个命令前后执行额外行为的挂钩系统。
 
开始:
uvicorn app_cache:app --host localhost --port 8000 --workers 5
# app_cache.py
import os
from aiocache import Cache
from fastapi import FastAPI, status
app = FastAPI()
cache = Cache(Cache.REDIS, endpoint="localhost", port=6379, namespace="main")
class Meta:
    def __init__(self):
        pass
    async def get_count(self) -> int:
        return await cache.get("count", default=0)
    async def set_count(self, value: int) -> None:
        await cache.set("count", value)
    async def increment_count(self) -> None:
        await cache.increment("count", 1)
meta = Meta()
# increases the count variable in the meta object by 1
@app.post("/increment")
async def increment():
    await meta.increment_count()
    return status.HTTP_200_OK
# returns a json containing the current count from the meta object
@app.get("/report")
async def report():
    count = await meta.get_count()
    return {'count': count, "current_process_id": os.getpid()}
# resets the count in the meta object to 0
@app.post("/reset")
async def reset():
    await meta.set_count(0)
    return status.HTTP_200_OK
数据库(Tortoise ORM+PostgreSQL)
启动: 为简单起见,我们首先运行一个Worker在数据库中创建模式:
uvicorn app_db:app --host localhost --port 8000 --workers 1
[Ctrl-C] 
uvicorn app_db:app --host localhost --port 8000 --workers 5
# app_db.py
from fastapi import FastAPI, status
from tortoise import Model, fields
from tortoise.contrib.fastapi import register_tortoise
class MetaModel(Model):
    count = fields.IntField(default=0)
app = FastAPI()
# increases the count variable in the meta object by 1
@app.get("/increment")
async def increment():
    meta, is_created = await MetaModel.get_or_create(id=1)
    meta.count += 1  # it's better do it in transaction
    await meta.save()
    return status.HTTP_200_OK
# returns a json containing the current count from the meta object
@app.get("/report")
async def report():
    meta, is_created = await MetaModel.get_or_create(id=1)
    return {'count': meta.count}
# resets the count in the meta object to 0
@app.get("/reset")
async def reset():
    meta, is_created = await MetaModel.get_or_create(id=1)
    meta.count = 0
    await meta.save()
    return status.HTTP_200_OK
register_tortoise(
    app,
    db_url="postgres://test_user:test_pass@localhost:5432/test_db",  # Don't expose login/pass in src, use environment variables
    modules={"models": ["app_db"]},
    generate_schemas=True,
    add_exception_handlers=True,
)
                        这篇关于在多个工作进程之间共享Python对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:在多个工作进程之间共享Python对象
				
        
 
            
        基础教程推荐
- 在Python中从Azure BLOB存储中读取文件 2022-01-01
 - 使用大型矩阵时禁止 Pycharm 输出中的自动换行符 2022-01-01
 - 包装空间模型 2022-01-01
 - PermissionError: pip 从 8.1.1 升级到 8.1.2 2022-01-01
 - 修改列表中的数据帧不起作用 2022-01-01
 - Plotly:如何设置绘图图形的样式,使其不显示缺失日期的间隙? 2022-01-01
 - 在同一图形上绘制Bokeh的烛台和音量条 2022-01-01
 - PANDA VALUE_COUNTS包含GROUP BY之前的所有值 2022-01-01
 - 无法导入 Pytorch [WinError 126] 找不到指定的模块 2022-01-01
 - 求两个直方图的卷积 2022-01-01
 
    	
    	
    	
    	
    	
    	
    	
    	
				
				
				
				