您好,登錄后才能下訂單哦!
本篇內容主要講解“Java使用Kafka的方法”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Java使用Kafka的方法”吧!
1、maven依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency>
2、Producer
2.1、producer發送消息
import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; /** * @author Thomas * @Description:最簡單的kafka producer * @date 22:18 2019-7-5 */ public class ProducerDemo { public static void main(String[] args) { Properties properties =new Properties(); //zookeeper服務器集群地址,用逗號隔開 properties.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092"); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //自定義producer攔截器 properties.put("interceptor.classes", "com.lt.kafka.producer.MyProducerInterceptor"); //自定義消息路由規則(消息發送到哪一個Partition中) //properties.put("partitioner.class", "com.lt.kafka.producer.MyPartition"); Producer<string, string=""> producer = null; try { producer = new KafkaProducer<string, string="">(properties); for (int i = 20; i < 40; i++) { String msg = "This is Message:" + i; /** * kafkaproducer中會同時調用自己的callback的onCompletion方法和producerIntercepter的onAcknowledgement方法。 * 關鍵源碼:Callback interceptCallback = this.interceptors == null * callback : new InterceptorCallback<>(callback, * this.interceptors, tp); */ producer.send(new ProducerRecord<string, string="">("leixiang", msg),new MyCallback()); } } catch (Exception e) { e.printStackTrace(); } finally { if(producer!=null) producer.close(); } } }
2.2、自定義producer攔截器
import java.util.Map; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; /** * @author Thomas * @Description:自定義producer攔截器 * @date 22:21 2019-7-5 */ public class MyProducerInterceptor implements ProducerInterceptor<string,string>{ /** * 打印配置相關信息 */ public void configure(Map<string,> configs) { // TODO Auto-generated method stub System.out.println(configs.toString()); } /** * producer發送信息攔截方法 */ public ProducerRecord<string,string> onSend(ProducerRecord<string, string=""> record) { System.out.println("攔截處理前============="); String topic=record.topic(); String value=record.value(); System.out.println("攔截處理前的消息====:"+value); ProducerRecord<string,string> record2=new ProducerRecord<string, string="">(topic, value+" (intercepted)"); System.out.println("攔截處理后的消息:"+record2.value()); System.out.println("攔截處理后==============="); return record2; } /** * 消息確認回調函數,和callback的onCompletion方法相似。 * 在kafkaProducer中,如果都設置,兩者都會調用。 */ public void onAcknowledgement(RecordMetadata metadata, Exception exception) { if (metadata != null) System.out.println("MyProducerInterceptor onAcknowledgement:RecordMetadata=" + metadata.toString()); if (exception != null) exception.printStackTrace(); } /** * interceptor關閉回調 */ public void close() { System.out.println("MyProducerInterceptor is closed!"); } }
2.3、自定義消息路由規則
自定義路由規則,可以根據自己的需要定義消息發送到哪個分區。自定義路由規則需要實現Partitioner。
import java.util.Map; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; /** * @author Thomas * @Description: * @date 22:24 2019-7-5 */ public class MyPartition implements Partitioner { public void configure(Map<string,> arg0) { // TODO Auto-generated method stub } public void close() { // TODO Auto-generated method stub } public int partition(String arg0, Object arg1, byte[] arg2, Object arg3, byte[] arg4, Cluster arg5) { // TODO Auto-generated method stub return 0; } }
3.1、自動提交
import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; /** * @author Thomas * @Description: * @date 22:26 2019-7-5 */ public class AutoCommitConsumerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092"); props.put("group.id", "leixiang"); props.put("enable.auto.commit", "true"); //想要讀取之前的數據,必須加上 //props.put("auto.offset.reset", "earliest"); /* 自動確認offset的時間間隔 */ props.put("auto.commit.interval.ms", "1000"); /* * 一旦consumer和kakfa集群建立連接, * consumer會以心跳的方式來高速集群自己還活著, * 如果session.timeout.ms 內心跳未到達服務器,服務器認為心跳丟失,會做rebalence */ props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //配置自定義的攔截器,可以在攔截器中引入第三方插件實現日志記錄等功能。 //props.put("interceptor.classes", "com.lt.kafka.consumer.MyConsumerInterceptor"); @SuppressWarnings("resource") KafkaConsumer<string, string=""> consumer = new KafkaConsumer<string, string="">(props); try { /* 消費者訂閱的topic, 可同時訂閱多個 ,用逗號隔開*/ consumer.subscribe(Arrays.asList("leixiang")); while (true) { //輪詢數據。如果緩沖區中沒有數據,輪詢等待的時間為毫秒。如果0,立即返回緩沖區中可用的任何記錄,則返回空 ConsumerRecords<string, string=""> records = consumer.poll(100); for (ConsumerRecord<string, string=""> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } }
3.2、手動提交
import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; /** * @author Thomas * @Description: * @date 22:28 2019-7-5 */ public class ManualCommitConsumerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092"); props.put("group.id", "leixiang"); props.put("enable.auto.commit", "false");//手動確認 /* 自動確認offset的時間間隔 */ props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest");//想要讀取之前的數據,必須加上 /* * 一旦consumer和kakfa集群建立連接, * consumer會以心跳的方式來高速集群自己還活著, * 如果session.timeout.ms 內心跳未到達服務器,服務器認為心跳丟失,會做rebalence */ props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //配置自定義的攔截器,可以在攔截器中引入第三方插件實現日志記錄等功能。 props.put("interceptor.classes", "com.lt.kafka.consumer.MyConsumerInterceptor"); KafkaConsumer<string, string=""> consumer = new KafkaConsumer<string, string="">(props); /* 消費者訂閱的topic, 可同時訂閱多個 ,用逗號隔開*/ consumer.subscribe(Arrays.asList("leixiang")); while (true) { ConsumerRecords<string, string=""> records = consumer.poll(100); for (ConsumerRecord<string, string=""> record : records) { //處理消息 saveMessage(record); //手動提交,并且設置Offset提交回調方法 //consumer.commitAsync(new MyOffsetCommitCallback()); consumer.commitAsync(); } } } public static void saveMessage(ConsumerRecord<string, string=""> record){ System.out.printf("處理消息:offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
自定義Consumer攔截器
import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerInterceptor; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; /** * @author Thomas * @Description: * @date 22:29 2019-7-5 */ public class MyConsumerInterceptor implements ConsumerInterceptor<string, string="">{ public void configure(Map<string,> configs) { System.out.println("MyConsumerInterceptor configs>>>"+configs.toString()); } public ConsumerRecords<string, string=""> onConsume(ConsumerRecords<string, string=""> records) { System.out.println("onConsume"); return records; } public void onCommit(Map<topicpartition, offsetandmetadata=""> offsets) { System.out.println("onCommit"); } public void close() { System.out.println("MyConsumerInterceptor is closed!"); } }
自定義Offset提交回調方法
import java.util.Map; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; /** * @author Thomas * @Description: * @date 22:31 2019-7-5 */ public class MyOffsetCommitCallback implements OffsetCommitCallback { public void onComplete(Map<topicpartition, offsetandmetadata=""> offsets, Exception exception) { if (offsets != null) System.out.println("offsets>>>" + offsets.toString()); if (exception != null) exception.printStackTrace(); } }
到此,相信大家對“Java使用Kafka的方法”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。