Post

二叉堆

总结:二叉堆是一种能够动态排序的数据结构,是二叉树的一种延伸。

二叉堆的主要操作有两个:sink(下沉) 和 swim(上浮)。 二叉堆的主要运用:Priority Queue(优先级队列)与 Heap Sort(堆排序)

二叉树性质: 二叉堆是一种特殊的二叉树,这棵二叉树上的任意节点的值,都必须大于等于(或小于等于)其左右子树所有节点的值。如果是大于等于,我们称之为「大顶堆」,如果是小于等于,我们称之为「小顶堆」。 对于小顶堆,每个节点下方的所有节点的值都比它大,那么不难想象根节点就是整棵树上的最小值。同理,大顶堆的根节点就是整棵树上的最大值。所以二叉堆可以辅助我们快速找到最大值或最小值。 二叉堆还有个性质:一个二叉堆的左右子堆(子树)也是一个二叉堆。这个性质主要在 堆排序算法的优化 中有用到。

代码实现:

实际上,我们在实现二叉堆的时候,是使用数组来模拟二叉树结构。原因如下:

  1. 模拟二叉树结构时我们大多采用链表来构建,这种方式相比数组更加耗费空间。
  2. pushpop 方法需要先找到最底层右侧元素,而使用数组仅需 O(1)的时间复杂度。
  3. 需要注意的是:要想使用数组模拟,该二叉树必须为完全二叉树

最常见的应用:优先级队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class MyPriorityQueue:
    def __init__(self, capacity, comparator=None):
        # 堆数组
        self.heap = [None] * capacity
        
        # 堆中元素的数量
        self.size = 0
        
        # 元素比较器
        self.comparator = comparator if comparator is not None else lambda x, y: (x > y) - (x < y)

    # 返回堆的大小
    def size(self):
        return self.size

    # 判断堆是否为空
    def is_empty(self):
        return self.size == 0

    # 父节点的索引
    def parent(self, node):
        return (node - 1) // 2

    # 左子节点的索引
    def left(self, node):
        return node * 2 + 1

    # 右子节点的索引
    def right(self, node):
        return node * 2 + 2

    # 交换数组的两个元素
    def swap(self, i, j):
        self.heap[i], self.heap[j] = self.heap[j], self.heap[i]

    # 查,返回堆顶元素,时间复杂度 O(1)
    def peek(self):
        if self.is_empty():
            raise IndexError("Priority queue underflow")
        return self.heap[0]

    # 增,向堆中插入一个元素,时间复杂度 O(logN)
    def push(self, x):
        # 扩容
        if self.size == len(self.heap):
            self.resize(2 * len(self.heap))
        
        # 把新元素追加到最后
        self.heap[self.size] = x
        # 然后上浮到正确位置
        self.swim(self.size)
        self.size += 1

    # 删,删除堆顶元素,时间复杂度 O(logN)
    def pop(self):
        if self.is_empty():
            raise IndexError("Priority queue underflow")
        
        res = self.heap[0]
        # 把堆底元素放到堆顶
        self.swap(0, self.size - 1)
        # 避免对象游离
        self.heap[self.size - 1] = None
        self.size -= 1
        
        # 然后下沉到正确位置
        self.sink(0)
        
        # 缩容
        if self.size > 0 and self.size == len(self.heap) // 4:
            self.resize(len(self.heap) // 2)
        
        return res

    # 上浮操作,时间复杂度是树高 O(logN)
    def swim(self, node):
        while node > 0 and self.comparator(self.heap[self.parent(node)], self.heap[node]) > 0:
            self.swap(self.parent(node), node)
            node = self.parent(node)

    # 下沉操作,时间复杂度是树高 O(logN)
    def sink(self, node):
        while self.left(node) < self.size:
            # 比较自己和左右子节点,看看谁最小
            min_node = node
            if self.left(node) < self.size and self.comparator(self.heap[self.left(node)], self.heap[min_node]) < 0:
                min_node = self.left(node)
            if self.right(node) < self.size and self.comparator(self.heap[self.right(node)], self.heap[min_node]) < 0:
                min_node = self.right(node)
            if min_node == node:
                break
            # 如果左右子节点中有比自己小的,就交换
            self.swap(node, min_node)
            node = min_node

    # 调整堆的大小
    def resize(self, capacity):
        assert capacity >= self.size
        new_heap = [None] * capacity
        for i in range(self.size):
            new_heap[i] = self.heap[i]
        self.heap = new_heap

# 测试代码
if __name__ == "__main__":
    # 小顶堆
    pq = MyPriorityQueue(3, comparator=lambda x, y: (x > y) - (x < y))
    pq.push(3)
    pq.push(1)
    pq.push(4)
    pq.push(1)
    pq.push(5)
    pq.push(9)

    # 1 1 3 4 5 9
    while not pq.is_empty():
        print(pq.pop())
This post is licensed under CC BY 4.0 by the author.