在Java中,使用EMqtt庫可以實現MQTT消息的訂閱和過濾。以下是一個簡單的示例,展示了如何使用EMqtt客戶端訂閱主題并根據主題過濾消息。
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
import org.eclipse.paho.client.mqttv3.*;
public class EMqttClient {
public static void main(String[] args) {
String brokerUrl = "tcp://broker.emqx.io:1883";
String clientId = "JavaEMqttClient";
MqttClient client = new MqttClient(brokerUrl, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setAutomaticReconnect(true);
try {
client.connect(connOpts);
} catch (MqttException e) {
System.out.println("Failed to connect to MQTT broker.");
e.printStackTrace();
return;
}
}
}
test/topic
,并且只處理包含單詞"hello"的消息:import org.eclipse.paho.client.mqttv3.*;
public class EMqttClient {
// ... (省略連接到MQTT代理的代碼)
public static void main(String[] args) {
// ... (省略連接到MQTT代理的代碼)
try {
// 訂閱主題
String topic = "test/topic";
client.subscribe(topic);
// 處理接收到的消息
client.setCallback(new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String payload = new String(message.getPayload());
if (payload.contains("hello")) {
System.out.println("Received message: " + payload);
}
}
@Override
public void connectionLost(Throwable cause) throws Exception {
System.out.println("Connection lost.");
cause.printStackTrace();
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Delivery complete.");
}
});
// 保持客戶端運行,以便持續接收消息
Thread.sleep(10000);
} catch (MqttException | InterruptedException e) {
System.out.println("Error occurred.");
e.printStackTrace();
} finally {
try {
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
}
在這個示例中,我們訂閱了test/topic
主題,并在messageArrived
回調方法中檢查消息負載是否包含單詞"hello"。如果包含,則打印消息。這樣,我們就實現了消息過濾功能。