Как использовать объекты FastAPI Depends с asyncio?

У меня есть конечная точка FastAPI, где нужно загрузить некоторые файлы из HDFS на локальный сервер.

Я пытаюсь использовать asyncio для запуска функции, которая загружает файлы в отдельном процессе.

Я использую FastAPI Depends для создания клиента HDFS и внедрения объекта в выполнение конечной точки.

from fastapi import Depends, FastAPI, Request, Response, status
from hdfs import InsecureClient
import asyncio
from concurrent.futures.process import ProcessPoolExecutor

app = FastAPI()

HDFS_URLS = ['http://hdfs-srv.local:50070']

async def run_in_process(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result

def connectHDFS():
    client = InsecureClient(url)
    yield client

def fr(id, img, client):
    # my code here

    client.download(id_identifica_foto_dir_hdfs,  id_identifica_foto_dir_local, True, n_threads=2)

    # my code here

    return jsonReturn


@app.post("/")
async def main(request: Request, hdfsclient: InsecureClient = Depends(connectHDFS)):

    # Decode the received message
    data = await request.json()
    message = base64.b64decode(data['data']).decode('utf-8').replace("'", '"')
    message = json.loads(message)

    res = await run_in_process(fr, message['id'], message['img'], hdfsclient)

    return {
        "message": res
    }

@app.on_event("startup")
async def on_startup():
    app.state.executor = ProcessPoolExecutor()

@app.on_event("shutdown")
async def on_shutdown():
    app.state.executor.shutdown()

Но я не могу передать объект hdfsclient:

res = await run_in_process(fr, message['id'], message['img'], hdfsclient)

Я получаю следующую ошибку:

Traceback (most recent call last):
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/uvicorn/protocols/http/h11_impl.py", line 396, in run_asgi
    result = await app(self.scope, self.receive, self.send)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
    return await self.app(scope, receive, send)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/fastapi/applications.py", line 199, in __call__
    await super().__call__(scope, receive, send)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/applications.py", line 111, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/middleware/errors.py", line 181, in __call__
    raise exc from None
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/middleware/errors.py", line 159, in __call__
    await self.app(scope, receive, _send)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/exceptions.py", line 82, in __call__
    raise exc from None
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/exceptions.py", line 71, in __call__
    await self.app(scope, receive, sender)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/routing.py", line 566, in __call__
    await route.handle(scope, receive, send)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/routing.py", line 227, in handle
    await self.app(scope, receive, send)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/routing.py", line 41, in app
    response = await func(request)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/fastapi/routing.py", line 202, in app
    dependant=dependant, values=values, is_coroutine=is_coroutine
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/fastapi/routing.py", line 148, in run_endpoint_function
    return await dependant.call(**values)
  File "./asgi.py", line 86, in main
    res = await run_in_process(fr, message['id'], message['img'], hdfsclient)
  File "./asgi.py", line 22, in run_in_process
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result
  File "/usr/lib/python3.7/multiprocessing/queues.py", line 236, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects

Как сделать hdfsclient доступным внутри функции def fr() без необходимости создавать новое соединение при каждом новом запросе? Я имею в виду, как создать hdfsclient при запуске приложения и иметь возможность использовать его внутри функции?


person Kleyson Rios    schedule 27.04.2021    source источник


Ответы (1)


Вся суть asyncio в том, чтобы делать то, что вы пытаетесь достичь, в одном процессе.

Типичным примером является поисковый робот, в котором вы открываете несколько запросов в одном потоке / процессе, а затем ждете их завершения. Таким образом, вы получите данные из нескольких urls, не дожидаясь каждого отдельного запроса перед запуском следующего.

То же самое применимо и в вашем случае: вызовите вашу async функцию, которая загружает файл, сделайте свою работу и затем дождитесь завершения загрузки файла (если она еще не завершена). Обмен данными между процессами не является тривиальным делом, и ваша функция не работает по этой причине.

Я предлагаю вам сначала понять, что такое async и как оно работает, прежде чем приступать к тому, что вы не понимаете.

Некоторые руководства по asyncio

https://www.datacamp.com/community/tutorials/asyncio-introduction

https://realpython.com/lessons/what-asyncio/

https://docs.python.org/3/library/asyncio.html

person lsabi    schedule 28.04.2021
comment
Спасибо за ответ. Я понимаю все те случаи, о которых вы упомянули. Я просто ищу возможное решение для своего случая. - person Kleyson Rios; 28.04.2021
comment
Дело в том, что вы используете отвертку вместо молотка, чтобы вешать гвоздь. Совместное использование переменных между процессами нетривиально и может привести к ошибкам. Вы можете использовать нужный инструмент, но любой другой ответ будет либо действительно сложным, либо предложит использовать asyncio так, как это было задумано. - person lsabi; 28.04.2021