您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關RocketMQ中怎么實現權限控制,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
1、簡單使用
ACL是access control list的簡稱,俗稱訪問控制列表。訪問控制,基本上會涉及到用戶、資源、權限、角色等概念,那在RocketMQ中上述會對應哪些對象呢?
用戶:用戶是訪問控制的基礎要素,RocketMQ ACL必然也會引入用戶的概念,即支持用戶名、密碼。 資源:需要保護的對象,消息發送涉及的Topic、消息消費涉及的消費組,應該進行保護,故可以抽象成資源。 權限:針對資源,能進行的操作。 角色:RocketMQ中,只定義兩種角色:是否是管理員。
acl默認的配置文件名:plain_acl.yml,需要放在${ROCKETMQ_HOME}/store/config目錄下
需要使用acl必須在服務端開啟此功能,在Broker的配置文件中配置,aclEnable = true開啟此功能
配置plain_acl.yml文件
globalWhiteRemoteAddresses: - 10.10.15.* - 192.168.0.* accounts: - accessKey: RocketMQ secretKey: 12345678 whiteRemoteAddress: admin: false defaultTopicPerm: DENY defaultGroupPerm: SUB topicPerms: - topicA=DENY - topicB=PUB|SUB - topicC=SUB groupPerms: # the group should convert to retry topic - groupA=DENY - groupB=PUB|SUB - groupC=SUB - accessKey: rocketmq2 secretKey: 12345678 whiteRemoteAddress: 192.168.1.* # if it is admin, it could access all resources admin: true
下面我們介紹一下plain_acl.yml文件中相關的參數含義及使用
字段 | 取值 | 含義 |
---|---|---|
globalWhiteRemoteAddresses | *;192.168.*.*;192.168.0.1 | 全局IP白名單 |
accessKey | 字符串 | Access Key 用戶名 |
secretKey | 字符串 | Secret Key 密碼 |
whiteRemoteAddress | *;192.168.*.*;192.168.0.1 | 用戶IP白名單 |
admin | true;false | 是否管理員賬戶 |
defaultTopicPerm | DENY;PUB;SUB;PUB|SUB | 默認的Topic權限 |
defaultGroupPerm | DENY;PUB;SUB;PUB|SUB | 默認的ConsumerGroup權限 |
topicPerms | topic=權限 | 各個Topic的權限 |
groupPerms | group=權限 | 各個ConsumerGroup的權限 |
權限標識符的含義
權限 | 含義 |
---|---|
DENY | 拒絕 |
ANY | PUB 或者 SUB 權限 |
PUB | 發送權限 |
SUB | 訂閱權限 |
處理流程
特殊的請求例如 UPDATE_AND_CREATE_TOPIC 等,只能由 admin 賬戶進行操作;
對于某個資源,如果有顯性配置權限,則采用配置的權限;如果沒有顯性配置權限,則采用默認的權限
RocketMQ的權限控制存儲的默認實現是基于yml配置文件。用戶可以動態修改權限控制定義的屬性,而不需重新啟動Broker服務節點
如果ACL與高可用部署(Master/Slave架構)同時啟用,那么需要在Broker Master節點的${ROCKETMQ_HOME}/store/conf/plain_acl.yml配置文件中 設置全局白名單信息,即為將Slave節點的ip地址設置至Master節點plain_acl.yml配置文件的全局白名單中
public class AclProducer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name", getAclRPCHook()); producer.setNamesrvAddr("10.10.15.246:9876;10.10.15.247:9876"); producer.start(); for (int i = 0; i < 10; i++) { try { Message msg = new Message("topicA" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("RocketMQ","12345678")); } }
查看結果
報錯提示topicA沒有權限,我們在plain_acl.yml文件中配置的也確實是RocketMQ用戶拒絕,生產消費topicA主題信息,我們改變主題為topicB,則發現發送消息成功,topicB=PUB|SUB設置的權限是生產消費都可以。
查看結果
public class AclConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupA", getAclRPCHook(),new AllocateMessageQueueAveragely()); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("topicB", "*"); consumer.setNamesrvAddr("10.10.15.246:9876;10.10.15.247:9876"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("RocketMQ","12345678")); } }
查看結果:發現沒有任何消息被消費,也沒有報錯信息,對于RocketMQ用戶topicB設置的就是可以可以生產可以消費的,但是我們發現其groupA=DENY是拒絕的,說明消費組是groupA則拒絕消費任何消息,我們改成groupB或者groupC查看結果。
Broker端ACL原理圖
Broker服務啟動時創建BrokerController并初始化initialize()時調用acl相關的初始化方法initialAcl()
private void initialAcl() { //broker配置文件中是否開啟ACL功能,默認關閉 if (!this.brokerConfig.isAclEnable()) { log.info("The broker dose not enable acl"); return; } //獲取權限訪問校驗器的列表,加載的META-INF/service/org.apache.rocketmq.acl.AccessValidator文件中指向 //org.apache.rocketmq.acl.plain.PlainAccessValidator,默認只有一個 List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); if (accessValidators == null || accessValidators.isEmpty()) { log.info("The broker dose not load the AccessValidator"); return; } for (AccessValidator accessValidator: accessValidators) { final AccessValidator validator = accessValidator; //注冊服務端就的“鉤子”對象,對權限進行校驗 this.registerServerRPCHook(new RPCHook() { @Override public void doBeforeRequest(String remoteAddr, RemotingCommand request) { //Do not catch the exception validator.validate(validator.parse(request, remoteAddr)); } @Override public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { } }); } }
源碼中有相關的注解,我們查看一下注冊registerServerRPCHook方法
public void registerServerRPCHook(RPCHook rpcHook) { //服務端的NettyRemotingServer服務注冊“鉤子”函數 getRemotingServer().registerRPCHook(rpcHook); this.fastRemotingServer.registerRPCHook(rpcHook); }
關于NettyRemotingServer服務和NettyRemotingClient服務配合使用,后面章節RocketMQ Remoting會重點分析
PlainAccessValidator.parse(),根據客戶端不同的請求Code其需要的檢驗資源也不一樣
switch (request.getCode()) { //發送消息需要校驗當前的賬戶的topic是否具有PUB權限 case RequestCode.SEND_MESSAGE: accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB); break; case RequestCode.SEND_MESSAGE_V2: accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB); break; case RequestCode.CONSUMER_SEND_MSG_BACK: accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB); accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB); break; //拉取消息時需要知道該consumer賬戶下拉取的topic是否具有SUB權限,并且還要知道訂閱組consumerGroup是否有sub權限 case RequestCode.PULL_MESSAGE: accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB); accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB); break; case RequestCode.QUERY_MESSAGE: accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB); break; case RequestCode.HEART_BEAT: HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); for (ConsumerData data : heartbeatData.getConsumerDataSet()) { accessResource.addResourceAndPerm(getRetryTopic(data.getGroupName()), Permission.SUB); for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) { accessResource.addResourceAndPerm(subscriptionData.getTopic(), Permission.SUB); } } break; case RequestCode.UNREGISTER_CLIENT: final UnregisterClientRequestHeader unregisterClientRequestHeader = (UnregisterClientRequestHeader) request .decodeCommandCustomHeader(UnregisterClientRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB); break; case RequestCode.GET_CONSUMER_LIST_BY_GROUP: final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader = (GetConsumerListByGroupRequestHeader) request .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB); break; case RequestCode.UPDATE_CONSUMER_OFFSET: final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader = (UpdateConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB); accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB); break; default: break; }
根據request.getCode()獲取當前的操作需要的權限標識集合,供后面與系統的權限配置文件plain_acl.yml中的權限標識符校驗時使用
Broker初始化相關服務的時候創建了PlainAccessValidator,我們發現其默認的構造方法中調用了其權限資源加載器PlainPermissionLoader
public PlainAccessValidator() { aclPlugEngine = new PlainPermissionLoader(); }
創建PlainPermissionLoader對象
public PlainPermissionLoader() { //加載服務端的權限文件plain_acl.yml load(); //開啟線程每500ms檢測權限文件是否改變,若改變則執行load()從新加載權限文件 watch(); }
查看load方法流程
public void load() { Map<String, PlainAccessResource> plainAccessResourceMap = new HashMap<>(); List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>(); JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName, JSONObject.class); if (plainAclConfData == null || plainAclConfData.isEmpty()) { throw new AclException(String.format("%s file is not data", fileHome + File.separator + fileName)); } log.info("Broker plain acl conf data is : ", plainAclConfData.toString()); //獲取全局白名單IP集合 JSONArray globalWhiteRemoteAddressesList = plainAclConfData.getJSONArray("globalWhiteRemoteAddresses"); if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) { for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) { globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory. getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i))); } } //獲取賬戶權限集合 JSONArray accounts = plainAclConfData.getJSONArray("accounts"); if (accounts != null && !accounts.isEmpty()) { List<PlainAccessConfig> plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class); for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) { //構建每個賬戶的權限資源 PlainAccessResource plainAccessResource = buildPlainAccessResource(plainAccessConfig); //放入Map中AccessKey作為key,該賬戶的權限資源作為value plainAccessResourceMap.put(plainAccessResource.getAccessKey(),plainAccessResource); } } this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy; this.plainAccessResourceMap = plainAccessResourceMap; }
加載資源文件,解析其中的權限標識,等待權限校驗器PlainAccessValidator調用其validate()對權限校驗
核心的校驗方法PlainPermissionLoader.validate()
public void validate(PlainAccessResource plainAccessResource) { //全局的白名單IP進行校驗 for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) { //匹配成功說明是全局的白名單IP,具有所有權限,直接返回。 if (remoteAddressStrategy.match(plainAccessResource)) { return; } } //判斷用戶名是否為空,null則拋出AclException異常 if (plainAccessResource.getAccessKey() == null) { throw new AclException(String.format("No accessKey is configured")); } //校驗賬戶是否存在于服務端的權限資源文件中plain_acl.yml,不在則拋出異常 if (!plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) { throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey())); } PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey()); //檢查該賬戶的白名單IP是否匹配上客戶端IP,匹配成功具有所有權限,除UPDATE_AND_CREATE_TOPIC等特殊權限需要管理員權限 if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) { return; } //校驗簽名 String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey()); if (!signature.equals(plainAccessResource.getSignature())) { throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey())); } //校驗賬戶內的資源權限 checkPerm(plainAccessResource, ownedAccess); }
查看其對于當前賬戶內部的資源校驗
void checkPerm(PlainAccessResource needCheckedAccess, PlainAccessResource ownedAccess) { //判斷請求的命令的Code是否需要管理員權限,并判斷該用戶是否是管理員 if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) { throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey())); } Map<String, Byte> needCheckedPermMap = needCheckedAccess.getResourcePermMap(); Map<String, Byte> ownedPermMap = ownedAccess.getResourcePermMap(); if (needCheckedPermMap == null) { // If the needCheckedPermMap is null,then return return; } for (Map.Entry<String, Byte> needCheckedEntry : needCheckedPermMap.entrySet()) { String resource = needCheckedEntry.getKey(); Byte neededPerm = needCheckedEntry.getValue(); //判斷是否是group,在構建resourcePermMap時候,group的key=RETRY_GROUP_TOPIC_PREFIX + consumerGroup boolean isGroup = PlainAccessResource.isRetryTopic(resource); //系統的權限配置文件中配置項包不含該客戶端命令請求需要的權限 if (!ownedPermMap.containsKey(resource)) { //判斷其是否是topic還是group的權限標識,獲取該類型的全局的權限是什么 byte ownedPerm = isGroup ? needCheckedAccess.getDefaultGroupPerm() : needCheckedAccess.getDefaultTopicPerm(); //核對權限 if (!Permission.checkPermission(neededPerm, ownedPerm)) { throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup))); } continue; } //系統的權限配置文件中配置項包含該客戶端命令請求需要的權限,則直接判斷其權限 if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) { throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup))); } } }
所有的檢驗流程如果有一項不滿足則拋出AclException異常
上面圖中只是分析了Broker服務端的處理流程,客戶端如何調用我們具體分析下我們以發送消息為例:
我們之前分析過Producer的消息發送的核心方法是DefaultMQProducerImpl.sendKernelImpl()該方法
//是否注冊了“鉤子” if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } //封裝其ACL請求的參數信息 this.executeSendMessageHookBefore(context); }
hasSendMessageHook(),我們在構建Producer的時候創建了該對象,加入到DefaultMQProducerImpl的sendMessageHookList屬性中。
我們查看其發送消息NettyRemotingClient類中調用AclClientRPCHook.doBeforeRequest()發送前的數據準備
public void doBeforeRequest(String remoteAddr, RemotingCommand request) { byte[] total = AclUtils.combineRequestContent(request, parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken())); String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey()); request.addExtField(SIGNATURE, signature); request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey()); // The SecurityToken value is unneccessary,user can choose this one. if (sessionCredentials.getSecurityToken() != null) { request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken()); } }
關于RocketMQ中怎么實現權限控制就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。