您好,登錄后才能下訂單哦!
kafka從0.11.0版本開始所使用的消息格式版本為v2,這個版本的消息相比于v0和v1的版本而言改動很大,同時還參考了Protocol Buffer而引入了變長整型(Varints)和ZigZag編碼。為了更加形象的說明問題,首先我們來了解一下變長整型。
Varints是使用一個或多個字節來序列化整數的一種方法。數值越小,其所占用的字節數就越少。Varints中每個字節都有一個位于最高位的msb位(most significant bit),除了最后一個字節外其余msb位都設置為1,最后一個字節的msb位設置為0。這個msb位表示其后的字節是否和當前字節一起來表示同一個整數。除msb位之外,剩余的7位用于存儲數據本身,這種表示類型又稱之為Base 128。通常而言,一個字節8位可以表示256個值,所以稱之為Base 256,而這里只能用7位表示,2的7次方即為128。Varints中采用小端字節序,即最小的字節放在最前面。
舉個例子,比如數字1,它只占一個字節,所以msb位為0:
0000 0001
再舉一個復雜點的例子,如數字300:
1010 1100 0000 0010
300的二進制表示原本為:0000 0001 0010 1100 = 256+32+8+4=300,那么為什么300的變長表示為上面的這種形式?
首先我們先去掉每個字節的msb位,表示如下:
1010 1100 0000 0010
-> 010 1100 000 0010
如前所述,varints是小端字節序的布局方式,所以這里兩個字節的位置需要翻轉一下:
010 1100 000 0010
-> 000 0010 010 1100 (翻轉)
-> 000 0010 ++ 010 1100
-> 0000 0001 0010 1100 = 256+32+8+4=300
Varints可以用來表示int32、int64、uint32、uint64、sint32、sint64、bool、enum等類型。在實際使用過程中,如果當前字段可以表示為負數,那么對于int32/int64和sint32/sint64而言,它們在進行編碼時存在著較大的區別。比如你使用int64表示一個負數,那么哪怕是-1,其編碼后的長度始終為10個字節(可以通過下面的代碼來測試長度),就如同對待一個很大的無符號長整型一樣。為了使得編碼更加的高效,Varints使用了ZigZag的編碼方式。
public int sizeOfLong(int v) {
int bytes = 1;
while ((v & 0xffffffffffffff80L) != 0L) {
bytes += 1;
v >>>= 7;
}
return bytes;
}
ZigZag編碼以一種鋸齒形(zig-zags)的方式來回穿梭于正負整數之間,以使得帶符號整數映射為無符號整數,這樣可以使得絕對值較小的負數仍然享有較小的Varints編碼值,比如-1編碼為1,1編碼為2,-2編碼為3,參考下表:
對應的公式為:
(n << 1) ^ (n >> 31)
這是對于sint32而言的,sint64對應的公式為:
(n << 1) ^ (n >> 63)
以-1為例,其二進制表現形式為:1111 1111 1111 1111 1111 1111 1111 1111(補碼)。
(n << 1) = 1111 1111 1111 1111 1111 1111 1111 1110
(n >> 31) = 0000 0000 0000 0000 0000 0000 0000 0001
(n << 1) ^ (n >> 31) = 1
最終-1的Varints編碼為0000 0001,這樣原本用4字節表示的-1現在可以用1個字節來表示了。對于1而言就顯得非常簡單了,其二進制表現形式為:0000 0000 0000 0000 0000 0000 0000 0001。
(n << 1) = 0000 0000 0000 0000 0000 0000 0000 0010
(n >> 31) = 0000 0000 0000 0000 0000 0000 0000 0000
(n << 1) ^ (n >> 31) = 2
最終1的Varints編碼為0000 0010,也只占用一個字節。
前面說過Varints中的一個字節中只有7位是有效數值位,即只能表示128個數值,轉變成絕對值之后其實質上只能表示64個數值。比如對于消息體長度而言,其值肯定是個大于等于0的正整數,那么一個字節長度的Varints最大只能表示63(從0開始計)。對于64而言,其二進制表示為:
0100 0000
經過ZigZag處理后為:
1000 0000 ^ 0000 0000 = 1000 0000
每個字節的低7位是有效數值位,所以1000 0000進一步轉變為:
000 0001 000 0000
而Varints又是小端字節序,所以需要翻轉一下位置:
000 0000 000 0001
設置非最后一個字節的msb位為1,最后一個字節的msb位為0,最終有:
1000 0000 0000 0001
所以最終64表示為:1000 0000 0000 0001,而63卻表示為:0111 1110。
具體的編碼實現如下(針對int32類型):
public static void writeVarint(int value, ByteBuffer buffer) {
int v = (value << 1) ^ (value >> 31);
while ((v & 0xffffff80) != 0L) {
byte b = (byte) ((v & 0x7f) | 0x80);
buffer.put(b);
v >>>= 7;
}
buffer.put((byte) v);
}
對應的解碼實現如下(針對int32類型):
public static int readVarint(ByteBuffer buffer) {
int value = 0;
int i = 0;
int b;
while (((b = buffer.get()) & 0x80) != 0) {
value |= (b & 0x7f) << i;
i += 7;
if (i > 28)
throw illegalVarintException(value);
}
value |= b << i;
return (value >>> 1) ^ -(value & 1);
}
回顧一下kafka v0和v1版本的消息格式,如果消息本身沒有key,那么key length字段為-1,int類型的需要4個字節來保存,而如果采用Varints來編碼則只需要一個字節。根據Varints的規則可以推導出0-63之間的數字占1個字節,64-8191之間的數字占2個字節,8192-1048575之間的數字占3個字節。而kafka broker的配置message.max.bytes的默認大小為1000012(Varints編碼占3個字節),如果消息格式中與長度有關的字段采用Varints的編碼的話,絕大多數情況下都會節省空間,而v2版本的消息格式也正是這樣做的。
不過需要注意的是Varints并非一直會省空間,一個int32最長會占用5個字節(大于默認的4字節),一個int64最長會占用10字節(大于默認的8字節)。下面代碼展示如何計算一個int32占用的字節個數:
public static int sizeOfVarint(int value) {
int v = (value << 1) ^ (value >> 31);
int bytes = 1;
while ((v & 0xffffff80) != 0L) {
bytes += 1;
v >>>= 7;
}
return bytes;
}
有關int32/int64的更多實現細節可以參考org.apache.kafka.common.utils.ByteUtils。
本文的重點是你有沒有收獲與成長,其余的都不重要,希望讀者們能謹記這一點。同時我經過多年的收藏目前也算收集到了一套完整的學習資料,包括但不限于:分布式架構、高可擴展、高性能、高并發、Jvm性能調優、Spring,MyBatis,Nginx源碼分析,Redis,ActiveMQ、、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多個知識點高級進階干貨,希望對想成為架構師的朋友有一定的參考和幫助
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。