Airflow - Xcom Exclusive Better
Remember: XCom is a messenger, not a data lake. Keep your messages brief, and your pipelines will thank you.
XCom data accumulates rapidly, leading to performance bottlenecks. Implement a maintenance DAG that runs weekly to purge expired or non-essential XCom rows directly from the metadata database using the SecretKeeper pattern or standard SQLAlchemy cleanup tasks:
| | Don't | | :--- | :--- | | Pass small metadata values (status, paths, IDs, counts) | Pass large datasets, DataFrames, or binary blobs | | Use the TaskFlow API for cleaner, less error‑prone code | Use XComs as a replacement for a shared data lake | | Set up a custom backend if your data exceeds 48KB | Push hundreds of XComs per DAG run | | Always test XCom values for JSON serializability | Rely on pickling or custom serialization unless absolutely necessary | | Explicitly specify task_ids when pulling | Assume default xcom_pull() behavior is constant across versions | | Store large payloads externally (S3, GCS) and pass the URI | Use XComs for state that must survive task retries | airflow xcom exclusive
# Upstream Task def push_metadata(**kwargs): kwargs['ti'].xcom_push(key='model_accuracy', value=0.94) # Downstream Task def pull_metadata(**kwargs): accuracy = kwargs['ti'].xcom_pull(task_ids='push_metadata_task', key='model_accuracy') print(f"Model accuracy received: accuracy") Use code with caution. Implicit (TaskFlow API)
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def push_function(**context): # This value is automatically pushed to XCom return "secret_data_123" def pull_function(**context): ti = context['ti'] # Pull the value from the task 'push_task' value = ti.xcom_pull(task_ids='push_task') print(f"Pulled value: value") with DAG('xcom_traditional_example', start_date=datetime(2023,1,1), schedule=None) as dag: push_task = PythonOperator( task_id='push_task', python_callable=push_function ) pull_task = PythonOperator( task_id='pull_task', python_callable=pull_function ) push_task >> pull_task Use code with caution. B. The TaskFlow API Approach (Recommended) Remember: XCom is a messenger, not a data lake
Airflow 2.0 introduced the ability to swap the XCom backend. This changes the game regarding the "Size Limit" constraint mentioned above.
In enterprise data platforms, multi-tenant environments demand strict data isolation. "Exclusive XComs" refers to architectural patterns ensuring that specific data payloads are accessible only to authorized downstream tasks, preventing data leaks, race conditions, or accidental overwrites across parallel DAG runs. 1. Preventing Cross-DAG Contamination Implement a maintenance DAG that runs weekly to
High-volume XCom usage can significantly slow down your scheduler and database. Best Practice: The "Reference" Pattern
Using the execution context or task instance ( ti ) object directly within your operators.
def extract(**context): context['ti'].xcom_push(key='user_id', value=42) return "raw": "data"
# Set XCom backend to use object storage AIRFLOW__CORE__XCOM_BACKEND='airflow.providers.common.io.xcom.backend.XComObjectStorageBackend'