python标准库--heapq - 堆队列算法(优先队列)在算法比赛的应用
目录
一、基本操作
1.构造堆
2.访问堆顶元素(返回堆顶元素)
3.删除堆顶元素(返回堆顶元素)
4.插入新元素,时间复杂度为 O (log n)
5. 插入并删除元素(高效操作)
6. 高级操作- 合并多个有序序列
7.高级操作-获取最大 / 最小的 K 个元素
8.高级操作-实现最大堆
9.自定义对象的堆
10.时间复杂度
二、实例
1.优先队列
2. Top-K 问题
3. 合并有序序列
4. 动态维护中位数
5. 区间调度问题
一、基本操作
1.构造堆
import heapq
# 方法1
heap = []
heapq.heappush(heap, 3)
heapq.heappush(heap, 1)
heapq.heappush(heap, 2)
print(heap) # 输出: [1, 3, 2](最小堆的内部结构)# 方法2
arr = [3, 1, 2]
heapq.heapify(arr)
print(arr) # 输出: [1, 3, 2]
2.访问堆顶元素(返回堆顶元素)
arr[0]
3.删除堆顶元素(返回堆顶元素)
heapq.heappop(arr)
4.插入新元素,时间复杂度为 O (log n)
heapq.heappush(arr, 5)
5. 插入并删除元素(高效操作)
使用 heappushpop()
或 heapreplace()
:
heappushpop(heap, item)
:先插入item
,再删除并返回堆顶元素。heapreplace(heap, item)
:先删除并返回堆顶元素,再插入item
。
arr = [2, 3, 5]
print(heapq.heappushpop(arr, 0)) # 输出: 0(插入0后弹出最小值0)
print(arr) # 输出: [2, 3, 5]print(heapq.heapreplace(arr, 1)) # 输出: 2(弹出原堆顶2,插入1)
print(arr) # 输出: [1, 3, 5]
6. 高级操作- 合并多个有序序列
a = [1, 4, 7]
b = [2, 5, 8]
merged = heapq.merge(a, b)
print(list(merged)) # 输出: [1, 2, 4, 5, 7, 8]
7.高级操作-获取最大 / 最小的 K 个元素
nlargest(k, iterable, key=None)
:返回可迭代对象中最大的 K 个元素。nsmallest(k, iterable, key=None)
:返回可迭代对象中最小的 K 个元素。
arr = [3, 1, 4, 1, 5, 9, 2, 6, 5]
print(heapq.nlargest(3, arr)) # 输出: [9, 6, 5]
print(heapq.nsmallest(3, arr)) # 输出: [1, 1, 2]# 结合 key 参数(例如对字典按值排序)
students = [{'name': 'Alice', 'score': 85},{'name': 'Bob', 'score': 92},{'name': 'Charlie', 'score': 78}
]
print(heapq.nlargest(2, students, key=lambda x: x['score']))
# 输出: [{'name': 'Bob', 'score': 92}, {'name': 'Alice', 'score': 85}]
8.高级操作-实现最大堆
heapq
默认为最小堆。若需最大堆,可将元素取负后存储,取出时再取负还原。
arr = [3, 1, 4]
max_heap = [-x for x in arr]
heapq.heapify(max_heap)# 获取最大值
max_val = -heapq.heappop(max_heap)
print(max_val) # 输出: 4
9.自定义对象的堆
通过实现 __lt__
方法定义对象的比较规则
class Item:def __init__(self, value, priority):self.value = valueself.priority = prioritydef __lt__(self, other):# 定义优先级比较规则(优先级高的元素更小)return self.priority < other.priorityheap = []
heapq.heappush(heap, Item("task1", 3))
heapq.heappush(heap, Item("task2", 1))
heapq.heappush(heap, Item("task3", 2))# 按优先级出队
print(heapq.heappop(heap).value) # 输出: "task2"(优先级1最高)
10.时间复杂度
操作 | 代码示例 | 时间复杂度 |
---|---|---|
插入元素 | heapq.heappush(heap, item) | O(log n) |
删除堆顶元素 | heapq.heappop(heap) | O(log n) |
获取堆顶元素 | heap[0] | O(1) |
列表转换为堆 | heapq.heapify(arr) | O(n) |
获取最大 / 最小 K 元素 | heapq.nlargest(k, arr) | O(n log k) |
二、实例
1.优先队列
典型问题:
- Dijkstra 算法(单源最短路径)
- 哈夫曼编码(最优前缀编码)
- 任务调度(按优先级执行任务)
示例:Dijkstra 算法
import heapqdef dijkstra(graph, start):n = len(graph)dist = [float('inf')] * ndist[start] = 0heap = [(0, start)] # (距离, 节点)while heap:d, u = heapq.heappop(heap)if d > dist[u]:continuefor v, w in graph[u]:if dist[v] > d + w:dist[v] = d + wheapq.heappush(heap, (dist[v], v))return dist# 示例图的邻接表表示
graph = [[(1, 4), (2, 1)], # 节点0的邻接节点及边权[(3, 2)], # 节点1的邻接节点及边权[(1, 2), (3, 5)], # 节点2的邻接节点及边权[] # 节点3的邻接节点及边权
]print(dijkstra(graph, 0)) # 输出: [0, 3, 1, 5]
2. Top-K 问题
求数组中最大 / 最小的 K 个元素,时间复杂度为 O (N log K)。
示例:最小的 K 个数
import heapqdef smallest_k(arr, k):if k == 0:return []heap = []for num in arr:# 维护一个大小为K的最大堆(用负数模拟)if len(heap) < k:heapq.heappush(heap, -num)else:if -num > heap[0]: # 当前数比堆顶的数更小heapq.heappop(heap)heapq.heappush(heap, -num)return [-x for x in heap]arr = [3, 2, 1, 5, 6, 4]
print(smallest_k(arr, 2)) # 输出: [1, 2]
3. 合并有序序列
高效合并多个有序数组 / 链表,时间复杂度为 O (N log M)(M 为序列数)。
示例:合并 K 个有序链表
import heapq
from typing import List, Optionalclass ListNode:def __init__(self, val=0, next=None):self.val = valself.next = nextdef merge_k_lists(lists: List[Optional[ListNode]]) -> Optional[ListNode]:dummy = ListNode(0)curr = dummyheap = []# 初始化堆:将每个链表的头节点加入堆for i, node in enumerate(lists):if node:heapq.heappush(heap, (node.val, i, node))while heap:val, idx, node = heapq.heappop(heap)curr.next = nodecurr = curr.nextif node.next:heapq.heappush(heap, (node.next.val, idx, node.next))return dummy.next
4. 动态维护中位数
使用两个堆(最大堆 + 最小堆)动态维护数据流的中位数。
示例:数据流的中位数
import heapqclass MedianFinder:def __init__(self):self.max_heap = [] # 存储较小的一半数(取负)self.min_heap = [] # 存储较大的一半数def addNum(self, num: int) -> None:if not self.max_heap or num <= -self.max_heap[0]:heapq.heappush(self.max_heap, -num)else:heapq.heappush(self.min_heap, num)# 平衡两个堆的大小if len(self.max_heap) > len(self.min_heap) + 1:heapq.heappush(self.min_heap, -heapq.heappop(self.max_heap))elif len(self.min_heap) > len(self.max_heap):heapq.heappush(self.max_heap, -heapq.heappop(self.min_heap))def findMedian(self) -> float:if len(self.max_heap) == len(self.min_heap):return (-self.max_heap[0] + self.min_heap[0]) / 2else:return -self.max_heap[0]# 示例用法
median_finder = MedianFinder()
median_finder.addNum(1)
median_finder.addNum(2)
print(median_finder.findMedian()) # 输出: 1.5
5. 区间调度问题
例如会议室预订、活动安排等,需高效判断区间重叠。
示例:会议室预订
import heapqdef min_meeting_rooms(intervals):if not intervals:return 0# 按开始时间排序intervals.sort(key=lambda x: x[0])heap = [] # 存储会议室的结束时间for start, end in intervals:if heap and start >= heap[0]:# 最早结束的会议室可用,更新该会议室的结束时间heapq.heappop(heap)# 分配新会议室或更新已分配会议室的结束时间heapq.heappush(heap, end)return len(heap)intervals = [[0, 30], [5, 10], [15, 20]]
print(min_meeting_rooms(intervals)) # 输出: 2