什么是跳表?
跳表(Skip List)是一种基于概率的数据结构,结合了链表的简单性和二叉搜索树的查找效率。它通过在普通链表上增加多层索引,允许快速查找、插入和删除操作。Redis 使用跳表作为 ZSET
(有序集合)的底层实现之一,用于高效支持范围查询和排名操作。
想象你在一个图书馆找一本书。如果只有一排书架(普通链表),你得一本一本翻,效率很低。但如果图书馆有“索引层”,比如每隔 10 本书有一个标记,告诉你这 10 本的范围,你可以先跳到大概的位置,再细找。跳表就是这个思路:通过多层索引“跳跃”到目标附近。
Redis 为什么要用跳表?
在 Redis 的 ZSET
中,每个元素有值(value)和分值(score),需要按分值排序并支持以下操作:
- 快速查找某个分值的元素(
ZSCORE
)。
- 按分值范围查询(如
ZRANGEBYSCORE
)。
- 获取排名(如
ZRANK
)。
- 高效插入和删除。
Redis 的 ZSET
底层使用跳表 + 哈希表的组合:
- 哈希表:存储 value 到 score 的映射,O(1) 时间查找分值。
- 跳表:维护按 score 排序的结构,支持范围查询和排名。
为什么不用红黑树或 AVL 树?跳表有以下优势:
- 实现简单:跳表基于链表,代码逻辑比平衡树直观。
- 并发友好:跳表的插入和删除操作局部性强,适合并发场景。
- 范围查询高效:跳表的多层索引天然适合范围扫描。
- 概率均衡:通过随机层高,跳表在平均情况下达到 O(log N) 的查找效率,无需复杂的旋转操作。
跳表的结构原理
1. 基本结构
跳表是一个多层链表,底层是包含所有元素的完整链表(称为 Level 0),上层是稀疏的索引层(Level 1, Level 2, …)。每个节点可能出现在多层中,层数越高,节点越稀疏。
用一个生活化的例子解释:
- 假设你在火车站排队买票,队伍很长(底层链表)。
- 站长每隔 5 个人放一个“快速通道”标记(Level 1),让你快速跳到某个区域。
- 更高层的“超级快速通道”(Level 2)每隔 25 个人一个标记,覆盖更大的范围。
- 你可以从最高层开始,快速定位到目标区域,再逐层向下细化。
在 Redis 中,跳表节点(zskiplistNode
)的结构如下:
- 值(value):存储元素的实际数据(字符串或二进制数据)。
- 分值(score):用于排序的浮点数。
- 后向指针(backward):指向前一个节点,方便反向遍历。
- 层(level):一个数组,每个元素包含:
- 前向指针(forward):指向该层中的下一个节点。
- 跨度(span):表示从当前节点到下一个节点在底层链表中的距离(用于计算排名)。
跳表的整体结构(zskiplist
)包含:
- 头节点(header):一个空的起始节点,方便操作。
- 尾节点(tail):指向最后一个节点。
- 长度(length):底层链表的节点数。
- 最大层数(maxlevel):当前跳表最高的层数。
2. 随机层数
跳表的关键是每个节点的层数是随机的。Redis 使用一个概率算法来决定新节点的层数,规则是:
- 每次以概率
p
(Redis 中 p = 0.25
)决定是否增加一层。
- 最多允许 32 层(
ZSKIPLIST_MAXLEVEL
)。
以下是用 Go 语言实现的随机层数生成代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
package main
import (
"math/rand"
"time"
)
const (
ZSKIPLIST_MAXLEVEL = 32 // 最大层数
ZSKIPLIST_P = 0.25 // 概率 p = 0.25
)
// randomLevel 生成新节点的随机层数
func randomLevel() int {
rand.Seed(time.Now().UnixNano())
level := 1
for rand.Float64() < ZSKIPLIST_P && level < ZSKIPLIST_MAXLEVEL {
level++
}
return level
}
func main() {
// 示例:生成 10 个随机层数
for i := 0; i < 10; i++ {
level := randomLevel()
fmt.Printf("节点 %d 的层数: %d\n", i+1, level)
}
}
|
这个算法确保:
- 大多数节点层数较低(接近 1 或 2 层)。
- 少数节点有较高层数,形成稀疏的索引层。
- 平均情况下,跳表的层高分布符合对数复杂度。
3. 查找过程
查找一个分值(score)的过程从最高层开始,逐步向下:
- 从头节点的最高层出发,检查当前节点的下一个节点(forward)。
- 如果下一个节点的分值小于目标分值,或者为空,继续向右移动。
- 如果下一个节点的分值大于或等于目标分值,或者到达末尾,回到当前节点的下一层。
- 重复直到到达底层(Level 0),找到目标节点或确认不存在。
以下是用 Go 模拟查找过程的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
package main
import "fmt"
// 节点结构
type SkipListNode struct {
score float64
value string
forward []*SkipListNode // 前向指针数组
span []int // 跨度数组
}
// 跳表结构
type SkipList struct {
header *SkipListNode
maxLevel int
length int
}
// search 查找分值为 score 的节点
func (sl *SkipList) search(score float64) *SkipListNode {
current := sl.header
// 从最高层开始
for level := sl.maxLevel - 1; level >= 0; level-- {
// 向右遍历当前层
for current.forward[level] != nil && current.forward[level].score < score {
current = current.forward[level]
}
}
// 到达底层,检查是否找到
current = current.forward[0]
if current != nil && current.score == score {
return current
}
return nil
}
func main() {
// 示例:初始化跳表并查找
sl := &SkipList{
header: &SkipListNode{forward: make([]*SkipListNode, ZSKIPLIST_MAXLEVEL)},
maxLevel: 1,
length: 0,
}
// 假设已插入节点
result := sl.search(42.0)
if result != nil {
fmt.Printf("找到节点,分值: %f, 值: %s\n", result.score, result.value)
} else {
fmt.Println("未找到节点")
}
}
|
查找的平均时间复杂度是 O(log N),因为每层节点数量大约是下一层的 1/p(Redis 中 p = 0.25,所以每层节点数约为下一层的 1/4)。
4. 插入过程
插入新节点需要:
- 查找插入位置:类似查找过程,记录每层经过的最后一个节点(称为
update
数组)。
- 生成随机层数:决定新节点的层数。
- 更新指针:将新节点插入到每一层的正确位置,调整前向指针和跨度。
- 更新跳表元数据:如最大层数和长度。
以下是插入的 Go 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
package main
import (
"fmt"
"math/rand"
"time"
)
const (
ZSKIPLIST_MAXLEVEL = 32
ZSKIPLIST_P = 0.25
)
type SkipListNode struct {
score float64
value string
forward []*SkipListNode
span []int
}
type SkipList struct {
header *SkipListNode
maxLevel int
length int
}
func randomLevel() int {
rand.Seed(time.Now().UnixNano())
level := 1
for rand.Float64() < ZSKIPLIST_P && level < ZSKIPLIST_MAXLEVEL {
level++
}
return level
}
func (sl *SkipList) insert(score float64, value string) {
// 记录每层的前驱节点
update := make([]*SkipListNode, ZSKIPLIST_MAXLEVEL)
rank := make([]int, ZSKIPLIST_MAXLEVEL) // 记录跨度
current := sl.header
// 查找插入位置
for level := sl.maxLevel - 1; level >= 0; level-- {
for current.forward[level] != nil && current.forward[level].score < score {
rank[level] += current.span[level]
current = current.forward[level]
}
update[level] = current
}
// 生成新节点的层数
level := randomLevel()
if level > sl.maxLevel {
sl.maxLevel = level
}
// 创建新节点
newNode := &SkipListNode{
score: score,
value: value,
forward: make([]*SkipListNode, level),
span: make([]int, level),
}
// 更新指针和跨度
for i := 0; i < level; i++ {
newNode.forward[i] = update[i].forward[i]
update[i].forward[i] = newNode
newNode.span[i] = update[i].span[i] - rank[i]
update[i].span[i] = rank[i] + 1
}
sl.length++
}
func main() {
sl := &SkipList{
header: &SkipListNode{forward: make([]*SkipListNode, ZSKIPLIST_MAXLEVEL)},
maxLevel: 1,
length: 0,
}
sl.insert(42.0, "answer")
fmt.Printf("插入节点,分值: 42.0, 值: answer\n")
}
|
5. 删除过程
删除过程类似插入:
- 查找目标节点,记录每层的前驱节点。
- 更新前驱节点的指针和跨度,移除目标节点。
- 如果删除后最大层数减少,更新
maxLevel
。
6. 跨度和排名
跨度(span)记录了节点在底层链表中的距离,用于快速计算排名。例如:
- 节点 A 到 B 的跨度是 3,说明 A 和 B 之间有 2 个节点。
- 要计算节点 X 的排名,从头节点遍历,累加跨度。
Redis 跳表的实现细节
Redis 的跳表实现(t_zset.c
)有一些独特的设计:
- 内存优化:节点的值(value)只在底层存储一次,上层通过指针引用,减少内存占用。
- 双精度浮点数:分值使用
double
类型,支持高精度排序。
- 后向指针:方便反向遍历,用于实现
ZREVRANGE
等命令。
- 层数限制:最多 32 层,平衡性能和内存。
性能分析
- 时间复杂度:
- 查找、插入、删除:平均 O(log N),最坏 O(N)。
- 范围查询:O(log N + k),其中 k 是返回的元素数。
- 空间复杂度:O(N),平均每节点约 1.33 层(1 / (1 - p))。
- 并发性:Redis 单线程模型下,跳表无需锁,简化实现。
实际应用场景
跳表在 Redis 的 ZSET
中广泛应用于:
- 排行榜:游戏分数排行,按分值排序,快速获取 Top N。
- 消息队列:按时间戳排序的消息队列,支持范围查询。
- 地理位置:存储经纬度分数,快速查找附近的用户。
总结
跳表是 Redis ZSET
的核心数据结构,通过随机层数和多层索引实现高效的排序和范围查询。它结合了链表的简单性和树的查找效率,适合动态数据场景。通过本文的讲解和 Go 代码示例,你应该对跳表的原理和实现有了深入理解。
评论 0