您好,登錄后才能下訂單哦!
先上一個圖,大概說明一下moquette 的類之間的關系
一.ProtocolProcessor類
該類是moquette里面的最終要的類,負責所有報文的處理,持有所有各模塊功能的實現對象的引用, 下面詳細介紹
protected ConnectionDescriptorStore connectionDescriptors;//所有的連接描述符文存儲,即clientId與通道之間的映射集合
protected ConcurrentMap<RunningSubscription, SubscriptionState> subscriptionInCourse;//所有當前正在處理的
訂閱關系的存儲,之所以有這個是過濾無效的訂閱請求
private SubscriptionsDirectory subscriptions;//訂閱目錄,本質上是topic樹
private ISubscriptionsStore subscriptionStore;//所有的訂閱的集合
private boolean allowAnonymous;//是否允許匿名連接
private boolean allowZeroByteClientId;//是否允許clientId為空
private IAuthorizator m_authorizator; //對topic的讀寫權限認證
private IMessagesStore m_messagesStore;//retainMessage的存儲
private ISessionsStore m_sessionsStore;//session 存儲
private IAuthenticator m_authenticator;//連接時候的鑒權認證
private BrokerInterceptor m_interceptor;//各個層面的攔截器
private Qos0PublishHandler qos0PublishHandler;//qos0攔截器
private Qos1PublishHandler qos1PublishHandler;//qos1攔截器
private Qos2PublishHandler qos2PublishHandler;/qos2攔截器
private MessagesPublisher messagesPublisher;//分發消息,遺愿消息,以及集權間同步消息
private InternalRepublisher internalRepublisher;//保留消息,qos1,qos2消息重發器
ConcurrentMap<String, WillMessage> m_willStore//遺愿消息存儲
幾乎所有的功能的源頭都在這個類里面
二.對14種報文的處理,都在ProtocolProcessor類,后面會分篇挨個講解moquette對這14個報文的處理
具體哪14中文報文如下
名字 值 報文流動方向 描述
Reserved 0 禁止 保留
CONNECT 1 客戶端到服務端 客戶端請求連接服務端
CONNACK 2 服務端到客戶端 連接報文確認
PUBLISH 3 兩個方向都允許 發布消息
PUBACK 4 兩個方向都允許 QoS 1消息發布收到確認
PUBREC 5 兩個方向都允許 發布收到(保證交付第一步)
PUBREL 6 兩個方向都允許 發布釋放(保證交付第二步)
PUBCOMP 7 兩個方向都允許 QoS 2消息發布完成(保證交互第三步)
SUBSCRIBE 8 客戶端到服務端 客戶端訂閱請求
SUBACK 9 服務端到客戶端 訂閱請求報文確認
UNSUBSCRIBE 10 客戶端到服務端 客戶端取消訂閱請求
UNSUBACK 11 服務端到客戶端 取消訂閱報文確認
PINGREQ 12 客戶端到服務端 心跳請求
PINGRESP 13 服務端到客戶端 心跳響應
DISCONNECT 14 客戶端到服務端 客戶端斷開連接
Reserved 15 禁止 保留
或者到這里看更詳細的mqtt中文翻譯
https://github.com/mcxiaoke/mqtt/blob/master/mqtt/02-ControlPacketFormat.md
非常感謝作者的辛勞工作和無私分享
三.debug跟蹤moquette 對CONNECT報文的處理
大概分為以下幾步
1.驗證協議版本,如果不是mqtt-3.1或者mqtt-3.1.1則拒絕連接
2.驗證clientId是否為空,如果為空,但是配置的時候(在上篇介紹的moquette.cof里面配置)要求不允許唯恐,即上面的allowZeroByteClientId或者cleanSession為false即要求保存會話,則視為不合法,拒絕連接,否則由moquette生成clientId
3.驗證是否有登錄的權限
這里面貼上源碼講解一下
private boolean login(Channel channel, MqttConnectMessage msg, final String clientId) {
// handle user authentication
if (msg.variableHeader().hasUserName()) {
byte[] pwd = null;
if (msg.variableHeader().hasPassword()) {
pwd = msg.payload().passwordInBytes();
} else if (!this.allowAnonymous) {
LOG.error("Client didn't supply any password and MQTT anonymous mode is disabled CId={}", clientId);
failedCredentials(channel);
return false;
}
if (!m_authenticator.checkValid(clientId, msg.payload().userName(), pwd)) {
LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}, password={}",
clientId, msg.payload().userName(), pwd);
failedCredentials(channel);
return false;
}
NettyUtils.userName(channel, msg.payload().userName());
} else if (!this.allowAnonymous) {
LOG.error("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);
failedCredentials(channel);
return false;
}
return true;
}
3.1.如果CONNETCT報文里面的可變頭里面沒有用戶名,直接返回true
3.2.如果有用戶名,同時有密碼,從可變頭取出密碼,調用m_authenticator進行驗證
3.3 如果有用戶名,沒有密碼,認證失敗,拒絕連接
3.4 如果沒有用戶名,同時配置為不允許匿名,則認證失敗
4.創建連接描述符,連接描述符包括clientId,channel,isCleanSession,ConnectState,同時判斷連接描述符集合里面是否包括該連接描述符,如果包含,代表該連接以及建立,斷開連接
5.根據CONNECT報文里面的Keep Alive time 來設置tcp參數
6.根據CONNECT報文遺愿消息標志位,覺得是否存儲遺愿消息
7.返回CONNACK報文,這里面把返回CONNACK報文單獨講解一下
private boolean sendAck(ConnectionDescriptor descriptor, MqttConnectMessage msg, final String clientId) {
LOG.info("Sending connect ACK. CId={}", clientId);
final boolean success = descriptor.assignState(DISCONNECTED, SENDACK);
if (!success) {
return false;
}
MqttConnAckMessage okResp;
ClientSession clientSession = m_sessionsStore.sessionForClient(clientId);
boolean isSessionAlreadyStored = clientSession != null;
if (!msg.variableHeader().isCleanSession() && isSessionAlreadyStored) {
okResp = connAckWithSessionPresent(CONNECTION_ACCEPTED);
} else {
okResp = connAck(CONNECTION_ACCEPTED);
}
if (isSessionAlreadyStored) {
LOG.info("Cleaning session. CId={}", clientId);
clientSession.cleanSession(msg.variableHeader().isCleanSession());
}
descriptor.writeAndFlush(okResp);
LOG.info("The connect ACK has been sent. CId={}", clientId);
return true;
}
7.1 判斷當前連接的狀態,怎么判斷的呢?這里面用了AtomicReference<ConnectionState>通過調用原子引用類 compareAndSet(DISCONNECTED, SENDACK)來解決并發修改連接狀態的問題。
7.2如果狀態是disConnect,將狀態修改為sendAck
7.3 如果CONNETCT報文里面的CleanSession標識設置為0同時broker已經有了client的會話,將CONNACK報文里面的連接確認標志設為1,告訴客戶端,broker已經有了響應的會話信息。否則將連接確認標志設為0
7.4 如果已經存在相應的client的會話,則根據新的連接,更新clientSession里面的是否清理session屬性
8.喚醒攔截器記錄連接事件
9.創建或者從新加載clientSession,這里面單獨講解一下
private ClientSession createOrLoadClientSession(ConnectionDescriptor descriptor, MqttConnectMessage msg,
String clientId) {
final boolean success = descriptor.assignState(SENDACK, SESSION_CREATED);
if (!success) {
return null;
}
ClientSession clientSession = m_sessionsStore.sessionForClient(clientId);
boolean isSessionAlreadyStored = clientSession != null;
if (!isSessionAlreadyStored) {
clientSession = m_sessionsStore.createNewSession(clientId, msg.variableHeader().isCleanSession());
}
if (msg.variableHeader().isCleanSession()) {
LOG.info("Cleaning session. CId={}", clientId);
clientSession.cleanSession();
}
return clientSession;
}
9.1 AtomicReference<ConnectionState>通過調用原子引用類 compareAndSet(SENDACK, SESSION_CREATED)將連接狀態從sendAck修改為session_create
9.2 session存儲結合里面,是否已經存在會話信息,如果不存在,創建一個新的clientsession
9.3 如果存在,根據CONNETCT報文里面的cleansession自動決定是否清理調舊的會話信息。
10.如果CONNETCT報文要求不清理會話信息(cleansession標志位為0),則重發QoS1 and QoS2 messages,同時將連接狀態從session_create修改成message_republish
11.將連接狀態從session_create修改成established
到此,broker和client直接的mqtt連接正式建立,后面client可以開始發送SUBSCRIBE或者PUBLISH報文了。
在這里再補充一點,對于broker來說,建立連接的過程中,連接狀態會從disConnect->sendAck->session_create->message_republish->established,之所以要設置這些狀態,是因為,每一步后面的操作都要基于前面的狀態來決定是否需要真正執行,這里面用到了原子引用類來保證,狀態的修改這個操作的原子行,確保了在并發的情況下,每一步操作都是條件滿足的。
下面一篇將會講解SUBSCRIBE報文的處理
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。