当前位置: 首页 > ai >正文

图像解码失败检测

图像解码失败检测

视频解码失败时,出现一些反复重复或者高相似度的的行或者列,使用经典算法进行检测,未使用ai

1. 简易版本

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
快速图像行分析工具 - 专门检测行间相似性和重复
适用于检测解码失败的图像
"""import numpy as np
import cv2
import matplotlib.pyplot as plt
from scipy.stats import pearsonr
from collections import defaultdict
import hashlibdef quick_row_analysis(image_path_or_array):"""快速分析图像行间相似性Args:image_path_or_array: 图像路径或numpy数组"""# 加载图像if isinstance(image_path_or_array, str):image = cv2.imread(image_path_or_array, cv2.IMREAD_GRAYSCALE)if image is None:raise ValueError(f"无法加载图像: {image_path_or_array}")print(f"已加载图像: {image_path_or_array}")else:image = image_path_or_arrayif len(image.shape) == 3:image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)height, width = image.shapeprint(f"图像尺寸: {width} x {height}")# 1. 检测完全重复的行print("\n1. 检测完全重复的行...")row_hashes = {}duplicate_groups = defaultdict(list)for i in range(height):row_hash = hashlib.md5(image[i].tobytes()).hexdigest()if row_hash in row_hashes:duplicate_groups[row_hash].append(i)if len(duplicate_groups[row_hash]) == 1:duplicate_groups[row_hash].insert(0, row_hashes[row_hash])else:row_hashes[row_hash] = iif duplicate_groups:print(f"   发现 {len(duplicate_groups)} 组完全重复的行:")for i, (hash_val, rows) in enumerate(duplicate_groups.items()):print(f"   组{i+1}: 行 {rows} (共{len(rows)}行)")else:print("   未发现完全重复的行")# 2. 检测高相似行 (快速采样版本,避免过慢)print("\n2. 检测高相似行...")similar_pairs = []threshold = 0.95# 如果图像太大,进行采样if height > 500:print(f"   图像较大,采样分析前 500 行...")sample_indices = np.linspace(0, height-1, 500, dtype=int)sample_image = image[sample_indices]sample_height = len(sample_indices)for i in range(sample_height):for j in range(i+1, min(i+50, sample_height)):  # 只检查附近50行corr, _ = pearsonr(sample_image[i], sample_image[j])if corr >= threshold:actual_i, actual_j = sample_indices[i], sample_indices[j]similar_pairs.append((actual_i, actual_j, corr))else:for i in range(height):for j in range(i+1, min(i+20, height)):  # 只检查附近20行corr, _ = pearsonr(image[i], image[j])if corr >= threshold:similar_pairs.append((i, j, corr))if similar_pairs:print(f"   发现 {len(similar_pairs)} 对高相似行 (相关系数 >= {threshold}):")for i, (row1, row2, corr) in enumerate(similar_pairs[:10]):print(f"   行{row1} 与 行{row2}: 相关系数 {corr:.4f}")if len(similar_pairs) > 10:print(f"   ... 还有 {len(similar_pairs)-10} 对")else:print("   未发现高相似行")# 3. 分析行均值变化模式print("\n3. 分析行模式...")row_means = np.mean(image, axis=1)# 计算相邻行差异row_diffs = np.abs(np.diff(row_means))avg_diff = np.mean(row_diffs)std_diff = np.std(row_diffs)# 检测异常平稳的区域 (可能的重复区域)stable_regions = []stable_threshold = avg_diff * 0.1  # 变化小于平均值的10%current_start = Nonefor i, diff in enumerate(row_diffs):if diff < stable_threshold:if current_start is None:current_start = ielse:if current_start is not None and i - current_start > 5:  # 至少5行stable_regions.append((current_start, i))current_start = Noneif current_start is not None and len(row_diffs) - current_start > 5:stable_regions.append((current_start, len(row_diffs)))print(f"   平均行间差异: {avg_diff:.2f}")print(f"   行间差异标准差: {std_diff:.2f}")if stable_regions:print(f"   发现 {len(stable_regions)} 个异常平稳区域 (可能的重复区域):")for start, end in stable_regions:print(f"   行 {start}{end} (共{end-start+1}行)")else:print("   未发现异常平稳区域")# 4. 简单的周期性检测print("\n4. 检测周期性模式...")# 使用自相关检测周期if len(row_means) > 100:# 只分析前面部分避免计算过慢sample_means = row_means[:min(1000, len(row_means))]autocorr = np.correlate(sample_means, sample_means, mode='full')autocorr = autocorr[len(autocorr)//2:]# 寻找可能的周期potential_periods = []for period in range(2, min(100, len(autocorr)//4)):if autocorr[period] > 0.8 * autocorr[0]:  # 80%的自相关potential_periods.append(period)if potential_periods:print(f"   检测到可能的周期: {potential_periods[:5]}")else:print("   未检测到明显周期性")# 5. 生成简单可视化print("\n5. 生成分析图表...")fig, axes = plt.subplots(2, 2, figsize=(12, 8))plt.rcParams['font.sans-serif'] = ['SimHei']plt.rcParams['axes.unicode_minus'] = False# 原图axes[0,0].imshow(image, cmap='gray', aspect='auto')axes[0,0].set_title('原始图像')axes[0,0].axis('off')# 行均值变化axes[0,1].plot(row_means)axes[0,1].set_title('各行均值变化')axes[0,1].set_xlabel('行索引')axes[0,1].set_ylabel('均值')axes[0,1].grid(True)# 标记重复区域for hash_val, rows in duplicate_groups.items():for row in rows:axes[0,1].axvline(x=row, color='red', alpha=0.5, linestyle='--')# 行间差异axes[1,0].plot(row_diffs)axes[1,0].set_title('相邻行差异')axes[1,0].set_xlabel('行索引')axes[1,0].set_ylabel('差异')axes[1,0].grid(True)# 标记平稳区域for start, end in stable_regions:axes[1,0].axvspan(start, end, alpha=0.3, color='yellow')# 行标准差row_stds = np.std(image, axis=1)axes[1,1].plot(row_stds)axes[1,1].set_title('各行标准差')axes[1,1].set_xlabel('行索引')axes[1,1].set_ylabel('标准差')axes[1,1].grid(True)plt.tight_layout()plt.savefig('quick_analysis_result.png', dpi=300, bbox_inches='tight')print("   分析结果已保存为 'quick_analysis_result.png'")plt.show()# 6. 总结评估print("\n" + "="*50)print("分析总结")print("="*50)risk_score = 0issues = []if duplicate_groups:risk_score += len(duplicate_groups) * 15issues.append(f"存在 {len(duplicate_groups)} 组完全重复的行")if len(similar_pairs) > height * 0.05:  # 相似对超过5%risk_score += 20issues.append(f"高相似行对较多 ({len(similar_pairs)} 对)")if len(stable_regions) > 3:risk_score += 10issues.append(f"存在多个异常平稳区域 ({len(stable_regions)} 个)")if std_diff < avg_diff * 0.3:  # 变化过于规律risk_score += 15issues.append("行间变化过于规律,可能存在模式重复")print(f"图像尺寸: {width} x {height}")print(f"总行数: {height}")if risk_score > 40:print(f"⚠️  风险等级: 高 (得分: {risk_score})")print("   很可能是解码失败或包含人工模式的图像")elif risk_score > 20:print(f"⚠️  风险等级: 中等 (得分: {risk_score})")print("   存在一些可疑模式,建议进一步检查")else:print(f"✅ 风险等级: 低 (得分: {risk_score})")print("   图像看起来相对正常")if issues:print(f"发现的问题:")for issue in issues:print(f"   • {issue}")return {'duplicate_groups': dict(duplicate_groups),'similar_pairs': similar_pairs,'stable_regions': stable_regions,'risk_score': risk_score,'row_means': row_means,'row_diffs': row_diffs}def main():"""主函数"""print("快速图像行分析工具")print("专门检测解码失败、行重复等问题")print("="*40)image_input = input("请输入图像路径 (或回车创建测试图像): ").strip()if not image_input:print("创建测试图像...")# 创建有问题的测试图像height, width = 720, 1280test_image = np.random.randint(0, 256, (height, width), dtype=np.uint8)# 添加重复行for i in range(100, 120):test_image[i] = test_image[99]# 添加周期性for i in range(300, 500):test_image[i] = test_image[300 + (i-300) % 10]result = quick_row_analysis(test_image)else:try:result = quick_row_analysis(image_input)except Exception as e:print(f"分析失败: {e}")returnprint(f"\n分析完成!结果已保存到当前目录。")if __name__ == "__main__":main() 

2. 复杂版本

import numpy as np
import cv2
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import signal
from scipy.stats import pearsonr
from sklearn.metrics.pairwise import cosine_similarity
from collections import defaultdict
import hashlibclass ImageRowAnalyzer:"""图像行级分析器 - 检测解码失败、伪造数据等异常"""def __init__(self, image_path=None, image_array=None):"""初始化分析器Args:image_path: 图像文件路径image_array: 直接传入的图像数组 (H, W) 或 (H, W, C)"""if image_path:self.image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)if self.image is None:raise ValueError(f"无法加载图像: {image_path}")elif image_array is not None:self.image = image_arrayif len(self.image.shape) == 3:self.image = cv2.cvtColor(self.image, cv2.COLOR_BGR2GRAY)else:raise ValueError("必须提供 image_path 或 image_array")self.height, self.width = self.image.shapeprint(f"图像尺寸: {self.width}x{self.height}")def calculate_row_similarity_matrix(self):"""计算行间相似性矩阵"""print("计算行间相似性矩阵...")# 方法1: 皮尔逊相关系数correlation_matrix = np.zeros((self.height, self.height))# 方法2: 余弦相似度cosine_matrix = cosine_similarity(self.image)# 方法3: 归一化互相关for i in range(self.height):for j in range(i, self.height):# 皮尔逊相关系数corr, _ = pearsonr(self.image[i], self.image[j])correlation_matrix[i, j] = correlation_matrix[j, i] = corrreturn correlation_matrix, cosine_matrixdef detect_duplicate_rows(self, threshold=0.99):"""检测重复或高度相似的行"""print(f"检测重复行 (阈值: {threshold})...")duplicate_groups = defaultdict(list)row_hashes = {}# 使用哈希快速检测完全相同的行for i in range(self.height):row_hash = hashlib.md5(self.image[i].tobytes()).hexdigest()if row_hash in row_hashes:duplicate_groups[row_hash].append(i)if len(duplicate_groups[row_hash]) == 1:duplicate_groups[row_hash].insert(0, row_hashes[row_hash])else:row_hashes[row_hash] = i# 检测高度相似但不完全相同的行similar_pairs = []for i in range(self.height):for j in range(i+1, self.height):# 计算相关系数corr, _ = pearsonr(self.image[i], self.image[j])if corr >= threshold:similar_pairs.append((i, j, corr))return dict(duplicate_groups), similar_pairsdef analyze_row_patterns(self):"""分析行模式和周期性"""print("分析行模式...")# 计算行间差异row_diffs = []for i in range(self.height - 1):diff = np.mean(np.abs(self.image[i+1] - self.image[i]))row_diffs.append(diff)# 检测周期性模式row_means = np.mean(self.image, axis=1)# 使用自相关检测周期autocorr = signal.correlate(row_means, row_means, mode='full')autocorr = autocorr[autocorr.size // 2:]# 寻找峰值peaks, _ = signal.find_peaks(autocorr[1:], height=np.max(autocorr) * 0.1)return row_diffs, row_means, autocorr, peaksdef fourier_analysis(self):"""对行数据进行傅里叶变换分析"""print("进行傅里叶变换分析...")# 对每一行进行1D FFTrow_ffts = []row_frequencies = []for i in range(self.height):fft = np.fft.fft(self.image[i])fft_magnitude = np.abs(fft)freqs = np.fft.fftfreq(self.width)row_ffts.append(fft_magnitude)row_frequencies.append(freqs)row_ffts = np.array(row_ffts)# 对行均值序列进行FFT(检测垂直方向的周期性)row_means = np.mean(self.image, axis=1)vertical_fft = np.fft.fft(row_means)vertical_fft_magnitude = np.abs(vertical_fft)vertical_freqs = np.fft.fftfreq(self.height)return row_ffts, row_frequencies, vertical_fft_magnitude, vertical_freqsdef detect_anomalies(self):"""检测异常行"""print("检测异常行...")# 计算每行的统计特征row_stats = []for i in range(self.height):stats = {'mean': np.mean(self.image[i]),'std': np.std(self.image[i]),'entropy': self._calculate_entropy(self.image[i]),'edge_density': self._calculate_edge_density(self.image[i])}row_stats.append(stats)# 识别异常值anomalies = []for feature in ['mean', 'std', 'entropy', 'edge_density']:values = [stats[feature] for stats in row_stats]mean_val = np.mean(values)std_val = np.std(values)for i, val in enumerate(values):if abs(val - mean_val) > 2 * std_val:  # 2倍标准差外的异常值anomalies.append((i, feature, val))return row_stats, anomaliesdef _calculate_entropy(self, row):"""计算行的信息熵"""hist, _ = np.histogram(row, bins=256, range=(0, 255))hist = hist[hist > 0]  # 去除0值prob = hist / np.sum(hist)entropy = -np.sum(prob * np.log2(prob))return entropydef _calculate_edge_density(self, row):"""计算行的边缘密度"""edges = np.abs(np.diff(row.astype(np.float32)))return np.mean(edges)def visualize_analysis(self, save_plots=True):"""可视化分析结果"""print("生成可视化图表...")# 获取分析结果corr_matrix, cosine_matrix = self.calculate_row_similarity_matrix()duplicate_groups, similar_pairs = self.detect_duplicate_rows()row_diffs, row_means, autocorr, peaks = self.analyze_row_patterns()row_ffts, _, vertical_fft, vertical_freqs = self.fourier_analysis()row_stats, anomalies = self.detect_anomalies()# 创建子图fig, axes = plt.subplots(2, 3, figsize=(18, 12))plt.rcParams['font.sans-serif'] = ['SimHei']  # 支持中文显示plt.rcParams['axes.unicode_minus'] = False# 1. 原始图像axes[0, 0].imshow(self.image, cmap='gray')axes[0, 0].set_title('原始图像')axes[0, 0].axis('off')# 2. 行间相似性矩阵im1 = axes[0, 1].imshow(corr_matrix, cmap='coolwarm', vmin=-1, vmax=1)axes[0, 1].set_title('行间相关系数矩阵')axes[0, 1].set_xlabel('行索引')axes[0, 1].set_ylabel('行索引')plt.colorbar(im1, ax=axes[0, 1])# 3. 行均值变化axes[0, 2].plot(row_means)axes[0, 2].set_title('各行均值变化')axes[0, 2].set_xlabel('行索引')axes[0, 2].set_ylabel('均值')axes[0, 2].grid(True)# 4. 行间差异axes[1, 0].plot(row_diffs)axes[1, 0].set_title('相邻行差异')axes[1, 0].set_xlabel('行索引')axes[1, 0].set_ylabel('平均绝对差异')axes[1, 0].grid(True)# 5. 垂直方向FFTaxes[1, 1].plot(vertical_freqs[:len(vertical_freqs)//2], vertical_fft[:len(vertical_fft)//2])axes[1, 1].set_title('垂直方向频谱分析')axes[1, 1].set_xlabel('频率')axes[1, 1].set_ylabel('幅度')axes[1, 1].grid(True)# 6. 行统计特征分布entropies = [stats['entropy'] for stats in row_stats]axes[1, 2].hist(entropies, bins=30, alpha=0.7)axes[1, 2].set_title('行信息熵分布')axes[1, 2].set_xlabel('信息熵')axes[1, 2].set_ylabel('频次')axes[1, 2].grid(True)plt.tight_layout()if save_plots:plt.savefig('row_analysis_results.png', dpi=300, bbox_inches='tight')print("分析结果已保存为 'row_analysis_results.png'")plt.show()# 打印分析报告self._print_analysis_report(duplicate_groups, similar_pairs, anomalies, peaks)def _print_analysis_report(self, duplicate_groups, similar_pairs, anomalies, peaks):"""打印分析报告"""print("\n" + "="*50)print("图像行分析报告")print("="*50)print(f"\n1. 基本信息:")print(f"   - 图像尺寸: {self.width} x {self.height}")print(f"   - 总行数: {self.height}")print(f"\n2. 重复行检测:")if duplicate_groups:print(f"   - 发现 {len(duplicate_groups)} 组完全重复的行:")for i, (hash_val, rows) in enumerate(duplicate_groups.items()):print(f"     组{i+1}: 行 {rows} (共{len(rows)}行)")else:print("   - 未发现完全重复的行")print(f"\n3. 高相似行检测:")if similar_pairs:print(f"   - 发现 {len(similar_pairs)} 对高相似行:")for i, (row1, row2, corr) in enumerate(similar_pairs[:10]):  # 只显示前10对print(f"     行{row1} 与 行{row2}: 相关系数 {corr:.4f}")if len(similar_pairs) > 10:print(f"     ... 还有 {len(similar_pairs)-10} 对")else:print("   - 未发现高相似行")print(f"\n4. 周期性检测:")if len(peaks) > 0:print(f"   - 检测到 {len(peaks)} 个可能的周期:")for peak in peaks[:5]:  # 显示前5个周期print(f"     周期长度: {peak+1} 行")else:print("   - 未检测到明显的周期性模式")print(f"\n5. 异常行检测:")if anomalies:print(f"   - 发现 {len(anomalies)} 个异常:")anomaly_dict = defaultdict(list)for row, feature, val in anomalies:anomaly_dict[row].append(f"{feature}={val:.4f}")for row, features in list(anomaly_dict.items())[:10]:  # 显示前10个异常行print(f"     行{row}: {', '.join(features)}")if len(anomaly_dict) > 10:print(f"     ... 还有 {len(anomaly_dict)-10} 个异常行")else:print("   - 未发现明显异常的行")print(f"\n6. 伪造数据可能性评估:")# 简单的评分系统score = 0reasons = []if duplicate_groups:score += len(duplicate_groups) * 10reasons.append(f"存在 {len(duplicate_groups)} 组重复行")if len(similar_pairs) > self.height * 0.1:  # 如果相似对超过10%score += 20reasons.append(f"高相似行对过多 ({len(similar_pairs)} 对)")if len(peaks) > 3:  # 如果周期性过强score += 15reasons.append(f"检测到强周期性 ({len(peaks)} 个周期)")if score > 30:print(f"   - 风险等级: 高 (得分: {score})")print(f"   - 可能原因: {'; '.join(reasons)}")elif score > 15:print(f"   - 风险等级: 中等 (得分: {score})")print(f"   - 可能原因: {'; '.join(reasons)}")else:print(f"   - 风险等级: 低 (得分: {score})")print("   - 图像看起来相对正常")def create_test_image():"""创建一个测试图像用于演示"""print("创建测试图像...")# 创建一个有问题的图像:某些行重复,某些行相似height, width = 720, 1280test_image = np.random.randint(0, 256, (height, width), dtype=np.uint8)# 添加一些重复行for i in range(50, 60):test_image[i] = test_image[45]  # 重复第45行# 添加一些相似行(添加噪声)for i in range(100, 120):noise = np.random.randint(-10, 10, width)test_image[i] = np.clip(test_image[90].astype(np.int16) + noise, 0, 255).astype(np.uint8)# 添加周期性模式for i in range(200, 400):if i % 10 < 5:  # 每10行重复一次模式test_image[i] = test_image[200]else:test_image[i] = test_image[205]cv2.imwrite('test_problematic_image.jpg', test_image)print("测试图像已保存为 'test_problematic_image.jpg'")return test_imagedef main():"""主函数 - 演示如何使用分析器"""print("图像行分析工具")print("="*30)# 选择分析方式choice = input("请选择: 1) 分析已有图像  2) 创建并分析测试图像  (输入1或2): ").strip()if choice == "1":image_path = input("请输入图像路径: ").strip()try:analyzer = ImageRowAnalyzer(image_path=image_path)except Exception as e:print(f"错误: {e}")returnelif choice == "2":test_image = create_test_image()analyzer = ImageRowAnalyzer(image_array=test_image)else:print("无效选择,使用测试图像...")test_image = create_test_image()analyzer = ImageRowAnalyzer(image_array=test_image)# 运行分析analyzer.visualize_analysis(save_plots=True)if __name__ == "__main__":main()
http://www.xdnf.cn/news/13715.html

相关文章:

  • 健康管理实训室建设方案:构建智慧康养人才培养生态体系
  • PERST#、Hot Reset、Link Disable
  • React16,17,18,19更新对比
  • slam--高斯分布
  • 《树状数组》
  • 消除信息屏障推动系统联动,IBMS系统成为建筑智能控制核心枢纽
  • EtherCAT转Modbus TCP网关实现倍福CX9020与科尔摩根NDC8AGV控制器设备之间的通讯案例
  • C语言入门教程
  • 2.4 创建视图
  • python爬虫ip封禁应对办法
  • Word 文件转md文件 在 Word 中没有直接将文档另存为 Markdown(.md)格式的选项,但你可以使用一些工具或手动转换来实现
  • spring系列---拦截器
  • NLP基础与词嵌入:让AI理解文字(superior哥深度学习系列第13期)
  • 计算机组成原理-主存储器
  • RedHat主机配置日志留存策略:从4周延长至6个月
  • 预训练模型适应下游任务?模型参数Freezing 与 微调 !
  • 基于Jenkins与Kubernetes的系统化变更管理实践
  • 《前端面试题:call、apply、bind 区别》
  • 1.sql连接语句
  • 软件测试相关问题
  • 柑橘检测模型
  • 直白话 OAuth 2 流程
  • langchain runnables 概念指南
  • 2025年硬件实习/秋招面试准备
  • 小熊派开发板显示图片
  • 机器人导航中的高程图 vs 高度筛选障碍物点云投影 —— 如何高效处理避障问题?
  • Oracle 条件索引 case when 报错解决方案(APP)
  • HTTP 网络协议演进过程
  • 【Docker基础】Docker核心概念:容器(Container)与镜像(Image)的区别与联系
  • Vue3 计算属性 computed