跳转至

第六章:变量与连接

Variables

创建变量

from airflow.models import Variable

# 设置变量
Variable.set('my_var', 'my_value')

# 设置 JSON 变量
Variable.set('my_json', {'key1': 'value1', 'key2': 'value2'})

# 设置带描述的变量
Variable.set('my_var', 'my_value', description='My variable description')

获取变量

from airflow.models import Variable

# 获取变量
value = Variable.get('my_var')

# 获取变量(带默认值)
value = Variable.get('my_var', default_var='default_value')

# 获取 JSON 变量
json_value = Variable.get('my_json', deserialize_json=True)

# 获取所有变量
all_vars = Variable.get_all()

在 DAG 中使用变量

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator

def my_function():
    # 获取变量
    db_host = Variable.get('db_host')
    db_port = Variable.get('db_port', default_var='5432')

    print(f"Connecting to {db_host}:{db_port}")

with DAG(...) as dag:
    task = PythonOperator(
        task_id='task',
        python_callable=my_function,
    )

Jinja 模板中使用变量

from airflow.operators.bash import BashOperator

task = BashOperator(
    task_id='bash_task',
    bash_command='echo "{{ var.value.my_var }}"',
)

Connections

创建连接

from airflow.models import Connection
from airflow import settings

# 创建连接
conn = Connection(
    conn_id='my_connection',
    conn_type='postgres',
    host='localhost',
    port=5432,
    login='user',
    password='password',
    schema='database',
)

# 添加连接
session = settings.Session()
session.add(conn)
session.commit()

获取连接

from airflow.hooks.base import BaseHook

# 获取连接
conn = BaseHook.get_connection('my_connection')

# 获取连接参数
host = conn.host
port = conn.port
login = conn.login
password = conn.password
schema = conn.schema

# 获取连接 URI
uri = conn.get_uri()

使用 Hook

from airflow.providers.postgres.hooks.postgres import PostgresHook

# 创建 Hook
hook = PostgresHook(postgres_conn_id='my_connection')

# 执行查询
results = hook.get_records('SELECT * FROM users')

# 执行 SQL
hook.run('INSERT INTO users (name) VALUES (%s)', parameters=['John'])

# 获取连接
conn = hook.get_conn()

自定义 Hook

from airflow.hooks.base import BaseHook
import requests

class MyCustomHook(BaseHook):
    """
    自定义 Hook
    """

    def __init__(self, conn_id: str):
        super().__init__()
        self.conn_id = conn_id

    def get_conn(self):
        """
        获取连接
        """
        conn = self.get_connection(self.conn_id)

        return {
            'host': conn.host,
            'port': conn.port,
            'login': conn.login,
            'password': conn.password,
        }

    def make_request(self, endpoint: str):
        """
        发送请求
        """
        conn = self.get_conn()

        url = f"http://{conn['host']}:{conn['port']}/{endpoint}"

        response = requests.get(
            url,
            auth=(conn['login'], conn['password']),
        )

        return response.json()

Secrets Backend

配置 Secrets Backend

# airflow.cfg
[secrets]
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {
    "url": "http://vault:8200",
    "token": "my-token",
    "mount_point": "airflow"
}

使用 AWS Secrets Manager

# airflow.cfg
[secrets]
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
backend_kwargs = {
    "connections_prefix": "airflow/connections",
    "variables_prefix": "airflow/variables"
}

使用 GCP Secret Manager

# airflow.cfg
[secrets]
backend = airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend
backend_kwargs = {
    "connections_prefix": "airflow-connections",
    "variables_prefix": "airflow-variables"
}

XCom

推送 XCom

from airflow.operators.python import PythonOperator

def push_function(**context):
    # 推送 XCom
    context['ti'].xcom_push(key='my_key', value='my_value')

    # 返回值自动推送到 XCom
    return 'return_value'

push_task = PythonOperator(
    task_id='push_task',
    python_callable=push_function,
    provide_context=True,
)

拉取 XCom

from airflow.operators.python import PythonOperator

def pull_function(**context):
    # 拉取 XCom
    value = context['ti'].xcom_pull(
        task_ids='push_task',
        key='my_key',
    )

    # 拉取返回值
    return_value = context['ti'].xcom_pull(
        task_ids='push_task',
    )

    print(f"Value: {value}")
    print(f"Return value: {return_value}")

pull_task = PythonOperator(
    task_id='pull_task',
    python_callable=pull_function,
    provide_context=True,
)

Jinja 模板中使用 XCom

from airflow.operators.bash import BashOperator

task = BashOperator(
    task_id='bash_task',
    bash_command='echo "{{ ti.xcom_pull(task_ids="push_task", key="my_key") }}"',
)

小结

变量与连接要点:

  • Variables:创建、获取、使用
  • Connections:创建、获取、Hook
  • Secrets Backend:Vault、AWS、GCP
  • XCom:推送、拉取、模板使用

下一章我们将学习调度与触发。