У меня есть конечная точка FastAPI, где нужно загрузить некоторые файлы из HDFS на локальный сервер.
Я пытаюсь использовать asyncio для запуска функции, которая загружает файлы в отдельном процессе.
Я использую FastAPI Depends для создания клиента HDFS и внедрения объекта в выполнение конечной точки.
from fastapi import Depends, FastAPI, Request, Response, status
from hdfs import InsecureClient
import asyncio
from concurrent.futures.process import ProcessPoolExecutor
app = FastAPI()
HDFS_URLS = ['http://hdfs-srv.local:50070']
async def run_in_process(fn, *args):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(app.state.executor, fn, *args) # wait and return result
def connectHDFS():
client = InsecureClient(url)
yield client
def fr(id, img, client):
# my code here
client.download(id_identifica_foto_dir_hdfs, id_identifica_foto_dir_local, True, n_threads=2)
# my code here
return jsonReturn
@app.post("/")
async def main(request: Request, hdfsclient: InsecureClient = Depends(connectHDFS)):
# Decode the received message
data = await request.json()
message = base64.b64decode(data['data']).decode('utf-8').replace("'", '"')
message = json.loads(message)
res = await run_in_process(fr, message['id'], message['img'], hdfsclient)
return {
"message": res
}
@app.on_event("startup")
async def on_startup():
app.state.executor = ProcessPoolExecutor()
@app.on_event("shutdown")
async def on_shutdown():
app.state.executor.shutdown()
Но я не могу передать объект hdfsclient
:
res = await run_in_process(fr, message['id'], message['img'], hdfsclient)
Я получаю следующую ошибку:
Traceback (most recent call last):
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/uvicorn/protocols/http/h11_impl.py", line 396, in run_asgi
result = await app(self.scope, self.receive, self.send)
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
return await self.app(scope, receive, send)
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/fastapi/applications.py", line 199, in __call__
await super().__call__(scope, receive, send)
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/applications.py", line 111, in __call__
await self.middleware_stack(scope, receive, send)
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/middleware/errors.py", line 181, in __call__
raise exc from None
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/middleware/errors.py", line 159, in __call__
await self.app(scope, receive, _send)
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/exceptions.py", line 82, in __call__
raise exc from None
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/exceptions.py", line 71, in __call__
await self.app(scope, receive, sender)
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/routing.py", line 566, in __call__
await route.handle(scope, receive, send)
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/routing.py", line 227, in handle
await self.app(scope, receive, send)
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/routing.py", line 41, in app
response = await func(request)
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/fastapi/routing.py", line 202, in app
dependant=dependant, values=values, is_coroutine=is_coroutine
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/fastapi/routing.py", line 148, in run_endpoint_function
return await dependant.call(**values)
File "./asgi.py", line 86, in main
res = await run_in_process(fr, message['id'], message['img'], hdfsclient)
File "./asgi.py", line 22, in run_in_process
return await loop.run_in_executor(app.state.executor, fn, *args) # wait and return result
File "/usr/lib/python3.7/multiprocessing/queues.py", line 236, in _feed
obj = _ForkingPickler.dumps(obj)
File "/usr/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects
Как сделать hdfsclient
доступным внутри функции def fr()
без необходимости создавать новое соединение при каждом новом запросе? Я имею в виду, как создать hdfsclient
при запуске приложения и иметь возможность использовать его внутри функции?