PyQt5 K线图实现与性能优化详解
下面我将展示一个完整的PyQt5 K线图实现方案,包含详细说明和多项性能优化技术。
完整示例代码
import sys
import random
from PyQt5.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QWidget, QPushButton, QComboBox, QLabel)
from PyQt5.QtCore import Qt, QTimer, QRectF
from PyQt5.QtGui import QPainter, QColor, QPen, QFont
import numpy as np
from collections import dequeclass KLineItem:"""K线数据项,存储单个K线的所有信息"""def __init__(self, timestamp, open_price, close_price, high_price, low_price, volume):self.timestamp = timestampself.open = open_priceself.close = close_priceself.high = high_priceself.low = low_priceself.volume = volumedef is_rise(self):"""判断是阳线还是阴线"""return self.close >= self.openclass KLineWidget(QWidget):"""自定义K线图绘制控件"""def __init__(self, parent=None):super().__init__(parent)self.setBackgroundRole(Qt.white)self.setAutoFillBackground(True)# 数据相关self.kline_data = deque(maxlen=200) # 使用双端队列限制最大数据量self.current_data = [] # 当前显示的数据self.price_min = 0self.price_max = 0self.volume_max = 0# 显示设置self.candle_width = 8 # K线宽度self.margin = 10 # 边距self.show_volume = True # 是否显示成交量self.theme = "dark" # 主题# 性能优化相关self.cache_valid = Falseself.cache_image = Noneself.last_draw_rect = QRectF()# 模拟数据定时器self.timer = QTimer(self)self.timer.timeout.connect(self.add_random_data)def add_data(self, kline_item):"""添加K线数据"""self.kline_data.append(kline_item)self.update_price_range()self.cache_valid = False # 数据变化,缓存失效self.update()def add_random_data(self):"""添加随机数据(用于演示)"""if not self.kline_data:# 初始数据for _ in range(50):self.add_random_data()returnlast = self.kline_data[-1]# 基于最后一条数据生成新数据change_percent = random.uniform(-0.03, 0.03)new_close = last.close * (1 + change_percent)# 确保新价格在合理范围内new_close = max(new_close, self.price_min * 0.9)new_close = min(new_close, self.price_max * 1.1)# 生成新高低noise = random.uniform(-0.01, 0.01) * new_closenew_high = new_close * (1 + abs(noise))new_low = new_close * (1 - abs(noise))# 确保高低顺序正确new_high = max(new_high, new_low, new_close)new_low = min(new_high, new_low, new_close)# 生成成交量(随机波动)new_volume = max(100, int(last.volume * (1 + random.uniform(-0.2, 0.2))))self.add_data(KLineItem(timestamp=last.timestamp + 60, # 假设每分钟一条数据open_price=last.close,close_price=new_close,high_price=new_high,low_price=new_low,volume=new_volume))def update_price_range(self):"""更新价格范围"""if not self.kline_data:return# 计算价格范围(留出10%的边距)all_prices = []for item in self.kline_data:all_prices.extend([item.open, item.close, item.high, item.low])price_range = max(all_prices) - min(all_prices)self.price_min = min(all_prices) - price_range * 0.1self.price_max = max(all_prices) + price_range * 0.1# 计算成交量最大值self.volume_max = max(item.volume for item in self.kline_data) * 1.1def paintEvent(self, event):"""绘制K线图(核心性能优化点)"""# 1. 使用双缓冲减少闪烁painter = QPainter(self)painter.setRenderHint(QPainter.Antialiasing)# 2. 只重绘需要更新的区域draw_rect = event.rect()if self.cache_valid and self.last_draw_rect == draw_rect and self.cache_image:painter.drawImage(draw_rect, self.cache_image, draw_rect)return# 3. 创建缓存图像self.cache_image = QImage(self.size(), QImage.Format_ARGB32)cache_painter = QPainter(self.cache_image)cache_painter.setRenderHint(QPainter.Antialiasing)# 4. 绘制背景self.draw_background(cache_painter)# 5. 绘制坐标轴和网格self.draw_grid(cache_painter)# 6. 绘制K线(性能关键部分)self.draw_klines(cache_painter)# 7. 绘制成交量(可选)if self.show_volume:self.draw_volume(cache_painter)# 8. 完成缓存绘制cache_painter.end()# 9. 将缓存绘制到屏幕painter.drawImage(draw_rect, self.cache_image, draw_rect)# 10. 标记缓存有效self.cache_valid = Trueself.last_draw_rect = draw_rectdef draw_background(self, painter):"""绘制背景"""if self.theme == "dark":painter.fillRect(self.rect(), QColor(30, 30, 30))painter.setPen(QPen(QColor(50, 50, 50), 1))else:painter.fillRect(self.rect(), Qt.white)painter.setPen(QPen(QColor(220, 220, 220), 1))# 绘制背景网格(稍后在draw_grid中处理)def draw_grid(self, painter):"""绘制网格和坐标轴"""width = self.width()height = self.height()if self.show_volume:chart_height = height * 0.7 # 主图占70%volume_height = height * 0.2 # 成交量占20%else:chart_height = heightvolume_height = 0# 绘制主图网格painter.save()if self.theme == "dark":painter.setPen(QPen(QColor(50, 50, 50), 1))else:painter.setPen(QPen(QColor(230, 230, 230), 1))# 水平网格线(价格)price_steps = 5price_step = (self.price_max - self.price_min) / price_stepsfor i in range(price_steps + 1):y = chart_height - (i * chart_height / price_steps)painter.drawLine(0, y, width, y)# 绘制价格标签price = self.price_min + i * price_steppainter.drawText(5, y - 5, f"{price:.2f}")# 垂直网格线(时间) - 简化版,实际应根据数据量动态计算data_count = len(self.kline_data)if data_count > 1:time_steps = min(10, data_count // 2) # 最多显示10个时间标签for i in range(time_steps + 1):x = self.margin + i * (width - 2 * self.margin) / time_stepspainter.drawLine(x, 0, x, chart_height)# 绘制时间标签(简化版,实际应使用真实时间)if data_count > 0:idx = int(i * (data_count - 1) / time_steps)if idx < data_count:painter.drawText(x - 15, chart_height + 15, str(idx))painter.restore()# 绘制成交量区域网格(如果显示成交量)if self.show_volume and self.volume_max > 0:painter.save()if self.theme == "dark":painter.setPen(QPen(QColor(50, 50, 50), 1))else:painter.setPen(QPen(QColor(230, 230, 230), 1))volume_y_start = chart_height + 5# 成交量网格线(简化版)for i in range(1, 3):y = volume_y_start + i * volume_height / 3painter.drawLine(0, y, width, y)painter.restore()def draw_klines(self, painter):"""绘制K线(性能关键部分)"""width = self.width()height = self.height()if self.show_volume:chart_height = height * 0.7else:chart_height = heightdata_count = len(self.kline_data)if data_count == 0:return# 计算每个K线的宽度和间距total_width = width - 2 * self.marginavailable_width = total_width - (data_count + 1) * 2 # 2像素间距if available_width <= 0:returnself.candle_width = max(2, available_width / data_count) # 最小宽度2像素# 准备当前显示的数据范围(性能优化:只绘制可见区域)# 这里简化处理,实际应根据视口范围动态计算start_idx = max(0, data_count - int(total_width / (self.candle_width + 2)))self.current_data = list(self.kline_data)[start_idx:]data_count = len(self.current_data)# 绘制K线painter.save()for i, item in enumerate(self.current_data):x = width - self.margin - (i + 0.5) * (self.candle_width + 2)# 计算价格坐标high_y = chart_height - (item.high - self.price_min) / (self.price_max - self.price_min) * chart_heightlow_y = chart_height - (item.low - self.price_min) / (self.price_max - self.price_min) * chart_heightopen_y = chart_height - (item.open - self.price_min) / (self.price_max - self.price_min) * chart_heightclose_y = chart_height - (item.close - self.price_min) / (self.price_max - self.price_min) * chart_height# 绘制上下影线if self.theme == "dark":pen = QPen(QColor(100, 100, 100), 1)else:pen = QPen(QColor(150, 150, 150), 1)painter.setPen(pen)painter.drawLine(x, high_y, x, low_y)# 绘制实体candle_height = abs(open_y - close_y)if candle_height < 1: # 确保最小高度candle_height = 1candle_rect = QRectF(x - self.candle_width / 2,min(open_y, close_y),self.candle_width,candle_height)if item.is_rise():# 阳线(红色或绿色取决于主题)if self.theme == "dark":painter.setBrush(QColor(200, 50, 50))else:painter.setBrush(QColor(255, 50, 50))painter.setPen(QPen(QColor(200, 50, 50), 1))else:# 阴线if self.theme == "dark":painter.setBrush(QColor(50, 150, 50))else:painter.setBrush(QColor(50, 200, 50))painter.setPen(QPen(QColor(50, 150, 50), 1))painter.drawRect(candle_rect)painter.restore()def draw_volume(self, painter):"""绘制成交量"""if not self.current_data or self.volume_max <= 0:returnwidth = self.width()height = self.height()chart_height = height * 0.7volume_height = height * 0.2volume_y_start = chart_height + 5painter.save()for i, item in enumerate(self.current_data):x = width - self.margin - (i + 0.5) * (self.candle_width + 2)# 计算成交量高度vol_height = (item.volume / self.volume_max) * volume_heightif vol_height < 1:vol_height = 1vol_rect = QRectF(x - self.candle_width / 2,volume_y_start + volume_height - vol_height,self.candle_width,vol_height)# 根据涨跌设置不同颜色if item.is_rise():if self.theme == "dark":painter.setBrush(QColor(200, 50, 50))else:painter.setBrush(QColor(255, 100, 100))else:if self.theme == "dark":painter.setBrush(QColor(50, 150, 50))else:painter.setBrush(QColor(100, 200, 100))painter.setPen(Qt.NoPen)painter.drawRect(vol_rect)painter.restore()def resizeEvent(self, event):"""窗口大小变化时使缓存失效"""self.cache_valid = Falsesuper().resizeEvent(event)def clear_data(self):"""清除所有数据"""self.kline_data.clear()self.current_data.clear()self.cache_valid = Falseself.update()class KLineDemo(QMainWindow):"""主窗口类"""def __init__(self):super().__init__()self.setWindowTitle("PyQt5 K线图演示")self.setGeometry(100, 100, 1000, 600)# 创建UIself.init_ui()def init_ui(self):"""初始化用户界面"""# 主窗口部件central_widget = QWidget()self.setCentralWidget(central_widget)# 布局layout = QVBoxLayout()central_widget.setLayout(layout)# 标题title_label = QLabel("PyQt5 K线图演示")title_label.setAlignment(Qt.AlignCenter)title_label.setFont(QFont("Arial", 16))layout.addWidget(title_label)# K线图控件self.kline_widget = KLineWidget()layout.addWidget(self.kline_widget)# 控制面板control_panel = QWidget()control_layout = QVBoxLayout()control_panel.setLayout(control_layout)# 按钮区域button_layout = QHBoxLayout()# 开始按钮self.start_btn = QPushButton("开始模拟数据")self.start_btn.clicked.connect(self.start_simulation)button_layout.addWidget(self.start_btn)# 停止按钮self.stop_btn = QPushButton("停止")self.stop_btn.clicked.connect(self.stop_simulation)self.stop_btn.setEnabled(False)button_layout.addWidget(self.stop_btn)# 清除按钮self.clear_btn = QPushButton("清除数据")self.clear_btn.clicked.connect(self.clear_data)button_layout.addWidget(self.clear_btn)# 主题选择self.theme_combo = QComboBox()self.theme_combo.addItems(["暗色主题", "亮色主题"])self.theme_combo.currentTextChanged.connect(self.change_theme)button_layout.addWidget(self.theme_combo)# 成交量开关self.volume_check = QPushButton("隐藏成交量")self.volume_check.setCheckable(True)self.volume_check.setChecked(True)self.volume_check.clicked.connect(self.toggle_volume)button_layout.addWidget(self.volume_check)control_layout.addLayout(button_layout)layout.addWidget(control_panel)def start_simulation(self):"""开始模拟数据"""self.kline_widget.timer.start(500) # 每500ms添加一条数据self.start_btn.setEnabled(False)self.stop_btn.setEnabled(True)def stop_simulation(self):"""停止模拟数据"""self.kline_widget.timer.stop()self.start_btn.setEnabled(True)self.stop_btn.setEnabled(False)def clear_data(self):"""清除数据"""self.kline_widget.clear_data()def change_theme(self, theme):"""切换主题"""self.kline_widget.theme = "dark" if theme == "暗色主题" else "light"self.kline_widget.cache_valid = Falseself.kline_widget.update()def toggle_volume(self, checked):"""切换成交量显示"""self.kline_widget.show_volume = checkedself.volume_check.setText("隐藏成交量" if checked else "显示成交量")self.kline_widget.cache_valid = Falseself.kline_widget.update()if __name__ == "__main__":app = QApplication(sys.argv)# 性能优化:设置应用程序样式app.setStyle("Fusion")window = KLineDemo()window.show()sys.exit(app.exec_())
代码说明与性能优化技术
1. 数据结构优化
使用双端队列:
python
self.kline_data = deque(maxlen=200) # 限制最大数据量 |
- 自动淘汰旧数据,防止内存无限增长
- 比普通列表更高效的追加和弹出操作
K线数据项类:
python
class KLineItem: |
def __init__(self, timestamp, open_price, close_price, high_price, low_price, volume): |
self.timestamp = timestamp |
self.open = open_price |
self.close = close_price |
self.high = high_price |
self.low = low_price |
self.volume = volume |
- 结构化存储K线数据,便于管理和访问
2. 绘制性能优化
双缓冲技术:
python
# 在paintEvent中 |
self.cache_image = QImage(self.size(), QImage.Format_ARGB32) |
cache_painter = QPainter(self.cache_image) |
# ...绘制到缓存... |
painter.drawImage(draw_rect, self.cache_image, draw_rect) |
- 减少屏幕闪烁
- 避免重复计算
局部重绘:
python
draw_rect = event.rect() |
if self.cache_valid and self.last_draw_rect == draw_rect and self.cache_image: |
painter.drawImage(draw_rect, self.cache_image, draw_rect) |
return |
- 只重绘需要更新的区域
- 显著提高性能,特别是在窗口调整大小时
动态K线宽度计算:
python
available_width = total_width - (data_count + 1) * 2 # 2像素间距 |
self.candle_width = max(2, available_width / data_count) # 最小宽度2像素 |
- 根据显示区域自动调整K线宽度
- 确保在不同窗口大小下都能正常显示
3. 数据可视化优化
价格范围动态计算:
python
def update_price_range(self): |
# 计算价格范围(留出10%的边距) |
price_range = max(all_prices) - min(all_prices) |
self.price_min = min(all_prices) - price_range * 0.1 |
self.price_max = max(all_prices) + price_range * 0.1 |
- 自动适应数据范围
- 留出适当边距使图表更美观
智能数据裁剪:
python
start_idx = max(0, data_count - int(total_width / (self.candle_width + 2))) |
self.current_data = list(self.kline_data)[start_idx:] |
- 只绘制当前视口可见的数据
- 大幅减少绘制量,提高性能
4. 主题与样式优化
主题支持:
python
self.theme = "dark" # 或 "light" |
- 提供暗色和亮色两种主题
- 减少不必要的样式计算
条件渲染:
python
if self.theme == "dark": |
painter.setPen(QPen(QColor(50, 50, 50), 1)) |
else: |
painter.setPen(QPen(QColor(230, 230, 230), 1)) |
- 根据主题选择不同颜色
- 避免运行时重复计算
5. 内存管理优化
缓存失效机制:
python
self.cache_valid = False # 数据变化时标记缓存失效 |
- 数据变化时自动使缓存失效
- 确保显示内容始终最新
资源清理:
python
def clear_data(self): |
self.kline_data.clear() |
self.current_data.clear() |
self.cache_valid = False |
self.update() |
- 提供明确的资源清理方法
- 防止内存泄漏
扩展性能优化建议
- 使用OpenGL加速:
- 对于极大量数据,可以考虑使用QOpenGLWidget进行硬件加速绘制
- 需要安装PyQt5的OpenGL支持
- 数据分页加载:
- 实现数据分页,只加载当前视口需要的数据
- 特别适合历史数据回放场景
- WebWorker模式:
- 对于特别复杂的数据处理,可以考虑使用QThread在后台处理数据
- 通过信号槽机制与主线程通信
- 简化复杂图形:
- 在极小的K线宽度下,可以简化影线绘制
- 使用位图或预渲染技术优化极大量数据的显示
- 使用QCharts:
- PyQt5的QtCharts模块提供了 candlestick 图表类型
- 适合快速实现基本K线图,但自定义程度较低
实际应用建议
- 真实数据接入:
- 替换
add_random_data
方法为真实数据源 - 可以使用WebSocket接入实时行情数据
- 替换
- 技术指标计算:
- 在数据添加时计算MA、MACD、KDJ等指标
- 可以单独绘制指标曲线
- 交互功能增强:
- 添加十字光标功能
- 实现缩放和平移操作
- 添加K线选择和详细信息显示
- 多周期支持:
- 实现不同时间周期(1分钟、5分钟、日K等)的切换
- 可以缓存不同周期的数据
这个实现提供了良好的基础框架和多项性能优化技术,可以根据实际需求进一步扩展和完善。