FastAPI ejecuta llamadas api en serie en lugar de en paralelo

Resuelto Learning from masters asked hace 2 años • 2 respuestas

Tengo el siguiente código:

import time
from fastapi import FastAPI, Request
    
app = FastAPI()
    
@app.get("/ping")
async def ping(request: Request):
        print("Hello")
        time.sleep(5)
        print("bye")
        return {"ping": "pong!"}

Si ejecuto mi código en localhost, por ejemplo, http://localhost:8501/pingen diferentes pestañas de la misma ventana del navegador, obtengo:

Hello
bye
Hello
bye

en lugar de:

Hello
Hello
bye
bye

He leído sobre el uso httpx, pero aún así no puedo tener una verdadera paralelización. ¿Cuál es el problema?

Learning from masters avatar Mar 17 '22 23:03 Learning from masters
Aceptado

Según la documentación de FastAPI :

Cuando declara una función de operación de ruta con normal defen lugar de async def, se ejecuta en un grupo de subprocesos externo que luego se awaited , en lugar de llamarse directamente (ya que bloquearía el servidor).

también, como se describe aquí :

Si está utilizando una biblioteca de terceros que se comunica con algo (una base de datos, una API, el sistema de archivos, etc.) y no tiene soporte para usar await(este es actualmente el caso de la mayoría de las bibliotecas de bases de datos), entonces declare su La operación de ruta funciona normalmente, con solo def.

Si su aplicación (de alguna manera) no tiene que comunicarse con nada más y esperar a que responda, use async def.

Si no lo sabes, usa normal def.

Nota : Puede mezclar defy async defen su ruta funciones de operación tantas como necesite y definir cada una usando la mejor opción para usted. FastAPI hará lo correcto con ellos.

De todos modos, en cualquiera de los casos anteriores, FastAPI seguirá funcionando de forma asíncrona y será extremadamente rápido.

Pero si sigue los pasos anteriores, podrá realizar algunas optimizaciones de rendimiento.

Por lo tanto, deflos puntos finales (en el contexto de la programación asincrónica, una función definida con just defse llama función sincrónica ), en FastAPI, se ejecutan en un subproceso separado de un grupo de subprocesos externo que luego se awaitedita y, por lo tanto, FastAPI seguirá funcionando de forma asincrónica . En otras palabras, el servidor procesará las solicitudes a dichos puntos finales al mismo tiempo . Mientras que async deflos puntos finales se ejecutan directamente en el event loopsubproceso principal (único), es decir, el servidor también procesará solicitudes a dichos puntos finales de forma concurrente / asincrónica , siempre que haya una awaitllamada a E/S sin bloqueo. operaciones dentro de dichos async defpuntos finales/rutas, como esperar a que (1) se envíen datos del cliente a través de la red, (2) se lea el contenido de un archivo en el disco, (3) finalice una operación de base de datos, etc. , (mira aquí ). Sin embargo, si un punto final definido con async defno lo hace awaitpara alguna rutina interna, con el fin de dar tiempo para que se event loopejecuten otras tareas en el (por ejemplo, solicitudes al mismo u otros puntos finales, tareas en segundo plano, etc.), cada solicitud a dicho un punto final tendrá que estar completamente terminado (es decir, salir del punto final), antes de devolver el control event loopy permitir que se ejecuten otras tareas. En otras palabras, en tales casos, el servidor procesará las solicitudes de forma secuencial . Tenga en cuenta que el mismo concepto también se aplica a las funciones definidas con normal defque se utilizan como StreamingResponsegeneradores (ver StreamingResponseimplementación de clase) o Background Tasks(ver BackgroundTaskimplementación de clase), lo que significa que FastAPI, detrás de escena, también ejecutará dichas funciones en un hilo separado del mismo grupo de subprocesos externo. El número predeterminado de subprocesos de trabajo de ese grupo de subprocesos externo es 40y se puede ajustar según sea necesario; consulte esta respuesta sobre cómo hacerlo. Por lo tanto, después de leer esta respuesta hasta el final, debería poder decidir si debe definir un punto final FastAPI, StreamingResponseun generador o una función de tarea en segundo plano con defo async def.

La palabra clave await(que solo funciona dentro de una async deffunción) devuelve el control de la función al archivo event loop. En otras palabras, suspende la ejecución de la rutina circundante (es decir, un objeto de rutina es el resultado de llamar a una async deffunción) y le dice event loopque deje ejecutar alguna otra tarea, hasta que awaitse complete esa tarea. Tenga en cuenta que el hecho de que pueda definir una función personalizada con async defy luego awaitdentro de su async defpunto final, no significa que su código funcionará de forma asíncrona, si esa función personalizada contiene, por ejemplo, llamadas a time.sleep()tareas vinculadas a la CPU, no asíncronas. Bibliotecas de E/S o cualquier otra llamada de bloqueo que sea incompatible con el código Python asíncrono. En FastAPI, por ejemplo, cuando se utilizan los asyncmétodos de UploadFile, como await file.read()y await file.write(), FastAPI/Starlette, detrás de escena, en realidad llama a los métodos de archivo sincrónicos correspondientes en un subproceso separado del grupo de subprocesos externo descrito anteriormente (usando la función) y lo envía; de lo contrario, dichos métodos/operaciones bloquearían la clase. Puede obtener más información observando la implementación de la clase .async run_in_threadpool()awaitevent loopUploadFile

Tenga en cuenta que eso asyncno significa paralelo , sino simultáneamente . Como se mencionó anteriormente, el código asincrónico con asyncy awaitmuchas veces se resume como el uso de corrutinas . Las corrutinas son colaborativas (o multitarea cooperativa ), lo que significa que "en un momento dado, un programa con corrutinas ejecuta solo una de sus corrutinas, y esta corrutina en ejecución suspende su ejecución solo cuando solicita explícitamente ser suspendida" (ver aquí y aquí para obtener más información sobre corrutinas). Como se describe en este artículo :

Específicamente, cada vez que la ejecución de una corrutina actualmente en ejecución alcanza una awaitexpresión, la corrutina puede suspenderse y otra corrutina previamente suspendida puede reanudar la ejecución si aquello en lo que se suspendió ha devuelto un valor desde entonces. La suspensión también puede ocurrir cuando un async forbloque solicita el siguiente valor de un iterador asincrónico o cuando async withse ingresa o se sale de un bloque, como se usa awaiten estas operaciones.

Sin embargo, si una operación de bloqueo vinculada a E/S o vinculada a la CPU se ejecutara/llamara directamenteasync def dentro de una función/punto final, entonces bloquearía el bucle de eventos y, por lo tanto, el subproceso principal (las event loopejecuciones en el subproceso principal de un proceso/trabajador). Por lo tanto, una operación de bloqueo, como time.sleep()en un async defpunto final, bloquearía todo el servidor (como en el ejemplo de código proporcionado en su pregunta). Por lo tanto, si su punto final no va a realizar ninguna asyncllamada, puede declararlo con normal def, en cuyo caso, FastAPI lo ejecutará en un subproceso separado del grupo de subprocesos externo y await, como se explicó anteriormente (se brindan más soluciones en el siguientes apartados). Ejemplo:

@app.get("/ping")
def ping(request: Request):
    #print(request.client)
    print("Hello")
    time.sleep(5)
    print("bye")
    return "pong"

De lo contrario, si las funciones que tuvo que ejecutar dentro del punto final son asyncfunciones que tuvo que ejecutar await, debe definir su punto final con async def. Para demostrar esto, el siguiente ejemplo utiliza la asyncio.sleep()función (de la asynciobiblioteca), que proporciona una operación de suspensión sin bloqueo. El await asyncio.sleep()método suspenderá la ejecución de la rutina circundante (hasta que se complete la operación de suspensión), permitiendo así que se event loopejecuten otras tareas en el. Se dan ejemplos similares aquí y aquí también.

import asyncio
 
@app.get("/ping")
async def ping(request: Request):
    #print(request.client)
    print("Hello")
    await asyncio.sleep(5)
    print("bye")
    return "pong"

Los dos puntos finales anteriores imprimirán los mensajes especificados en la pantalla en el mismo orden mencionado en su pregunta, si dos solicitudes llegaron (aproximadamente) al mismo tiempo, es decir:

Hello
Hello
bye
bye

Nota IMPORTANTE

Cuando utilice un navegador para llamar al mismo punto final por segunda (tercera, y así sucesivamente), recuerde hacerlo desde una pestaña que esté aislada de la sesión principal del navegador ; de lo contrario, las solicitudes posteriores (es decir, las que siguen a la primera) podrían ser bloqueadas por el navegador (en el lado del cliente ), ya que el navegador podría estar esperando una respuesta a la solicitud anterior del servidor, antes de enviar la siguiente solicitud. Este es un comportamiento común al menos para el navegador Chrome, debido a esperar a ver el resultado de una solicitud y comprobar si el resultado se puede almacenar en caché, antes de volver a solicitar el mismo recurso.

Puede confirmarlo usando print(request.client)el interior del punto final, donde verá que el número hostnamey portes el mismo para todas las solicitudes entrantes, en caso de que las solicitudes se iniciaron desde pestañas abiertas en la misma ventana/sesión del navegador; de lo contrario, el portnúmero normalmente sería diferente para cada solicitud y, por lo tanto, esas solicitudes serían procesadas secuencialmente por el servidor, debido a que el navegador/cliente las envía secuencialmente en primer lugar. Para superar esto, podrías:

  1. Vuelva a cargar la misma pestaña (como se está ejecutando), o

  2. Abrir una nueva pestaña en una ventana de incógnito, o

  3. Utilice un navegador/cliente diferente para enviar la solicitud, o

  4. Utilice la httpxbiblioteca para realizar solicitudes HTTP asincrónicas , junto con awaitable asyncio.gather() , que permite ejecutar múltiples operaciones asincrónicas simultáneamente y luego devuelve una lista de resultados en el mismo orden en que se pasaron las awaitables (tareas) a esa función (eche un vistazo a esta respuesta para más detalles).

    Ejemplo :

    import httpx
    import asyncio
    
    URLS = ['http://127.0.0.1:8000/ping'] * 2
    
    async def send(url, client):
        return await client.get(url, timeout=10)
    
    async def main():
        async with httpx.AsyncClient() as client:
            tasks = [send(url, client) for url in URLS]
            responses = await asyncio.gather(*tasks)
            print(*[r.json() for r in responses], sep='\n')
    
    asyncio.run(main())
    

    En caso de que haya tenido que llamar a diferentes puntos finales que pueden tardar diferentes tiempos en procesar una solicitud y desee imprimir la respuesta en el lado del cliente tan pronto como la devuelva el servidor, en lugar de esperar a asyncio.gather()recopilar los resultados de todos tareas e imprimirlas en el mismo orden en que se pasaron las tareas a la send()función; puede reemplazar la send()función del ejemplo anterior con la que se muestra a continuación:

    async def send(url, client):
        res = await client.get(url, timeout=10)
        print(res.json())
        return res
    

Async/ awaity bloqueo de operaciones vinculadas a E/S o CPU

Si necesita definir un punto final async def(como podría necesitarlo awaitpara algunas rutinas dentro de él), pero también tiene alguna operación de bloqueo síncrono vinculada a E/S o vinculada a la CPU (tarea computacionalmente intensiva) que bloquearía event loop(esencialmente, todo el servidor) y no dejaría pasar otras solicitudes, por ejemplo:

@app.post("/ping")
async def ping(file: UploadFile = File(...)):
    print("Hello")
    try:
        contents = await file.read()
        res = cpu_bound_task(contents)  # this will block the event loop
    finally:
        await file.close()
    print("bye")
    return "pong"

entonces:

  1. Debe verificar si puede cambiar la definición de su punto final a normal defen lugar de async def. Por ejemplo, si el único método en su punto final que debe esperarse es el que lee el contenido del archivo (como mencionó en la sección de comentarios a continuación), podría declarar el tipo de parámetro del punto final como (es decir, bytes) file: bytes = File()y por lo tanto , FastAPI leerá el archivo por usted y usted recibirá el contenido como bytes. Por lo tanto, no habría necesidad de utilizar await file.read(). Tenga en cuenta que el enfoque anterior debería funcionar para archivos pequeños, ya que todo el contenido del archivo se almacenaría en la memoria (consulte la documentación en FileParámetros ); y por lo tanto, si tu sistema no tiene suficiente RAM disponible para acomodar los datos acumulados (si, por ejemplo, tienes 8 GB de RAM, no puedes cargar un archivo de 50 GB), tu aplicación puede terminar fallando. Alternativamente, puede llamar al .read()método SpooledTemporaryFiledirectamente (al que se puede acceder a través del .fileatributo del UploadFileobjeto), de modo que nuevamente no tenga que usar awaitel .read()método y, como ahora puede declarar su punto final con normal def, cada solicitud se ejecutará. en un hilo separado (se proporciona un ejemplo a continuación). Para obtener más detalles sobre cómo cargar un archivo Filey cómo se utiliza Starlette/FastAPI SpooledTemporaryFiledetrás de escena, consulte esta respuesta y esta respuesta .

    @app.post("/ping")
    def ping(file: UploadFile = File(...)):
        print("Hello")
        try:
            contents = file.file.read()
            res = cpu_bound_task(contents)
        finally:
            file.file.close()
        print("bye")
        return "pong"
    
  2. Utilice la función FastAPI (Starlette) run_in_threadpool()del concurrencymódulo, como sugirió @tiangolo aquí , que "ejecutará la función en un hilo separado para garantizar que el hilo principal (donde se ejecutan las rutinas) no se bloquee" (ver aquí ). Como lo describe @tiangolo aquí , " run_in_threadpooles una awaitfunción capaz; el primer parámetro es una función normal, los siguientes parámetros se pasan directamente a esa función. Admite argumentos de secuencia y argumentos de palabras clave ".

    from fastapi.concurrency import run_in_threadpool
    
    res = await run_in_threadpool(cpu_bound_task, contents)
    
  3. Alternativamente, use asyncio's loop.run_in_executor()—después de obtener el event loopuso en ejecución— asyncio.get_running_loop()para ejecutar la tarea, que, en este caso, puede awaitcompletar y devolver los resultados, antes de pasar a la siguiente línea de código. Pasando Noneal argumento del ejecutor , se utilizará el ejecutor predeterminado ; el cual es un ThreadPoolExecutor:

    import asyncio
    
    loop = asyncio.get_running_loop()
    res = await loop.run_in_executor(None, cpu_bound_task, contents)
    

    o, si en su lugar desea pasar argumentos de palabras clave , puede usar una lambdaexpresión (p. ej., lambda: cpu_bound_task(some_arg=contents)) o, preferiblemente, functools.partial(), que se recomienda específicamente en la documentación para loop.run_in_executor():

    import asyncio
    from functools import partial
    
    loop = asyncio.get_running_loop()
    res = await loop.run_in_executor(None, partial(cpu_bound_task, some_arg=contents))
    

    En Python 3.9+, también se puede utilizar asyncio.to_thread()para ejecutar de forma asincrónica una función sincrónica en un subproceso separado, que, esencialmente, se usa await loop.run_in_executor(None, func_call)de forma interna, como se puede ver en la implementación deasyncio.to_thread() . La to_thread()función toma el nombre de una función de bloqueo para ejecutar, así como cualquier argumento ( *argsy/o **kwargs) de la función, y luego devuelve una corrutina que se puede awaiteditar. Ejemplo:

    import asyncio
    
    res = await asyncio.to_thread(cpu_bound_task, contents)
    

    Tenga en cuenta que, como se explica en esta respuesta , pasar Noneel executorargumento no crea uno nuevo ThreadPoolExecutorcada vez que llama await loop.run_in_executor(None, ...), sino que reutiliza el ejecutor predeterminado con el número predeterminado de subprocesos de trabajo (es decir, min(32, os.cpu_count() + 4)). Por lo tanto, dependiendo de los requisitos de su aplicación, ese número puede ser bastante bajo. En ese caso, deberías utilizar un archivo personalizado ThreadPoolExecutor. Por ejemplo:

    import asyncio
    import concurrent.futures
    
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        res = await loop.run_in_executor(pool, cpu_bound_task, contents)
    

    Recomiendo encarecidamente echar un vistazo a la respuesta vinculada anterior para conocer la diferencia entre usar run_in_threadpool()y run_in_executor(), así como también cómo crear una personalización reutilizable ThreadPoolExecutoral inicio de la aplicación y ajustar la cantidad máxima de subprocesos de trabajo según sea necesario.

  4. ThreadPoolExecutor will successfully prevent the event loop from being blocked, but won't give you the performance improvement you would expect from running code in parallel; especially, when one needs to perform CPU-bound tasks, such as the ones described here (e.g., audio or image processing, machine learning, and so on). It is thus preferable to run CPU-bound tasks in a separate process—using ProcessPoolExecutor, as shown below—which, again, you can integrate with asyncio, in order to await it to finish its work and return the result(s). As described here, it is important to protect the entry point of the program to avoid recursive spawning of subprocesses, etc. Basically, your code must be under if __name__ == '__main__'.

    import concurrent.futures
    
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        res = await loop.run_in_executor(pool, cpu_bound_task, contents) 
    

    Again, I'd suggest having a look at the linked answer earlier on how to create a re-usable ProcessPoolExecutor at the application startup. You might find this answer helpful as well.

  5. Use more workers to take advantage of multi-core CPUs, in order to run multiple processes in parallel and be able to serve more requests. For example, uvicorn main:app --workers 4 (if you are using Gunicorn as a process manager with Uvicorn workers, please have a look at this answer). When using 1 worker, only one process is run. When using multiple workers, this will spawn multiple processes (all single threaded). Each process has a separate Global Interpreter Lock (GIL), as well as its own event loop, which runs in the main thread of each process and executes all tasks in its thread. That means, there is only one thread that can take a lock on the interpreter of each process; unless, of course, you employ additional threads, either outside or inside the event loop, e.g., when using a ThreadPoolExecutor with loop.run_in_executor, or defining endpoints/background tasks/StreamingResponse's generator with normal def instead of async def, as well as when calling UploadFile's methods (see the first two paragraphs of this answer for more details).

    Note: Each worker "has its own things, variables and memory". This means that global variables/objects, etc., won't be shared across the processes/workers. In this case, you should consider using a database storage, or Key-Value stores (Caches), as described here and here. Additionally, note that "if you are consuming a large amount of memory in your code, each process will consume an equivalent amount of memory".

  6. If you need to perform heavy background computation and you don't necessarily need it to be run by the same process (for example, you don't need to share memory, variables, etc), you might benefit from using other bigger tools like Celery, as described in FastAPI's documentation.

Chris avatar Mar 17 '2022 19:03 Chris