在 Flink 中自定義觸發器需要實現 Trigger 接口,該接口定義如下:
public interface Trigger<T, W extends Window> extends Serializable {
// 初始化觸發器
void open(TriggerContext ctx) throws Exception;
// 每次元素到來時都會調用此方法,決定是否觸發窗口計算
TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
// 每次處理時間定時器到來時都會調用此方法,決定是否觸發窗口計算
TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
// 每次事件時間定時器到來時都會調用此方法,決定是否觸發窗口計算
TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
// 當窗口計算完成時會調用此方法
void clear(W window, TriggerContext ctx) throws Exception;
// 序列化
default void write(DataOutputView out) throws IOException {}
// 反序列化
default void read(DataInputView in) throws IOException {}
}
自定義觸發器需要實現 onElement、onProcessingTime、onEventTime、clear 這幾個方法,并在 open 方法中對觸發器進行初始化。此外,TriggerContext 提供了一些上下文信息,可以在觸發器中使用。通過實現 Trigger 接口,可以根據自己的業務需求定義觸發邏輯,實現更靈活的窗口計算方式。