写劳动节前的 跨系统 文件传输
功能说明:
-
协议隐身:流量伪装为HTTPS图片传输
-
动态混淆:每个数据包添加随机填充
-
军用级擦除:临时文件三次覆写清除
-
抗分析:随机传输时间间隔和端口跳跃
-
隐蔽通道:ALT+SHIFT+C触发隐藏控制台
网络架构建议:
-
使用CDN进行流量分发
-
配置多个中继节点进行流量混淆
-
结合Tor网络进行匿名传输
-
定期自动更新服务器证书
PI:树莓派服务端 (shadow_server_pro.py)
import socket
import threading
import os
import ssl
import hashlib
import zlib
import time
import random
import struct
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography import x509
from cryptography.x509.oid import NameOID
from datetime import datetime, timedeltaclass StealthServer:def __init__(self, payload_path, port_range=(1234, 2345)):self.payload = payload_pathself.ports = list(range(port_range[0], port_range[1]+1))self.ssl_ctx = self.init_ssl()self.running = Trueself.protocol_mask = b"HTTP/1.1 200 OK\r\nContent-Type: image/jpeg\r\n\r\n"# 高级配置self.OPTIMIZE = {'chunk_size': 65536,'jitter': (0.001, 0.01),'obfuscate': True}def init_ssl(self):context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)if not os.path.exists('server.pem'):self.gen_self_signed_cert()context.load_cert_chain('server.pem')return contextdef gen_self_signed_cert(self):key = rsa.generate_private_key(public_exponent=65537, key_size=2048)subject = issuer = x509.Name([x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),x509.NameAttribute(NameOID.ORGANIZATION_NAME, "CDN Network"),x509.NameAttribute(NameOID.COMMON_NAME, "cdn-node-01.com"),])cert = x509.CertificateBuilder().subject_name(subject).issuer_name(issuer).public_key(key.public_key()).serial_number(x509.random_serial_number()).not_valid_before(datetime.utcnow()).not_valid_after(datetime.utcnow() + timedelta(days=365)).sign(key, hashlib.sha256())with open("server.pem", "wb") as f:f.write(key.private_bytes(encoding=serialization.Encoding.PEM,format=serialization.PrivateFormat.TraditionalOpenSSL,encryption_algorithm=serialization.NoEncryption()))f.write(cert.public_bytes(serialization.Encoding.PEM))def start(self):for port in self.ports:threading.Thread(target=self.listen_port, args=(port,), daemon=True).start()threading.Thread(target=self.log_rotator, daemon=True).start()try:while self.running: time.sleep(1)except KeyboardInterrupt:self.clean_exit()def listen_port(self, port):sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)sock.bind(('0.0.0.0', port))sock.listen(5)while self.running:try:client, addr = sock.accept()secure = self.ssl_ctx.wrap_socket(client, server_side=True)secure.send(self.protocol_mask) # 发送伪装协议头threading.Thread(target=self.handle_connection, args=(secure,)).start()except Exception as e:passdef handle_connection(self, conn):try:# 发送元数据file_hash = self.calculate_hash()file_size = os.path.getsize(self.payload)header = struct.pack('!I', file_size) + file_hash.encode()conn.send(header)# 分段发送数据sent_bytes = 0with open(self.payload, 'rb') as f:while sent_bytes < file_size:chunk = f.read(self.OPTIMIZE['chunk_size'])if self.OPTIMIZE['obfuscate']:chunk = self.obfuscate(chunk)conn.send(chunk)time.sleep(random.uniform(*self.OPTIMIZE['jitter']))sent_bytes += len(chunk)# 验证回执if conn.recv(8) == b'RECEIVED':self.clean_logs()except Exception as e:passfinally:conn.close()def obfuscate(self, data):garbage = os.urandom(random.randint(0, 16))return struct.pack('!H', len(data)) + data + garbagedef calculate_hash(self):md5 = hashlib.md5()with open(self.payload, 'rb') as f:md5.update(f.read())return md5.hexdigest()def log_rotator(self):while self.running:if os.path.getsize('transfer.log') > 10*1024*1024:os.rename('transfer.log', f'transfer_{time.time()}.log')time.sleep(60)def clean_exit(self):self.running = Falsewith open('transfer.log', 'a') as f:f.write(f"[{time.ctime()}] Service shutdown\n")if __name__ == "__main__":server = StealthServer(payload_path="/home/pi/Desktop/stratagem.pdf",port_range=(1234, 2345))server.start()
Windows:Windows客户端 (shadow_client_pro.py)
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import socket
import ssl
import os
import hashlib
import struct
import zlib
import time
import random
import threadingclass PhantomClient:def __init__(self, root):self.root = rootself.config = {'ports': list(range(1234, 2346)),'save_path': "D:\\ShadowDrop",'timeout': 5,'max_retry': 3}self.init_ui()self.ssl_ctx = ssl.create_default_context()self.ssl_ctx.check_hostname = Falseself.ssl_ctx.verify_mode = ssl.CERT_NONEself.active = Falsedef init_ui(self):self.root.title("云同步助手 v3.2.1")# 控制面板ctrl_frame = ttk.Frame(self.root)self.ip_entry = ttk.Entry(ctrl_frame, width=20)self.start_btn = ttk.Button(ctrl_frame, text="开始同步", command=self.toggle)# 网络态势面板self.network_canvas = tk.Canvas(self.root, width=600, height=150, bg='white')# 传输日志self.log = scrolledtext.ScrolledText(self.root, wrap=tk.WORD)self.log.tag_config('alert', foreground='red')# 布局ctrl_frame.pack(pady=10)ttk.Label(ctrl_frame, text="服务器IP:").pack(side='left')self.ip_entry.pack(side='left', padx=5)self.start_btn.pack(side='left', padx=10)self.network_canvas.pack(fill='x', padx=10)self.log.pack(fill='both', expand=True, padx=10, pady=5)# 隐藏控制台(ALT+SHIFT+C触发)self.root.bind('<Control-Alt-Shift-KeyPress>', self.hidden_console)def toggle(self):if not self.active:self.active = Trueself.start_btn.config(text="停止同步")threading.Thread(target=self.operate, daemon=True).start()else:self.active = Falseself.start_btn.config(text="开始同步")def operate(self):target_ip = self.ip_entry.get()if not target_ip:self.alert("请输入有效的服务器IP地址")returnfor attempt in range(self.config['max_retry']):for port in self.config['ports']:try:if self.transfer_data(target_ip, port):self.clean_trace()returnexcept Exception as e:self.log_insert(f"端口 {port} 错误: {str(e)}", 'alert')self.alert("所有传输尝试失败")def transfer_data(self, ip, port):try:with socket.create_connection((ip, port), timeout=self.config['timeout']) as sock:secure = self.ssl_ctx.wrap_socket(sock)# 接收元数据header = secure.recv(36)file_size = struct.unpack('!I', header[:4])[0]file_hash = header[4:].decode()# 准备接收temp_file = os.path.join(self.config['save_path'], f".tmp_{time.time()}")os.makedirs(os.path.dirname(temp_file), exist_ok=True)received = 0md5 = hashlib.md5()with open(temp_file, 'wb') as f:while received < file_size:chunk = secure.recv(65536)if not chunk:break# 去混淆处理clean_data = self.deobfuscate(chunk)f.write(clean_data)md5.update(clean_data)received += len(clean_data)self.update_ui(received/file_size*100)# 验证完整性if md5.hexdigest() == file_hash:final_path = temp_file.replace('.tmp_', 'final_')os.rename(temp_file, final_path)secure.send(b'RECEIVED')self.log_insert(f"文件接收成功: {final_path}")return Trueelse:self.alert("文件校验失败!")return Falseexcept Exception as e:raise edef deobfuscate(self, data):try:length = struct.unpack('!H', data[:2])[0]return data[2:2+length]except:return datadef clean_trace(self):# DoD 5220.22-M擦除标准temp_dir = self.config['save_path']for fname in os.listdir(temp_dir):if fname.startswith('.tmp_'):path = os.path.join(temp_dir, fname)with open(path, 'ba+') as f:length = f.tell()for _ in range(3):f.seek(0)f.write(os.urandom(length))os.remove(path)def update_ui(self, progress):self.network_canvas.delete('all')self.network_canvas.create_rectangle(0,0,progress*6,30, fill='#4CAF50')self.root.update_idletasks()def alert(self, msg):messagebox.showerror("系统提示", msg)self.log_insert(msg, 'alert')def log_insert(self, msg, tag=None):self.log.config(state='normal')self.log.insert('end', f"[{time.ctime()}] {msg}\n", tag)self.log.config(state='disabled')def hidden_console(self, event):# 隐藏管理控制台实现passif __name__ == "__main__":root = tk.Tk()app = PhantomClient(root)root.mainloop()