您好,登錄后才能下訂單哦!
解決相關依賴:
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.1.0</version>
</dependency>
生產者:
packagecom.zy.kafka;
importjava.util.Properties;
importorg.apache.kafka.clients.producer.KafkaProducer;
importorg.apache.kafka.clients.producer.Producer;
importorg.apache.kafka.clients.producer.ProducerRecord;
publicclassKafkaTest {
publicstaticvoidmain(String[] args) {
//1.加載配置文件
//1.1封裝配置文件對象
Properties prps=newProperties();
//配置broker地址
prps.put("bootstrap.servers", "hadoop02:9092");
//配置ack級別:0 1 -1(all)
prps.put("acks", "all");
//重試次數
prps.put("retries", 3);
prps.put("batch.size", 16384);
prps.put("linger.ms",1);
prps.put("buffer.memory", 33554432);
//指定(message的K-V)的序列化
prps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//2.創建生產者對象(指定的key和value的泛型)
Producer<String, String>producer=new KafkaProducer<>(prps);
//生產者發送消息
for(inti=0;i<100;i++) {
/**
* ProducerRecord<String, String>(topic, value)
* topic:主題名稱
* key:
* value:
*/
//消息的封裝對象
ProducerRecord<String, String>pr=newProducerRecord<String, String>("test_topic", "key"+i, "value"+i);
producer.send(pr);
}
producer.close();
}
}
消費者:
packagecom.zy.kafka;
importjava.util.Arrays;
importjava.util.Properties;
importorg.apache.kafka.clients.consumer.ConsumerRecord;
importorg.apache.kafka.clients.consumer.ConsumerRecords;
importorg.apache.kafka.clients.consumer.KafkaConsumer;
importorg.apache.kafka.clients.producer.KafkaProducer;
importorg.apache.kafka.clients.producer.Producer;
importorg.apache.kafka.clients.producer.ProducerRecord;
publicclassKafkaTest {
publicstaticvoidmain(String[] args) {
//1.加載配置文件
//1.1封裝配置文件對象
Properties prps=newProperties();
//配置broker地址
prps.put("bootstrap.servers", "hadoop02:9092");
//指定消費的組的ID
prps.put("group.id", "test");
//是否啟動自動提交(是否自動提交反饋信息,向zookeeper提交)
prps.put("enable.auto.commit", "true");
//自動提交的時間間隔
prps.put("auto.commit.interval.ms", "1000");
//指定(message的K-V)的序列化
prps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//創建kafka的消費者
KafkaConsumer<String, String>consumer=newKafkaConsumer<>(prps);
//添加消費主題
consumer.subscribe(Arrays.asList("kafka_test"));
//開始消費
while(true) {
//設置從哪里開始消費,返回的是一個消費記錄
ConsumerRecords<String, String>poll = consumer.poll(10);
for(ConsumerRecord<String, String>p:poll) {
System.out.printf("offset=%d,key=%s,value=%s\n",p.offset(),p.key(),p.value());
}
}
}
}
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import kafka.admin.TopicCommand;
public class KafkaAPI {
public static void main(String[] args) throws IOException {
/*
kafka-topics.sh \
--create \
--zookeeper hadoop02:2181,hadoop03:2181,hadoop04:2181 \
--replication-factor 3 \
--partitions 10 \
--topic kafka_test11
*/
//創建一個topic
String ops[]=new String []{
"--create",
"--zookeeper","hadoop01:2181,hadoop02:2181,hadoop03:2181",
"--replication-factor","3",
"--topic","zy_topic","--partitions","5"
};
String list[]=new String[] {
"--list",
"--zookeeper",
"hadoop01:2181,hadoop02:2181,hadoop03:2181"
};
//以命令的方式提交
TopicCommand.main(list);
}
}
shell中常用操作:
#!/usr/bin/env bash
#查看kafka的topic
kafka-topics.sh --list --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
#查看kafkatopic的偏移量
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic kafka_api_r1p1
#創建topic
kafka-topics.sh --create --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --partitions 3 --replication-factor 1 --topic kafka_api_r1p3
#刪除topic
kafka-topics.sh --delete --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic act_inventory_r1p1_test1
#查看具體的group 的偏移量
kafka-consumer-groups.sh
①簡單實現,kafka的消費者,并且將由kafka自動管理偏移量(單分區消費)
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
/**
* ?* Created with IntelliJ IDEA.
* ?* User: ZZY
* ?* Date: 2019/9/9
* ?* Time: 19:44
* ?* Description:? 簡單實現,kafka的消費者,并且將由kafka自動管理偏移量(單分區消費)
*/
public class MyConsumer01 {
private static Properties props = new Properties();
static {
props.put("group.id", "kafka_api_group_2");
//設置kafka集群的地址
props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
//開啟offset自動提交
props.put("enable.auto.commit", "true");
//手動提交偏移量
//props.put("enable.auto.commit", "false");
//設置自動提交時間
props.put("auto.commit.interval.ms", "100");
//設置消費方式
props.put("auto.offset.reset","earliest");
//序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}
public static void main(String[] args) throws InterruptedException {
String topic = "kafka_api_r1p1";
//實例化一個消費者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//消費者訂閱主題,可以訂閱多個主題
// consumer.subscribe(Collections.singleton(topic));
consumer.subscribe(Arrays.asList(topic));
//死循環不停的從broker中拿數據
while(true){
ConsumerRecords<String, String> records = consumer.poll(10);
for(ConsumerRecord<String, String> record : records){
System.out.printf("offset=%d,key=%s,value=%s",record.offset(),
record.key(),record.value());
}
Thread.sleep(2000);
}
//consumer.commitAsync(); 提交偏移量信息
}
}
②實現多分區消費
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
/**
* ?* Created with IntelliJ IDEA.
* ?* User: ZZY
* ?* Date: 2019/9/10
* ?* Time: 8:55
* ?* Description:?實現多分區消費
*/
public class MyConsumer02 {
private static Properties props = new Properties();
static{
//設置kafka集群的地址
props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
//設置消費者組,組名字自定義,組名字相同的消費者在一個組
props.put("group.id", "kafka_api_group_1");
//開啟offset自動提交
props.put("enable.auto.commit", "false");
//序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}
public static void main(String[] args) {
String topicName="kafka_api_r1p3";
//實例化一個消費者
KafkaConsumer<String,String> consumer =new KafkaConsumer<>(props);
//消費者訂閱主題,可以訂閱多個主題
consumer.subscribe(Arrays.asList(topicName));
while(true){
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
//獲取每個分區的數據
for(TopicPartition partition :records.partitions()){
System.out.println("開始消費第"+partition.partition()+"分區數據!");
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
//獲取每個分區里的records
for(ConsumerRecord<String, String> partitionRecord:partitionRecords){
System.out.println("partition:"+partition.partition()+",key:"+partitionRecord.key()+",value"
+partitionRecord.value()+",offset:"+partitionRecord.offset());
}
//更新每個分區的偏移量(取分區中最后一個record的偏移量,就是這個分區的偏移量)
long lastOffset =partitionRecords.get(partitionRecords.size()-1).offset();
consumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(lastOffset +1)));
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
③實現消費者從指定分區拉取數據
注意:
(1)kafka提供的消費者組內的協調功能就不再有效
(2)樣的寫法可能出現不同消費者分配了相同的分區,為了避免偏移量提交沖突,每個消費者實例的group_id要不重復
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
/**
* ?* Created with IntelliJ IDEA.
* ?* User: ZZY
* ?* Date: 2019/9/10
* ?* Time: 10:10
* ?* Description:?消費者從指定分區拉取數據
* 一旦指定特定的分區消費需要注意:
* (1)kafka提供的消費者組內的協調功能就不再有效
* (2)樣的寫法可能出現不同消費者分配了相同的分區,為了避免偏移量提交沖突,每個消費者實例的group_id要不重復
*/
public class MyConsumer03 {
private static Properties props = new Properties();
//實例化一個消費者
static KafkaConsumer<String, String> consumer;
static {
//設置kafka集群的地址
props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
//設置消費者組,組名字自定義,組名字相同的消費者在一個組
props.put("group.id", "kafka_api_group_1");
//開啟offset自動提交
props.put("enable.auto.commit", "false");
//序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
}
public static void main(String[] args) {
//消費者訂閱主題,并設置要拉取的分區
String topic="kafka_api_r1p3";
int partitionNum=0;
//消費者訂閱主題,并設置要拉取的分區
TopicPartition partition0 =new TopicPartition(topic,partitionNum);
consumer.assign(Arrays.asList(partition0));
while(true){
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for(TopicPartition partition : records.partitions()){
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for(ConsumerRecord<String, String> partitionRecord:partitionRecords){
System.out.println("分區:"+partitionRecord.partition()+",key:"+partitionRecord.key()+",value:"
+partitionRecord.value()+"offset:"+partitionRecord.offset());
}
long lastOffset =partitionRecords.get(partitionRecords.size()-1).offset();
consumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(lastOffset+1)));
}
}
}
}
④重置kafka組的offset
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;
/**
* ?* Created with IntelliJ IDEA.
* ?* User: ZZY
* ?* Date: 2019/9/10
* ?* Time: 9:46
* ?* Description:? 該API用于重置kafka組的offset
*/
public class ReSetOffset {
//用于重置的offset
final private static String group="kafka_api_group_1";
final private static Properties props = new Properties();
static KafkaConsumer<String,String> consumer;
static{
props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
props.put("group.id",group);
props.put("enable.auto.commit", "true");
//props.put("auto.offset.reset","earliest");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumer=new KafkaConsumer<String, String>(props);
}
public static String resetOffset(String topic,long offset){
int partitionNums=getTopicPartitionNum(topic);
for(int i=0;i<partitionNums;i++){
TopicPartition tp=new TopicPartition(topic,i);
//這里每重置一個分區的offset,就需要重新創建一個新的KafkaConsumer
KafkaConsumer consumer_temp= new KafkaConsumer<String, String>(props);
consumer_temp.assign(Arrays.asList(tp));
consumer_temp.seek(tp,offset);
consumer_temp.close();
}
consumer.close();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss ");
return dateFormat.format(new Date())+ group +" ResetOffset Succeed!!";
}
private static int getTopicPartitionNum(String topic){
int partitionNums=consumer.partitionsFor(topic).size();
return partitionNums;
}
public static void main(String[] args) {
String topic="kafka_api_r1p1";
System.out.println(ReSetOffset.resetOffset(topic,0));
}
}
⑤多線程版本的消費者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* ?* Created with IntelliJ IDEA.
* ?* User: ZZY
* ?* Date: 2019/9/10
* ?* Time: 10:45
* ?* Description:?這是一個consumer的線程
*/
public class ConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer<String, String> consumer;
private final CountDownLatch latch;
public ConsumerRunner(KafkaConsumer<String, String> consumer, CountDownLatch latch) {
this.consumer = consumer;
this.latch = latch;
}
@Override
public void run() {
System.out.println("threadName....." + Thread.currentThread().getName());
try {
consumer.subscribe(Arrays.asList("kafka_api_r1p1"));
while (!closed.get()) {
ConsumerRecords<String, String> records = consumer.poll(150);
for (ConsumerRecord<String, String> record : records)
System.out.printf("threadName= %s, offset = %d, key = %s, value = %s%n", Thread.currentThread().getName(), record.offset(), record.key(), record.value());
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (WakeupException e) {
if(!closed.get()){
throw e;
}
}finally {
consumer.close();
latch.countDown();
}
}
public void shutdown(){
System.out.println("close ConsumerRunner");
closed.set(true);
consumer.wakeup();
}
}
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* ?* Created with IntelliJ IDEA.
* ?* User: ZZY
* ?* Date: 2019/9/10
* ?* Time: 10:52
* ?* Description:? 這里主要測試多線程下的Consumer
*/
public class RunConsumer {
private static Properties props = new Properties();
static{
//設置kafka集群的地址
props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
//設置消費者組,組名字自定義,組名字相同的消費者在一個組
props.put("group.id", "kafka_api_group_1");
//開啟offset自動提交
props.put("enable.auto.commit", "true");
//自動提交時間間隔
props.put("auto.commit.interval.ms", "1000");
//序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}
public static void main(String[] args) {
//實例化一個消費者
final List<ConsumerRunner> consumers = new ArrayList<>();
final List<KafkaConsumer<String, String>> kafkaConsumers = new ArrayList<>();
for(int i=0;i<2;i++){
kafkaConsumers.add(new KafkaConsumer<String, String>(props));
}
//倒計時,利用await方法使主線程阻塞,利用countDown遞減,當遞減到0時,喚醒主線程,功能類似于join
final CountDownLatch latch = new CountDownLatch(2);
ExecutorService executor = Executors.newFixedThreadPool(2);
for(int i=0;i<2;i++){
ConsumerRunner c= new ConsumerRunner(kafkaConsumers.get(i),latch);
consumers.add(c);
executor.submit(c);
}
/**
* 這個方法的意思就是在jvm中增加一個關閉的鉤子,當JVM關閉時,會執行系統中已經設置的所有
* 方法addShutdownHook添加的鉤子,當系統執行完成這些鉤子后,jvm才會關閉,
* 所以這些鉤子可以在jvm關閉的時候進行內存清理、對象銷毀、關閉連接等操作。
*/
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {
System.out.println("....................");
for(ConsumerRunner consumer:consumers){
consumer.shutdown();
}
executor.shutdown();
try {
executor.awaitTermination(5000, TimeUnit.MICROSECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。