您好,登錄后才能下訂單哦!
這篇文章主要講解了“redis的bitmap使用實例分析”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“redis的bitmap使用實例分析”吧!
如果我們需要記錄某一用戶在一年中每天是否有登錄我們的系統這一需求該如何完成呢?如果使用KV存儲,每個用戶需要記錄365個,當用戶量上億時,這所需要的存儲空間是驚人的。
Redis 為我們提供了位圖這一數據結構,每個用戶每天的登錄記錄只占據一位,365天就是365位,僅僅需要46字節就可存儲,極大地節約了存儲空間。
位圖數據結構其實并不是一個全新的玩意,我們可以簡單的認為就是個數組,只是里面的內容只能為0或1而已(二進制位數組)。
Redis提供了SETBIT
、GETBIT
、BITCOUNT
、BITOP
四個常用命令用于處理二進制位數組。
SETBIT
:為位數組指定偏移量上的二進制位設置值,偏移量從0開始計數,二進制位的值只能為0或1。返回原位置值。
GETBIT
:獲取指定偏移量上二進制位的值。
BITCOUNT
:統計位數組中值為1的二進制位數量。
BITOP
:對多個位數組進行按位與、或、異或運算。
127.0.0.1:6379> SETBIT first 0 1 # 0000 0001 (integer) 0 127.0.0.1:6379> SETBIT first 3 1 # 0000 1001 (integer) 0 127.0.0.1:6379> SETBIT first 0 0 # 0000 1000 (integer) 1 127.0.0.1:6379> GETBIT first 0 (integer) 0 127.0.0.1:6379> GETBIT first 3 (integer) 1 127.0.0.1:6379> BITCOUNT first # 0000 1000 (integer) 1 127.0.0.1:6379> SETBIT first 0 1 # 0000 1001 (integer) 0 127.0.0.1:6379> BITCOUNT first # 0000 1001 (integer) 2 127.0.0.1:6379> SETBIT first 1 1 # 0000 1011 (integer) 0 127.0.0.1:6379> BITCOUNT first # 0000 1011 (integer) 3 127.0.0.1:6379> SETBIT x 3 1 (integer) 0 127.0.0.1:6379> SETBIT x 1 1 (integer) 0 127.0.0.1:6379> SETBIT x 0 1 # 0000 1011 (integer) 0 127.0.0.1:6379> SETBIT y 2 1 (integer) 0 127.0.0.1:6379> SETBIT y 1 1 # 0000 0110 (integer) 0 127.0.0.1:6379> SETBIT z 2 1 (integer) 0 127.0.0.1:6379> SETBIT z 0 1 # 0000 0101 (integer) 0 127.0.0.1:6379> BITOP AND andRes x y z #0000 0000 (integer) 1 127.0.0.1:6379> BITOP OR orRes x y z #0000 1111 (integer) 1 127.0.0.1:6379> BITOP XOR x y z #0000 1000 (integer) 1 # 對給定的位數組進行按位取反 127.0.0.1:6379> SETBIT value 0 1 (integer) 0 127.0.0.1:6379> SETBIT value 3 1 #0000 1001 (integer) 0 127.0.0.1:6379> BITOP NOT notValue value #1111 0110 (integer) 1
如下展示了一個用 SDS 表示的一字節(8位)長的位圖:
擴展:Redis 中的每個對象都是有一個 redisObject 結構表示的。
typedef struct redisObject { // 類型 unsigned type:4; // 編碼 unsigned encoding:4; unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) */ // 引用計數 int refcount; // 執行底層實現的數據結構的指針 void *ptr; } robj;
type
的值為 REDIS_STRING
表示這是一個字符串對象
sdshdr.len
的值為1表示這個SDS保存了一個1字節大小的位數組
buf數組中的buf[0]
實際保存了位數組
buf數組中的buf[1]
為自動追加的\0
字符
為了便于我們觀察,buf數組的每個字節都用一行來表示,buf[i]表示這是buf數組的第i個字節,buf[i]之后的8個格子表示這個字節上的8位。
為再次拉齊各位思想,如下展示了另外一個位數組:
位數組由buf[0]、buf[1]和buf[2]三個字節保存,真實數據為
1111 0000 1100 0011 1010 0101
GETBIT
用于返回位數組在偏移量上的二進制位的值。值得我們注意的是,GETBIT
的時間復雜度是O(1)
。
GETBIT
命令的執行過程如下:
計算$ byte = \lfloor offset\p8 \rfloor $ (即>>3
),byte 值表示指定的
o
f
f
s
e
t
offset
offset 位于位數組的哪個字節(計算在第幾行);
指定 b u f [ i ] buf[i] buf[i] 中的 i i i了,接下來就要計算在8個字節中的第幾位呢?使用 $ bit = (offset\ %\ 8)+1 $計算可得;
根據 b y t e byte byte 和 b i t bit bit 在位數組中定位到目標值返回即可。
GETBIT
命令源碼如下所示:
void getbitCommand(client *c) { robj *o; char llbuf[32]; uint64_t bitoffset; size_t byte, bit; size_t bitval = 0; // 獲取offset if (getBitOffsetFromArgument(c,c->argv[2],&bitoffset,0,0) != C_OK) return; // 查找對應的位圖對象 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,OBJ_STRING)) return; // 計算offset位于位數組的哪一行 byte = bitoffset >> 3; // 計算offset在一行中的第幾位,等同于取模 bit = 7 - (bitoffset & 0x7); // #define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR) if (sdsEncodedObject(o)) { // SDS 是RAW 或者 EMBSTR類型 if (byte < sdslen(o->ptr)) // 獲取指定位置的值 // 注意它不是真正的一個二維數組不能用((uint8_t*)o->ptr)[byte][bit]去獲取呀~ bitval = ((uint8_t*)o->ptr)[byte] & (1 << bit); } else { // SDS 是 REDIS_ENCODING_INT 類型的整數,先轉為String if (byte < (size_t)ll2string(llbuf,sizeof(llbuf),(long)o->ptr)) bitval = llbuf[byte] & (1 << bit); } addReply(c, bitval ? shared.cone : shared.czero); }
以GETBIT array 3
為例,array
表示上圖中三個字節的位數組。
1. $byte = \lfloor 3 \p8 \rfloor$ 得到值為0,說明在 $buf[0]$ 上 2. $bit = (3\ mod\ 8 ) + 1$得到值為4 3. 定位到 $buf[0]$ 字節的從左至右第4個位置上
因為 GETBIT
命令執行的所有操作都可以在常數時間內完成,所以該命令的算法復雜度為O(1)。
SETBIT
用于將位數組在偏移量的二進制位的值設為value,并向客戶端返回舊值。
SITBIT
命令的執行過程如下:
計算 l e n = ? o f f s e t ÷ 8 ? + 1 len = \lfloor offset÷8\rfloor + 1 len=?offset÷8?+1, l e n len len值記錄了保存 o f f s e t offset offset偏移量指定的二進制位至少需要多少字節
檢查位數組的長度是否小于 l e n len len,如果是的話,將SDS的長度擴展為len字節,并將所有新擴展空間的二進制位設置為0
計算 b y t e = ? o f f s e t ÷ 8 ? byte = \lfloor offset÷8\rfloor byte=?offset÷8?, b y t e byte byte值表示指定的 o f f s e t offset offset位于位數組的那個字節(就是計算在那個 b u f [ i ] buf[i] buf[i]中的 i i i)
使用 b i t = ( o f f s e t m o d 8 ) + 1 bit = (offset\ mod\ 8)+1 bit=(offset mod 8)+1計算可得目標 b u f [ i ] buf[i] buf[i]的具體第幾位
根據 b y t e byte byte和 b i t bit bit的值,首先保存 o l d V a l u e oldValue oldValue,然后將新值 v a l u e value value設置到目標位上
返回舊值
因為SETBIT命令執行的所有操作都可以在常數時間內完成,所以該命令的算法復雜度為O(1)。
SETBIT
命令源碼如下所示:
void setbitCommand(client *c) { robj *o; char *err = "bit is not an integer or out of range"; uint64_t bitoffset; ssize_t byte, bit; int byteval, bitval; long on; // 獲取offset if (getBitOffsetFromArgument(c,c->argv[2],&bitoffset,0,0) != C_OK) return; // 獲取我們需要設置的值 if (getLongFromObjectOrReply(c,c->argv[3],&on,err) != C_OK) return; /* 判斷指定值是否為0或1 */ if (on & ~1) { // 設置了0和1之外的值,直接報錯 addReplyError(c,err); return; } // 根據key查詢SDS對象(會自動擴容) if ((o = lookupStringForBitCommand(c,bitoffset)) == NULL) return; /* 獲得當前值 */ byte = bitoffset >> 3; byteval = ((uint8_t*)o->ptr)[byte]; bit = 7 - (bitoffset & 0x7); bitval = byteval & (1 << bit); /* 更新值并返回舊值 */ byteval &= ~(1 << bit); byteval |= ((on & 0x1) << bit); ((uint8_t*)o->ptr)[byte] = byteval; // 發送數據修改通知 signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id); server.dirty++; addReply(c, bitval ? shared.cone : shared.czero); }
array
表示上圖中的三個字節位數組。以SETBIT array 10 1
為例:
$ len = \lfloor10÷8\rfloor + 1$得到值為2,說明至少需要2字節長的位數組
檢查是否需要擴容,不需要
計算$byte = 1 , 計 算 ,計算 ,計算bit = 3 , 保 存 ,保存 ,保存oldValue$,設置新值
返回 o l d V a l u e oldValue oldValue
假設目標位數組為1字節長度。執行SETBIT array 12 1
,執行如下:
l e n = ? 12 ÷ 8 ? + 1 len = ?12÷8? + 1 len=?12÷8?+1 得到值為2,說明需要2字節長的 SDS
檢查是否需要擴容,需要呀!由 SDS 的自動擴容機制可知, SDS 將按新長度的兩倍擴容。
計算 $byte = 1 $
計算 $bit = 5 $
保存 o l d V a l u e oldValue oldValue,設置新值
返回 o l d V a l u e oldValue oldValue
BITCOUNT
命令用于統計給定位數組中值為1的二進制位的數量。功能似乎不復雜,但實際上要高效地實現這個命令并不容易,需要用到一些精巧的算法。
統計一個位數組中非0二進制位的數量在數學上被稱為"計算漢明重量"。
實現BITCOUNT
命令最簡單直接的方法,就是遍歷位數組中的每個二進制位,并在遇到值為1的二進制位時將計數器加1。
小數據量還好,大數據量直接PASS!
對于一個有限集合來說,集合元素的排列方式是有限的,并且對于一個有限長度的位數組來說,它能表示的二進制位排列也是有限的。根據這個原理,我們可以創建一個表,表的鍵為某種排列的位數組,而表的值則是相應位數組中值為1的二進制位的數量。
對于8位長的位數組來說,我們可以創建下表,通過這個表格我們可以一次從位數組中讀入8位,然后根據這8位的值進行查表,直接知道這個值包含了多少個1。
可惜,查表法耗內存呀!
目前已知效率最好的通用算法為variable-precision SWAR
算法,該算法通過一系列位移和位運算操作,可以在常數時間(這就很牛逼了)內計算多個字節的漢明重量,并且不需要使用任何額外的內存。
SWAR
算法代碼如下所示:
uint32_t swar(uint32_t i) { // 5的二進制:0101 i = (i & 0x55555555) + ((i >> 1) & 0x55555555); // 3的二進制:0011 i = (i & 0x33333333) + ((i >> 2) & 0x33333333); i = (i & 0x0F0F0F0F) + ((i >> 4) & 0x0F0F0F0F); i = (i*(0x01010101) >> 24); return i; i = i - ((i >> 1) & 0x55555555); i = (i & 0x33333333) + ((i >> 2) & 0x33333333); return (((i + (i >> 4)) & 0x0F0F0F0F) * 0x01010101) >> 24; }
下面描述一下這幾步都干了啥:
步驟一計算出的值i的二進制表示可以按每兩個二進制位為一組進行分組,各組的十進制表示就是該組的1的數量;
步驟二計算出的值i的二進制表示可以按每四個二進制位為一組進行分組,各組的十進制表示就是該組的1的數量;
步驟三計算出的值i的二進制表示可以按每八個二進制位為一組進行分組,各組的十進制表示就是該組的1的數量;
步驟四的
i*0x01010101
語句計算出bitarray中1的數量并記錄在二進制位的最高八位,而>>24
語句則通過右移運算,將bitarray的漢明重量移動到最低八位,得出的結果就是bitarray的漢明重量。
對于調用swar(0xFBB4080B)
,步驟一將計算出值0xA6640406
,這個值表的每兩個二進制位的十進制表示記錄了0xFBB4080B
每兩個二進制位的漢明重量。
步驟二將計算出值0x43310103
,這個值表的每四個二進制位的十進制表示記錄了0xFBB4080B
每四個二進制位的漢明重量。
步驟三將計算出值0x7040103
,這個值表的每八個二進制位的十進制表示記錄了0xFBB4080B
每八個二進制位的漢明重量。
步驟四首先計算0x7040103 * 0x01010101 = 0xF080403
,將漢明重量聚集到二進制位的最高八位。
之后計算0xF080403 >> 24
,將漢明重量移動到低八位,得到最終值0x1111
,即十進制15。
如果您是Java程序員,可以去看看Integer.bitCount方法,也是基于SWAR算法的思想哦!
大家也可以看看StackFlow上大神對它的講解:[How does this algorithm to count the number of set bits in a 32-bit integer work?](https://stackoverflow.com/questions/22081738/how-does-this-algorithm-to-count-the-number-of-set-bits-in-a-32-bit-integer-work)3.4.4 源碼分析
Redis 中通過調用redisPopcount
方法統計漢明重量,源碼如下所示:
long long redisPopcount(void *s, long count) { long long bits = 0; unsigned char *p = s; uint32_t *p4; // 為查表法準備的表 static const unsigned char bitsinbyte[256] = {0,1,1,2,1,2,2,3,1,2,2,3,2,3,3,4,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,4,5,5,6,5,6,6,7,5,6,6,7,6,7,7,8}; // CPU一次性讀取8個字節,如果4字節跨了兩個8字節,需要讀取兩次才行 // 所以考慮4字節對齊,只需讀取一次就可以讀取完畢 while((unsigned long)p & 3 && count) { bits += bitsinbyte[*p++]; count--; } // 一次性處理28字節,單獨看一個aux就容易理解了,其實就是SWAR算法 // uint32_t:4字節 p4 = (uint32_t*)p; while(count>=28) { uint32_t aux1, aux2, aux3, aux4, aux5, aux6, aux7; aux1 = *p4++;// 一次性讀取4字節 aux2 = *p4++; aux3 = *p4++; aux4 = *p4++; aux5 = *p4++; aux6 = *p4++; aux7 = *p4++; count -= 28;// 共處理了4*7=28個字節,所以count需要減去28 aux1 = aux1 - ((aux1 >> 1) & 0x55555555); aux1 = (aux1 & 0x33333333) + ((aux1 >> 2) & 0x33333333); aux2 = aux2 - ((aux2 >> 1) & 0x55555555); aux2 = (aux2 & 0x33333333) + ((aux2 >> 2) & 0x33333333); aux3 = aux3 - ((aux3 >> 1) & 0x55555555); aux3 = (aux3 & 0x33333333) + ((aux3 >> 2) & 0x33333333); aux4 = aux4 - ((aux4 >> 1) & 0x55555555); aux4 = (aux4 & 0x33333333) + ((aux4 >> 2) & 0x33333333); aux5 = aux5 - ((aux5 >> 1) & 0x55555555); aux5 = (aux5 & 0x33333333) + ((aux5 >> 2) & 0x33333333); aux6 = aux6 - ((aux6 >> 1) & 0x55555555); aux6 = (aux6 & 0x33333333) + ((aux6 >> 2) & 0x33333333); aux7 = aux7 - ((aux7 >> 1) & 0x55555555); aux7 = (aux7 & 0x33333333) + ((aux7 >> 2) & 0x33333333); bits += ((((aux1 + (aux1 >> 4)) & 0x0F0F0F0F) + ((aux2 + (aux2 >> 4)) & 0x0F0F0F0F) + ((aux3 + (aux3 >> 4)) & 0x0F0F0F0F) + ((aux4 + (aux4 >> 4)) & 0x0F0F0F0F) + ((aux5 + (aux5 >> 4)) & 0x0F0F0F0F) + ((aux6 + (aux6 >> 4)) & 0x0F0F0F0F) + ((aux7 + (aux7 >> 4)) & 0x0F0F0F0F))* 0x01010101) >> 24; } /* 剩余的不足28字節,使用查表法統計 */ p = (unsigned char*)p4; while(count--) bits += bitsinbyte[*p++]; return bits; }
不難發現 Redis 中同時運用了查表法和SWAR算法完成BITCOUNT
功能。
如果沒有1GB的內存限制,我們可以使用排序和Set完成這個算法:
排序:① 首先將40億個QQ號進行排序;② 從小到大遍歷,跳過重復元素只取第一個元素。
Set:將40億個QQ號統統放進Set集合中,自動完成去重,Perfect
這樣回答是要GG的節奏呀!
對40億個QQ號進行排序需要多少時間?這個存儲了40億QQ號的數組容量已經超過1GB了,同理Set集合存儲這么多的QQ號也導致內存超限了。
**這不巧了么~我們可以使用剛剛學的BITMAP
來去重呀!**一個字節可以記錄8個數是否存在(類似于計數排序),將QQ號對應的offset的值設置為1表示此數存在,遍歷完40億個QQ號后直接統計BITMAP
上值為1的offset即可完成QQ號的去重。
如果是對40億個QQ號進行排序也是可以用位圖完成的哦~一樣的思想
既然我們深入了解了BITMAP
,那不進行個實戰項目可說不過去呀!
我們使用
BITMAP
實現GITHUB中統計每天提交次數的這個小功能,基于SpringBoot+Echarts實現
如果是記錄登錄狀態我們可以很方便的使用0和1記錄,如果是記錄提交次數就顯得BITMAP
無用了,沒關系,我們可以使用一個字節來記錄提交次數,只是在業務上需要處理好十進制和二進制直接的轉換而已。
public void genTestData() { if(redisUtils.isExist(CommonConstant.KEY)){ return; } // 獲取當前年的總天數 int days = getDays(); for (int i = 0; i < days; i++) { int random = ThreadLocalRandom.current().nextInt(64); // 生成隨機數表示每天的PR次數 String binaryString = Integer.toBinaryString(random); if (binaryString.length() < 8) { // 填充0 if(binaryString.length() == 0){binaryString = "00000000";} else if(binaryString.length() == 1){binaryString = "0000000"+binaryString;} else if(binaryString.length() == 2){binaryString = "000000"+binaryString;} else if(binaryString.length() == 3){binaryString = "00000"+binaryString;} else if(binaryString.length() == 4){binaryString = "0000"+binaryString;} else if(binaryString.length() == 5){binaryString = "000"+binaryString;} else if(binaryString.length() == 6){binaryString = "00"+binaryString;} else if(binaryString.length() == 7){binaryString = "0"+binaryString;} } char[] chars = binaryString.toCharArray(); for (int j = 0; j < chars.length; j++) { // 設置BitMap redisUtils.setBit(CommonConstant.KEY,i*8+j,chars[j]); } }}/** * 獲取當前年的總天數 * @return days 總天數 */private int getDays(){ Calendar calOne = Calendar.getInstance(); int year = calOne.get(Calendar.YEAR); System.out.println(year); Calendar calTwo = new GregorianCalendar(year, 11, 31); return calTwo.get(Calendar.DAY_OF_YEAR);}
public List<String> getPushData() { List<String> res = new ArrayList<>(366); // 沒有數據就先造數據 genTestData(); int days = getDays(); for(long i=0;i<days;i++){ StringBuilder sb = new StringBuilder(); for (int j = 0; j < 8; j++) { String bit = redisUtils.getBit(CommonConstant.KEY, i * 8 + j); sb.append(bit); } // 直接返回二進制串,前端轉換為十進制 res.add(sb.toString()); } return res;}
這里覺得可以直接將所有的bit統統返回,在前端進行分割處理
<script type="text/javascript"> var chartDom = document.getElementById('main'); var myChart = echarts.init(chartDom); var option; function getVirtulData(year) { var date = +echarts.number.parseDate(year + '-01-01'); var end = +echarts.number.parseDate(+year + 1 + '-01-01'); var dayTime = 3600 * 24 * 1000; var data = []; $.ajax({ "url":'http://localhost:8080/test/getPushData', "async":false, // ajax同步獲取 success:function (res){ for (let time = date,k=0; time < end && k < res.data.length; time += dayTime,k++) { data.push([ echarts.format.formatTime('yyyy-MM-dd', time), parseInt(res.data[k],2)//客戶端完成進制轉換,不放在服務端完成 ]); } } }) return data; } option = { title: { top: 30, left: 'left', text: 'BitMap Demo' }, tooltip: {}, visualMap: { min: 0, max: 32, type: 'piecewise', orient: 'horizontal', left: 'right', top: 220, pieces: [ {min: 0, max: 0,label:"less"}, {min: 1, max: 10,label:" "}, {min: 1, max: 20,label:" "}, {min: 21, max: 40,label:" "}, {min: 41, max: 64,label:"more"}, ], inRange: { color: [ '#EAEDF0', '#9AE9A8', '#41C363', '#31A14E', '#206D38' ],//顏色設置 colorAlpha: 0.9,//透明度 } }, calendar: { top: 120, left: 30, right: 30, cellSize: 13, range: '2022', splitLine: { show: false },//不展示邊線 itemStyle: { borderWidth: 0.5 }, yearLabel: { show: false } }, series: { type: 'heatmap', coordinateSystem: 'calendar', data: getVirtulData('2022') } }; option && myChart.setOption(option);</script>
感謝各位的閱讀,以上就是“redis的bitmap使用實例分析”的內容了,經過本文的學習后,相信大家對redis的bitmap使用實例分析這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。