您好,登錄后才能下訂單哦!
Airflow 是一個 Airbnb 的 Workflow 開源項目,在Github 上已經有超過兩千星。Airflow 使用 Python 寫的,支持 Python 2/3 兩個版本。 傳統 Workflow 通常使用 Text Files (json, xml / etc) 來定義 DAG, 然后 Scheduler 解析這些 DAG 文件形成具體的 Task Object 執行;Airflow 沒這么干,它直接用 Python 寫 DAG definition, 一下子突破了文本文件表達能力的局限,定義 DAG 變得簡單。 另外,Airflow 的權限設計、限流設計、以及 Hook/Plugin 的設計都挺有意思,功能性、擴展性良好。當然,項目里的代碼質量感覺比較一般,很多地方函數名和實現不太一致,造成理解障礙;也有很多 Flag 和重復出現的定義,顯然是當初沒有設計好、后面沒有精力 Refactor 轉而 Hack 的結果。但總體上,可讀性中上,系統的擴展性非常好。
左側 On/Off 按鈕控制 DAG 的運行狀態,Off 為暫停狀態,On 為運行狀態。注意:所有 DAG 腳本初次部署完成時均為 Off 狀態。
若 DAG 名稱處于不可點擊狀態,可能為 DAG 被刪除或未載入。若 DAG 未載入,可點擊右側刷新按鈕進行刷新。注意:由于可以部署若干 WebServer,所以單次刷新可能無法刷新所有 WebServer 緩存,可以嘗試多次刷新。
Recent Tasks 會顯示最近一次 DAG Run(可以理解為 DAG 的執行記錄)中 Task Instances(可以理解為作業的執行記錄)的運行狀態,如果 DAG Run 的狀態為 running,此時顯示最近完成的一次以及正在運行的 DAG Run 中所有 Task Instances 的狀態。
- Last Run 顯示最近一次的 execution date。注意:execution date 并不是真實執行時間,具體細節在下文 DAG 配置中詳述。將鼠標移至 execution date 右側 info 標記上,會顯示 start date,start date 為真實運行時間。start date 一般為 execution date 所對應的下次執行時間。
在 DAG 的樹狀圖和 DAG 圖中都可以點擊對應的 Task Instance 以彈出 Task Instance 模態框,以進行 Task Instance 的相關操作。注意:選擇的 Task Instance 為對應 DAG Run 中的 Task Instance。
在作業名字的右邊有一個漏斗符號,點擊后整個 DAG 的界面將只顯示該作業及該作業的依賴作業。當該作業所處的 DAG 較大時,此功能有較大的幫助。
Task Instance Details 顯示該 Task Instance 的詳情,可以從中得知該 Task Instance 的當前狀態,以及處于當前狀態的原因。例如,若該 Task Instance 為 no status 狀態,遲遲不進入 queued 及 running 狀態,此時就可通過 Task Instance Details 中的 Dependency 及 Reason 得知原因。
Rendered 顯示該 Task Instance 被渲染后的命令。
Run 指令可以直接執行當前作業。
Clear 指令為清除當前 Task Instance 狀態,清除任意一個 Task Instance 都會使當前 DAG Run 的狀態變更為 running。注意:如果被清除的 Task Instance 的狀態為 running,則會嘗試 kill 該 Task Instance 所執行指令,并進入 shutdown 狀態,并在 kill 完成后將此次執行標記為 failed(如果 retry 次數沒有用完,將標記為 up_for_retry)。Clear 有額外的5個選項,均為多選,這些選項從左到右依次為:
- Past: 同時清除所有過去的 DAG Run 中此 Task Instance 所對應的 Task Instance。
- Future: 同時清除所有未來的 DAG Run 中此 Task Instance 所對應的 Task Instance。注意:僅清除已生成的 DAG Run 中的 Task Instance。
- Upstream: 同時清除該 DAG Run 中所有此 Task Instance 上游的 Task Instance。
- Downstream: 同時清除該 DAG Run 中所有此 Task Instance 下游的 Task Instance。
- Recursive: 當此 Task Instance 為 sub DAG 時,循環清除所有該 sub DAG 中的 Task Instance。注意:若當此 Task Instance 不是 sub DAG 則忽略此選項。
Mark Success 指令為講當前 Task Instance 狀態標記為 success。注意:如果該 Task Instance 的狀態為 running,則會嘗試 kill 該 Task Instance 所執行指令,并進入 shutdown 狀態,并在 kill 完成后將此次執行標記為 failed(如果 retry 次數沒有用完,將標記為 up_for_retry)。
跨越時間的 DAG 的樹表示。如果 pipeline(管道)延遲了,您可以很快地看到哪里出現了錯誤的步驟并且辨別出堵塞的進程。
圖形視圖可能是最全面的一種表現形式了。它可以可視化您的 DAG 依賴以及某個運行實例的當前狀態。
過去 N 次運行的不同任務的持續時間。通過此視圖,您可以查找異常值并快速了解 DAG 在多次運行中花費的時間。
甘特圖可讓您分析任務持續時間和重疊情況。您可以快速識別系統瓶頸和哪些特定 DAG 在運行中花費了大量的時間。
透明就是一切。雖然您的 pipeline(管道)代碼在源代碼管理中,但這是一種快速獲取 DAG 代碼并提供更多上下文的方法。
從上面的頁面(樹視圖,圖形視圖,甘特圖......)中,始終可以單擊任務實例,并進入此豐富的上下文菜單,該菜單可以將您帶到更詳細的元數據并執行某些操作。
外部系統的連接信息存儲在 Airflow 元數據數據庫中,并在 UI 中進行管理(Menu -> Admin -> Connections)。在那里定義了conn_idconn_id而無需在任何地方硬編碼任何此類信息。
可以定義具有相同conn_id許多連接,并且在這種情況下,并且當掛鉤使用來自BaseHook的get_connection方法時,Airflow 將隨機選擇一個連接,允許在與重試一起使用時進行一些基本的負載平衡和容錯。
Airflow 還能夠通過操作系統中的環境變量引用連接。但它只支持 URI 格式。如果您需要為連接指定extra信息,請使用 Web UI。
如果在 Airflow 元數據數據庫和環境變量中都定義了具有相同conn_id連接,則 Airflow 將僅引用環境變量中的連接(例如,給定conn_idpostgres_master,在開始搜索元數據數據庫之前,Airflow 將優先在環境變量中搜索AIRFLOW_CONN_POSTGRES_MASTER并直接引用它)。
許多鉤子都有一個默認的conn_id,使用該掛鉤的 Operator 不需要提供顯式連接 ID。 例如,PostgresHook的默認conn_id是postgres_default
XComs 允許任務交換消息,允許更細微的控制形式和共享狀態。該名稱是“交叉通信”的縮寫。XComs 主要由鍵,值和時間戳定義,但也跟蹤創建 XCom 的任務/DAG 以及何時應該可見的屬性。任何可以被 pickle 的對象都可以用作 XCom 值,因此用戶應該確保使用適當大小的對象。
變量是將任意內容或設置存儲和檢索為 Airflow 中的簡單鍵值存儲的通用方法。可以從 UI(Admin -> Variables),代碼或 CLI 列出,創建,更新和刪除變量。此外,json 設置文件可以通過 UI 批量上傳。雖然管道代碼定義和大多數常量和變量應該在代碼中定義并存儲在源代碼控制中,但是通過 UI 可以訪問和修改某些變量或配置項會很有用。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。