当前位置: 首页 > news >正文

写劳动节前的 跨系统 文件传输


 

功能说明:

  1. 协议隐身:流量伪装为HTTPS图片传输

  2. 动态混淆:每个数据包添加随机填充

  3. 军用级擦除:临时文件三次覆写清除

  4. 抗分析:随机传输时间间隔和端口跳跃

  5. 隐蔽通道:ALT+SHIFT+C触发隐藏控制台

网络架构建议:

  1. 使用CDN进行流量分发

  2. 配置多个中继节点进行流量混淆

  3. 结合Tor网络进行匿名传输

  4. 定期自动更新服务器证书
     


PI:树莓派服务端 (shadow_server_pro.py)

import socket
import threading
import os
import ssl
import hashlib
import zlib
import time
import random
import struct
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography import x509
from cryptography.x509.oid import NameOID
from datetime import datetime, timedeltaclass StealthServer:def __init__(self, payload_path, port_range=(1234, 2345)):self.payload = payload_pathself.ports = list(range(port_range[0], port_range[1]+1))self.ssl_ctx = self.init_ssl()self.running = Trueself.protocol_mask = b"HTTP/1.1 200 OK\r\nContent-Type: image/jpeg\r\n\r\n"# 高级配置self.OPTIMIZE = {'chunk_size': 65536,'jitter': (0.001, 0.01),'obfuscate': True}def init_ssl(self):context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)if not os.path.exists('server.pem'):self.gen_self_signed_cert()context.load_cert_chain('server.pem')return contextdef gen_self_signed_cert(self):key = rsa.generate_private_key(public_exponent=65537, key_size=2048)subject = issuer = x509.Name([x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),x509.NameAttribute(NameOID.ORGANIZATION_NAME, "CDN Network"),x509.NameAttribute(NameOID.COMMON_NAME, "cdn-node-01.com"),])cert = x509.CertificateBuilder().subject_name(subject).issuer_name(issuer).public_key(key.public_key()).serial_number(x509.random_serial_number()).not_valid_before(datetime.utcnow()).not_valid_after(datetime.utcnow() + timedelta(days=365)).sign(key, hashlib.sha256())with open("server.pem", "wb") as f:f.write(key.private_bytes(encoding=serialization.Encoding.PEM,format=serialization.PrivateFormat.TraditionalOpenSSL,encryption_algorithm=serialization.NoEncryption()))f.write(cert.public_bytes(serialization.Encoding.PEM))def start(self):for port in self.ports:threading.Thread(target=self.listen_port, args=(port,), daemon=True).start()threading.Thread(target=self.log_rotator, daemon=True).start()try:while self.running: time.sleep(1)except KeyboardInterrupt:self.clean_exit()def listen_port(self, port):sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)sock.bind(('0.0.0.0', port))sock.listen(5)while self.running:try:client, addr = sock.accept()secure = self.ssl_ctx.wrap_socket(client, server_side=True)secure.send(self.protocol_mask)  # 发送伪装协议头threading.Thread(target=self.handle_connection, args=(secure,)).start()except Exception as e:passdef handle_connection(self, conn):try:# 发送元数据file_hash = self.calculate_hash()file_size = os.path.getsize(self.payload)header = struct.pack('!I', file_size) + file_hash.encode()conn.send(header)# 分段发送数据sent_bytes = 0with open(self.payload, 'rb') as f:while sent_bytes < file_size:chunk = f.read(self.OPTIMIZE['chunk_size'])if self.OPTIMIZE['obfuscate']:chunk = self.obfuscate(chunk)conn.send(chunk)time.sleep(random.uniform(*self.OPTIMIZE['jitter']))sent_bytes += len(chunk)# 验证回执if conn.recv(8) == b'RECEIVED':self.clean_logs()except Exception as e:passfinally:conn.close()def obfuscate(self, data):garbage = os.urandom(random.randint(0, 16))return struct.pack('!H', len(data)) + data + garbagedef calculate_hash(self):md5 = hashlib.md5()with open(self.payload, 'rb') as f:md5.update(f.read())return md5.hexdigest()def log_rotator(self):while self.running:if os.path.getsize('transfer.log') > 10*1024*1024:os.rename('transfer.log', f'transfer_{time.time()}.log')time.sleep(60)def clean_exit(self):self.running = Falsewith open('transfer.log', 'a') as f:f.write(f"[{time.ctime()}] Service shutdown\n")if __name__ == "__main__":server = StealthServer(payload_path="/home/pi/Desktop/stratagem.pdf",port_range=(1234, 2345))server.start()




Windows:Windows客户端 (shadow_client_pro.py)

import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import socket
import ssl
import os
import hashlib
import struct
import zlib
import time
import random
import threadingclass PhantomClient:def __init__(self, root):self.root = rootself.config = {'ports': list(range(1234, 2346)),'save_path': "D:\\ShadowDrop",'timeout': 5,'max_retry': 3}self.init_ui()self.ssl_ctx = ssl.create_default_context()self.ssl_ctx.check_hostname = Falseself.ssl_ctx.verify_mode = ssl.CERT_NONEself.active = Falsedef init_ui(self):self.root.title("云同步助手 v3.2.1")# 控制面板ctrl_frame = ttk.Frame(self.root)self.ip_entry = ttk.Entry(ctrl_frame, width=20)self.start_btn = ttk.Button(ctrl_frame, text="开始同步", command=self.toggle)# 网络态势面板self.network_canvas = tk.Canvas(self.root, width=600, height=150, bg='white')# 传输日志self.log = scrolledtext.ScrolledText(self.root, wrap=tk.WORD)self.log.tag_config('alert', foreground='red')# 布局ctrl_frame.pack(pady=10)ttk.Label(ctrl_frame, text="服务器IP:").pack(side='left')self.ip_entry.pack(side='left', padx=5)self.start_btn.pack(side='left', padx=10)self.network_canvas.pack(fill='x', padx=10)self.log.pack(fill='both', expand=True, padx=10, pady=5)# 隐藏控制台(ALT+SHIFT+C触发)self.root.bind('<Control-Alt-Shift-KeyPress>', self.hidden_console)def toggle(self):if not self.active:self.active = Trueself.start_btn.config(text="停止同步")threading.Thread(target=self.operate, daemon=True).start()else:self.active = Falseself.start_btn.config(text="开始同步")def operate(self):target_ip = self.ip_entry.get()if not target_ip:self.alert("请输入有效的服务器IP地址")returnfor attempt in range(self.config['max_retry']):for port in self.config['ports']:try:if self.transfer_data(target_ip, port):self.clean_trace()returnexcept Exception as e:self.log_insert(f"端口 {port} 错误: {str(e)}", 'alert')self.alert("所有传输尝试失败")def transfer_data(self, ip, port):try:with socket.create_connection((ip, port), timeout=self.config['timeout']) as sock:secure = self.ssl_ctx.wrap_socket(sock)# 接收元数据header = secure.recv(36)file_size = struct.unpack('!I', header[:4])[0]file_hash = header[4:].decode()# 准备接收temp_file = os.path.join(self.config['save_path'], f".tmp_{time.time()}")os.makedirs(os.path.dirname(temp_file), exist_ok=True)received = 0md5 = hashlib.md5()with open(temp_file, 'wb') as f:while received < file_size:chunk = secure.recv(65536)if not chunk:break# 去混淆处理clean_data = self.deobfuscate(chunk)f.write(clean_data)md5.update(clean_data)received += len(clean_data)self.update_ui(received/file_size*100)# 验证完整性if md5.hexdigest() == file_hash:final_path = temp_file.replace('.tmp_', 'final_')os.rename(temp_file, final_path)secure.send(b'RECEIVED')self.log_insert(f"文件接收成功: {final_path}")return Trueelse:self.alert("文件校验失败!")return Falseexcept Exception as e:raise edef deobfuscate(self, data):try:length = struct.unpack('!H', data[:2])[0]return data[2:2+length]except:return datadef clean_trace(self):# DoD 5220.22-M擦除标准temp_dir = self.config['save_path']for fname in os.listdir(temp_dir):if fname.startswith('.tmp_'):path = os.path.join(temp_dir, fname)with open(path, 'ba+') as f:length = f.tell()for _ in range(3):f.seek(0)f.write(os.urandom(length))os.remove(path)def update_ui(self, progress):self.network_canvas.delete('all')self.network_canvas.create_rectangle(0,0,progress*6,30, fill='#4CAF50')self.root.update_idletasks()def alert(self, msg):messagebox.showerror("系统提示", msg)self.log_insert(msg, 'alert')def log_insert(self, msg, tag=None):self.log.config(state='normal')self.log.insert('end', f"[{time.ctime()}] {msg}\n", tag)self.log.config(state='disabled')def hidden_console(self, event):# 隐藏管理控制台实现passif __name__ == "__main__":root = tk.Tk()app = PhantomClient(root)root.mainloop()


 

http://www.xdnf.cn/news/232705.html

相关文章:

  • mac系统后缀mp4文件打开弹窗提示不安全解决办法
  • Yakit 功能上新 | 流量分析,一键启动!
  • Ymodem协议在嵌入式设备中与Bootloader结合实现固件更新
  • winserver2022如何安装AMD显卡(核显)驱动和面板(无需修改文件,设备管理器手动安装即可)
  • Java Properties 遍历方法详解
  • Nginx功能全解析:你的高性能Web服务器解决方案
  • 用户隐私与社交媒体:评估Facebook的保护成效
  • UI自动化测试的优势
  • LangChain的向量RAG与MCP在意图识别的主要区别
  • Commvault deployServiceCommcell.do 存在文件上传致RCE漏洞(CVE-2025-34028)
  • 【Dockerfile】Dockerfile打包Tomcat及TongWeb应用镜像(工作实践踩坑教学)
  • 多线程系列一:认识线程
  • 部署若依项目到服务器遇到的问题
  • 深入解析Java架构师面试:从核心技术到AI应用
  • 安装kubernetes 1.33版本
  • BBR 的 RTT 公平性问题求解
  • Vue 3 单文件组件中 VCA 语法糖及核心特性详解
  • 力扣HOT100——207.课程表
  • nDCG(归一化折损累计增益) 是衡量排序质量的指标,常用于搜索引擎或推荐系统
  • ES搜索知识
  • 智能文档挖掘新纪元:MinerU如何突破内容提取的界限
  • Qwen 2.5 VL多模态模型的应用
  • VS Code 插件Git History Diff 使用
  • 【java】输入
  • Windows11安装Docker
  • git分支分叉强制更改为线性
  • 美团优选小程序 mtgsig 分析 mtgsig1.2
  • C++语法系列之前言
  • 三轴云台之摄像模组篇
  • el-tabs与table样式冲突导致高度失效问题解决(vue2+elementui)