FastAPI ejecuta llamadas api en serie en lugar de en paralelo
Tengo el siguiente código:
import time
from fastapi import FastAPI, Request
app = FastAPI()
@app.get("/ping")
async def ping(request: Request):
print("Hello")
time.sleep(5)
print("bye")
return {"ping": "pong!"}
Si ejecuto mi código en localhost, por ejemplo, http://localhost:8501/ping
en diferentes pestañas de la misma ventana del navegador, obtengo:
Hello
bye
Hello
bye
en lugar de:
Hello
Hello
bye
bye
He leído sobre el uso httpx
, pero aún así no puedo tener una verdadera paralelización. ¿Cuál es el problema?
Según la documentación de FastAPI :
Cuando declara una función de operación de ruta con normal
def
en lugar deasync def
, se ejecuta en un grupo de subprocesos externo que luego seawait
ed , en lugar de llamarse directamente (ya que bloquearía el servidor).
también, como se describe aquí :
Si está utilizando una biblioteca de terceros que se comunica con algo (una base de datos, una API, el sistema de archivos, etc.) y no tiene soporte para usar
await
(este es actualmente el caso de la mayoría de las bibliotecas de bases de datos), entonces declare su La operación de ruta funciona normalmente, con solodef
.Si su aplicación (de alguna manera) no tiene que comunicarse con nada más y esperar a que responda, use
async def
.Si no lo sabes, usa normal
def
.Nota : Puede mezclar
def
yasync def
en su ruta funciones de operación tantas como necesite y definir cada una usando la mejor opción para usted. FastAPI hará lo correcto con ellos.De todos modos, en cualquiera de los casos anteriores, FastAPI seguirá funcionando de forma asíncrona y será extremadamente rápido.
Pero si sigue los pasos anteriores, podrá realizar algunas optimizaciones de rendimiento.
Por lo tanto, def
los puntos finales (en el contexto de la programación asincrónica, una función definida con just def
se llama función sincrónica ), en FastAPI, se ejecutan en un subproceso separado de un grupo de subprocesos externo que luego se await
edita y, por lo tanto, FastAPI seguirá funcionando de forma asincrónica . En otras palabras, el servidor procesará las solicitudes a dichos puntos finales al mismo tiempo . Mientras que async def
los puntos finales se ejecutan directamente en el event loop
subproceso principal (único), es decir, el servidor también procesará solicitudes a dichos puntos finales de forma concurrente / asincrónica , siempre que haya una await
llamada a E/S sin bloqueo. operaciones dentro de dichos async def
puntos finales/rutas, como esperar a que (1) se envíen datos del cliente a través de la red, (2) se lea el contenido de un archivo en el disco, (3) finalice una operación de base de datos, etc. , (mira aquí ). Sin embargo, si un punto final definido con async def
no lo hace await
para alguna rutina interna, con el fin de dar tiempo para que se event loop
ejecuten otras tareas en el (por ejemplo, solicitudes al mismo u otros puntos finales, tareas en segundo plano, etc.), cada solicitud a dicho un punto final tendrá que estar completamente terminado (es decir, salir del punto final), antes de devolver el control event loop
y permitir que se ejecuten otras tareas. En otras palabras, en tales casos, el servidor procesará las solicitudes de forma secuencial . Tenga en cuenta que el mismo concepto también se aplica a las funciones definidas con normal def
que se utilizan como StreamingResponse
generadores (ver StreamingResponse
implementación de clase) o Background Tasks
(ver BackgroundTask
implementación de clase), lo que significa que FastAPI, detrás de escena, también ejecutará dichas funciones en un hilo separado del mismo grupo de subprocesos externo. El número predeterminado de subprocesos de trabajo de ese grupo de subprocesos externo es 40
y se puede ajustar según sea necesario; consulte esta respuesta sobre cómo hacerlo. Por lo tanto, después de leer esta respuesta hasta el final, debería poder decidir si debe definir un punto final FastAPI, StreamingResponse
un generador o una función de tarea en segundo plano con def
o async def
.
La palabra clave await
(que solo funciona dentro de una async def
función) devuelve el control de la función al archivo event loop
. En otras palabras, suspende la ejecución de la rutina circundante (es decir, un objeto de rutina es el resultado de llamar a una async def
función) y le dice event loop
que deje ejecutar alguna otra tarea, hasta que await
se complete esa tarea. Tenga en cuenta que el hecho de que pueda definir una función personalizada con async def
y luego await
dentro de su async def
punto final, no significa que su código funcionará de forma asíncrona, si esa función personalizada contiene, por ejemplo, llamadas a time.sleep()
tareas vinculadas a la CPU, no asíncronas. Bibliotecas de E/S o cualquier otra llamada de bloqueo que sea incompatible con el código Python asíncrono. En FastAPI, por ejemplo, cuando se utilizan los async
métodos de UploadFile
, como await file.read()
y await file.write()
, FastAPI/Starlette, detrás de escena, en realidad llama a los métodos de archivo sincrónicos correspondientes en un subproceso separado del grupo de subprocesos externo descrito anteriormente (usando la función) y lo envía; de lo contrario, dichos métodos/operaciones bloquearían la clase. Puede obtener más información observando la implementación de la clase .async
run_in_threadpool()
await
event loop
UploadFile
Tenga en cuenta que eso async
no significa paralelo , sino simultáneamente . Como se mencionó anteriormente, el código asincrónico con async
y await
muchas veces se resume como el uso de corrutinas . Las corrutinas son colaborativas (o multitarea cooperativa ), lo que significa que "en un momento dado, un programa con corrutinas ejecuta solo una de sus corrutinas, y esta corrutina en ejecución suspende su ejecución solo cuando solicita explícitamente ser suspendida" (ver aquí y aquí para obtener más información sobre corrutinas). Como se describe en este artículo :
Específicamente, cada vez que la ejecución de una corrutina actualmente en ejecución alcanza una
await
expresión, la corrutina puede suspenderse y otra corrutina previamente suspendida puede reanudar la ejecución si aquello en lo que se suspendió ha devuelto un valor desde entonces. La suspensión también puede ocurrir cuando unasync for
bloque solicita el siguiente valor de un iterador asincrónico o cuandoasync with
se ingresa o se sale de un bloque, como se usaawait
en estas operaciones.
Sin embargo, si una operación de bloqueo vinculada a E/S o vinculada a la CPU se ejecutara/llamara directamenteasync def
dentro de una función/punto final, entonces bloquearía el bucle de eventos y, por lo tanto, el subproceso principal (las event loop
ejecuciones en el subproceso principal de un proceso/trabajador). Por lo tanto, una operación de bloqueo, como time.sleep()
en un async def
punto final, bloquearía todo el servidor (como en el ejemplo de código proporcionado en su pregunta). Por lo tanto, si su punto final no va a realizar ninguna async
llamada, puede declararlo con normal def
, en cuyo caso, FastAPI lo ejecutará en un subproceso separado del grupo de subprocesos externo y await
, como se explicó anteriormente (se brindan más soluciones en el siguientes apartados). Ejemplo:
@app.get("/ping")
def ping(request: Request):
#print(request.client)
print("Hello")
time.sleep(5)
print("bye")
return "pong"
De lo contrario, si las funciones que tuvo que ejecutar dentro del punto final son async
funciones que tuvo que ejecutar await
, debe definir su punto final con async def
. Para demostrar esto, el siguiente ejemplo utiliza la asyncio.sleep()
función (de la asyncio
biblioteca), que proporciona una operación de suspensión sin bloqueo. El await asyncio.sleep()
método suspenderá la ejecución de la rutina circundante (hasta que se complete la operación de suspensión), permitiendo así que se event loop
ejecuten otras tareas en el. Se dan ejemplos similares aquí y aquí también.
import asyncio
@app.get("/ping")
async def ping(request: Request):
#print(request.client)
print("Hello")
await asyncio.sleep(5)
print("bye")
return "pong"
Los dos puntos finales anteriores imprimirán los mensajes especificados en la pantalla en el mismo orden mencionado en su pregunta, si dos solicitudes llegaron (aproximadamente) al mismo tiempo, es decir:
Hello
Hello
bye
bye
Nota IMPORTANTE
Cuando utilice un navegador para llamar al mismo punto final por segunda (tercera, y así sucesivamente), recuerde hacerlo desde una pestaña que esté aislada de la sesión principal del navegador ; de lo contrario, las solicitudes posteriores (es decir, las que siguen a la primera) podrían ser bloqueadas por el navegador (en el lado del cliente ), ya que el navegador podría estar esperando una respuesta a la solicitud anterior del servidor, antes de enviar la siguiente solicitud. Este es un comportamiento común al menos para el navegador Chrome, debido a esperar a ver el resultado de una solicitud y comprobar si el resultado se puede almacenar en caché, antes de volver a solicitar el mismo recurso.
Puede confirmarlo usando print(request.client)
el interior del punto final, donde verá que el número hostname
y port
es el mismo para todas las solicitudes entrantes, en caso de que las solicitudes se iniciaron desde pestañas abiertas en la misma ventana/sesión del navegador; de lo contrario, el port
número normalmente sería diferente para cada solicitud y, por lo tanto, esas solicitudes serían procesadas secuencialmente por el servidor, debido a que el navegador/cliente las envía secuencialmente en primer lugar. Para superar esto, podrías:
Vuelva a cargar la misma pestaña (como se está ejecutando), o
Abrir una nueva pestaña en una ventana de incógnito, o
Utilice un navegador/cliente diferente para enviar la solicitud, o
Utilice la
httpx
biblioteca para realizar solicitudes HTTP asincrónicas , junto con awaitableasyncio.gather()
, que permite ejecutar múltiples operaciones asincrónicas simultáneamente y luego devuelve una lista de resultados en el mismo orden en que se pasaron las awaitables (tareas) a esa función (eche un vistazo a esta respuesta para más detalles).Ejemplo :
import httpx import asyncio URLS = ['http://127.0.0.1:8000/ping'] * 2 async def send(url, client): return await client.get(url, timeout=10) async def main(): async with httpx.AsyncClient() as client: tasks = [send(url, client) for url in URLS] responses = await asyncio.gather(*tasks) print(*[r.json() for r in responses], sep='\n') asyncio.run(main())
En caso de que haya tenido que llamar a diferentes puntos finales que pueden tardar diferentes tiempos en procesar una solicitud y desee imprimir la respuesta en el lado del cliente tan pronto como la devuelva el servidor, en lugar de esperar a
asyncio.gather()
recopilar los resultados de todos tareas e imprimirlas en el mismo orden en que se pasaron las tareas a lasend()
función; puede reemplazar lasend()
función del ejemplo anterior con la que se muestra a continuación:async def send(url, client): res = await client.get(url, timeout=10) print(res.json()) return res
Async
/ await
y bloqueo de operaciones vinculadas a E/S o CPU
Si necesita definir un punto final async def
(como podría necesitarlo await
para algunas rutinas dentro de él), pero también tiene alguna operación de bloqueo síncrono vinculada a E/S o vinculada a la CPU (tarea computacionalmente intensiva) que bloquearía event loop
(esencialmente, todo el servidor) y no dejaría pasar otras solicitudes, por ejemplo:
@app.post("/ping")
async def ping(file: UploadFile = File(...)):
print("Hello")
try:
contents = await file.read()
res = cpu_bound_task(contents) # this will block the event loop
finally:
await file.close()
print("bye")
return "pong"
entonces:
Debe verificar si puede cambiar la definición de su punto final a normal
def
en lugar deasync def
. Por ejemplo, si el único método en su punto final que debe esperarse es el que lee el contenido del archivo (como mencionó en la sección de comentarios a continuación), podría declarar el tipo de parámetro del punto final como (es decir,bytes
)file: bytes = File()
y por lo tanto , FastAPI leerá el archivo por usted y usted recibirá el contenido comobytes
. Por lo tanto, no habría necesidad de utilizarawait file.read()
. Tenga en cuenta que el enfoque anterior debería funcionar para archivos pequeños, ya que todo el contenido del archivo se almacenaría en la memoria (consulte la documentación enFile
Parámetros ); y por lo tanto, si tu sistema no tiene suficiente RAM disponible para acomodar los datos acumulados (si, por ejemplo, tienes 8 GB de RAM, no puedes cargar un archivo de 50 GB), tu aplicación puede terminar fallando. Alternativamente, puede llamar al.read()
métodoSpooledTemporaryFile
directamente (al que se puede acceder a través del.file
atributo delUploadFile
objeto), de modo que nuevamente no tenga que usarawait
el.read()
método y, como ahora puede declarar su punto final con normaldef
, cada solicitud se ejecutará. en un hilo separado (se proporciona un ejemplo a continuación). Para obtener más detalles sobre cómo cargar un archivoFile
y cómo se utiliza Starlette/FastAPISpooledTemporaryFile
detrás de escena, consulte esta respuesta y esta respuesta .@app.post("/ping") def ping(file: UploadFile = File(...)): print("Hello") try: contents = file.file.read() res = cpu_bound_task(contents) finally: file.file.close() print("bye") return "pong"
Utilice la función FastAPI (Starlette)
run_in_threadpool()
delconcurrency
módulo, como sugirió @tiangolo aquí , que "ejecutará la función en un hilo separado para garantizar que el hilo principal (donde se ejecutan las rutinas) no se bloquee" (ver aquí ). Como lo describe @tiangolo aquí , "run_in_threadpool
es unaawait
función capaz; el primer parámetro es una función normal, los siguientes parámetros se pasan directamente a esa función. Admite argumentos de secuencia y argumentos de palabras clave ".from fastapi.concurrency import run_in_threadpool res = await run_in_threadpool(cpu_bound_task, contents)
Alternativamente, use
asyncio
'sloop.run_in_executor()
—después de obtener elevent loop
uso en ejecución—asyncio.get_running_loop()
para ejecutar la tarea, que, en este caso, puedeawait
completar y devolver los resultados, antes de pasar a la siguiente línea de código. PasandoNone
al argumento del ejecutor , se utilizará el ejecutor predeterminado ; el cual es unThreadPoolExecutor
:import asyncio loop = asyncio.get_running_loop() res = await loop.run_in_executor(None, cpu_bound_task, contents)
o, si en su lugar desea pasar argumentos de palabras clave , puede usar una
lambda
expresión (p. ej.,lambda: cpu_bound_task(some_arg=contents)
) o, preferiblemente,functools.partial()
, que se recomienda específicamente en la documentación paraloop.run_in_executor()
:import asyncio from functools import partial loop = asyncio.get_running_loop() res = await loop.run_in_executor(None, partial(cpu_bound_task, some_arg=contents))
En Python 3.9+, también se puede utilizar
asyncio.to_thread()
para ejecutar de forma asincrónica una función sincrónica en un subproceso separado, que, esencialmente, se usaawait loop.run_in_executor(None, func_call)
de forma interna, como se puede ver en la implementación deasyncio.to_thread()
. Lato_thread()
función toma el nombre de una función de bloqueo para ejecutar, así como cualquier argumento (*args
y/o**kwargs
) de la función, y luego devuelve una corrutina que se puedeawait
editar. Ejemplo:import asyncio res = await asyncio.to_thread(cpu_bound_task, contents)
Tenga en cuenta que, como se explica en esta respuesta , pasar
None
elexecutor
argumento no crea uno nuevoThreadPoolExecutor
cada vez que llamaawait loop.run_in_executor(None, ...)
, sino que reutiliza el ejecutor predeterminado con el número predeterminado de subprocesos de trabajo (es decir,min(32, os.cpu_count() + 4)
). Por lo tanto, dependiendo de los requisitos de su aplicación, ese número puede ser bastante bajo. En ese caso, deberías utilizar un archivo personalizadoThreadPoolExecutor
. Por ejemplo:import asyncio import concurrent.futures loop = asyncio.get_running_loop() with concurrent.futures.ThreadPoolExecutor() as pool: res = await loop.run_in_executor(pool, cpu_bound_task, contents)
Recomiendo encarecidamente echar un vistazo a la respuesta vinculada anterior para conocer la diferencia entre usar
run_in_threadpool()
yrun_in_executor()
, así como también cómo crear una personalización reutilizableThreadPoolExecutor
al inicio de la aplicación y ajustar la cantidad máxima de subprocesos de trabajo según sea necesario.ThreadPoolExecutor
will successfully prevent theevent loop
from being blocked, but won't give you the performance improvement you would expect from running code in parallel; especially, when one needs to performCPU-bound
tasks, such as the ones described here (e.g., audio or image processing, machine learning, and so on). It is thus preferable to run CPU-bound tasks in a separate process—usingProcessPoolExecutor
, as shown below—which, again, you can integrate withasyncio
, in order toawait
it to finish its work and return the result(s). As described here, it is important to protect the entry point of the program to avoid recursive spawning of subprocesses, etc. Basically, your code must be underif __name__ == '__main__'
.import concurrent.futures loop = asyncio.get_running_loop() with concurrent.futures.ProcessPoolExecutor() as pool: res = await loop.run_in_executor(pool, cpu_bound_task, contents)
Again, I'd suggest having a look at the linked answer earlier on how to create a re-usable
ProcessPoolExecutor
at the application startup. You might find this answer helpful as well.Use more workers to take advantage of multi-core CPUs, in order to run multiple processes in parallel and be able to serve more requests. For example,
uvicorn main:app --workers 4
(if you are using Gunicorn as a process manager with Uvicorn workers, please have a look at this answer). When using 1 worker, only one process is run. When using multiple workers, this will spawn multiple processes (all single threaded). Each process has a separate Global Interpreter Lock (GIL), as well as its ownevent loop
, which runs in the main thread of each process and executes all tasks in its thread. That means, there is only one thread that can take a lock on the interpreter of each process; unless, of course, you employ additional threads, either outside or inside theevent loop
, e.g., when using aThreadPoolExecutor
withloop.run_in_executor
, or defining endpoints/background tasks/StreamingResponse
's generator with normaldef
instead ofasync def
, as well as when callingUploadFile
's methods (see the first two paragraphs of this answer for more details).Note: Each worker "has its own things, variables and memory". This means that
global
variables/objects, etc., won't be shared across the processes/workers. In this case, you should consider using a database storage, or Key-Value stores (Caches), as described here and here. Additionally, note that "if you are consuming a large amount of memory in your code, each process will consume an equivalent amount of memory".If you need to perform heavy background computation and you don't necessarily need it to be run by the same process (for example, you don't need to share memory, variables, etc), you might benefit from using other bigger tools like Celery, as described in FastAPI's documentation.