您好,登錄后才能下訂單哦!
什么是MQ
跨進程的消息隊列,主要角色包括生產者與消費者。
生產者只負責生產信息,無法感知消費者是誰,消息怎么處理,處理結果是什么。
消費者負責接收及處理消息,無法感知生產者是誰,怎么產生的。
Mq能做什么?
MQ 特性一般有異步,吞吐量大 ,延時低;
適合做:
由于MQ是異步處理消息的,所以MQ不適合做同步處理操作,如果需要及時的返回處理結果請不要用MQ;
MQ 個系統帶來了什么?
缺點:增加了系統的復雜性,除了代碼組件接入以外還需要考慮,高可用,集群,消息的可靠性等問題!
生產者:消息發送怎么保證可靠性,怎么保證不重復!
消費者:怎么保證冪等性,接收到重復消息怎么處理!
還有會帶來的處理延時等問題!
優點: 解耦,利用MQ我們可以很好的給我們系統解耦,特別是分布式/微服系統!
原來的同步操作,可以用異步處理,也可以帶來更快的響應速度;
場景 (1)
系統解耦,用戶系統或者其他系統需要發送短信可以通過 MQ 執行;很好的將 用戶系統 和 短信系統進行解耦;
場景(2)
順序執行的任務場景,假設 A B C 三個任務,B需要等待 A完成才去執行,C需要等待B完成才去執行;
我見過一些同學的做法是 ,用 三個定時器 錯開時間去執行的,假設 A定時器 9 點執行, B 定時器 10 點執行 , C 11 點執行 , 類似這樣子;
這樣做其實是 不安全的, 因為 后一個任務 無法知道 前一個任務是否 真的執行了! 假設 A 宕機了, 到 10 點 B 定時去 執行,這時候 數據就會產生異常!
當我們 引入 MQ 后 可以這么做, A執行完了 發送 消息給 B ,B收到消息后 執行,C 類似,收到 B消息后執行;
場景(3)
支付網關的通知,我們的系統常常需要接入支付功能,微信或者支付寶通常會以回調的形式通知我們系統支付結果。
我們可以將我們的支付網關獨立出來,通過MQ通知我們業務系統進行處理,這樣處理有利于系統的解耦和擴展!
假設我們還有一個積分系統,用戶支付成功,給用戶添加積分。只需要積分系統監聽這個消息,并處理積分就好,無需去修改再去修改網關層代碼!
如果沒有使用MQ ,我是不是還得去修改網關系統的代碼,遠程調用增加積分的接口?
這就是使用了MQ的好處,解耦和擴展!
當然我們的轉發規則也要保證每個感興趣的隊列能獲取到消息!
場景(4)
微服/分布式系統,分布式事務 - 最終一致性 處理方案!
詳情:?分布式事務處理方案,微服事務處理方案
場景(5)
我們以前的做法是 通常啟用一個定時器,每分鐘或者每小時,去跑一次取出需要處理的訂單或其他數據進行處理。
這種做法一個是 效率比較低,如果數據量大的話,每次都要掃庫,非常要命!
再者時效性不是很高,最差的時候可能需要等待一輪時長!
還有可能出現重復執行的結果,時效和輪詢的頻率難以平衡!
利用MQ(Rabbitmq),DLX (Dead Letter Exchanges)和 消息的 TTL (Time-To-Live Extensions)特性。我們可以高效的完成這個任務場景!不需要掃庫,時效性更好!
DLX:http://www.rabbitmq.com/dlx.html,
TTL:http://www.rabbitmq.com/ttl.html#per-message-ttl
原理:
發送到隊列的消息,可以設置一個存活時間 TTL,在存活時間內沒有被消費,可以設置這個消息轉發到其他隊列里面去;然后我們從這個其他隊列里面消費執行我們的任務,這樣就可以達到一個消息延時的效果!
設置過期時間:
過期時間可以統一設置到消息隊列里面,也可以單獨設置到某個消息!
PS 如果消息設置了過期時間,發生到了設置有過期時間的隊列,已隊列設置的過期時間為準!
已 SpringBoot 為例:
配置轉發隊列和被轉發隊列:
@Component
@Configuration
public class RabbitMqConfig {
@Bean
public Queue curQueue() {
Map<String, Object> args = new HashMap<String, Object>();
//超時后的轉發器 過期轉發到 delay_queue_exchange
args.put("x-dead-letter-exchange", "delay_queue_exchange");
//routingKey 轉發規則
args.put("x-dead-letter-routing-key", "user.#");
//過期時間 20 秒
args.put("x-message-ttl", 20000);
return new Queue("cur_queue", false, false, false, args);
}
@Bean
public Queue delayQueue() {
return new Queue("delay_queue");
}
@Bean
TopicExchange exchange() {
//當前隊列
return new TopicExchange("cur_queue_exchange");
}
@Bean
TopicExchange exchange2() {
//被轉發的隊列
return new TopicExchange("delay_queue_exchange");
}
@Bean
Binding bindingHelloQueue(Queue curQueue, TopicExchange exchange) {
//綁定隊列到轉發器
return BindingBuilder.bind(curQueue).to(exchange).with("user.#");
}
@Bean
Binding bindingHelloQueue2(Queue delayQueue, TopicExchange exchange2) {
return BindingBuilder.bind(delayQueue).to(exchange2).with("user.#");
}
}
發生消息:
@Component
public class MqEventSender {
Logger logger = LoggerFactory.getLogger(MqEventSender.class);
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 消息沒有設置 時間
* 發生到隊列 cur_queue_exchange
* @param msg
*/
public void sendMsg(String msg) {
logger.info("發送消息: " + msg);
rabbitTemplate.convertAndSend("cur_queue_exchange", "user.ss", msg);
}
/**
* 消息設置時間
* 發生到隊列 cur_queue_exchange
* @param msg
*/
public void sendMsgWithTime(String msg) {
logger.info("發送消息: " + msg);
MessageProperties messageProperties = new MessageProperties();
//過期時間設置 10 秒
messageProperties.setExpiration("10000");
Message message = rabbitTemplate.getMessageConverter().toMessage(msg, messageProperties);
rabbitTemplate.convertAndSend("cur_queue_exchange", "user.ss", message);
}
}
消息監聽:
監聽 的隊列是 delay_queue 而不是 cur_queue;
PS cur_queue 不應該有監聽者,否則消息被消費達不到想要的延時消息效果!
/**
* Created by linli on 2017/8/21.
* 監聽 被丟到 超時隊列內容
*/
@Component
@RabbitListener(queues = "delay_queue")
public class DelayQueueListener {
public static Logger logger = LoggerFactory.getLogger(AddCommentsEventListener.class);
@RabbitHandler
public void process(@Payload String msg) {
logger.info("收到消息 "+msg);
}
}
測試:
/**
* Created by linli on 2017/8/21.
*/
@RestController
@RequestMapping("/test")
public class TestContorller {
@Autowired
MqEventSender sender;
@RequestMapping("/mq/delay")
public String test() {
sender.sendMsg("隊列延時消息!");
sender.sendMsgWithTime("消息延時消息!");
return "";
}
}
結果:
觀察結果發現:發送時間 和 收到時間 間隔 20秒 ;
我們給消息設置的 10 秒 TTL 時間沒有生效!驗證了 : 如果消息設置了過期時間,發生到了設置有過期時間的隊列,已隊列設置的過期時間為準!
如果希望每個消息都要自己的存活時間,發送到隊列 不要設置
args.put(“x-message-ttl”, 20000);
消息的過期時間 設置在隊列還是消息,根據自己的業務場景去定!
MQ 是一個跨進程的消息隊列,我們可以很好的利用他進行系統的解耦;
引入MQ會給系統帶來一定的復雜度,需要評估!
MQ 適合做異步任務,不適合做同步任務!
針對于上面所涉及到的知識點我總結出了有1到5年開發經驗的程序員在面試中涉及到的絕大部分架構面試題及答案做成了文檔和架構視頻資料免費分享給大家(包括Dubbo、Redis、Netty、zookeeper、Spring cloud、分布式、高并發等架構技術資料),希望能幫助到您面試前的復習且找到一個好的工作,也節省大家在網上搜索資料的時間來學習,也可以關注我一下以后會有更多干貨分享。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。