Airflow 3
https://airflow.apache.org/docs/docker-stack/index.html
Start Airflow standalone with Docker
-
Create develop folder
- Git clone from https://github.com/uuboyscy/airflow-demo
git clone --branch airflow3 --depth 1 https://github.com/uuboyscy/airflow-demo.git
- Git clone from https://github.com/uuboyscy/airflow-demo
-
Start via Docker Container
-
Mac / Linux
docker run -it -d \
--name airflow3-server \
-p 8080:8080 \
-v $PWD/dags:/opt/airflow/dags \
-v $PWD/logs:/opt/airflow/logs \
-v $PWD/utils:/opt/airflow/utils \
-v $PWD/tasks:/opt/airflow/tasks \
-e PYTHONPATH=/opt/airflow \
apache/airflow:3.0.3-python3.11 airflow standalone -
Windows
docker run -it -d `
--name airflow3-server `
-p 8080:8080 `
-v "${PWD}/dags:/opt/airflow/dags" `
-v "${PWD}/logs:/opt/airflow/logs" `
-v "${PWD}/utils:/opt/airflow/utils" `
-v "${PWD}/tasks:/opt/airflow/tasks" `
-e PYTHONPATH=/opt/airflow `
apache/airflow:3.0.3-python3.11 airflow standalone
-
-
Get login credentials
- Execute into container
docker exec -it airflow3-server /bin/bash - Find the auto-generated password inside the container
cat /opt/airflow/simple_auth_manager_passwords.json.generated
- Execute into container
In Airflow 3, there is no need to create a user manually. The password is auto-generated at startup.
Quick start
Decorator
In Airflow 3, the import path changed. Use
from airflow.sdk import dag, taskinstead offrom airflow.decorators import dag, task.
d_04_example_dag_decorator.py
import pendulum
from airflow.sdk import dag, task
# Define the DAG
@dag(
schedule="* 1-2,7-8 * * *",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def d_04_example_dag_decorator():
@task
def task1():
print("Running Task 1")
@task
def task2():
print("Running Task 2")
@task.bash
def task3():
return "echo 'Hello from Task 3!'"
t1 = task1()
task2().set_upstream(t1)
task3().set_upstream(t1)
d_04_example_dag_decorator()
In Airflow 3, use
@task.bashinstead of@bash_taskfor bash tasks.
Pass parameters
d_05_example_pass_parameters_decorator.py
from datetime import datetime, timedelta
from airflow.sdk import dag, task
# Default arguments for the DAG
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
@dag(
dag_id="d_05_example_pass_parameters_decorator",
default_args=default_args,
description="An example DAG with Python operators",
schedule="*/15 * * * *",
start_date=datetime(2023, 1, 1),
catchup=False,
tags=["example", "decorator"]
)
def d_05_example_pass_parameters_decorator():
@task
def task1():
print("Running Task 1")
return "Hello from Task 1!"
@task
def task2(returned_value_from_task1):
print("Running Task 2")
print(f"Received from Task 1: {returned_value_from_task1}")
# Task dependencies defined by calling the tasks in sequence
result_from_task1 = task1()
task2(result_from_task1)
# Instantiate the DAG
d_05_example_pass_parameters_decorator()
d_06_example_pass_parameters_cli.py
"""
CLI command:
airflow dags trigger -c '{"start_date": "2024-01-01", "end_date": "2024-03-01"}' d_06_example_pass_parameters_cli
airflow dags test -c '{"start_date": "2024-01-01", "end_date": "2024-03-01"}' d_06_example_pass_parameters_cli
"""
from datetime import datetime
from airflow.sdk import dag, task
# Default values for start_date and end_date
DEFAULT_START_DATE = "2023-01-01"
DEFAULT_END_DATE = "2023-12-31"
@dag(
dag_id="d_06_example_pass_parameters_cli",
schedule=None, # Set to None for manual triggering
start_date=datetime(2023, 1, 1),
catchup=False,
tags=["example", "cli_variables_defaults"]
)
def d_06_example_pass_parameters_cli():
@task
def extract_parameters(ti):
config = ti.xcom_pull(task_ids='trigger', key='return_value') or {}
# Get parameters with defaults if not provided via CLI
start_date = config.get('start_date', DEFAULT_START_DATE)
end_date = config.get('end_date', DEFAULT_END_DATE)
return {"start_date": start_date, "end_date": end_date}
@task
def start_task(date_params):
print(f"Task runs with a start date of {date_params.get('start_date')} and end date of {date_params.get('end_date')}")
date_params = extract_parameters()
start_task(date_params)
# Create the DAG instance
d_06_example_pass_parameters_cli()
Data Pipeline
d_07_example_data_pipeline.py
from datetime import datetime, timedelta
import pandas as pd
from airflow.sdk import dag, task
# Default arguments for the DAG
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
@dag(
dag_id="d_07_example_data_pipeline",
default_args=default_args,
description="An example DAG with Python operators",
schedule="* 10 10 * *",
start_date=datetime(2023, 1, 1),
catchup=False,
tags=["example", "decorator"]
)
def d_07_example_data_pipeline():
@task
def e_data_source_1() -> pd.DataFrame:
print("Getting df1.")
return pd.DataFrame(data=[[1], [2]], columns=["col"])
@task
def e_data_source_2() -> pd.DataFrame:
print("Getting df2.")
return pd.DataFrame(data=[[3], [4]], columns=["col"])
@task
def t_concat(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
print("Concating df1 and df2.")
return pd.concat([df1, df2]).reset_index(drop=True)
@task
def l_db1(df: pd.DataFrame) -> None:
print("Loading df to db1.")
print(df)
print("===============")
@task
def l_db2(df: pd.DataFrame) -> None:
print("Loading df to db2.")
print(df)
print("===============")
# Task dependencies defined by calling the tasks in sequence
df1 = e_data_source_1()
df2 = e_data_source_2()
df = t_concat(df1, df2)
l_db1(df)
l_db2(df)
# Instantiate the DAG
d_07_example_data_pipeline()
Notification
Try import from utils
utils/testutils.py
def testfunc():
print("===========")
print("test")
print("===========")
d_08_example_import_utils.py
from datetime import datetime, timedelta
from airflow.sdk import dag, task
from utils.testutils import testfunc
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
@dag(
dag_id="d_08_example_import_utils",
default_args=default_args,
description="An example DAG with Python operators",
schedule="*/30 * * * *",
start_date=datetime(2023, 1, 1),
catchup=False,
tags=["example", "decorator"]
)
def d_08_example_import_utils():
@task
def call_custom_utils() -> None:
testfunc()
# Task dependencies defined by calling the tasks in sequence
call_custom_utils()
# Instantiate the DAG
d_08_example_import_utils()
Implement
utils/my_notifier.py
import time
from airflow.notifications.basenotifier import BaseNotifier
def send_message(title, message) -> None:
print(title)
print(message)
print("Message sent.")
with open(f"/tmp/test_notification_message_{time.time_ns()}.txt", "w") as f:
f.write(f"{title}\n=====\n{message}\n")
class MyNotifier(BaseNotifier):
template_fields = ("message",)
def __init__(self, message):
self.message = message
def notify(self, context):
# Send notification here, below is an example
title = f"Task {context['task_instance'].task_id} failed"
send_message(title, self.message)
Apply
d_09_example_notification.py
import random
from datetime import datetime, timedelta
from airflow.sdk import dag, task
from utils.my_notifier import MyNotifier
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
@dag(
dag_id="d_09_example_notification",
default_args=default_args,
description="An example DAG with Python operators",
schedule="0,10,50 * * * *",
start_date=datetime(2023, 1, 1),
on_success_callback=MyNotifier(message="Success!"),
on_failure_callback=MyNotifier(message="Failure!"),
catchup=False,
tags=["example", "decorator"]
)
def d_09_example_notification():
@task
def success_or_failure() -> None:
if random.randint(0, 1) == 0:
print("Failure!")
raise
else:
print("Success!")
success_or_failure()
# Instantiate the DAG
d_09_example_notification()
Manage tasks
Import tasks
tasks/test_tasks.py
from airflow.sdk import task
@task
def do_something(some_str: str) -> list[str]:
return list(some_str)
d_10_example_import_tasks.py
import random
from datetime import datetime, timedelta
from airflow.sdk import dag, task
from tasks.test_tasks import do_something
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
@dag(
dag_id="d_10_example_import_tasks",
default_args=default_args,
description="An example DAG with Python operators",
schedule="*/20 * * * *",
start_date=datetime(2023, 1, 1),
catchup=False,
)
def d_10_example_import_tasks():
@task
def generate_some_str() -> str:
return "HELLO"
@task
def print_something(something: str | list) -> None:
print("======")
print(something)
print("======")
some_str = generate_some_str()
result = do_something(some_str)
print_something(result)
# Instantiate the DAG
d_10_example_import_tasks()
TaskGroup
d_11_example_import_taskgroup.py
from datetime import datetime, timedelta
from airflow.sdk import dag, task
from airflow.utils.task_group import TaskGroup
from tasks.test_tasks import do_something
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
@dag(
dag_id="d_11_example_import_taskgroup",
default_args=default_args,
description="An example DAG with Python operators",
schedule="*/5 * * * *",
start_date=datetime(2023, 1, 1),
catchup=False,
)
def d_11_example_import_taskgroup():
@task
def generate_some_str() -> str:
return "HELLO"
@task
def print_something_separately(something: str | list) -> None:
print("======")
print(something)
print("======")
some_str = generate_some_str()
result = do_something(some_str)
with TaskGroup(group_id='do_something_task_group') as do_something_task_group:
# Dynamically create task instances for processing
print_something_separately.expand(something=result)
# Instantiate the DAG
d_11_example_import_taskgroup()