目录

Redis-底层数据结构源码剖析

Redis 底层数据结构源码剖析

一直好奇为什么 Redis 处理速度这么快,很大部分是因为数据结构设计的好,所以学习一下 Redis 数据结构的底层实现。

Redis 发展到现在已经有 9 种数据类型了,其中最基础、最常用的数据类型有 5 种,它们分别是:字符串类型(String)、列表类型(List)、哈希表类型(Hash)、集合类型(Set)、有序集合类型(SortedSet/Zset)

redisObject

在 Redis 中,所有的对象都会包含 redisObject 对象头,

//在 server.h 头文件中
typedef struct redisObject {
    unsigned type:4;    /* 4 bit */
    unsigned encoding:4;    /* 4 bit */
    unsigned lru:LRU_BITS; /* 3 个字节 */
    int refcount;   /* 4 个字节 */
    void *ptr;  /* 8 个字节 */
} robj;

它的参数说明如下:

  • type:对象的数据类型,例如:string、list、hash 等,占用 4 bits 也就是半个字符的大小;
  • encoding:对象数据编码,占用 4 bits;
  • lru:记录对象的 LRU(Least Recently Used 的缩写,即最近最少使用)信息,内存回收时会用到此属性,占用 24 bits(3 字节);
  • refcount:引用计数器,占用 32 bits(4 字节);
  • *ptr:对象指针用于指向具体的内容,占用 64 bits(8 字节)。

在我们使用 object encoding key 查看键值对的数据类型时,发现有不同的情况,

  1. int 类型
127.0.0.1:6379> set key 666
OK
127.0.0.1:6379> object encoding key
"int"
  1. embstr 类型
127.0.0.1:6379> set key abc
OK
127.0.0.1:6379> object encoding key
"embstr"
  1. raw 类型
127.0.0.1:6379> set key abcdefghigklmnopqrstyvwxyzabcdefghigklmnopqrs
OK
127.0.0.1:6379> object encoding key
"raw"

存储整数时是 int 类型,存储字符串时是 embstr 类型,存储的字符串长度 超过 44 个字节 (Redis 5.0后)时数据类型是 raw 类型。

在 Redis中,SDS 存储的值为 64字节时,Redis 的内存分配器会认为此对象为大字符串,并使用 raw 类型来存储。64个字节包括了 Redis 对象头字节长度+ SDS 结构头字节长度+结束标识\0。

https://i-blog.csdnimg.cn/direct/3d91705c1b8541f18d0e4316dee1051a.png

字符串

Redis 中的字符串类型本质是 sds,sds 定义的结构体有五种 sdshdr5,sdshdr8,sdshdr16,sdshdr32,sdshdr64。 sdshdr5已经不再使用 ,如果字符串长度小于2的5次方,会把字符串长度设为2的8次方,即 sdshdr8

https://i-blog.csdnimg.cn/direct/e0990c012d544a5389d0e3fbac065968.png

typedef char *sds;

struct __attribute__ ((__packed__)) sdshdr8 {
    uint8_t len; /* 字符串现有长度,8位无符号整型,小于 256 字节*/  /*1字节*/
    uint8_t alloc; /* 已分配的空间 */	/*1字节*/
    unsigned char flags; /* SDS 类型 */	/*1字节*/
    char buf[]; /* 字符数组 */
};

SDS 定义多种结构体是为了灵活应对不同的数据长度,因为不同类型的结构体结构头占有字节大小也不同,可以更好地节省内存空间。

除了结构头的优化外,Redis 还在编译器处理方面做了优化,比如 __attribute__ ((__packed__)) 告诉编译器不采用字节对齐的方式分配内存,而是采用 紧凑 的方式分配内存,这是因为在默认情况下,编译器会按照 8 字节对齐的方式,给变量分配内存。也就是说,即使一个变量的大小不到 8 个字节,编译器也会给它分配 8 个字节。

字典

字典类型(hash)本质上是通过数组加链表结构组成的,添加时,如果哈希冲突就添加到链表的尾部(尾插法)。

// dict.h
typedef struct dictEntry {	
    void *key;
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    struct dictEntry *next;	// 下一个 dictEntry
} dictEntry;

typedef struct dictht {
    dictEntry **table;
    unsigned long size;
    unsigned long sizemask;
    unsigned long used;
} dictht;

typedef struct dict {
    dictType *type;
    void *privdata;
    dictht ht[2];	/*两个dictht,另一个用来 rehash*/
    long rehashidx; /* 判断是否进行 rehash 的标识 */
    unsigned long iterators; /* number of iterators currently running */
} dict;

https://i-blog.csdnimg.cn/direct/54b62612201248bcadecf5ebe3492b0f.png

https://i-blog.csdnimg.cn/direct/98c114b36fa94c158826eee3fd785e7f.png

每次向字典中添加或删除时,需要判断是否进行 rehash,进行扩容或缩容

  • 扩容

    _dictExpandIfNeeded 方法中判断是否进行扩容或rehash

static int _dictExpandIfNeeded(dict *d)
{
    /* 1. 已经扩容后 rehashidx = 0,直接开始 rehash  */
    if (dictIsRehashing(d)) return DICT_OK;

    /* 2. 第一次初始化时,分配内存空间,不进行 rehash */
    if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

    /* 3. ht[0]的已使用的长度大于ht[0]长度 或 ht[0]已使用的长度大于ht[0]长度的5倍时扩容 */
    /* dict_can_resize 是为了确保在 fork 时不进行 rehash =》 bgsave 或 bgrewriteaof*/
    if ((dict_can_resize == DICT_RESIZE_ENABLE &&
         d->ht[0].used >= d->ht[0].size) ||
        (dict_can_resize != DICT_RESIZE_FORBID &&
         d->ht[0].used / d->ht[0].size > dict_force_resize_ratio))
    {
    	/*扩容为原来已用空间的两倍*/
        return dictExpand(d, d->ht[0].used*2);
    }
    // DICT_OK = 0, rehashidx != -1, 开始进行 rehash
    return DICT_OK;
}
int dictExpand(dict *d, unsigned long size)
{
    /* 需要容量的小于当前容量,不需要扩容 */
    if (dictIsRehashing(d) || d->ht[0].used > size)
        return DICT_ERR;
    dictht n; /* the new hash table */
    unsigned long realsize = _dictNextPower(size);
    /* 计算后的 realsize 等于当前容量大小,不需要扩容 */
    if (realsize == d->ht[0].size) 
    	return DICT_ERR;
    /* 分配一个新的哈希表,并初始化所有的指针指向 NULL,==》开始扩容 */
    n.size = realsize;
    n.sizemask = realsize-1;
    n.table = zcalloc(realsize*sizeof(dictEntry*));
    n.used = 0;
    /* 如果是第一次初始化,为 ht[0] 分配空间,直接返回 */
    if (d->ht[0].table == NULL) {
        d->ht[0] = n;
        return DICT_OK;
    }
    /* 为 ht[1] 分配空间,rehashidx=0,准备 rehash */
    d->ht[1] = n;
    d->rehashidx = 0;
    return DICT_OK;
}

从以上源码可以看出,如果需要扩容则会申请一个新的内存地址赋值给 ht[1],并把字典的 rehashindex 设置为 0,表示之后需要进行 rehash 操作。

  • 缩容

    当字典的使用容量不足总空间的 10% 时就会触发缩容,Redis 在进行缩容时也会把 rehashindex 设置为 0,表示之后需要进行 rehash 操作。

扩容或缩容后,开始 rehash

static void _dictRehashStep(dict *d) {
    if (d->iterators == 0) dictRehash(d,1);
}
int dictRehash(dict *d, int n) {
	// rehash 时, 遍历的 bucket 为空的次数是 n*10
    int empty_visits = n*10;
	// ...
	// 开始拷贝n个哈希桶, 直到原ht[0]个数等于0或哈希桶个数为0
    while(n-- && d->ht[0].used != 0) {
        dictEntry *de, *nextde;

        // 从 empty_visit 个哈希桶中遍历 dictEntry!=null, 并开始 rehash
        while(d->ht[0].table[d->rehashidx] == NULL) {
            d->rehashidx++;
            // empty_visits 为有限个 n*10, 如果多个 bucket 都为null, 直接返回, 防止一直阻塞主线程
            if (--empty_visits == 0) return 1;
        }
        de = d->ht[0].table[d->rehashidx];
        /* 开始将 ht[0] 迁移到 ht[1] 中 */
        /* 反转链表 */
        while(de) {
            uint64_t h;
            
            nextde = de->next;
            h = dictHashKey(d, de->key) & d->ht[1].sizemask;
            de->next = d->ht[1].table[h];
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        }
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
    }
    
    /* 已经迁移完成,释放原 ht[0] */
    if (d->ht[0].used == 0) {
        zfree(d->ht[0].table);
        d->ht[0] = d->ht[1];
        _dictReset(&d->ht[1]);
        d->rehashidx = -1;
        return 0;
    }
    return 1;
}

rehashidx 变量表示的是当前 rehash 在对哪个 bucket 做数据迁移。比如,当 rehashidx 等于 0 时,表示对 ht[0]中的第一个 bucket 进行数据迁移;当 rehashidx 等于 1 时,表示对 ht[0]中的第二个 bucket 进行数据迁移,以此类推。

当ht[0]容量已满,已扩容且将要 rehash 时,因为整个 hash 结构容量过大,如果直接把所有 bucket 进行 rehash 可能会阻塞主线程,会产生较大的 rehash开销 , Redis 为了减少 rehash产生的开销(长时间阻塞主线程),采用了 渐进式rehash 的方式。在 rehash 时,如果遍历 bucket 次数为空的个数超过了 n*10( empty_visit ),则不再继续 rehash。

压缩链表

压缩链表在内存中是一块连续的内存空间,在 ziplist.h 中没有定义压缩链表的结构体

/* Create a new empty ziplist. */
unsigned char *ziplistNew(void) {
	//初始分配的大小
    unsigned int bytes = ZIPLIST_HEADER_SIZE+ZIPLIST_END_SIZE;
    unsigned char *zl = zmalloc(bytes);
    ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
    ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
    ZIPLIST_LENGTH(zl) = 0;
	//将列表尾设置为ZIP_END
    zl[bytes-1] = ZIP_END;
    return zl;
}

创建一个新的 ziplist 后,ziplist的结构

https://i-blog.csdnimg.cn/direct/4d4329728667454ebb386bd1a8bf5566.png

当我们向 ziplist 中新增数据时,ziplist 会根据添加的是字符串还是数字,以及数据的大小进行不同的编码处理,这种根据数据大小进行相应编码的设计思想,正是 Redis 为了节省内存而采用的。

typedef struct zlentry {
    unsigned int prevrawlensize; // 前一个项使用的字节大小
    unsigned int prevrawlen;	 // 前一个项的长度
    unsigned int lensize;		 // 当前项使用字节大小
    unsigned int len;			 // 当前项的长度
    unsigned int headersize;     // headersize = prevrawlensize + lensize
    unsigned char encoding;      // 编码方式,比如 ZIP_STR_* 或 ZIP_INT_*
    unsigned char *p;			 // 指向每一个当前项的起始位置, 也可以通过前一项的长度prev-entry-len确定 
} zlentry;

https://i-blog.csdnimg.cn/direct/784df83130c64386b660931bbba39a8f.png

// ziplist.c
unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
	...
	// 设置 prevlen 前一项的长度 1 或 5字节
	reqlen += zipStorePrevEntryLength(NULL,prevlen);
	// 针对字符串或整数设置不同的编码大小
    reqlen += zipStoreEntryEncoding(NULL,encoding,slen);
    ...
}

unsigned int zipStorePrevEntryLength(unsigned char *p, unsigned int len) {
	// ZIP_BIG_PREVLEN=254
	// len 前一项的字节大小
    if (p == NULL) {
        return (len < ZIP_BIG_PREVLEN) ? 1 : sizeof(len)+1;
    } else {
    	// 判断prevlen的长度是否小于ZIP_BIG_PREVLEN
        if (len < ZIP_BIG_PREVLEN) {
			// 如果前一项的长度小于254字节,那么返回prevlen为1字节
            p[0] = len;
            return 1;
        } else {
	        // 否则将 p[0] 设为 254, 且拷贝前一项长度从第二字节到第五字节
	        // 返回prevlen大小为5字节
            return zipStorePrevEntryLengthLarge(p,len);
        }
    }
}
  • 前一项长度 小于 254 字节 ,prevlen 使用 1字节 空间保存这个长度值
  • 前一项长度 大于等于 254 字节 ,prevlen 使用 5字节 空间保存这个长度值

压缩链表的问题及改进

  • 查找效率低

    压缩链表查找头节点尾节点效率很高,但是随机查找中间节点的效率较低

  • 连锁更新风险

    压缩链表可能存在 连锁更新或内存的重分配 问题,当前项的 prevlen 会根据前一个节点长度分配不同的空间,这可能会引发连锁更新的问题。

假如有多个 zlentry,每个 zlentry 大小都在 250~253字节,现在新增了一个前置项,且大小超过了 254字节,后一项的 prevlen 无法用 1字节来保存新添加前一项的长度,需要扩容为 5字节,扩容后的当前项总大小超过了 254字节,同样导致后一项 prevlen 的扩容更新。

所以压缩链表通常用来保存数量不多的情况,避免大量的连锁更新。Redis 后续版本对压缩链表发生的连锁更新等问题做了优化,比如quicklist 或 listpack。

quicklist

quicklist底层实际上使用了双向链表和压缩列表, 一个quicklist就是一个双向链表,链表内的元素(quicklistNode)是一个 ziplist

https://i-blog.csdnimg.cn/direct/0462a96052dd442abba3a70c488a8b5f.png

typedef struct quicklistNode {
    struct quicklistNode *prev;
    struct quicklistNode *next;
    unsigned char *zl;			 // 指向 ---- ziplist 的指针
    unsigned int sz;             /* ziplist size in bytes */
    unsigned int count : 16;     /* count of items in ziplist */
    unsigned int encoding : 2;   /* RAW==1 or LZF==2 */
	...
} quicklistNode;

typedef struct quicklist {
    quicklistNode *head;
    quicklistNode *tail;
    unsigned long count;        /* total count of all entries in all ziplists */
    unsigned long len;          /* number of quicklistNodes */
    int fill : QL_FILL_BITS;              /* fill factor for individual nodes */
    unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */
    unsigned int bookmark_count: QL_BM_BITS;
    quicklistBookmark bookmarks[];
} quicklist;

quicklist 通过控制每个 quicklistNode 中,ziplist 的大小或是元素个数,就有效减少了在 ziplist 中新增或修改元素后,发生连锁更新的情况,从而提供了更好的访问性能。

Redis 列表,底层就是 quicklist 实现的,虽然随机查询效率较低,但是 quicklist 的头尾节点查询效率很高,非常适合作为列表的数据结构,比如 lpush,rpush等从两端操作的命令。

listpack

listpack 和 ziplist 结构类似也是连续紧凑的内存空间,区别在于 listpack 每一项存储的不再是前一项的字节长度,而是当前项的字节大小 ===》 解决了连锁更新的问题

当我们在 listpack 中新增或修改元素时,实际上只会涉及每个列表项自己的操作,而不会影响后续列表项的长度变化,这就避免了连锁更新。

listpack 是对压缩列表的改善,适合存储元素个数较少的情况,所以当 有序集合 和 字典 中当元素个数较少时,用 listpack 存储。

集合

集合(Set)底层是通过整数集合(intSet)或哈希表(hashtable)的形式存储的。

  • 当集合类型以 hashtable 存储时,哈希表的 key 为要插入的元素值,而哈希表的 value 则为 Null。
  • 当集合中所有的值都为整数时,Redis 会使用 intset 结构来存储。

当所有元素都为整数时,集合会以 intset 结构进行存储。

当发生以下两种情况时,会导致集合类型使用 hashtable 存储:

1)当元素的个数超过一定数量时,默认是 512 个,该值可通过命令 set-max-intset-entries xxx 来配置

2)当元素为非整数时,集合将会使用 hashtable 来存储

// 传入的 value 是 sds 类型,需要判断编码方式是整数还是字符串
int setTypeAdd(robj *subject, sds value) {
    long long llval;
    // 编码方式是 hashtable
    if (subject->encoding == OBJ_ENCODING_HT) {
        dict *ht = subject->ptr;
        // hashkey = value, hashvalue = NULL
        dictEntry *de = dictAddRaw(ht,value,NULL);
        if (de) {
            dictSetKey(ht,de,sdsdup(value));
            dictSetVal(ht,de,NULL);
            return 1;
        }
    } else if (subject->encoding == OBJ_ENCODING_INTSET) {
        // 编码方式是 intset
        if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
            uint8_t success = 0;
            subject->ptr = intsetAdd(subject->ptr,llval,&success);
            if (success) {
                size_t max_entries = server.set_max_intset_entries;
                // 超过 inset 的最大存储数量,则使用字典类型存储
                if (max_entries >= 1<<30) max_entries = 1<<30;
                if (intsetLen(subject->ptr) > max_entries)
                    setTypeConvert(subject,OBJ_ENCODING_HT);
                return 1;
            }
        } else {
            // 转化为整数类型失败,使用字典类型存储
            setTypeConvert(subject,OBJ_ENCODING_HT);
            
            serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
            return 1;
        }
    } else {
        serverPanic("Unknown set encoding");
    }
    return 0;
}

有序集合

在使用 Redis 的 zset 查询存储的数据时,无论是范围查询还是等值查询效率都非常高,底层使用了字典和跳表存储

// server.h
typedef struct zset {
    dict *dict;		// 字典
    zskiplist *zsl; // 跳表
} zset;

typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;

typedef struct zskiplistNode {
	// zset 中的元素
    sds ele;
    // 元素的权重值
    double score;
    // 
    struct zskiplistNode *backward;
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned long span;
    } level[];
} zskiplistNode;

当查询一个节点时,跳表会先从头节点的最高层开始,查找下一个节点。

在查找时,先根据节点的权重值查找当前层,如果权重值小于目标权重值,则继续访问当前层的下一个节点,如果权重值相等,则比较 SDS 值是否和目标 SDS 值相等,如果小于 目标 SDS 值,则继续访问当前层的下一个节点。当这两个条件都不满足时,则使用当前 level 数组里的指针,跳转到下一层,接着查找。

当跳表从最高层开始进行查找时,由于每一层结点数都约是下一层结点数的一半,这种查找过程就 类似于二分查找 ,查找复杂度可以降低到 O(logN)。但这种设计方法也会带来负面影响,那就是为了维持相邻两层上结点数的比例为 2:1,一旦有新的结点插入或是有结点被删除,那么插入或删除处的结点,及其后续结点的层数都需要进行调整,而这样就带来了额外的开销。

跳表在创建结点时,采用的是另一种设计方法,即 随机生成每个结点的层数 。此时,相邻两层链表上的结点数并不需要维持在严格的 2:1 关系。这样一来,当新插入一个结点时,只需要修改前后结点的指针,而其他结点的层数就不需要随之改变了,这就降低了插入操作的复杂度。

// t_zset.c
#define ZSKIPLIST_MAXLEVEL 64  //最大层数为64
#define ZSKIPLIST_P 0.25       //随机数的值为0.25
int zslRandomLevel(void) {
	// 初始化层数为 1
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
	...
	if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
		...
	}
	// 编码方式是 skiplist
	if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        zskiplistNode *znode;
        dictEntry *de;

        // 从哈希表中查询新增的元素 ele
        de = dictFind(zs->dict,ele);
        // 如果新元素存在
        if (de != NULL) {
            /* NX? Return, same element already exists. */
            if (nx) {
                *flags |= ZADD_NOP;
                return 1;
            }
            // 从 dict 中获取之前 key=ele 的 value:score
            curscore = *(double*)dictGetVal(de);

            /* Prepare the score for the increment if needed. */
            // 如果要更新元素的权重值
            if (incr) {
                score += curscore;
                if (isnan(score)) {
                    *flags |= ZADD_NAN;
                    return 0;
                }
                if (newscore) *newscore = score;
            }

            /* Remove and re-insert when score changes. */
            //如果权重发生变化了
            if (score != curscore) {
                //更新跳表结点
                znode = zslUpdateScore(zs->zsl,curscore,ele,score);
                //让哈希表元素的值指向跳表结点的权重
                dictGetVal(de) = &znode->score;
                *flags |= ZADD_UPDATED;
            }
            return 1;
        } 
        // 如果新元素不存在
        else if (!xx) {
            ele = sdsdup(ele);
            // 新插入跳表结点
            znode = zslInsert(zs->zsl,score,ele);
            // 新插入哈希表元素
            serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
            *flags |= ZADD_ADDED;
            if (newscore) *newscore = score;
            return 1;
        } 
        ...
    }
}

Sorted Set 先是通过在它的数据结构中同时定义了跳表和哈希表,来实现同时使用这两种索引结构。然后,Sorted Set 在执行数据插入或是数据更新的过程中,会依次在跳表和哈希表中插入或更新相应的数据,从而保证了跳表和哈希表中记录的信息一致。

Sorted Set 既可以使用跳表支持数据的范围查询,还能使用哈希表支持根据元素直接查询它的权重。