您好,登錄后才能下訂單哦!
業務場景
我們公司是做共享充電寶的業務的。有一些比較大的代理商或者ka商戶,他們需要了解到他們自己下面的商戶的訂單數據,這些訂單數據需要由我們推送給他們。
大致架構為數據部門通過canal訂閱訂單表的數據,然后推送到kafka ,我們訂閱數據部門kafka獲取到代理商下商戶的實時訂單數據再推送給代理商。比如,代理商下商戶產生了一筆訂單,整個過程會產生,訂單生成,訂單已支付,充電寶已被取走,充電寶已歸還等多種狀態的訂單消息,我們需要實時把這些訂單消息推送給代理商。我們的業務場景需要消息的順序推送和多線程并發消費以提高性能
kafka多線程消費方案
消費者程序啟動多個線程,每個線程維護專屬的KafkaConsumer實例,負責完整的消息獲取、消息處理
流程。如下圖所示:
消費者程序使用單或多線程獲取消息,同時創建多個消費線程執行消息處理邏輯。獲取消息的線程可以 是一個,也可以是多個,每個線程維護專屬的KafkaConsumer實例,處理消息則交由特定的線程池來 做,從而實現消息獲取與消息處理的真正解耦。具體架構如下圖所示:
這兩種方案孰優孰劣呢?應該說是各有千秋。這兩種方案的優缺點,我們先來看看下面這張表格。
kafka怎么保證順序消費
保證順序消費,需要滿足如下條件
保證相同訂單編號的消息需要發送到同一個分區。
@Configuration
public class SenderConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map producerConfigs() {
Map props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}
public class Sender {
@Autowired
private KafkaTemplate kafkaTemplate;
public void send(String topic, String data) {
kafkaTemplate.send(topic, data);
}
public void send(String topic, int partition, String data) {
kafkaTemplate.send(topic, partition, data);
}
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringKafkaApplicationTest {
private static String BATCH_TOPIC = "batch.t";
private static Integer PARTITIONS = 6;
/**
* 已支付
*/
private static Integer PAYED_STATUS = 2;
/**
* 已取走
*/
private static Integer SEND_BACK_STATUS = 3;
@Autowired
private Sender sender;
private static DelayQueue delayQueue = new DelayQueue();
@Test
public void testReceive() throws Exception {
for (int i = 1; i < 50; i++) {
Integer orderNum = 800010 + i;
Integer orderPrice = RandomUtil.randomInt(1, 20);
// 用戶支付成功,訂單狀態為支付成功
OrderDTO order = new OrderDTO(orderNum, orderPrice, PAYED_STATUS);
// 發送支付成功訂單消息到對應的kafka分區
Integer destinationPartition = orderNum % PARTITIONS;
sender.send(BATCH_TOPIC, destinationPartition, JSONUtil.toJsonStr(order));
// 創建任務放入延遲隊列(模擬用戶支付成功到取走充電寶花費的時間)
long delayTime = 200;
OrderTask orderTask = new OrderTask(delayTime, order);
delayQueue.offer(orderTask);
}
while (true) {
// 用戶取走充電寶,訂單狀態更改為 已取走
OrderTask orderTask = (OrderTask) delayQueue.take();
OrderDTO orderDTO = orderTask.getOrderDTO();
Integer destinationPartition = orderDTO.getOrderNum() % PARTITIONS;
orderDTO.setOrderStatus(SEND_BACK_STATUS);
// 發送已取走訂單消息到對應的kafka 分區
sender.send(BATCH_TOPIC, destinationPartition, JSONUtil.toJsonStr(orderDTO));
}
}
}
可以看出我們通過訂單號對分區數進行取余,來確定該消息發送到哪一個分區,保證相同訂單號的消息被發送到相同的分區。當然也可以對字符串這些進行hash ,獲得hash值來對分區數取余
Integer destinationPartition=orderDTO.getOrderNum()%PARTITIONS;
保證同一個分區的消息由同一個線程來消費。
我們的業務場景需要采用多線程方案一來處理我們的業務
普通方式實現方案一
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public KafkaConsumerRunner(KafkaConsumer consumer) {
this.consumer = consumer;
}
@Override
public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
// 執行消息處理邏輯
ConsumerRecords records = consumer.poll(10000);
}
} catch (Exception e) {
// Ignore exception if closing
if (!closed.get()) {
throw e;
}
} finally {
consumer.close();
}
}
/**
* Shutdown hook which can be called from a separate thread
*/
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
spring-kafka為我們做的封裝
消費者相關配置:
這里我們需要注意的是factory.setConcurrency(4)。
這個是配置主要是設置KafkaConsumer的數量,最大為topic 的分區數。當然你如果設置的值超過topic 分區數,spring-kafka 還是只會為我們創建最大分區數的KafkaConsumer數量,也就是創建KafkaConsumer數量能少于分區數,但不會超過分區數。少于分區數的話,一個KafkaConsumer會消費多個分區的數據,保證所有的分區數據都有對應的KafkaConsumer來進行消費;但不會出現多個KafkaConsumer消費同一個分區的情況,因為如果是這樣也就無法保證消息的順序消費機制。
一般情況下如果數據量較大,我們需要把此值設置為topic分區數,這樣一個KafkaConsumer消費一個分區的數據,提高數據的并發消費能力。
@Configuration
@EnableKafka
public class ReceiverConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map consumerConfigs() {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
// maximum records per poll
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
return props;
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// enable batch listening
factory.setBatchListener(true);
factory.setConcurrency(4);
return factory;
}
@Bean
public Receiver receiver() {
return new Receiver();
}
}
Receiver 代碼
public class Receiver {
@Autowired
private PushOrderService pushOrderService;
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private static final String BATCH_TOPIC = "batch.t";
@KafkaListener(topics = BATCH_TOPIC, containerFactory = "kafkaListenerContainerFactory")
public void receivePartitions(List data,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List partitions,
@Header(KafkaHeaders.OFFSET) List offsets) {
for (int i = 0; i < data.size(); i++) {
Long threadId = Thread.currentThread().getId();
// 向第三方推送訂單消息
String orderStr = data.get(i);
pushOrderService.pushOrderToPlatform(orderStr);
OrderDTO orderDTO = JSONUtil.toBean(orderStr, OrderDTO.class);
LOGGER.info("推送訂單消息成功,訂單號為:{},狀態為:{},分區為{},處理線程為:{}", orderDTO.getOrderNum(), orderDTO.getOrderStatus(), partitions.get(i), threadId);
}
}
}
/**
* 模擬網絡推送訂單信息給第三方平臺
*/
@Service
public class PushOrderService {
/**
* 已支付
*/
private static Integer PAYED_STATUS = 2;
public void pushOrderToPlatform(String orderString) {
// 模擬網絡推送訂單信息給第三方平臺(同步推送)
OrderDTO orderDTO = JSONUtil.toBean(orderString, OrderDTO.class);
// 已支付 訂單消息
if (orderDTO.getOrderStatus().equals(PAYED_STATUS)) {
ThreadUtil.sleep(500);
} else {
// 已取走 訂單消息
ThreadUtil.sleep(200);
}
}
}
測試結果: 無錫做人流手術多少錢 http://www.ytsg029.com/
16:17:47.026 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800014,狀態為:2,分區為4,處理線程為:67
16:17:47.026 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800012,狀態為:2,分區為2,處理線程為:66
16:17:47.534 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800020,狀態為:2,分區為4,處理線程為:67
16:17:47.534 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800018,狀態為:2,分區為2,處理線程為:66
16:17:48.035 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800026,狀態為:2,分區為4,處理線程為:67
16:17:48.036 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800024,狀態為:2,分區為2,處理線程為:66
16:17:48.537 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800015,狀態為:2,分區為5,處理線程為:67
16:17:48.539 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800016,狀態為:2,分區為0,處理線程為:66
16:17:49.044 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800022,狀態為:2,分區為0,處理線程為:66
16:17:49.045 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800021,狀態為:2,分區為5,處理線程為:67
16:17:49.546 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800013,狀態為:2,分區為3,處理線程為:67
16:17:49.547 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800028,狀態為:2,分區為0,處理線程為:66
16:17:50.051 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800019,狀態為:2,分區為3,處理線程為:67
16:17:50.051 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800011,狀態為:2,分區為1,處理線程為:66
16:17:50.554 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800025,狀態為:2,分區為3,處理線程為:67
16:17:50.554 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800017,狀態為:2,分區為1,處理線程為:66
16:17:51.060 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800023,狀態為:2,分區為1,處理線程為:66
16:17:51.576 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800034,狀態為:2,分區為0,處理線程為:66
16:17:51.579 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800031,狀態為:2,分區為3,處理線程為:67
16:17:51.583 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800032,狀態為:2,分區為4,處理線程為:70
16:17:51.583 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800027,狀態為:2,分區為5,處理線程為:72
16:17:52.079 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800040,狀態為:2,分區為0,處理線程為:66
16:17:52.083 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800037,狀態為:2,分區為3,處理線程為:67
16:17:52.088 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800038,狀態為:2,分區為4,處理線程為:70
16:17:52.088 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800033,狀態為:2,分區為5,處理線程為:72
16:17:52.583 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800046,狀態為:2,分區為0,處理線程為:66
16:17:52.588 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800043,狀態為:2,分區為3,處理線程為:67
16:17:52.589 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800044,狀態為:2,分區為4,處理線程為:70
16:17:52.590 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800039,狀態為:2,分區為5,處理線程為:72
16:17:53.089 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800052,狀態為:2,分區為0,處理線程為:66
16:17:53.091 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800050,狀態為:2,分區為4,處理線程為:70
16:17:53.091 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800049,狀態為:2,分區為3,處理線程為:67
16:17:53.095 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800045,狀態為:2,分區為5,處理線程為:72
16:17:53.591 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800058,狀態為:2,分區為0,處理線程為:66
16:17:53.592 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800056,狀態為:2,分區為4,處理線程為:70
16:17:53.593 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800055,狀態為:2,分區為3,處理線程為:67
16:17:53.600 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800051,狀態為:2,分區為5,處理線程為:72
16:17:53.795 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800016,狀態為:3,分區為0,處理線程為:66
16:17:53.796 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800013,狀態為:3,分區為3,處理線程為:67
16:17:53.796 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800014,狀態為:3,分區為4,處理線程為:70
16:17:54.000 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800020,狀態為:3,分區為4,處理線程為:70
16:17:54.000 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800019,狀態為:3,分區為3,處理線程為:67
16:17:54.000 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800022,狀態為:3,分區為0,處理線程為:66
16:17:54.101 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800057,狀態為:2,分區為5,處理線程為:72
16:17:54.205 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800026,狀態為:3,分區為4,處理線程為:70
16:17:54.206 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800025,狀態為:3,分區為3,處理線程為:67
16:17:54.206 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800028,狀態為:3,分區為0,處理線程為:66
16:17:54.306 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800015,狀態為:3,分區為5,處理線程為:72
16:17:54.410 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800037,狀態為:3,分區為3,處理線程為:67
16:17:54.410 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800034,狀態為:3,分區為0,處理線程為:66
16:17:54.410 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800032,狀態為:3,分區為4,處理線程為:70
16:17:54.510 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800021,狀態為:3,分區為5,處理線程為:72
16:17:54.614 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800031,狀態為:3,分區為3,處理線程為:67
16:17:54.615 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800046,狀態為:3,分區為0,處理線程為:66
16:17:54.615 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800038,狀態為:3,分區為4,處理線程為:70
16:17:54.711 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800027,狀態為:3,分區為5,處理線程為:72
16:17:54.820 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800043,狀態為:3,分區為3,處理線程為:67
16:17:54.820 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800040,狀態為:3,分區為0,處理線程為:66
16:17:54.820 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800056,狀態為:3,分區為4,處理線程為:70
16:17:54.914 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800033,狀態為:3,分區為5,處理線程為:72
16:17:55.025 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800044,狀態為:3,分區為4,處理線程為:70
16:17:55.025 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800052,狀態為:3,分區為0,處理線程為:66
16:17:55.025 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800055,狀態為:3,分區為3,處理線程為:67
16:17:55.118 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800039,狀態為:3,分區為5,處理線程為:72
16:17:55.231 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800050,狀態為:3,分區為4,處理線程為:70
16:17:55.231 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800049,狀態為:3,分區為3,處理線程為:67
16:17:55.231 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800058,狀態為:3,分區為0,處理線程為:66
16:17:55.321 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800057,狀態為:3,分區為5,處理線程為:72
16:17:55.525 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800051,狀態為:3,分區為5,處理線程為:72
16:17:55.728 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800045,狀態為:3,分區為5,處理線程為:72
16:17:55.735 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800029,狀態為:2,分區為1,處理線程為:66
16:17:55.737 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800030,狀態為:2,分區為2,處理線程為:67
16:17:56.239 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800036,狀態為:2,分區為2,處理線程為:67
16:17:56.239 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800035,狀態為:2,分區為1,處理線程為:66
16:17:56.743 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800042,狀態為:2,分區為2,處理線程為:67
16:17:56.743 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800041,狀態為:2,分區為1,處理線程為:66
16:17:57.247 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800047,狀態為:2,分區為1,處理線程為:66
16:17:57.247 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800048,狀態為:2,分區為2,處理線程為:67
16:17:57.751 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800053,狀態為:2,分區為1,處理線程為:66
16:17:57.751 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800054,狀態為:2,分區為2,處理線程為:67
16:17:57.953 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800012,狀態為:3,分區為2,處理線程為:67
16:17:58.159 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800018,狀態為:3,分區為2,處理線程為:67
16:17:58.256 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800059,狀態為:2,分區為1,處理線程為:66
16:17:58.361 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800024,狀態為:3,分區為2,處理線程為:67
16:17:58.457 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800011,狀態為:3,分區為1,處理線程為:66
16:17:58.566 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800048,狀態為:3,分區為2,處理線程為:67
16:17:58.662 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800017,狀態為:3,分區為1,處理線程為:66
16:17:58.771 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800042,狀態為:3,分區為2,處理線程為:67
16:17:58.868 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800023,狀態為:3,分區為1,處理線程為:66
16:17:58.975 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800030,狀態為:3,分區為2,處理線程為:67
16:17:59.073 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800029,狀態為:3,分區為1,處理線程為:66
16:17:59.177 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800036,狀態為:3,分區為2,處理線程為:67
16:17:59.279 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800041,狀態為:3,分區為1,處理線程為:66
16:17:59.383 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800054,狀態為:3,分區為2,處理線程為:67
16:17:59.481 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800035,狀態為:3,分區為1,處理線程為:66
16:17:59.685 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800053,狀態為:3,分區為1,處理線程為:66
16:17:59.891 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800047,狀態為:3,分區為1,處理線程為:66
16:18:00.092 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800059,狀態為:3,分區為1,處理線程為:66
完整代碼
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。