在Airflow中編寫執行MySQL查詢的任務可以通過使用PythonOperator
來執行查詢的Python函數。以下是一個簡單的示例:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mysql_operator import MySqlOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1
}
dag = DAG('mysql_query_dag', default_args=default_args, schedule_interval='@daily')
def execute_mysql_query():
# 連接MySQL數據庫
conn = MySQLdb.connect(host="localhost", user="root", passwd="password", db="database")
cursor = conn.cursor()
# 執行查詢
cursor.execute("SELECT * FROM table")
# 獲取結果
rows = cursor.fetchall()
for row in rows:
print(row)
# 關閉連接
conn.close()
mysql_task = PythonOperator(
task_id='execute_mysql_query',
python_callable=execute_mysql_query,
dag=dag
)
mysql_task
在這個例子中,首先創建了一個DAG,并定義了一個Python函數execute_mysql_query
,該函數連接到MySQL數據庫,執行查詢并打印結果。然后使用PythonOperator
來執行這個函數,并將其添加到DAG中。當DAG運行時,該任務將連接到MySQL數據庫執行查詢。