您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關基于Pulsar Functions的事件處理設計模式是什么,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
下面將介紹一些常見的實時流式傳輸模式及其實現。
首先回顧一下如何使用 Apache Pulsar Functions 實現基于內容的路由。基于內容的路由是一種集成模式。該模式已經存在多年,通常用于事件中心和消息框架中。基本思路是檢查每條消息的內容,根據消息內容將消息路由到不同目的地。
下面的例子使用了Apache Pulsar SDK,SDK 允許用戶配置三個不同的值:
用于在消息中查找匹配的正則表達式
消息匹配表達式模式時被發送到的 topic
消息不匹配表達式模式時被發送到的 topic
這個例子證明了 Pulsar Functions 功能強大,可以基于功能邏輯動態決定將事件發送到哪里。
import java.util.regex.*;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public ContentBasedRoutingFunction implements Function<String, String> {
String process(String input, Context context) throws Exception {
String regex = context
.getUserConfigValue(“regex”).toString();
String matchedTopic = context
.getUserConfigValue(“matched-topic”).toString();
String unmatchedTopic = context
.getUserConfigValue(“unmatched-topic”).toString();
Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(input);
if (m.matches()) {
context.publish(matchedTopic, input);
} else {
context.publish(unmatchedTopic, input);
}
}
}
如果想通過僅保留滿足給定條件的事件來排除 topic 上的大多數事件時,應用選擇過濾模式。過濾模式對僅查找感興趣的事件特別有效,如信用卡付款超過了一定金額;日志文件中的 ERROR 消息;傳感器讀數超過特定閾值等等(見模式 4)。
假如用戶在監視信用卡交易的事件流,并嘗試檢測欺詐或可疑行為。由于交易量很大,選擇“同意/不同意”的時間有限,用戶必須先過濾掉有“風險”特征的交易,如預付現金、大額付款等。
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import com.company.creditcard.Purchase;
public class FraudFilter implements Function<Purchase, Purchase> {
Purchase process(Purchase p, Context context) throws Exception {
if (p.getTransactionType() == ‘CASH ADVANCE’) ||
p.getAmount > 500.00) {
return p;
}
return null;
}
}
可以使用過濾器來過濾有“風險”特征的交易。過濾器可以識別這些“風險”特征,并只將這些交易路由到一個單獨的 topic 上以進行進一步評估。
經過過濾器過濾后,所有信用卡支付都可以被路由到一個“潛在欺詐行為”的 topic 上進行進一步評估,而其他事件則會被過濾掉,過濾器也不會對過濾掉的事件執行任何操作。
上圖是基于三個獨立支付對象的 FraudFilter function。第一次支付符合給定標準,被路由到“潛在欺詐行為”topic 上進行進一步評估;而第二次和第三次支付不符合欺詐標準,直接被過濾掉(沒有被路由到“潛在欺詐行為”過濾器上)。
轉換模式用于將事件從一種類型轉換為另一種類型,或用于添加、刪除或修改輸入事件的值。
|| 投影
投影模式類似于關系代數中的投影算子,選擇輸入事件的屬性子集,并創建僅包含這些屬性的輸出事件。投影模式可用于刪除事件中的敏感字段,或者只保留事件中的必要屬性。下圖為投影模式的一種應用,在將記錄發布到下游 topic 前,“屏蔽”傳入的社安全號碼。
|| 富集模式
富集模式用于將數據添加到輸入屬性中不存在的輸出事件中。典型的富集模式包含基于輸入事件中的某個鍵值對引用數據進行某種查找。以下示例展示了如何根據輸入事件中包含的 IP 地址將地理位置添加到輸出事件。
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import com.company.creditcard.Purchase;
import com.company.services.GeoService;
public class IPLookup implements Function<Purchase, Purchase> {
Purchase process(Purchase p) throws Exception {
Geo g = GeoService.getByIp(p.getIPAddress());
// By default, these fields are blank, so we just modify the object
p.setLongitude(g.getLon());
p.setLatitiude(g.getLat());
return p;
}
}
|| 分離模式
在分離模式下,事件處理器接收單個輸入事件,并將其分為多個輸出事件。當輸入事件是一個包含多個單獨事件(如日志文件中的 entry)的批處理,并且想要單獨處理每個事件時,分離模式十分適用。下圖展示了分離模式的處理過程:先根據換行符分隔輸入,再逐行發布到配置的輸出 topic。
此 function 的實現過程如下:
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public class Splitter implements Function<String, String> {
String process(String s, Context context) throws Exception {
Arrays.asLists(s.split(“\\R”).forEach(line ->
context.publish(context.getOutputTopic(), line));
return null;
}
}
警報和閾值模式可進行檢測,并根據檢測條件生成警報(如高溫警報)。可以基于簡單的值,也可以基于較復雜的條件(如增長率、數量的持續變化等)生成警報。
下面的示例為基于用戶配置的閾值參數(如 100.00,38.7 等)和接收警報通知的郵箱地址生成警報。當此 function 接收到超過配置閾值的傳感器事件時,將發送電子郵件。
import javax.mail.*;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public SimpleAlertFunction implements Function<Sensor, Void> {
Void process(Sensor sensor, Context context) throws Exception {
Double threshold = context
.getUserConfigValue(“threshold”).toString();
String alertEmail = context
.getUserConfigValue(“alert-email”).toString();
if (sensor.getReading() >= threshold) {
Session s = Session.getDefaultInstance();
MimeMessage msg = new MineMessage(s);
msg.setText(“Alert for Sensor:” + sensor.getId());
Transport.send(msg);
}
return null;
}
}
下面是一個有狀態 function 示例,該 function 根據特定傳感器讀數的增長率生成警報。在決定是否生成警報時,需要訪問以前的傳感器讀數。
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public ComplexAlertFunction implements Function<Sensor, Void> {
Void process(Sensor sensor, Context context) throws Exception {
Double threshold = context
.getUserConfigValue(“threshold”).toString();
String alertTopic = context
.getUserConfigValue(“alert-topic”).toString();
// Get previous & current metric values
Float previous = context.getState(sensor.getId() + “-metric”);
Long previous_time = context.getState(sensor.getId() + “-metric-time”);
Float current = sensor.getMetric();
Long current_time = sensor.getMetricTime();
// Calculate Rate of change & compare to threshold.
Double rateOfChange = (current-previous) /
(current_time-previous_time);
if (abs(rateOfChange) >= threshold) {
// Publish the sensor ID to the alert topic for handling
context.publish(alertTopic, sensor.getId());
}
// Update metric values
context.putState(sensor.getId() + “-metric”, current);
context.putState(sensor.getId() + “-metric-time”, current_time);
}
}
通過 Apache Pulsar Functions 狀態管理特性僅保留先前的度量讀數和時間,并將傳感器 ID 添加到這些值中(因為將會處理來自多個傳感器的度量,所以需要傳感器 ID)。為了簡單起見,假設事件以正確的順序到達,即始終是最新讀數,沒有亂序讀數。
另外,這一次我們將傳感器 ID 轉發到一個專門的警報 topic 以進行進一步處理,而不是僅發送電子郵件。通過這種方式,我們可以對事件進行額外的富集處理(通過 Pulsar Functions)。例如,查找獲取傳感器的地理位置,然后通知相關人員。
簡單計數和窗口計數模式使用了聚合函數,聚合函數將事件的集合作為輸入,并通過對輸入事件應用一個 function 生成一個所需的輸出事件。聚合函數包括:求和、平均值、最大值、最小值、百分位數等。
以下為使用 Pulsar Functions 實現“字數統計”的示例,計算每個單詞在給定 topic 中出現次數的總和。
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public WordCountFunction implements Function<String, Void> {
Void process(String s, Context context) throws Exception {
Arrays.asLists(s.split(“\\.”).forEach(word -> context.incrCounter(word, 1));
return null;
}
}
考慮到流數據 source 無休止的特性,無限期聚合用處不大,因為通常是在數據窗口上進行這些計算(如前一小時內的故障次數)。
數據窗口代表事件流的有限子集,如上圖所示。但是,應該如何定義數據窗口的邊界?有兩個用于定義窗口的常用屬性:
觸發策略:控制執行或觸發 function 代碼的時間。Apache Pulsar Function 框架通過這些規則來通知代碼處理窗口中收集的全部數據。
清除策略:控制保留在窗口中的數據量。這些規則用于決定是否從窗口中清除數據元素。
這兩個策略都是由時間或窗口中的數據量驅動的。二者之間的區別是什么?又是怎樣協同工作的?在多種窗口技術中,最常用的是滾動窗口和滑動窗口。
窗口已滿是滾動窗口清除策略的唯一條件,因此,只需要指定想要使用觸發策略(基于計數或基于時間)即可。基于計數的滾動窗口是怎樣工作的?
在下圖的第一個示例中,觸發策略設置為 2,也就是說,在窗口中有兩個項目時,觸發器將會觸發,開始執行 Pulsar Function 代碼。這一系列行為與時間無關,窗口計數達到 2 用了 5 秒還是 5 個小時并不重要,重要的是窗口計數達到 2。
將上述基于計數的滾動窗口與基于時間的滾動窗口(時間設置為 10 秒)進行對比。經過 10 秒的間隔后,無論窗口中有多少事件,function 代碼都會被觸發。在下圖中,第一個窗口中有 7 個事件,而第二個窗口中只有 3 個事件。
滑動窗口計數定義了窗口的長度,窗口長度設置了清除策略以限制保留待處理的數據量;滑動間隔定義了觸發策略。滾動窗口策略和滑動窗口策略都可以根據時間(時間段)或長度(數據元素的數量)來定義。
在下圖中,窗口長度為 2 秒,也就是說,2 秒以前的數據會被清除,并且不會用于計算。滑動間隔為 1 秒,即每 1 秒鐘執行一次 Pulsar function 代碼。這樣,可以在整個窗口長度內處理數據。
前面的示例都是基于時間來定義清除策略和觸發策略,也可以根據長度來定義清除策略或觸發策略,或者同時定義這兩種策略。
在 Pulsar Functions 中實現這兩種類型的窗口 function 都很容易,只需要指定一個 java.util.Collection 作為輸入類型,如下所示,并在創建 function 時在 -userConfig 標志中指定適當的窗口配置屬性。
用于實現前面提到的時間窗口四種情形的配置參數如下:
“–windowLengthCount”:每個窗口的消息數量
“–windowLengthDurationMs”:窗口時間(以毫秒為單位)
“–slidingIntervalCount”:窗口滑動后的消息數量
“–slidingIntervalDurationMs”:窗口滑動后的時間
正確的組合方式如下表:
時間,滑動窗口 | -windowLengthDurationMs = XXXX -slidingIntervalDurationMs = XXXX |
時間,Batch Window(即滾動窗口) | -windowLengthDurationMs = XXXX |
長度,滑動窗口 | -windowLengthCount = XXXX -slidingIntervalCount = XXXX |
長度,Batch Window(即滾動窗口) | -windowLengthCount = XXXX |
以上就是基于Pulsar Functions的事件處理設計模式是什么,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。