二叉堆
总结:二叉堆是一种能够动态排序的数据结构,是二叉树的一种延伸。
二叉堆的主要操作有两个:sink(下沉) 和 swim(上浮)。 二叉堆的主要运用:Priority Queue(优先级队列)与 Heap Sort(堆排序)
二叉树性质: 二叉堆是一种特殊的二叉树,这棵二叉树上的任意节点的值,都必须大于等于(或小于等于)其左右子树所有节点的值。如果是大于等于,我们称之为「大顶堆」,如果是小于等于,我们称之为「小顶堆」。 对于小顶堆,每个节点下方的所有节点的值都比它大,那么不难想象根节点就是整棵树上的最小值。同理,大顶堆的根节点就是整棵树上的最大值。所以二叉堆可以辅助我们快速找到最大值或最小值。 二叉堆还有个性质:一个二叉堆的左右子堆(子树)也是一个二叉堆。这个性质主要在 堆排序算法的优化 中有用到。
代码实现:
实际上,我们在实现二叉堆的时候,是使用数组来模拟二叉树结构。原因如下:
- 模拟二叉树结构时我们大多采用链表来构建,这种方式相比数组更加耗费空间。
push和pop方法需要先找到最底层右侧元素,而使用数组仅需O(1)的时间复杂度。- 需要注意的是:要想使用数组模拟,该二叉树必须为完全二叉树
最常见的应用:优先级队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class MyPriorityQueue:
def __init__(self, capacity, comparator=None):
# 堆数组
self.heap = [None] * capacity
# 堆中元素的数量
self.size = 0
# 元素比较器
self.comparator = comparator if comparator is not None else lambda x, y: (x > y) - (x < y)
# 返回堆的大小
def size(self):
return self.size
# 判断堆是否为空
def is_empty(self):
return self.size == 0
# 父节点的索引
def parent(self, node):
return (node - 1) // 2
# 左子节点的索引
def left(self, node):
return node * 2 + 1
# 右子节点的索引
def right(self, node):
return node * 2 + 2
# 交换数组的两个元素
def swap(self, i, j):
self.heap[i], self.heap[j] = self.heap[j], self.heap[i]
# 查,返回堆顶元素,时间复杂度 O(1)
def peek(self):
if self.is_empty():
raise IndexError("Priority queue underflow")
return self.heap[0]
# 增,向堆中插入一个元素,时间复杂度 O(logN)
def push(self, x):
# 扩容
if self.size == len(self.heap):
self.resize(2 * len(self.heap))
# 把新元素追加到最后
self.heap[self.size] = x
# 然后上浮到正确位置
self.swim(self.size)
self.size += 1
# 删,删除堆顶元素,时间复杂度 O(logN)
def pop(self):
if self.is_empty():
raise IndexError("Priority queue underflow")
res = self.heap[0]
# 把堆底元素放到堆顶
self.swap(0, self.size - 1)
# 避免对象游离
self.heap[self.size - 1] = None
self.size -= 1
# 然后下沉到正确位置
self.sink(0)
# 缩容
if self.size > 0 and self.size == len(self.heap) // 4:
self.resize(len(self.heap) // 2)
return res
# 上浮操作,时间复杂度是树高 O(logN)
def swim(self, node):
while node > 0 and self.comparator(self.heap[self.parent(node)], self.heap[node]) > 0:
self.swap(self.parent(node), node)
node = self.parent(node)
# 下沉操作,时间复杂度是树高 O(logN)
def sink(self, node):
while self.left(node) < self.size:
# 比较自己和左右子节点,看看谁最小
min_node = node
if self.left(node) < self.size and self.comparator(self.heap[self.left(node)], self.heap[min_node]) < 0:
min_node = self.left(node)
if self.right(node) < self.size and self.comparator(self.heap[self.right(node)], self.heap[min_node]) < 0:
min_node = self.right(node)
if min_node == node:
break
# 如果左右子节点中有比自己小的,就交换
self.swap(node, min_node)
node = min_node
# 调整堆的大小
def resize(self, capacity):
assert capacity >= self.size
new_heap = [None] * capacity
for i in range(self.size):
new_heap[i] = self.heap[i]
self.heap = new_heap
# 测试代码
if __name__ == "__main__":
# 小顶堆
pq = MyPriorityQueue(3, comparator=lambda x, y: (x > y) - (x < y))
pq.push(3)
pq.push(1)
pq.push(4)
pq.push(1)
pq.push(5)
pq.push(9)
# 1 1 3 4 5 9
while not pq.is_empty():
print(pq.pop())
This post is licensed under CC BY 4.0 by the author.