No se pueden crear plantillas de campos BaseOperator: task_id

Resuelto Petr Karol asked hace 9 meses • 0 respuestas

Actualmente, nuestro equipo está intentando actualizar Airflow de la versión 2.2.5 a 2.6.3 y encontramos un problema con la generación de DAG.

El error al que nos enfrentamos es el siguiente:

airflow.exceptions.AirflowException: Cannot template BaseOperator fields: task_id

A continuación se muestra un ejemplo de un DAG de prueba que desencadena el error:

DAG_ID="TestDAG"
with models.DAG(dag_id=DAG_ID,
                description="Description",
                default_args={**default_dag_settings, **{"catchup": False,
                                                         "start_date": datetime(2023, 5, 23)}},
                schedule_interval="0 5 * * *",
                doc_md=get_doc_dags(DAG_ID)) as dag:

    my_task = YamlOperator(
        task_id="task_id",
        trigger=TriggerETLConfigUrl(
            "{{ ds }}",
            GCSLoader(
                file_path="file_path").url(),
            "{{ ds }}",
        ),
        dag=dag
    )

La clase YamlOperator tiene el siguiente método de inicio :

class YamlOperator(BaseOperator):
    
    template_fields = ["args", "task_id", "arguments"]
    trigger = None

    def __init__(self,
                 task_id: str,
                 trigger: BaseTrigger,
                 files=None,
                 arguments=None,
                 *args, **kwargs
                 ):
        super(YamlOperator, self).__init__(task_id=task_id, *args, **kwargs)
        self.task_id = task_id
        self.trigger = trigger
        self.files = files
        self.args = self.trigger.command_line_args()
        self.arguments = arguments

¿Podría proporcionarnos información sobre por qué podría estar ocurriendo este problema?

Por favor, ayúdame a solucionarlo.

Petr Karol avatar Feb 16 '24 21:02 Petr Karol
Aceptado

Los parámetros de BaseOperator no se pueden crear plantillas. La excepción que está viendo es una verificación explícita para prohibir la creación de plantillas para esos campos específicamente.

Los campos de plantilla sólo son útiles para la ejecución de tareas. Las plantillas/expresiones de Jinja se evalúan justo antes de ejecutar una tarea.

El Programador, DAGFileProcessor y otros componentes necesitan conocer el valor de los parámetros de BaseOperator antes de la ejecución de la tarea por muchas razones. Por ejemplo, con task_id, existen comprobaciones de análisis para garantizar que no haya ID de tareas duplicadas dentro de los DAG. El valor real de task_idlas necesidades debe conocerse fuera de la ejecución de la tarea. Por lo tanto, la representación de las expresiones Jinja no se produciría para que la verificación se realice correctamente.

Escenarios como el descrito anteriormente y otros relacionados con la programación y la configuración (como pool, queuey similares) es la razón por la que esos tipos de parámetros no se pueden crear con plantillas.

Josh Fell avatar Feb 16 '2024 14:02 Josh Fell