您好,登錄后才能下訂單哦!
這篇“Argo Event云原生的事件驅動架構怎么用”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Argo Event云原生的事件驅動架構怎么用”文章吧。
Event事件大家都很熟悉,可以說Kubernetes就是完全由事件驅動的,不同的controller manager本質就是實現了不同的事件處理函數,比如所有ReplicaSet對象是由ReplicaSetController控制器管理,該控制器通過Informer監聽ReplicaSet以及其關聯的Pod的事件變化,從而維持運行狀態和我們聲明spec保持一致。
當然Kubernetes無論是什么Controller,其監聽和處理的都是內部事件,而在應用層上我們也有很多外部事件,比如CICD事件、Webhook事件、日志事件等等,如何處理這些事件呢,目前Kubernetes原生是無法實現的。
當然你可以自己實現一個event handler運行在Kubernetes平臺,不過實現難度也不小。而Argo Event組件完美解決了這個問題。
如圖是Argo Event官方提供的的流程圖:
首先事件源EventSource可以是Webhook、S3、Github、SQS等等,中間會經過一個叫Gateway(新版本叫EventBus)的組件,更準確地說老版本原來gateway的配置功能已經合并到EventSource了,EventBus是新引入的組件,后端默認基于高性能分布式消息中間件NATS[1]實現,當然其他中間件比如Kafka也是可以的。
這個EventBus可以看做是事件的一個消息隊列,消息生產者連接EvenSource,EventSource又連接到Sensor。更詳細地說EvenSource把事件發送給EvenBus,Sensor會訂閱EvenBus的消息隊列,EvenBus負責把事件轉發到已訂閱該事件的Sensor組件,EventSorce在上圖中沒有體現,具體設計文檔可以參考Argo-events Enhancement Proposals[2]。
有些人可能會說為什么EventBus不直接到Trigger,中間引入一個Sensor,這主要是兩個原因,一是為了使事件轉發和處理松耦合,二是為了實現Trigger事件的參數化,通過Sensor不僅可以實現事件的過濾,還可以實現事件的參數化,比如后面的Trigger是創建一個Kubernetes Pod,那這個Pod的metadata、env等,都可以根據事件內容進行填充。
Sensor組件注冊關聯了一個或者多個觸發器,這些觸發器可以觸發AWS Lambda事件、Argo Workflow事件、Kubernetes Objects等,通俗簡單地說,可以執行Lambda函數,可以動態地創建Kubernetes的對象或者創建前面的介紹的Workflow。
還記得前面介紹的Argo Rollout嗎,我們演示了手動promote實現應用發布或者回滾,通過Argo Event就可以很完美地和測試平臺或者CI/CD事件結合起來,實現自動應用自動發布或者回滾。
關于Argo Event的部署非常簡單,直接通過kubecl apply或者helm均可,可以參考文檔Installation[3],這里不再贅述。
Argo Event部署完成后注意還需要部署EventBus,官方推薦使用NATS中間件,文檔中有部署NATS stateful的文檔。
接下來我們以一個最簡單的Webhook事件為例,從而了解Argo Event的幾個組件功能以及用法。
首先按照前面的介紹,我們需要先定義EventSource:
apiVersion: argoproj.io/v1alpha1 kind: EventSource metadata: name: webhook spec: service: ports: - port: 12000 targetPort: 12000 webhook: webhook_example: port: "12000" endpoint: /webhook method: POST
這個EventSource定義了一個webhook webhook_example
,端口為12000,路徑為/webhook
,一般Webhook為POST方法,因此該Webhhok處理器我們配置只接收POST
方法。
為了把這個Webhook EventSource暴露,我們還創建了一個Service,端口也是12000。
此時我們可以手動curl該Service:
# kubectl get svc -l eventsource-name=webhook NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE webhook-eventsource-svc ClusterIP 10.96.93.24 <none> 12000/TCP 5m49s # curl -X POST -d '{}' 10.96.93.24:12000/webhook success
當然此時由于沒有注冊任何的Sensor,因此什么都不會發生。
接下來我們定義Sensor:
首先在dependencies
中定義了訂閱的EventSource以及具體的Webhook,由于一個EventSource可以定義多個Webhook,因此必須同時指定EventSource和Webhook兩個參數。
在Trigger中我們定義了對應Action為create一個workflow,這個workflow的spec定義在resource中配置。
最后的parameters
部分定義了workflow的參數,這些參數值從event中獲取,這里我們會把整個event都當作workflow的input。當然你可以通過dataKey只汲取body部分:dataKey: body.message
。
此時我們再次curl這個webhook事件:
curl -X POST -d '{"message": "HelloWorld!"}' 10.96.93.24:12000/webhook
此時我們獲取argo workflow列表發現新創建了一個實例:
# argo list NAME STATUS AGE DURATION PRIORITY webhook-8xt4s Succeeded 1m 18s 0
查看workflow輸出如下:
由于我們是把整個event作為workflow input發過去的,因此data內容部分是base64編碼,我們可以查看解碼后的內容如下:
{ "header": { "Accept": [ "*/*" ], "Content-Length": [ "26" ], "Content-Type": [ "application/x-www-form-urlencoded" ], "User-Agent": [ "curl/7.58.0" ] }, "body": { "message": "HelloWorld!" } }
從這里我們也可以看出Event包含兩個部分,一個是context,一個是data,data中又包含header部分以及body部分,在parameters
中可以通過Key獲取任意部分內容。
如上的webhook觸發是通過手動curl的,你可以很容易地在github或者bitbucket上配置到webhook中,這樣一旦代碼有更新就能觸發這個事件了。
前面的例子中的EventSource使用了Webhook,除了Webhook,Argo Event還支持很多的EventSource,比如:
amqp
aws-sns
aws-sqs
github/gitlab
hdfs
kafka
Kubernetes resource
...
Trigger也同樣支持很多,比如:
aws lambda
amqp
kafka
...
如上官方都提供了非常豐富的例子,可以參考argo events examples[4]。
這里以Kubernetes resource事件源為例,這個事件監聽Kubernetes的資源狀態,比如Pod創建、刪除等,這里以創建Pod為例:
apiVersion: argoproj.io/v1alpha1 kind: EventSource metadata: name: k8s-resource-demo spec: template: serviceAccountName: argo-events-sa resource: pod_demo: namespace: argo-events version: v1 resource: pods eventTypes: - ADD filter: afterStart: true labels: - key: app operation: "==" value: my-pod
如上例子監聽Pods的ADD事件,即創建Pod,filter中過濾只有包含app=my-pod標簽的Pod,特別需要注意的是使用的serviceaccount argo-events-sa
必須具有Pod的list、watch權限。
接下來我們使用AWS Lambda觸發器,Lambda函數已經在AWS提前創建好:
這個Lambda函數很簡單,直接返回event本身。
創建Sensor如下:
apiVersion: argoproj.io/v1alpha1 kind: Sensor metadata: name: aws-lambda-trigger-demo spec: template: serviceAccountName: argo-events-sa dependencies: - name: test-dep eventSourceName: k8s-resource-demo eventName: pod_demo triggers: - template: name: lambda-trigger awsLambda: functionName: hello accessKey: name: aws-secret key: accesskey secretKey: name: aws-secret key: secretkey namespace: argo-events region: cn-northwest-1 payload: - src: dependencyName: test-dep dataKey: body.name dest: name
如上AWS access key和access secret需要提前放到aws-secret中。
此時我們創建一個新的Pod my-pod:
apiVersion: v1 kind: Pod metadata: labels: app: my-pod name: my-pod spec: containers: - image: nginx name: my-pod dnsPolicy: ClusterFirst restartPolicy: Always
當Pod啟動后,我們發現AWS Lambda函數被觸發執行:
前面的例子中webhook中所有的事件都會被sensor觸發,我們有時不需要處理所有的事件,Argo Event支持基于data以及context過濾,比如我們只處理message為hello
或者為hey
的事件,其他消息忽略,只需要在原來的dependencies
中test-dep
增加filter即可:
dependencies: - name: test-dep eventSourceName: webhook eventName: webhook_example filters: - name: data-filter data: - path: body.message type: string value: - "hello" - "hey"
filter指定了基于data
過濾,過濾的字段為body.message
,匹配的內容為hello、hey
。
trigger policy主要用來判斷最后觸發器執行的結果是成功還是失敗,如果是創建Kubernetes資源比如Workflow,可以根據Workflow最終狀態決定這個Trigger的執行結果,而如果是觸發一個HTTP或者AWS Lambda,則需要自定義policy status。
awsLambda: functionName: hello accessKey: name: aws-secret key: accesskey secretKey: name: aws-secret key: secretkey namespace: argo-events region: us-east-1 payload: - src: dependencyName: test-dep dataKey: body.message dest: message policy: status: allow: - 200 - 201
如上表示當AWS Lambda返回200或者201時表示Trigger成功。
以上就是關于“Argo Event云原生的事件驅動架構怎么用”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。