您好,登錄后才能下訂單哦!
本篇內容主要講解“Springboot微服務項目整合Kafka如何實現文章上下架功能”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Springboot微服務項目整合Kafka如何實現文章上下架功能”吧!
創建一個Controller包并編寫一個測試類用于發送消息
package com.my.kafka.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class HelloController { @Autowired private KafkaTemplate<String,String> kafkaTemplate; @GetMapping("hello") public String helloProducer(){ kafkaTemplate.send("my-topic","Hello~"); return "ok"; } }
編寫測試類用于接收消息:
package com.my.kafka.listener; import org.junit.platform.commons.util.StringUtils; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class HelloListener { @KafkaListener(topics = "my-topic") public void helloListener(String message) { if(StringUtils.isNotBlank(message)) { System.out.println(message); } } }
打開瀏覽器輸入localhost:9991/hello,然后到控制臺查看消息,可以看到成功消息監聽到并且進行了消費。
目前springboot整合后的kafka,因為序列化器是StringSerializer,這個時候如果需要傳遞對象可以有兩種方式:
方式一:可以自定義序列化器,對象類型眾多,這種方式通用性不強,這里不做介紹。
方式二:可以把要傳遞的對象進行轉json字符串,接收消息后再轉為對象即可,本項目采用這種方式。
@GetMapping("hello") public String helloProducer(){ User user = new User(); user.setName("趙四"); user.setAge(20); kafkaTemplate.send("my-topic", JSON.toJSONString(user)); return "ok"; }
可以看到成功接收都對象參數,后期要使用該對象只需要將其轉換成User對象即可。
發布文章之后,可能會由于文章出現某些錯誤或者其他原因,我們會在文章管理端實現文章的上下架功能(見下圖),也即當管理端實現對文章下架之后移動端將不會再展示該文章,只有該文章重新被上架之后才能在移動端看到該文章信息。
后端接收到前端傳過來的參數之后要先做一個校驗,參數不為空才能繼續往下執行,首先應該根據前端傳過來的文章id(自媒體端文章id)查詢自媒體數據庫的文章信息并判斷該文章是否已是發布狀態,因為只有審核成功并成功發布了的文章才能進行上下架操作。自媒體端微服務對文章上下架狀態進行修改之后便可以向Kafka發送一條消息,該消息為Map對象,里面存儲的數據為移動端的文章id以及前端傳過來的上下架參數enable,當然要將該Map對象轉換成JSON字符串才能進行發送。
文章微服務監聽到Kafka發送過來的消息之后將JSON字符串轉換成Map對象之后再獲取相關參數對移動端文章的上下架狀態進行修改。
<!-- kafkfa --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency>
package com.my.common.constans; public class WmNewsMessageConstants { public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic"; }
由于我是用Nacos來作為注冊中心,所以配置信息放置在Nacos上面即可。
(1)自媒體端配置
spring: kafka: bootstrap-servers: 4.234.52.122:9092 producer: retries: 10 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
(2)移動端配置
spring: kafka: bootstrap-servers: 4.234.52.122:9092 consumer: group-id: ${spring.application.name}-test key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
@Autowired private KafkaTemplate<String,String> kafkaTemplate; /** * 文章下架或上架 * @param id * @param enable * @return */ @Override public ResponseResult downOrUp(Integer id,Integer enable) { log.info("執行文章上下架操作..."); if(id == null || enable == null) { return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID); } //根據id獲取文章 WmNews news = getById(id); if(news == null) { return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章信息不存在"); } //獲取當前文章狀態 Short status = news.getStatus(); if(!status.equals(WmNews.Status.PUBLISHED.getCode())) { return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"文章非發布狀態,不能上下架"); } //更改文章狀態 news.setEnable(enable.shortValue()); updateById(news); log.info("更改文章上架狀態{}-->{}",status,news.getEnable()); //發送消息到Kafka Map<String, Object> map = new HashMap<>(); map.put("articleId",news.getArticleId()); map.put("enable",enable.shortValue()); kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map)); log.info("發送消息到Kafka..."); return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }
(1)設置監聽器
package com.my.article.listener; import com.baomidou.mybatisplus.core.toolkit.StringUtils; import com.my.article.service.ApArticleService; import com.my.common.constans.WmNewsMessageConstants; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.kafka.annotation.KafkaListener; @Slf4j @Component public class EnableListener { @Autowired private ApArticleService apArticleService; @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC) public void downOrUp(String message) { if(StringUtils.isNotBlank(message)) { log.info("監聽到消息{}",message); apArticleService.downOrUp(message); } } }
(2)獲取消息并修改文章狀態
/** * 文章上下架 * @param message * @return */ @Override public ResponseResult downOrUp(String message) { Map map = JSON.parseObject(message, Map.class); //獲取文章id Long articleId = (Long) map.get("articleId"); //獲取文章待修改狀態 Integer enable = (Integer) map.get("enable"); //查詢文章配置 ApArticleConfig apArticleConfig = apArticleConfigMapper.selectOne (Wrappers.<ApArticleConfig>lambdaQuery().eq(ApArticleConfig::getArticleId, articleId)); if(apArticleConfig != null) { //上架 if(enable == 1) { log.info("文章重新上架"); apArticleConfig.setIsDown(false); apArticleConfigMapper.updateById(apArticleConfig); } //下架 if(enable == 0) { log.info("文章下架"); apArticleConfig.setIsDown(true); apArticleConfigMapper.updateById(apArticleConfig); } } else { throw new RuntimeException("文章信息不存在"); } return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }
到此,相信大家對“Springboot微服務項目整合Kafka如何實現文章上下架功能”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。