第六章:变量与连接¶
Variables¶
创建变量¶
from airflow.models import Variable
# 设置变量
Variable.set('my_var', 'my_value')
# 设置 JSON 变量
Variable.set('my_json', {'key1': 'value1', 'key2': 'value2'})
# 设置带描述的变量
Variable.set('my_var', 'my_value', description='My variable description')
获取变量¶
from airflow.models import Variable
# 获取变量
value = Variable.get('my_var')
# 获取变量(带默认值)
value = Variable.get('my_var', default_var='default_value')
# 获取 JSON 变量
json_value = Variable.get('my_json', deserialize_json=True)
# 获取所有变量
all_vars = Variable.get_all()
在 DAG 中使用变量¶
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
def my_function():
# 获取变量
db_host = Variable.get('db_host')
db_port = Variable.get('db_port', default_var='5432')
print(f"Connecting to {db_host}:{db_port}")
with DAG(...) as dag:
task = PythonOperator(
task_id='task',
python_callable=my_function,
)
Jinja 模板中使用变量¶
from airflow.operators.bash import BashOperator
task = BashOperator(
task_id='bash_task',
bash_command='echo "{{ var.value.my_var }}"',
)
Connections¶
创建连接¶
from airflow.models import Connection
from airflow import settings
# 创建连接
conn = Connection(
conn_id='my_connection',
conn_type='postgres',
host='localhost',
port=5432,
login='user',
password='password',
schema='database',
)
# 添加连接
session = settings.Session()
session.add(conn)
session.commit()
获取连接¶
from airflow.hooks.base import BaseHook
# 获取连接
conn = BaseHook.get_connection('my_connection')
# 获取连接参数
host = conn.host
port = conn.port
login = conn.login
password = conn.password
schema = conn.schema
# 获取连接 URI
uri = conn.get_uri()
使用 Hook¶
from airflow.providers.postgres.hooks.postgres import PostgresHook
# 创建 Hook
hook = PostgresHook(postgres_conn_id='my_connection')
# 执行查询
results = hook.get_records('SELECT * FROM users')
# 执行 SQL
hook.run('INSERT INTO users (name) VALUES (%s)', parameters=['John'])
# 获取连接
conn = hook.get_conn()
自定义 Hook¶
from airflow.hooks.base import BaseHook
import requests
class MyCustomHook(BaseHook):
"""
自定义 Hook
"""
def __init__(self, conn_id: str):
super().__init__()
self.conn_id = conn_id
def get_conn(self):
"""
获取连接
"""
conn = self.get_connection(self.conn_id)
return {
'host': conn.host,
'port': conn.port,
'login': conn.login,
'password': conn.password,
}
def make_request(self, endpoint: str):
"""
发送请求
"""
conn = self.get_conn()
url = f"http://{conn['host']}:{conn['port']}/{endpoint}"
response = requests.get(
url,
auth=(conn['login'], conn['password']),
)
return response.json()
Secrets Backend¶
配置 Secrets Backend¶
# airflow.cfg
[secrets]
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {
"url": "http://vault:8200",
"token": "my-token",
"mount_point": "airflow"
}
使用 AWS Secrets Manager¶
# airflow.cfg
[secrets]
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
backend_kwargs = {
"connections_prefix": "airflow/connections",
"variables_prefix": "airflow/variables"
}
使用 GCP Secret Manager¶
# airflow.cfg
[secrets]
backend = airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend
backend_kwargs = {
"connections_prefix": "airflow-connections",
"variables_prefix": "airflow-variables"
}
XCom¶
推送 XCom¶
from airflow.operators.python import PythonOperator
def push_function(**context):
# 推送 XCom
context['ti'].xcom_push(key='my_key', value='my_value')
# 返回值自动推送到 XCom
return 'return_value'
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True,
)
拉取 XCom¶
from airflow.operators.python import PythonOperator
def pull_function(**context):
# 拉取 XCom
value = context['ti'].xcom_pull(
task_ids='push_task',
key='my_key',
)
# 拉取返回值
return_value = context['ti'].xcom_pull(
task_ids='push_task',
)
print(f"Value: {value}")
print(f"Return value: {return_value}")
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
)
Jinja 模板中使用 XCom¶
from airflow.operators.bash import BashOperator
task = BashOperator(
task_id='bash_task',
bash_command='echo "{{ ti.xcom_pull(task_ids="push_task", key="my_key") }}"',
)
小结¶
变量与连接要点:
- Variables:创建、获取、使用
- Connections:创建、获取、Hook
- Secrets Backend:Vault、AWS、GCP
- XCom:推送、拉取、模板使用
下一章我们将学习调度与触发。