您好,登錄后才能下訂單哦!
這一篇開始講解moqutte對SUBSCRIBE報文的處理
代碼不復雜
public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {
String clientID = NettyUtils.clientID(channel);//從channel里面獲取clientId,具體原理看下文
int messageID = messageId(msg);
LOG.info("Processing SUBSCRIBE message. CId={}, messageId={}", clientID, messageID);
RunningSubscription executionKey = new RunningSubscription(clientID, messageID);
SubscriptionState currentStatus = subscriptionInCourse.putIfAbsent(executionKey, SubscriptionState.VERIFIED);
if (currentStatus != null) {
LOG.warn("Client sent another SUBSCRIBE message while this one was being processed CId={}, messageId={}",
clientID, messageID);
return;
}
String username = NettyUtils.userName(channel);
List<MqttTopicSubscription> ackTopics = doVerify(clientID, username, msg);
MqttSubAckMessage ackMessage = doAckMessageFromValidateFilters(ackTopics, messageID);
if (!this.subscriptionInCourse.replace(executionKey, SubscriptionState.VERIFIED, SubscriptionState.STORED)) {
LOG.warn("Client sent another SUBSCRIBE message while the topic filters were being verified CId={}, " +
"messageId={}", clientID, messageID);
return;
}
LOG.info("Creating and storing subscriptions CId={}, messageId={}, topics={}", clientID, messageID, ackTopics);
List<Subscription> newSubscriptions = doStoreSubscription(ackTopics, clientID);
// save session, persist subscriptions from session
for (Subscription subscription : newSubscriptions) {
subscriptions.add(subscription.asClientTopicCouple());
}
LOG.info("Sending SUBACK response CId={}, messageId={}", clientID, messageID);
channel.writeAndFlush(ackMessage);
// fire the persisted messages in session
for (Subscription subscription : newSubscriptions) {
publishRetainedMessagesInSession(subscription, username);
}
boolean success = this.subscriptionInCourse.remove(executionKey, SubscriptionState.STORED);
if (!success) {
LOG.warn("Unable to perform the final subscription state update CId={}, messageId={}", clientID, messageID);
}
}
1.channel里面為什么會存在clientid呢?這個問題也可以這樣描述,當連接建立之后,client發布消息的時候,netty接收到socket里面的數據之后,他怎么知道是哪個client的數據呢?這里面就需要確定client與channel的映射關系。moquette是這么做的,
在處理CONNECT的第5步,詳見https://blog.51cto.com/13579730/2073630的時候會做如下處理
private void initializeKeepAliveTimeout(Channel channel, MqttConnectMessage msg, final String clientId) {
int keepAlive = msg.variableHeader().keepAliveTimeSeconds();
LOG.info("Configuring connection. CId={}", clientId);
NettyUtils.keepAlive(channel, keepAlive);
// session.attr(NettyUtils.ATTR_KEY_CLEANSESSION).set(msg.variableHeader().isCleanSession());
NettyUtils.cleanSession(channel, msg.variableHeader().isCleanSession());
// used to track the client in the subscription and publishing phases.
// session.attr(NettyUtils.ATTR_KEY_CLIENTID).set(msg.getClientID());
NettyUtils.clientID(channel, clientId);
int idleTime = Math.round(keepAlive * 1.5f);
setIdleTime(channel.pipeline(), idleTime);
if(LOG.isDebugEnabled()){
LOG.debug("The connection has been configured CId={}, keepAlive={}, cleanSession={}, idleTime={}",
clientId, keepAlive, msg.variableHeader().isCleanSession(), idleTime);
}
}
這里面有一步NettyUtils.clientID(channel, clientId);這個不起眼的方法做了將channel與clientId映射的動作,接著跟蹤
public static void clientID(Channel channel, String clientID) {
channel.attr(NettyUtils.ATTR_KEY_CLIENTID).set(clientID);
}
原來是把clientId作為一個屬性存到了channel里面,因為channel是集成AttributeMap的,所以可以這么做。
只要有channel與clientId的映射關系,就好說了,這也就是為什么moquette的NettyMQTTHandler是這樣處理的
@Override
public void channelRead(ChannelHandlerContext ctx, Object message) {
MqttMessage msg = (MqttMessage) message;
MqttMessageType messageType = msg.fixedHeader().messageType();
if(LOG.isDebugEnabled())
LOG.debug("Processing MQTT message, type={}", messageType);
try {
switch (messageType) {
case CONNECT:
m_processor.processConnect(ctx.channel(), (MqttConnectMessage) msg);
break;
case SUBSCRIBE:
m_processor.processSubscribe(ctx.channel(), (MqttSubscribeMessage) msg);
break;
case UNSUBSCRIBE:
m_processor.processUnsubscribe(ctx.channel(), (MqttUnsubscribeMessage) msg);
break;
case PUBLISH:
m_processor.processPublish(ctx.channel(), (MqttPublishMessage) msg);
break;
case PUBREC:
m_processor.processPubRec(ctx.channel(), msg);
break;
case PUBCOMP:
m_processor.processPubComp(ctx.channel(), msg);
break;
case PUBREL:
m_processor.processPubRel(ctx.channel(), msg);
break;
case DISCONNECT:
m_processor.processDisconnect(ctx.channel());
break;
case PUBACK:
m_processor.processPubAck(ctx.channel(), (MqttPubAckMessage) msg);
break;
case PINGREQ:
MqttFixedHeader pingHeader = new MqttFixedHeader(
MqttMessageType.PINGRESP,
false,
AT_MOST_ONCE,
false,
0);
MqttMessage pingResp = new MqttMessage(pingHeader);
ctx.writeAndFlush(pingResp);
break;
default:
LOG.error("Unkonwn MessageType:{}", messageType);
break;
哪個tcp-socket對應哪個channel由netty負責處理,當client發送數據的時候,netty負責從ChannelHandlerContext取出channel傳給相應的業務自定義的handler進行處理。
2.創建一個正在運行中的RunningSubscription對象,之所以要創建這個對象,是為了防止重復訂閱,同時到存儲了所有的RunningSubscription的ConcurrentMap里面查詢所有已經存在這個對象,如果存在,說明是重復的訂閱包,則不處理,這里面調用了putIfAbsent方法,同時重寫了RunningSubscription的equals方法。packetId和clientID相同時代表是相同的RunningSubscription
3.從channel里面取出用戶名,驗證該client下的該username是否有權利讀取該topic(訂閱該topic)的權限,這里貼一下相關的代碼進行講解
rivate List<MqttTopicSubscription> doVerify(String clientID, String username, MqttSubscribeMessage msg) {
ClientSession clientSession = m_sessionsStore.sessionForClient(clientID);
List<MqttTopicSubscription> ackTopics = new ArrayList<>();
final int messageId = messageId(msg);
for (MqttTopicSubscription req : msg.payload().topicSubscriptions()) {
Topic topic = new Topic(req.topicName());
if (!m_authorizator.canRead(topic, username, clientSession.clientID)) {
// send SUBACK with 0x80, the user hasn't credentials to read the topic
LOG.error("Client does not have read permissions on the topic CId={}, username={}, messageId={}, " +
"topic={}", clientID, username, messageId, topic);
ackTopics.add(new MqttTopicSubscription(topic.toString(), FAILURE));
} else {
MqttQoS qos;
if (topic.isValid()) {
LOG.info("Client will be subscribed to the topic CId={}, username={}, messageId={}, topic={}",
clientID, username, messageId, topic);
qos = req.qualityOfService();
} else {
LOG.error("Topic filter is not valid CId={}, username={}, messageId={}, topic={}", clientID,
username, messageId, topic);
qos = FAILURE;
}
ackTopics.add(new MqttTopicSubscription(topic.toString(), qos));
}
}
return ackTopics;
}
從報文的payload里面取出所有的訂閱請求,遍歷,然后驗證是否有權限,這個權限是在配置文件里面配置的,詳見https://blog.51cto.com/13579730/2072467
如果沒有權限,返回SUBACK報文中標記該訂閱狀態為失敗,如果有權限,檢查topic是否有效如果有效,獲取qos,如果無效標記為失敗。
校驗之后得到一個List<MqttTopicSubscription>,再根據這個list生成SUBACK
4.將RunningSubscription的狀態從VERIFIED修改成STORED,這里面用到了ConcurrentHashMap.replace(key,oldvalue,newvlaue)這個原子操作,如果修改失敗表面,這個訂閱請求已經存在。
5.開始存儲訂閱請求,這里存儲訂閱請求
private List<Subscription> doStoreSubscription(List<MqttTopicSubscription> ackTopics, String clientID) {
ClientSession clientSession = m_sessionsStore.sessionForClient(clientID);
List<Subscription> newSubscriptions = new ArrayList<>();
for (MqttTopicSubscription req : ackTopics) {
// TODO this is SUPER UGLY
if (req.qualityOfService() == FAILURE) {
continue;
}
Subscription newSubscription =
new Subscription(clientID, new Topic(req.topicName()), req.qualityOfService());
clientSession.subscribe(newSubscription);//存儲到用戶的session里面,用以表明該client訂閱了哪些請求
newSubscriptions.add(newSubscription);
}
return newSubscriptions;
}
我們先看存儲到用戶的session這一步
public boolean subscribe(Subscription newSubscription) {
LOG.info("Adding new subscription. ClientId={}, topics={}, qos={}", newSubscription.getClientId(),
newSubscription.getTopicFilter(), newSubscription.getRequestedQos());
boolean validTopic = newSubscription.getTopicFilter().isValid();
if (!validTopic) {
LOG.warn("The topic filter is not valid. ClientId={}, topics={}", newSubscription.getClientId(),
newSubscription.getTopicFilter());
// send SUBACK with 0x80 for this topic filter
return false;
}
ClientTopicCouple matchingCouple = new ClientTopicCouple(this.clientID, newSubscription.getTopicFilter());
Subscription existingSub = subscriptionsStore.getSubscription(matchingCouple);
// update the selected subscriptions if not present or if has a greater qos
if (existingSub == null || existingSub.getRequestedQos().value() < newSubscription.getRequestedQos().value()) {
if (existingSub != null) {
LOG.info("Subscription already existed with a lower QoS value. It will be updated. ClientId={}, " +
"topics={}, existingQos={}, newQos={}", newSubscription.getClientId(),
newSubscription.getTopicFilter(), existingSub.getRequestedQos(), newSubscription.getRequestedQos());
subscriptions.remove(newSubscription);
}
subscriptions.add(newSubscription);//存儲到內存的session
subscriptionsStore.addNewSubscription(newSubscription);//存儲到別的地方
}
return true;
}
這里面先創建了一個ClientTopicCouple對,然后從訂閱集合里面查詢是否已經存在這個訂閱,如果不存在或者新的訂閱的qos要高于就的訂閱的qos,則會把訂閱添加到訂閱集合里面,這里有兩個存儲,一個是Set<Subscription>,一個是Map<Topic, Subscription> subscriptions(這個在ISessionsStore的具體實現里面)moquette在這里面做了冗余,即內存里面會存一分,同時允許通過ISessionsStore存儲到外部。
6.我們接著看processSubscribe,這個方法會返回一個新的list
接著會遍歷這個返回的list,存儲到SubscriptionsDirectory里面,這個維護所有的client直接的發布訂閱關系,是moquette里面一個非常重要的對象了,里面維護者一顆topic樹,這個后面單獨講
7.發送SUBACK
8.發布retain消息,這里面也講解一下,這一步的作用在于,如果一個client發布了新的訂閱,那么必須遍歷那些retain消息,如果這些新的訂閱,確實能夠匹配這些retain消息,必須將這些retain消息發送給他們。//這里moquette的處理是遍歷map,這樣的話,當retain消息特別大的時候,效率是非常低的,會很容易拖垮那些對吞吐率和性能要求比較高的系統的。
private void publishRetainedMessagesInSession(final Subscription newSubscription, String username) {
LOG.info("Retrieving retained messages CId={}, topics={}", newSubscription.getClientId(),
newSubscription.getTopicFilter());
// scans retained messages to be published to the new subscription
// TODO this is ugly, it does a linear scan on potential big dataset
Collection<IMessagesStore.StoredMessage> messages = m_messagesStore.searchMatching(new IMatchingCondition() {
@Override
public boolean match(Topic key) {
return key.match(newSubscription.getTopicFilter());
}
});
if (!messages.isEmpty()) {
LOG.info("Publishing retained messages CId={}, topics={}, messagesNo={}",
newSubscription.getClientId(), newSubscription.getTopicFilter(), messages.size());
}
ClientSession targetSession = m_sessionsStore.sessionForClient(newSubscription.getClientId());
this.internalRepublisher.publishRetained(targetSession, messages);
// notify the Observables
m_interceptor.notifyTopicSubscribed(newSubscription, username);
}
另外,用以匹配訂閱的topic與retain消息的topic是否匹配的方法也非常不完善。具體的原因大家可以看一下這里
io.moquette.spi.impl.subscriptions.Topic#match,另外注意以下,對于moquette來說有兩個對象能夠發送消息,分別是MessagesPublisher和InternalRepublisher,這兩個類是有區別的,第一個是正常的發消息和遺愿消息,第二個屬于重發之前未發送成功的qos1和qos2消息(在client發送connect建立連接的時候)還有一個作用是發送retain消息(在subscribe的時候)。
9.從ConcurrentMap<RunningSubscription, SubscriptionState>移除該訂閱請求。
整個RunningSubscription的狀態會從VERIFIED到STORED,這代表了整個處理過程的最重要的兩個步驟。
重新分析一下第第5步和第6步
我們發現對于Subscribe,實際上是有三個地方存儲了的
1.ClientSession里面有一個Set<Subscription> subscriptions
2.MemorySessionStore.里面的Session對象里面的Map<Topic, Subscription> subscriptions
3.SubscriptionsDirectory里面的Treenode里面的Set<ClientTopicCouple> m_subscriptions
這里面我們思考一下為什么要存三分呢?有必要嗎?
先說1和2個人覺得冗余的有點沒必要,唯一的好處就是查詢的時候一個在內存一個在redis等其他存儲
,性能稍微好一點,但是這樣就會有數據一致性問題,趕緊有點得不償失,當然也有可能是我沒看懂
再說說2和3,關鍵在這里,我們重看這段邏輯
public boolean subscribe(Subscription newSubscription) {
LOG.info("Adding new subscription. ClientId={}, topics={}, qos={}", newSubscription.getClientId(),
newSubscription.getTopicFilter(), newSubscription.getRequestedQos());
boolean validTopic = newSubscription.getTopicFilter().isValid();
if (!validTopic) {
LOG.warn("The topic filter is not valid. ClientId={}, topics={}", newSubscription.getClientId(),
newSubscription.getTopicFilter());
// send SUBACK with 0x80 for this topic filter
return false;
}
ClientTopicCouple matchingCouple = new ClientTopicCouple(this.clientID, newSubscription.getTopicFilter());
Subscription existingSub = subscriptionsStore.getSubscription(matchingCouple);
// update the selected subscriptions if not present or if has a greater qos
if (existingSub == null || existingSub.getRequestedQos().value() < newSubscription.getRequestedQos().value()) {
if (existingSub != null) {
LOG.info("Subscription already existed with a lower QoS value. It will be updated. ClientId={}, " +
"topics={}, existingQos={}, newQos={}", newSubscription.getClientId(),
newSubscription.getTopicFilter(), existingSub.getRequestedQos(), newSubscription.getRequestedQos());
subscriptions.remove(newSubscription);
}
subscriptions.add(newSubscription);
subscriptionsStore.addNewSubscription(newSubscription);
}
return true;
}
發現當同一個client訂閱對同一個topic-filter發送了另外一個qos等級的訂閱的時候,1和2,其實是更新了的,因為不論是set還是map,當equals相等時,會產生覆蓋。而這個時候并沒有對topic樹里面的subscribe進行更新,而是直接添加,這說明,對于目錄樹來說,一個topic下是可能存在某一個client的重復訂閱的。這說明2和3的作用不同,因為3即topic目錄里面更關系的是某個client到底有沒有訂閱該topic-filter,而不關心這個topic究竟應該怎么發,是以qos1還是qos2或者qos0發,他并不關系,而且更新一個普通的樹的消耗成本還是挺大的。2存儲是最新的訂閱。包含了等級信息。這也就能夠解釋為什么在發布消息的時候會有下面的一段過濾的邏輯了
public List<Subscription> matches(Topic topic) {
Queue<Token> tokenQueue = new LinkedBlockingDeque<>(topic.getTokens());
List<ClientTopicCouple> matchingSubs = new ArrayList<>();
subscriptions.get().matches(tokenQueue, matchingSubs);
// remove the overlapping subscriptions, selecting ones with greatest qos
Map<String, Subscription> subsForClient = new HashMap<>();
for (ClientTopicCouple matchingCouple : matchingSubs) {//遍歷從topic樹獲取的訂閱者
Subscription existingSub = subsForClient.get(matchingCouple.clientID);//看一下map里面是否已經存在
Subscription sub = this.subscriptionsStore.getSubscription(matchingCouple);//看一下該客戶端是否還在線
if (sub == null) {
// if the m_sessionStore hasn't the sub because the client disconnected
continue;
}
// update the selected subscriptions if not present or if has a greater qos
if (existingSub == null || existingSub.getRequestedQos().value() < sub.getRequestedQos().value()) {//
subsForClient.put(matchingCouple.clientID, sub);
}
}
return new ArrayList<>(subsForClient.values());
}
這就是為什么需要從目錄樹找訂閱者,但是卻需要從ISubscriptionsStore里面獲取最新的subscribe了。同時會有一個去重的邏輯,因為目錄樹下本來就可能重復,但是ISubscriptionsStore由于是一個map所以是不可能有重復的。
下一篇會講解moquette對PUBLISH報文的處理
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。