Redis 跳表的实现原理

什么是跳表?

跳表(Skip List)是一种基于概率的数据结构,结合了链表的简单性和二叉搜索树的查找效率。它通过在普通链表上增加多层索引,允许快速查找、插入和删除操作。Redis 使用跳表作为 ZSET(有序集合)的底层实现之一,用于高效支持范围查询和排名操作。

想象你在一个图书馆找一本书。如果只有一排书架(普通链表),你得一本一本翻,效率很低。但如果图书馆有“索引层”,比如每隔 10 本书有一个标记,告诉你这 10 本的范围,你可以先跳到大概的位置,再细找。跳表就是这个思路:通过多层索引“跳跃”到目标附近。

Redis 为什么要用跳表?

在 Redis 的 ZSET 中,每个元素有(value)和分值(score),需要按分值排序并支持以下操作:

  1. 快速查找某个分值的元素(ZSCORE)。
  2. 按分值范围查询(如 ZRANGEBYSCORE)。
  3. 获取排名(如 ZRANK)。
  4. 高效插入和删除。

Redis 的 ZSET 底层使用跳表 + 哈希表的组合:

  • 哈希表:存储 value 到 score 的映射,O(1) 时间查找分值。
  • 跳表:维护按 score 排序的结构,支持范围查询和排名。

为什么不用红黑树或 AVL 树?跳表有以下优势:

  1. 实现简单:跳表基于链表,代码逻辑比平衡树直观。
  2. 并发友好:跳表的插入和删除操作局部性强,适合并发场景。
  3. 范围查询高效:跳表的多层索引天然适合范围扫描。
  4. 概率均衡:通过随机层高,跳表在平均情况下达到 O(log N) 的查找效率,无需复杂的旋转操作。

跳表的结构原理

1. 基本结构

跳表是一个多层链表,底层是包含所有元素的完整链表(称为 Level 0),上层是稀疏的索引层(Level 1, Level 2, …)。每个节点可能出现在多层中,层数越高,节点越稀疏。

用一个生活化的例子解释:

  • 假设你在火车站排队买票,队伍很长(底层链表)。
  • 站长每隔 5 个人放一个“快速通道”标记(Level 1),让你快速跳到某个区域。
  • 更高层的“超级快速通道”(Level 2)每隔 25 个人一个标记,覆盖更大的范围。
  • 你可以从最高层开始,快速定位到目标区域,再逐层向下细化。

在 Redis 中,跳表节点(zskiplistNode)的结构如下:

  • 值(value):存储元素的实际数据(字符串或二进制数据)。
  • 分值(score):用于排序的浮点数。
  • 后向指针(backward):指向前一个节点,方便反向遍历。
  • 层(level):一个数组,每个元素包含:
    • 前向指针(forward):指向该层中的下一个节点。
    • 跨度(span):表示从当前节点到下一个节点在底层链表中的距离(用于计算排名)。

跳表的整体结构(zskiplist)包含:

  • 头节点(header):一个空的起始节点,方便操作。
  • 尾节点(tail):指向最后一个节点。
  • 长度(length):底层链表的节点数。
  • 最大层数(maxlevel):当前跳表最高的层数。

2. 随机层数

跳表的关键是每个节点的层数是随机的。Redis 使用一个概率算法来决定新节点的层数,规则是:

  • 每次以概率 p(Redis 中 p = 0.25)决定是否增加一层。
  • 最多允许 32 层(ZSKIPLIST_MAXLEVEL)。

以下是用 Go 语言实现的随机层数生成代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
    "math/rand"
    "time"
)

const (
    ZSKIPLIST_MAXLEVEL = 32 // 最大层数
    ZSKIPLIST_P        = 0.25 // 概率 p = 0.25
)

// randomLevel 生成新节点的随机层数
func randomLevel() int {
    rand.Seed(time.Now().UnixNano())
    level := 1
    for rand.Float64() < ZSKIPLIST_P && level < ZSKIPLIST_MAXLEVEL {
        level++
    }
    return level
}

func main() {
    // 示例:生成 10 个随机层数
    for i := 0; i < 10; i++ {
        level := randomLevel()
        fmt.Printf("节点 %d 的层数: %d\n", i+1, level)
    }
}

这个算法确保:

  • 大多数节点层数较低(接近 1 或 2 层)。
  • 少数节点有较高层数,形成稀疏的索引层。
  • 平均情况下,跳表的层高分布符合对数复杂度。

3. 查找过程

查找一个分值(score)的过程从最高层开始,逐步向下:

  1. 从头节点的最高层出发,检查当前节点的下一个节点(forward)。
  2. 如果下一个节点的分值小于目标分值,或者为空,继续向右移动。
  3. 如果下一个节点的分值大于或等于目标分值,或者到达末尾,回到当前节点的下一层。
  4. 重复直到到达底层(Level 0),找到目标节点或确认不存在。

以下是用 Go 模拟查找过程的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package main

import "fmt"

// 节点结构
type SkipListNode struct {
    score   float64
    value   string
    forward []*SkipListNode // 前向指针数组
    span    []int           // 跨度数组
}

// 跳表结构
type SkipList struct {
    header    *SkipListNode
    maxLevel  int
    length    int
}

// search 查找分值为 score 的节点
func (sl *SkipList) search(score float64) *SkipListNode {
    current := sl.header
    // 从最高层开始
    for level := sl.maxLevel - 1; level >= 0; level-- {
        // 向右遍历当前层
        for current.forward[level] != nil && current.forward[level].score < score {
            current = current.forward[level]
        }
    }
    // 到达底层,检查是否找到
    current = current.forward[0]
    if current != nil && current.score == score {
        return current
    }
    return nil
}

func main() {
    // 示例:初始化跳表并查找
    sl := &SkipList{
        header:   &SkipListNode{forward: make([]*SkipListNode, ZSKIPLIST_MAXLEVEL)},
        maxLevel: 1,
        length:   0,
    }
    // 假设已插入节点
    result := sl.search(42.0)
    if result != nil {
        fmt.Printf("找到节点,分值: %f, 值: %s\n", result.score, result.value)
    } else {
        fmt.Println("未找到节点")
    }
}

查找的平均时间复杂度是 O(log N),因为每层节点数量大约是下一层的 1/p(Redis 中 p = 0.25,所以每层节点数约为下一层的 1/4)。

4. 插入过程

插入新节点需要:

  1. 查找插入位置:类似查找过程,记录每层经过的最后一个节点(称为 update 数组)。
  2. 生成随机层数:决定新节点的层数。
  3. 更新指针:将新节点插入到每一层的正确位置,调整前向指针和跨度。
  4. 更新跳表元数据:如最大层数和长度。

以下是插入的 Go 代码示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package main

import (
    "fmt"
    "math/rand"
    "time"
)

const (
    ZSKIPLIST_MAXLEVEL = 32
    ZSKIPLIST_P        = 0.25
)

type SkipListNode struct {
    score   float64
    value   string
    forward []*SkipListNode
    span    []int
}

type SkipList struct {
    header    *SkipListNode
    maxLevel  int
    length    int
}

func randomLevel() int {
    rand.Seed(time.Now().UnixNano())
    level := 1
    for rand.Float64() < ZSKIPLIST_P && level < ZSKIPLIST_MAXLEVEL {
        level++
    }
    return level
}

func (sl *SkipList) insert(score float64, value string) {
    // 记录每层的前驱节点
    update := make([]*SkipListNode, ZSKIPLIST_MAXLEVEL)
    rank := make([]int, ZSKIPLIST_MAXLEVEL) // 记录跨度
    current := sl.header

    // 查找插入位置
    for level := sl.maxLevel - 1; level >= 0; level-- {
        for current.forward[level] != nil && current.forward[level].score < score {
            rank[level] += current.span[level]
            current = current.forward[level]
        }
        update[level] = current
    }

    // 生成新节点的层数
    level := randomLevel()
    if level > sl.maxLevel {
        sl.maxLevel = level
    }

    // 创建新节点
    newNode := &SkipListNode{
        score:   score,
        value:   value,
        forward: make([]*SkipListNode, level),
        span:    make([]int, level),
    }

    // 更新指针和跨度
    for i := 0; i < level; i++ {
        newNode.forward[i] = update[i].forward[i]
        update[i].forward[i] = newNode
        newNode.span[i] = update[i].span[i] - rank[i]
        update[i].span[i] = rank[i] + 1
    }

    sl.length++
}

func main() {
    sl := &SkipList{
        header:   &SkipListNode{forward: make([]*SkipListNode, ZSKIPLIST_MAXLEVEL)},
        maxLevel: 1,
        length:   0,
    }
    sl.insert(42.0, "answer")
    fmt.Printf("插入节点,分值: 42.0, 值: answer\n")
}

5. 删除过程

删除过程类似插入:

  1. 查找目标节点,记录每层的前驱节点。
  2. 更新前驱节点的指针和跨度,移除目标节点。
  3. 如果删除后最大层数减少,更新 maxLevel

6. 跨度和排名

跨度(span)记录了节点在底层链表中的距离,用于快速计算排名。例如:

  • 节点 A 到 B 的跨度是 3,说明 A 和 B 之间有 2 个节点。
  • 要计算节点 X 的排名,从头节点遍历,累加跨度。

Redis 跳表的实现细节

Redis 的跳表实现(t_zset.c)有一些独特的设计:

  1. 内存优化:节点的值(value)只在底层存储一次,上层通过指针引用,减少内存占用。
  2. 双精度浮点数:分值使用 double 类型,支持高精度排序。
  3. 后向指针:方便反向遍历,用于实现 ZREVRANGE 等命令。
  4. 层数限制:最多 32 层,平衡性能和内存。

性能分析

  • 时间复杂度
    • 查找、插入、删除:平均 O(log N),最坏 O(N)。
    • 范围查询:O(log N + k),其中 k 是返回的元素数。
  • 空间复杂度:O(N),平均每节点约 1.33 层(1 / (1 - p))。
  • 并发性:Redis 单线程模型下,跳表无需锁,简化实现。

实际应用场景

跳表在 Redis 的 ZSET 中广泛应用于:

  • 排行榜:游戏分数排行,按分值排序,快速获取 Top N。
  • 消息队列:按时间戳排序的消息队列,支持范围查询。
  • 地理位置:存储经纬度分数,快速查找附近的用户。

总结

跳表是 Redis ZSET 的核心数据结构,通过随机层数和多层索引实现高效的排序和范围查询。它结合了链表的简单性和树的查找效率,适合动态数据场景。通过本文的讲解和 Go 代码示例,你应该对跳表的原理和实现有了深入理解。

评论 0