您好,登錄后才能下訂單哦!
本篇內容主要講解“RabbitMQ用多路由,多隊列來破除流控”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“RabbitMQ用多路由,多隊列來破除流控”吧!
流控機制是我們在使用RabbitMQ最頭疼的問題,一旦并發激增時,消費者消費隊列消息就像滴水一樣慢。
現在我們下單后,需要給通知中心發送消息,讓通知中心通知服務商收取訂單,并確認提供服務。
我們先給Order接口添加一個發送消息的方法。
public interface Order {public void makeOrder(Order order); public OrderSuccessResult getResult(Order order); public void postOrder(Order order);}
實現類實現該方法
@Data@AllArgsConstructor@NoArgsConstructor@ServiceOrderVersion(value = 1)@RequiredArgsConstructorpublic class ServiceOrder extends AbstractOrder {private Long id; @NonNull private String code; @NonNull private Store store; @NonNull private ProviderService service; @NonNull private Car car; @NonNull private Date serviceDate; @NonNull private String contact; @NonNull private String contactTel; private AppUser user; @NonNull private String content; private int status; private Date createDate; @Override public void makeOrder(Order order) { ServiceOrderDao serviceOrderDao = SpringBootUtil.getBean(ServiceOrderDao.class); IdService idService = SpringBootUtil.getBean(IdService.class); ((ServiceOrder)order).setId(idService.genId()); ((ServiceOrder)order).setCode(getCodeInfo(idService)); AppUser loginAppUser = AppUserUtil.getLoginAppUser(); AppUser user = new AppUser(); user.setId(loginAppUser.getId()); user.setUsername(loginAppUser.getUsername()); ((ServiceOrder)order).setUser(user); ((ServiceOrder)order).setStatus(1); ((ServiceOrder)order).setCreateDate(new Date()); serviceOrderDao.save((ServiceOrder) order); }@Override public OrderSuccessResult getResult(Order order) { ServiceOrderSuccessResultFactory orderSuccessResultFactory = SpringBootUtil.getBean(ServiceOrderSuccessResultFactory.class); this.orderSuccessResult = orderSuccessResultFactory.getOrderSuccessResult(); return this.orderSuccessResult.getResult(order); }@Override public void postOrder(Order order) { MessageSender sender = SpringBootUtil.getBean(MessageSender.class); CompletableFuture.runAsync(() ->sender.send(OwnerCarCenterMq.MQ_EXCHANGE_ORDER, OwnerCarCenterMq.ROUTING_KEY_ORDER, order) ); }private String getCodeInfo(IdService idService) { String flow = String.valueOf(idService.genId()); flow = flow.substring(14,flow.length()); String pre = DateUtils.format(new Date(), DateUtils.pattern9); return pre + flow; } }
其中我們定義了這么一組隊列名,交換機,和路由
public interface OwnerCarCenterMq {/** * 隊列名 */ String ORDER_QUEUE = "order"; /** * 服務系統exchange名 */ String MQ_EXCHANGE_ORDER = "order.topic.exchange"; /** * 服務添加routing key */ String ROUTING_KEY_ORDER = "post.order";}
為了避免流控,我們定義了10個隊列,并全部綁定到一個交換機上。
@Configurationpublic class RabbitmqConfig { @Bean public List<Queue> orderQueues() { List<Queue> queues = new ArrayList<>(); for (int i = 1;i < 11;i++) { Queue queue = new Queue(OwnerCarCenterMq.ORDER_QUEUE + "_" + i); queues.add(queue); } return queues; } @Bean public TopicExchange orderExchange() { return new TopicExchange(OwnerCarCenterMq.MQ_EXCHANGE_ORDER); } @Bean public List<Binding> bindingOrders() { List<Binding> bindings = new ArrayList<>(); for (int i = 1;i < 11;i++) { Binding binding = BindingBuilder.bind(orderQueues().get(i - 1)).to(orderExchange()) .with(OwnerCarCenterMq.ROUTING_KEY_ORDER + "_" + i); bindings.add(binding); } return bindings; } }
重新封裝消息提供者,每次發送都隨機選取一個路由來進行發送。
@Slf4j@Componentpublic class MessageSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@Autowired private RabbitTemplate rabbitTemplate; public void send(String exchange,String routingKey,Object content) {log.info("send content=" + content); this.rabbitTemplate.setMandatory(true); this.rabbitTemplate.setConfirmCallback(this); this.rabbitTemplate.setReturnCallback(this); ThreadLocalRandom random = ThreadLocalRandom.current(); this.rabbitTemplate.convertAndSend(exchange,routingKey + "_" + random.nextInt(1,11),serialize(content)); }/** * 確認后回調: * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) {if (!ack) {log.info("send ack fail, cause = " + cause); } else {log.info("send ack success"); } }/** * 失敗后return回調: * * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey); }/** * 對消息對象進行二進制序列化 * @param o * @return */ private byte[] serialize(Object o) { Kryo kryo = new Kryo(); ByteArrayOutputStream stream = new ByteArrayOutputStream(); Output output = new Output(stream); kryo.writeObject(output, o); output.close(); return stream.toByteArray(); } }
我們可以看到在ServiceOrder里,我們是通過異步來進行發送到。
Controller如下
@Slf4j@RestControllerpublic class OrderController {private ThreadLocal<OrderFactory> orderFactory = new ThreadLocal<>(); private ThreadLocal<Order> orderService = new ThreadLocal<>(); @Autowired private OrderBean orderBean; @Transactional @SuppressWarnings("unchecked")@PostMapping("/makeeorder")public Result<OrderSuccessResult> makeOrder(@RequestBody String orderStr, @RequestParam("type") String type) {log.info(orderStr); Order order = setOrderFactory(orderStr,type); orderService.get().makeOrder(order); orderService.get().postOrder(order); return Result.success(orderService.get().getResult(order)); }/** * 判斷是哪一種類型的訂單來獲取哪一種類型的具體訂單工廠 * @param orderStr * @return */ private Order setOrderFactory(String orderStr,String type) { Class<?> classType = orderBean.getOrderMap().get(type); Object order = JSONObject.parseObject(orderStr, classType);// if (orderStr.contains("service")) {// order = JSON.parseObject(orderStr, ServiceOrder.class);// }else if (orderStr.contains("product")) {// order = JSON.parseObject(orderStr, ProductOrder.class);// } Class<?> classFactoryType = orderBean.getOrderFactoryMap().get(type + "Factory"); this.orderFactory.set((OrderFactory) SpringBootUtil.getBean(classFactoryType));// if (order instanceof ServiceOrder) {// this.orderFactory.set(SpringBootUtil.getBean(ServiceOrderFactory.class));// }else if (order instanceof ProductOrder) {// this.orderFactory.set(SpringBootUtil.getBean(ProductOrderFactory.class));// } orderService.set(orderFactory.get().getOrder()); return (Order) order; } }
最后是在我們的通知中心模塊接收消息,同時對這10個隊列實行監控
@Slf4j@Component@RabbitListener(queues = {OwnerCarCenterMq.ORDER_QUEUE + "_" + 1, OwnerCarCenterMq.ORDER_QUEUE + "_" + 2, OwnerCarCenterMq.ORDER_QUEUE + "_" + 3, OwnerCarCenterMq.ORDER_QUEUE + "_" + 4, OwnerCarCenterMq.ORDER_QUEUE + "_" + 5, OwnerCarCenterMq.ORDER_QUEUE + "_" + 6, OwnerCarCenterMq.ORDER_QUEUE + "_" + 7, OwnerCarCenterMq.ORDER_QUEUE + "_" + 8, OwnerCarCenterMq.ORDER_QUEUE + "_" + 9, OwnerCarCenterMq.ORDER_QUEUE + "_" + 10})public class ServiceOrderConsummer {@Getter private Queue<ServiceOrder> serviceOrders = new ConcurrentLinkedDeque<>(); @RabbitHandler public void receiceOrder(byte[] data, Channel channel, Message message) throws IOException {try {//告訴服務器收到這條消息 已經被我消費了 可以在隊列刪掉;否則消息服務器以為這條消息沒處理掉 后續還會在發 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); ServiceOrder order = unSerialize(data); this.serviceOrders.add(order); log.info(String.valueOf(order)); } catch (IOException e) { e.printStackTrace(); //丟棄這條消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); log.info("receiver fail"); } }/** * 反序列化 * @param data * @return */ private ServiceOrder unSerialize(byte[] data) { Input input = null; try { Kryo kryo = new Kryo(); input = new Input(new ByteArrayInputStream(data)); return kryo.readObject(input,ServiceOrder.class); }finally { input.close(); } } }
項目啟動后,我們可以看到rabbitmq的情況如下
現我們來對其進行壓測,啟動Jmeter,我們使用1000線程來進行壓測測試。各配置如下
保存文件上傳服務器,因為本人是華為云的服務器,故在服務器上進行壓測,不進行遠程壓測
在服務器的jmeter的bin目錄下輸入
./jmeter -n -t model/rabbit.jmx -l log.jtl
這里-n為不啟動圖形界面,-t使用我們上傳的配置文件,-l記錄日志
壓測結果如下
我們在壓測過程中來看一下rabbitmq的UI界面
消費基本上是實時的,沒有出現流控積壓現象。
到此,相信大家對“RabbitMQ用多路由,多隊列來破除流控”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。