您好,登錄后才能下訂單哦!
airflow 是一個編排、調度和監控workflow的平臺,由Airbnb開源,現在在Apache Software Foundation 孵化。airflow 將workflow編排為由tasks組成的DAGs(有向無環圖),調度器在一組workers上按照指定的依賴關系執行tasks。同時,airflow 提供了豐富的命令行工具和簡單易用的用戶界面以便用戶查看和操作,并且airflow提供了監控和報警系統。
Airflow的調度依賴于crontab命令,與crontab相比airflow可以直觀的看到任務執行情況、任務之間的邏輯依賴關系、可以設定任務出錯時郵件提醒、可以查看任務執行日志。而crontab命令管理的方式存在以下幾方面的弊端:
1、在多任務調度執行的情況下,難以理清任務之間的依賴關系;
2、不便于查看當前執行到哪一個任務;
3、任務執行失敗時不便于查看執行日志,也即不方便定位報錯的任務和錯誤原因;
4、不便于查看調度流下每個任務執行的起止消耗時間,這對于優化task作業是非常重要的;
5、不便于記錄歷史調度任務的執行情況,而這對于優化作業和錯誤排查是很重要的;
未使用airflow | 使用airflow |
---|---|
需要自己添加調度代碼、調試復雜、功能單一、缺乏整體調度能力 | 框架調度,簡單易用,更穩定,功能全面,可以整體調度 |
缺乏圖形化能力,給任務的新增、排查等操作帶來很多困難。特別是當任務較多,結構復雜的時候 | 內置樹狀圖和流程圖,清晰明了的展現任務拓撲結構 |
需要自己添加任務實時監測代碼 | 任務實時狀態返回網頁界面,方便管理和查看 |
任務的各種操作大多需要編碼或命令行完成,不夠高效 | 常見操作方式轉化為圖形化界面,高效清晰 |
需要手動分離調度和業務代碼 | 調度和業務代碼分離,減少耦合性,方便運維和迭代 |
除了以上的優點,工程實踐中有一個不足就是分布式部署有點麻煩,容易出錯。
概要:DAG(Directed Acyclic Graph)是有向無環圖,也稱為有向無循環圖。在Airflow中,一個DAG定義了一個完整的作業。同一個DAG中的所有Task擁有相同的調度時間。
參數:
- dag_id: 唯一識別DAG,方便日后管理
- default_args: 默認參數,如果當前DAG實例的作業沒有配置相應參數,則采用DAG實例的default_args中的相應參數
- schedule_interval: 配置DAG的執行周期,可采用crontab語法
概要:Task為DAG中具體的作業任務,依賴于DAG,也就是必須存在于某個DAG中。Task在DAG中可以配置依賴關系(當然也可以配置跨DAG依賴,但是并不推薦。跨DAG依賴會導致DAG圖的直觀性降低,并給依賴管理帶來麻煩)。
參數:
- dag: 傳遞一個DAG實例,以使當前作業屬于相應DAG
- task_id: 給任務一個標識符(名字),方便日后管理
- owner: 任務的擁有者,方便日后管理
- start_date: 任務的開始時間,即任務將在這個時間點之后開始調度
在配置中,它是作業開始調度時間。而在談論執行狀況時,它是調度開始時間。
調度執行周期。
執行時間。在Airflow中稱為執行時間,但其實它并不是真實的執行時間。
[敲黑板,劃重點]
所以,第一次調度時間:在作業中配置的start_date,且滿足schedule_interval的時間點。記錄的execution_date為作業中配置的start_date的第一個滿足schedule_interval的時間。[舉個例子]
假設我們配置了一個作業的start_date為2019年6月2日,配置的schedule_interval為 00 12 ,那么第一次執行的時間將是2019年6月3日 12點。因此execution_date并不是如期字面說的表示執行時間,真正的執行時間是execution_date所顯示的時間的下一個滿足schedule_interval的時間點。
DAGs: 即有向無環圖(Directed Acyclic Graph),將所有需要運行的tasks按照依賴關系組織起來,描述的是所有tasks執行的順序。
Operators: airflow內置了很多operators,如BashOperator 執行一個bash 命令,PythonOperator 調用任意的Python 函數,EmailOperator 用于發送郵件,HTTPOperator 用于發送HTTP請求, SqlOperator 用于執行SQL命令...同時,用戶可以自定義Operator,這給用戶提供了極大的便利性。可以理解為用戶需要的一個操作,是Airflow提供的類
Tasks: Task 是 Operator的一個實例
Task Instance: 由于Task會被重復調度,每次task的運行就是不同的task instance了。Task instance 有自己的狀態,包括"running", "success", "failed", "skipped", "up for retry"等。
Task Relationships: DAGs中的不同Tasks之間可以有依賴關系
提供web端服務,以及會定時生成子進程去掃描對應的目錄下的dags,并更新數據庫
webserver 提供以下功能:
webserver 守護進程使用 gunicorn 服務器(相當于 java 中的 tomcat )處理并發請求,可通過修改{AIRFLOW_HOME}/airflow.cfg文件中 workers 的值來控制處理并發請求的進程數。
例如:workers = 4 #表示開啟4個gunicorn worker(進程)處理web請求
任務調度服務,周期性地輪詢任務的調度計劃,以確定是否觸發任務執行,根據dags生成任務,并提交到消息中間件隊列中 (redis或rabbitMq)
分布在不同的機器上,作為任務真正的的執行節點。通過監聽消息中間件: redis或rabbitMq 領取任務
當設置 airflow 的 executors 設置為 CeleryExecutor 時才需要開啟 worker 守護進程。推薦你在生產環境使用 CeleryExecutor :executor = CeleryExecutor
監控worker進程的存活性,啟動或關閉worker進程,查看運行的task
默認的端口為 5555,您可以在瀏覽器地址欄中輸入 “http://hostip:5555” 來訪問 flower ,對 celery 消息隊列進行監控。
ETL,是英文 Extract-Transform-Load 的縮寫,用來描述將數據從來源端經過抽取(extract)、轉換(transform)、加載(load)至目的端的過程。ETL一詞較常用在數據倉庫,但其對象并不限于數據倉庫。
Airflow設計時,只是為了很好的處理ETL任務而已,但是其精良的設計,正好可以用來解決任務的各種依賴問題。
通常,在一個運維系統,數據分析系統,或測試系統等大型系統中,我們會有各種各樣的依賴需求。
也許大家會覺得這些是在任務程序中的邏輯需要處理的部分,但是我認為,這些邏輯可以抽象為任務控制邏輯的部分,和實際任務執行邏輯解耦合。
現在讓我們來看下最常用的依賴管理系統,Crontab
在各種系統中,總有些定時任務需要處理,每當在這個時候,我們第一個想到的總是crontab。確實,crontab可以很好的處理定時執行任務的需求,但是對于crontab來說,執行任務,只是調用一個程序如此簡單,而程序中的各種邏輯都不屬于crontab的管轄范圍(很好的遵循了KISS)
所以我們可以抽象的認為:
crontab是一種依賴管理系統,而且只管理時間上的依賴。
Airflow的核心概念,是DAG(有向無環圖),DAG由一個或多個TASK組成,而這個DAG正是解決了上文所說的任務間依賴。Task A 執行完成后才能執行 Task B,多個Task之間的依賴關系可以很好的用DAG表示完善
Airflow完整的支持crontab表達式,也支持直接使用python的datatime表述時間,還可以用datatime的delta表述時間差。這樣可以解決任務的時間依賴問題。
Airflow在CeleryExecuter下可以使用不同的用戶啟動Worker,不同的Worker監聽不同的Queue,這樣可以解決用戶權限依賴問題。Worker也可以啟動在多個不同的機器上,解決機器依賴的問題。
Airflow可以為任意一個Task指定一個抽象的Pool,每個Pool可以指定一個Slot數。每當一個Task啟動時,就占用一個Slot,當Slot數占滿時,其余的任務就處于等待狀態。這樣就解決了資源依賴問題。
Airflow中有Hook機制(其實我覺得不應該叫Hook),作用時建立一個與外部數據系統之間的連接,比如Mysql,HDFS,本地文件系統(文件系統也被認為是外部系統)等,通過拓展Hook能夠接入任意的外部系統的接口進行連接,這樣就解決的外部系統依賴問題。
不同Executer 的架構圖Airflow執行任務的方式有多種,包括SequentialExecutor、LocalExecutor以及CeleryExecutor,用的較多的是LocalExecutor和CeleryExecutor,這里分別介紹一下三種執行方式的架構:
使用celery方式的系統架構圖(官方推薦使用這種方式,同時支持mesos方式部署)。turing為外部系統,GDags服務幫助拼接成dag,可以忽略。
1. master節點webui管理dags、日志等信息。scheduler負責調度,只支持單節點,多節點啟動scheduler可能會掛掉
2. worker負責執行具體dag中的task。這樣不同的task可以在不同的環境中執行。
另一種啟動方式的思考,一個dag分配到1臺機器上執行。如果task不復雜同時task環境相同,可以采用這種方式,方便擴容、管理,同時沒有master單點問題。
SequentialExecutor表示單進程順序執行,通常只用于測試。
但是有時候僅僅靠配置作業依賴和調度執行周期并不能滿足一些復雜的需求
1)跳過非最新DAG Run(作業中出現故障,一段時間后恢復)
2)當存在正在執行的DAG Run時,跳過當前DAG Run(作業執行時間過長,長到下一次作業開始)
3)Sensor的替代方案(Airflow中有一類Operator被稱為Sensor,Sensor可以感應預先設定的條件是否滿足,當滿足條件后Sensor作業變為Success使得下游的作業可以執行。弊端是,如果上游作業執行3個小時,那么會占用worker三個小時不釋放,資源浪費。)
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。