一、整体架构
- 服务器:用Go语言实现,基于TLS加密通信,负责接收客户端连接、验证文件内容、处理POW挑战、存储文件。
- 客户端:用Python实现,负责读取本地文件、与服务器建立加密连接、上传内容、完成POW挑战。
- 核心机制:TLS加密传输 + 自定义文本协议 + 工作量证明(防止垃圾内容)。
1. 初始化阶段(服务器)
- 证书加载:服务器启动时加载TLS证书(
server.crt
)和私钥(server.key
),用于加密通信。 - TLS监听:在指定端口(默认6969)启动TLS监听,等待客户端连接。
- 环境准备:创建文件存储目录(
./posts/
),用于保存上传的文件。
generate_cert.go
package mainimport ("crypto/rand""crypto/rsa""crypto/x509""crypto/x509/pkix""encoding/pem""math/big""os""time"
)func main() {privateKey, err := rsa.GenerateKey(rand.Reader, 4096)if err != nil {panic("无法生成私钥: " + err.Error())}template := x509.Certificate{SerialNumber: big.NewInt(1),Subject: pkix.Name{Organization: []string{"Pastebeam"},CommonName: "localhost",},NotBefore: time.Now(),NotAfter: time.Now().Add(365 * 24 * time.Hour), KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},BasicConstraintsValid: true,IsCA: true,}derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &privateKey.PublicKey, privateKey)if err != nil {panic("无法生成证书: " + err.Error())}certOut, err := os.Create("server.crt")if err != nil {panic("无法打开证书文件: " + err.Error())}defer certOut.Close()if err := pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes}); err != nil {panic("无法写入证书文件: " + err.Error())}keyOut, err := os.Create("server.key")if err != nil {panic("无法打开私钥文件: " + err.Error())}defer keyOut.Close()privateKeyBytes, err := x509.MarshalPKCS8PrivateKey(privateKey)if err != nil {panic("无法编码私钥: " + err.Error())}if err := pem.Encode(keyOut, &pem.Block{Type: "PRIVATE KEY", Bytes: privateKeyBytes}); err != nil {panic("无法写入私钥文件: " + err.Error())}println("证书生成成功:")println(" - server.crt (证书文件)")println(" - server.key (私钥文件)")
}
pastebeam_server.go
package mainimport ("crypto/rand""crypto/sha256""crypto/tls""encoding/base64""encoding/hex""errors""fmt""net""os""path/filepath""strings""sync""time""unicode/utf8"
)
const (DefaultPort = 6969DefaultPostsRoot = "./posts/"PostIDByteSize = 32 ChallengeLeadingZeros = 4 ChallengeTimeoutMs = 60 * 1000ChallengeByteSize = 32MaxLimitPerIP = 5PostByteSizeLimit = 4 * 1024
)
var (ipConnCount = make(map[string]int) mu sync.Mutex buf = make([]byte, 1024)
)
func generatePostID() string {buf := make([]byte, PostIDByteSize)_, err := rand.Read(buf)if err != nil {panic(err)}return hex.EncodeToString(buf)
}
func isValidPostID(id string) bool {if len(id) != PostIDByteSize*2 {return false}_, err := hex.DecodeString(id)return err == nil
}
func checkIPLimit(ip string) bool {mu.Lock()defer mu.Unlock()count := ipConnCount[ip]if count >= MaxLimitPerIP {return true }ipConnCount[ip]++return false
}
func releaseIPLimit(ip string) {mu.Lock()defer mu.Unlock()count := ipConnCount[ip]if count <= 1 {delete(ipConnCount, ip)} else {ipConnCount[ip]--}
}
func isValidUTF8(b []byte) bool {return utf8.Valid(b)
}
func countLeadingZeros(hash string) int {count := 0for _, c := range hash {if c == '0' {count++} else {break}}return count
}
func handleSession(conn net.Conn) {addr, ok := conn.RemoteAddr().(*net.TCPAddr)if !ok {fmt.Println("invalid client address")conn.Close()return}clientIP := addr.IP.String()fmt.Printf("new connection from %s:%d\n", clientIP, addr.Port)if checkIPLimit(clientIP) {conn.Write([]byte("TOO MANY CONNECTIONS\r\n"))fmt.Printf("%s: too many connections\n", clientIP)conn.Close()releaseIPLimit(clientIP)return}defer func() {releaseIPLimit(clientIP)conn.Close()fmt.Printf("connection closed from %s:%d\n", clientIP, addr.Port)}()conn.SetReadDeadline(time.Now().Add(30 * time.Second))defer conn.SetReadDeadline(time.Time{}) conn.Write([]byte("HI\r\n"))n, err := conn.Read(buf)if err != nil {fmt.Printf("%s: read command failed: %v\n", clientIP, err)return}cmd := strings.TrimSpace(string(buf[:n]))switch cmd {case "PARAMS":params := fmt.Sprintf("challenge_leading_zeros %d\r\n"+"challenge_timeout_ms %d\r\n"+"challenge_byte_size %d\r\n"+"max_limit_per_ip %d\r\n"+"post_byte_size_limit %d\r\n",ChallengeLeadingZeros,ChallengeTimeoutMs,ChallengeByteSize,MaxLimitPerIP,PostByteSizeLimit,)conn.Write([]byte(params))case "POST":handlePost(conn, clientIP)case "GET":conn.Write([]byte("OK\r\n"))conn.SetReadDeadline(time.Now().Add(10 * time.Second))n, err := conn.Read(buf)if err != nil {if errors.Is(err, os.ErrDeadlineExceeded) {conn.Write([]byte("TIMEOUT\r\n"))fmt.Printf("%s: GET ID timeout\n", clientIP)} else {conn.Write([]byte("400\r\n"))fmt.Printf("%s: read GET ID failed: %v\n", clientIP, err)}return}id := strings.TrimSpace(string(buf[:n]))handleGet(conn, clientIP, id)case "CRASH":panic("client requested crash")default:conn.Write([]byte("INVALID COMMAND\r\n"))fmt.Printf("%s: invalid command: %s\n", clientIP, cmd)}
}
func handlePost(conn net.Conn, clientIP string) {conn.Write([]byte("OK\r\n"))fmt.Printf("%s: start POST upload\n", clientIP)var content []bytefor {conn.SetReadDeadline(time.Now().Add(30 * time.Second))n, err := conn.Read(buf)if err != nil {fmt.Printf("%s: read POST content failed: %v\n", clientIP, err)conn.Write([]byte("INTERNAL ERROR\r\n"))return}line := buf[:n]if strings.TrimSpace(string(line)) == "SUBMIT" {break}if !isValidUTF8(line) {conn.Write([]byte("INVALID UTF8\r\n"))fmt.Printf("%s: invalid UTF8 content\n", clientIP)return}if !strings.HasSuffix(string(line), "\r\n") {conn.Write([]byte("BAD LINE ENDING\r\n"))fmt.Printf("%s: bad line ending (expected \\r\\n)\n", clientIP)return}if len(content)+len(line) > PostByteSizeLimit {conn.Write([]byte("TOO BIG\r\n"))fmt.Printf("%s: post too big (exceeds %d bytes)\n", clientIP, PostByteSizeLimit)return}content = append(content, line...)conn.Write([]byte("OK\r\n"))}challengeBuf := make([]byte, ChallengeByteSize)_, err := rand.Read(challengeBuf)if err != nil {conn.Write([]byte("INTERNAL ERROR\r\n"))fmt.Printf("%s: generate challenge failed: %v\n", clientIP, err)return}challenge := base64.StdEncoding.EncodeToString(challengeBuf)challengeMsg := fmt.Sprintf("CHALLENGE sha256 %d %s\r\n", ChallengeLeadingZeros, challenge)conn.Write([]byte(challengeMsg))fmt.Printf("%s: sent challenge: %s\n", clientIP, challenge)conn.SetReadDeadline(time.Now().Add(time.Duration(ChallengeTimeoutMs) * time.Millisecond))n, err := conn.Read(buf)if err != nil {if errors.Is(err, os.ErrDeadlineExceeded) {conn.Write([]byte("TOO SLOW\r\n"))fmt.Printf("%s: POW timeout\n", clientIP)} else {conn.Write([]byte("INTERNAL ERROR\r\n"))fmt.Printf("%s: read POW solution failed: %v\n", clientIP, err)}return}solution := strings.TrimSpace(string(buf[:n]))if !strings.HasPrefix(solution, "ACCEPTED ") {conn.Write([]byte("INVALID COMMAND\r\n"))fmt.Printf("%s: invalid POW response\n", clientIP)return}prefix := strings.TrimPrefix(solution, "ACCEPTED ")powData := fmt.Sprintf("%s\r\n%s%s\r\n", prefix, string(content), challenge)hash := sha256.Sum256([]byte(powData))hashHex := hex.EncodeToString(hash[:])if countLeadingZeros(hashHex) < ChallengeLeadingZeros {conn.Write([]byte("CHALLENGE FAILED\r\n"))fmt.Printf("%s: POW failed (hash: %s)\n", clientIP, hashHex)return}fmt.Printf("%s: POW success (hash: %s)\n", clientIP, hashHex)postID := generatePostID()postPath := filepath.Join(DefaultPostsRoot, postID)if err := os.MkdirAll(DefaultPostsRoot, 0755); err != nil {conn.Write([]byte("INTERNAL ERROR\r\n"))fmt.Printf("%s: create posts dir failed: %v\n", clientIP, err)return}if err := os.WriteFile(postPath, content, 0644); err != nil {conn.Write([]byte("INTERNAL ERROR\r\n"))fmt.Printf("%s: write post failed: %v\n", clientIP, err)return}conn.Write([]byte(fmt.Sprintf("SENT %s\r\n", postID)))fmt.Printf("%s: post saved, ID: %s\n", clientIP, postID)
}
func handleGet(conn net.Conn, clientIP, id string) {if !isValidPostID(id) {conn.Write([]byte("404\r\n"))fmt.Printf("%s: invalid post ID: %s\n", clientIP, id)return}postPath := filepath.Join(DefaultPostsRoot, id)content, err := os.ReadFile(postPath)if err != nil {if errors.Is(err, os.ErrNotExist) {conn.Write([]byte("404\r\n"))fmt.Printf("%s: post not found: %s\n", clientIP, id)} else {conn.Write([]byte("500\r\n"))fmt.Printf("%s: read post failed: %v\n", clientIP, err)}return}conn.Write(content)fmt.Printf("%s: sent post: %s (size: %d bytes)\n", clientIP, id, len(content))
}
func main() {cert, err := tls.LoadX509KeyPair("server.crt", "server.key")if err != nil {fmt.Printf("load TLS cert failed: %v\n", err)os.Exit(1)}tlsConfig := &tls.Config{Certificates: []tls.Certificate{cert},MinVersion: tls.VersionTLS12, }listener, err := tls.Listen("tcp", fmt.Sprintf(":%d", DefaultPort), tlsConfig)if err != nil {fmt.Printf("start TLS listener failed: %v\n", err)os.Exit(1)}defer listener.Close()fmt.Printf("server started: TLS :%d, posts root: %s\n", DefaultPort, DefaultPostsRoot)for {conn, err := listener.Accept()if err != nil {fmt.Printf("accept connection failed: %v\n", err)continue}go handleSession(conn)}
}
python client script
get.py
import socket
import ssl
import sys
RECV_SIZE = 4096
TIMEOUT = 15 def check_response(client, expected: bytes) -> None:"""验证服务器响应是否完全符合预期"""client.settimeout(TIMEOUT)try:actual = client.recv(RECV_SIZE)except socket.timeout:raise TimeoutError(f"服务器在{TIMEOUT}秒内未响应")if not actual:raise ConnectionError("服务器意外关闭了连接")if actual != expected:raise ProtocolError(f"响应不匹配: 收到 {actual!r},预期 {expected!r}")def read_response_prefix(client, prefix: bytes) -> bytes:"""读取服务器响应,验证前缀并返回剩余部分"""client.settimeout(TIMEOUT)try:response = client.recv(RECV_SIZE)except socket.timeout:raise TimeoutError(f"服务器在{TIMEOUT}秒内未响应")if not response:raise ConnectionError("服务器意外关闭了连接")if not response.startswith(prefix):raise ProtocolError(f"响应前缀不匹配: 收到 {response!r},预期前缀 {prefix!r}")return response[len(prefix):]def usage(program_name: str) -> None:print(f"Usage: {program_name} <host> <port> <post-id>")print("示例: python get_paste.py localhost 6969 f576933171d2eb19f43b72a5dfb99e7ce67941fe09d9c75cb09c9db556ec6efa")def main() -> None:args = sys.argv[1:]if len(args) != 3:usage(sys.argv[0])sys.exit(1)host, port_str, post_id = argstry:port = int(port_str)except ValueError:print(f"错误: 无效的端口 '{port_str}' (必须是数字)")sys.exit(1)context = ssl.create_default_context()context.check_hostname = Falsecontext.verify_mode = ssl.CERT_NONEtry:with socket.create_connection((host, port), timeout=TIMEOUT) as sock:with context.wrap_socket(sock, server_hostname=host) as client:print(f"✅ 已连接到Pastebeam服务器: {host}:{port}")check_response(client, b"HI\r\n")client.sendall(b"GET\r\n")print(f"📤 已发送命令: GET\r\n")check_response(client, b"OK\r\n")print(f"✅ 服务器接受GET请求,准备发送Post ID")client.sendall(f"{post_id}\r\n".encode("utf-8"))print(f"📤 已发送Post ID: {post_id}")print("------------------------------")client.settimeout(TIMEOUT)content = []while True:data = client.recv(RECV_SIZE)if not data:breakcontent.append(data.decode("utf-8", errors="replace")) full_content = "".join(content)if full_content == "404\r\n":print(f"❌ 错误: Post ID '{post_id}' 不存在 (404)")elif full_content.startswith("500\r\n"):print(f"❌ 服务器错误: {full_content.strip()}")elif full_content.startswith("TIMEOUT\r\n"):print(f"❌ 错误: 操作超时")elif not full_content:print(f"❌ 错误: 未收到内容")else:print(f"✅ 成功接收内容 (长度: {len(full_content)} 字节):")print("------------------------------")print(full_content)print("------------------------------")print("操作完成")except ConnectionRefusedError:print(f"错误: 无法连接到 {host}:{port} (服务器可能未运行)")sys.exit(1)except TimeoutError as e:print(f"错误: {e}")sys.exit(1)except ConnectionError as e:print(f"错误: {e}")sys.exit(1)except ProtocolError as e:print(f"错误: 协议不匹配: {e}")sys.exit(1)except Exception as e:print(f"错误: 发生意外错误: {str(e)}")sys.exit(1)class ProtocolError(Exception):passif __name__ == "__main__":main()
post.py
import hashlib
from random import randint, randbytes
from base64 import b64encode
import socket
import ssl
import sys
import os
import mathRECV_SIZE = 1024
POW_LIMIT = 100_000_000 def check_response(client, expected: bytes) -> None:"""验证服务器响应是否符合预期"""client.settimeout(10)try:actual = client.recv(RECV_SIZE)except socket.timeout:raise TimeoutError("server response timeout")assert expected == actual, f"server returned {actual!r} (expected {expected!r})"def check_response_prefix(client, prefix: bytes) -> bytes:"""验证服务器响应前缀,并返回前缀后的内容"""client.settimeout(10)try:response = client.recv(RECV_SIZE)except socket.timeout:raise TimeoutError("server response timeout")assert response.startswith(prefix), f"server returned {response!r} (expected prefix {prefix!r})"return response[len(prefix):].strip()def usage(program_name: str) -> None:"""打印使用说明"""print(f"Usage: {program_name} <host> <port> <file-path>")def main() -> None:args = sys.argv[1:]if len(args) != 3:usage(sys.argv[0])sys.exit(1)host, port_str, file_path = argsport = int(port_str)if not os.path.isfile(file_path):print(f"ERROR: file {file_path!r} does not exist")sys.exit(1)with open(file_path, 'r', encoding='utf-8') as f:content = f.read().splitlines() if not content:print("ERROR: file is empty")sys.exit(1)context = ssl.create_default_context()context.check_hostname = Falsecontext.verify_mode = ssl.CERT_NONE try:with socket.create_connection((host, port)) as sock:with context.wrap_socket(sock, server_hostname=host) as client:check_response(client, b"HI\r\n")print(f"✅ Connected to server: {host}:{port}")client.sendall(b"POST\r\n")check_response(client, b"OK\r\n")print(f"✅ Server accepts POST request")total_lines = len(content)bar_len = 30print(f"📤 Uploading {total_lines} lines...")for idx, line in enumerate(content):client.sendall(f"{line}\r\n".encode('utf-8'))check_response(client, b"OK\r\n")progress = (idx + 1) / total_linesfilled = math.floor(progress * bar_len)bar = '#' * filled + '.' * (bar_len - filled)print(f"\r[{bar}] {idx+1}/{total_lines}", end='', flush=True)print(f"\r[{ '#' * bar_len }] {total_lines}/{total_lines} ✅")client.sendall(b"SUBMIT\r\n")challenge_data = check_response_prefix(client, b"CHALLENGE ").split()assert len(challenge_data) == 3, f"invalid challenge format: {challenge_data}"hash_func, zeros_str, challenge = challenge_dataassert hash_func == b"sha256", f"unsupported hash: {hash_func!r}"leading_zeros = int(zeros_str.decode('utf-8'))challenge_str = challenge.decode('utf-8')print(f"🔒 POW challenge: {leading_zeros} leading zeros")print(f"🔨 Mining solution (max {POW_LIMIT} attempts)...")counter = 0spinner = "-\\|/"found = Falsewhile counter < POW_LIMIT:if counter % 10000 == 0:spin_char = spinner[(counter // 10000) % len(spinner)]print(f"\rTrying... {spin_char} (attempt {counter})", end='', flush=True)prefix_bytes = randbytes(randint(3, 100))prefix = b64encode(prefix_bytes).decode('utf-8')pow_parts = [prefix] + content + [challenge_str, ""]pow_data = "\r\n".join(pow_parts)hash_obj = hashlib.sha256(pow_data.encode('utf-8'))hash_hex = hash_obj.hexdigest()zero_count = 0while zero_count < len(hash_hex) and hash_hex[zero_count] == '0':zero_count += 1if zero_count >= leading_zeros:print(f"\rFound solution after {counter + 1} attempts ✅")client.sendall(f"ACCEPTED {prefix}\r\n".encode('utf-8'))post_id = check_response_prefix(client, b"SENT ").decode('utf-8')print(f"\n🎉 Upload success! Post ID: {post_id}")found = Truebreakcounter += 1if not found:print(f"\n❌ Failed to solve POW (exceeded {POW_LIMIT} attempts)")sys.exit(1)except ConnectionRefusedError:print(f"ERROR: Could not connect to {host}:{port} (server not running?)")sys.exit(1)except Exception as e:print(f"ERROR: {e}")sys.exit(1)if __name__ == "__main__":main()
测试
- 自己run一下generate_cert.go生成证书,启动pastebeam server
- 然后用python脚本post文件,用get脚本获取id以取得文件内容
ref link
- https://github.com/tsoding/pastebeam