当前位置: 首页 > news >正文

分布式系统中的网络编程

1.分布式系统通信基础

1. 分布式系统架构模式

▶ 常见分布式架构模式

▶ 分布式系统通信挑战

挑战

描述

解决方案

网络延迟

节点间物理距离导致延迟

缓存、就近访问

节点故障

部分节点不可用

冗余设计、故障检测

消息丢失

网络问题导致消息丢失

可靠传输协议、重试机制

一致性保证

数据在多节点间保持一致

共识算法、最终一致性

2. 远程通信范式

▶ RPC与消息队列对比

特性

RPC

消息队列

通信模式

请求 - 响应同步模式

异步发布 - 订阅模式

耦合度

高(客户端依赖服务接口)

低(生产者与消费者解耦)

可靠性

依赖网络和服务可用性

消息持久化,可靠性高

适用场景

服务间强依赖的实时调用

异步任务处理、流量削峰

2.远程方法调用(RPC)实现

1. 简易RPC框架设计

▶ RPC基本流程

2. Java实现简易RPC框架

// 服务接口定义
public interface HelloService {String sayHello(String name);
}// 服务实现
public class HelloServiceImpl implements HelloService {@Overridepublic String sayHello(String name) {return "Hello, " + name + "!";}
}// RPC请求对象
public class RpcRequest implements Serializable {private static final long serialVersionUID = 1L;private String className;private String methodName;private Object[] parameters;// getters and setters
}// RPC响应对象
public class RpcResponse implements Serializable {private static final long serialVersionUID = 1L;private Object result;private Throwable exception;// getters and setters
}// 服务端
public class RpcServer {private final int port;private final Map<String, Object> serviceMap = new HashMap<>();public RpcServer(int port) {this.port = port;}public void registerService(String serviceName, Object service) {
        serviceMap.put(serviceName, service);}public void start() throws IOException {try (ServerSocket serverSocket = new ServerSocket(port)) {System.out.println("RPC服务器启动,监听端口: " + port);while (true) {try (Socket socket = serverSocket.accept();ObjectInputStream input = new ObjectInputStream(socket.getInputStream());ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream())) {// 读取请求RpcRequest request = (RpcRequest) input.readObject();// 查找并调用服务Object service = serviceMap.get(request.getClassName());RpcResponse response = new RpcResponse();try {Method method = service.getClass().getMethod(
                                request.getMethodName(), getParameterTypes(request.getParameters()));Object result = method.invoke(service, request.getParameters());
                        response.setResult(result);} catch (Exception e) {
                        response.setException(e);}// 返回响应
                    output.writeObject(response);} catch (ClassNotFoundException e) {
                    e.printStackTrace();}}}}private Class<?>[] getParameterTypes(Object[] parameters) {if (parameters == null || parameters.length == 0) {return new Class[0];}Class<?>[] parameterTypes = new Class[parameters.length];for (int i = 0; i < parameters.length; i++) {
            parameterTypes[i] = parameters[i].getClass();}return parameterTypes;}
}// 客户端
public class RpcClient {private final String host;private final int port;public RpcClient(String host, int port) {this.host = host;this.port = port;}@SuppressWarnings("unchecked")public <T> T createProxy(Class<T> serviceClass) {return (T) Proxy.newProxyInstance(
                serviceClass.getClassLoader(),new Class<?>[]{serviceClass},(proxy, method, args) -> {try (Socket socket = new Socket(host, port);ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());ObjectInputStream input = new ObjectInputStream(socket.getInputStream())) {// 创建并发送请求RpcRequest request = new RpcRequest();
                        request.setClassName(serviceClass.getName());
                        request.setMethodName(method.getName());
                        request.setParameters(args);                        output.writeObject(request);// 接收响应RpcResponse response = (RpcResponse) input.readObject();if (response.getException() != null) {throw response.getException();}return response.getResult();} catch (ClassNotFoundException e) {throw new RuntimeException(e);}});}
}// 使用示例
public class RpcExample {public static void main(String[] args) {// 启动服务器new Thread(() -> {try {RpcServer server = new RpcServer(9000);
                server.registerService(HelloService.class.getName(), new HelloServiceImpl());
                server.start();} catch (IOException e) {
                e.printStackTrace();}}).start();// 客户端调用RpcClient client = new RpcClient("localhost", 9000);HelloService service = client.createProxy(HelloService.class);String result = service.sayHello("RPC");System.out.println("远程调用结果: " + result);}
}

3.分布式缓存实现

1. 分布式缓存架构

▶ 缓存模式对比

模式

描述

优点

缺点

本地缓存

缓存数据存储在应用进程内

访问速度极快

无法共享,内存浪费

集中式缓存

所有应用访问同一缓存服务器

数据共享,减少冗余

单点故障风险

分布式缓存

缓存数据分布在多个节点

高可用,可扩展性强

实现复杂度高

2. 简易分布式缓存系统

// 缓存节点接口
public interface CacheNode {void put(String key, Object value);Object get(String key);void remove(String key);Set<String> getKeys();
}// 本地缓存实现
public class LocalCache implements CacheNode {private final ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap<>();@Overridepublic void put(String key, Object value) {
        cache.put(key, value);}@Overridepublic Object get(String key) {return cache.get(key);}@Overridepublic void remove(String key) {
        cache.remove(key);}@Overridepublic Set<String> getKeys() {return cache.keySet();}
}// 分布式缓存节点
public class DistributedCacheNode implements CacheNode {private final String nodeId;private final int port;private final LocalCache localCache = new LocalCache();private final Map<String, CacheNode> remoteNodes = new ConcurrentHashMap<>();private final ExecutorService threadPool = Executors.newFixedThreadPool(10);public DistributedCacheNode(String nodeId, int port) {this.nodeId = nodeId;this.port = port;}// 启动节点服务器public void start() {
        threadPool.submit(() -> {try (ServerSocket serverSocket = new ServerSocket(port)) {System.out.println("缓存节点 " + nodeId + " 启动,监听端口: " + port);while (true) {try (Socket socket = serverSocket.accept();ObjectInputStream input = new ObjectInputStream(socket.getInputStream());ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream())) {CacheRequest request = (CacheRequest) input.readObject();CacheResponse response = new CacheResponse();try {switch (request.getOperation()) {case PUT:
                                    localCache.put(request.getKey(), request.getValue());break;case GET:
                                    response.setValue(localCache.get(request.getKey()));break;case REMOVE:
                                    localCache.remove(request.getKey());break;case GET_KEYS:
                                    response.setKeys(localCache.getKeys());break;}} catch (Exception e) {
                            response.setException(e);}                        output.writeObject(response);} catch (ClassNotFoundException e) {
                        e.printStackTrace();}}} catch (IOException e) {
                e.printStackTrace();}});}// 连接到其他节点public void connectToNode(String nodeId, String host, int port) {
        remoteNodes.put(nodeId, new RemoteCacheNode(host, port));}// 分布式哈希算法确定节点private String getResponsibleNode(String key) {// 简化实现,实际应使用一致性哈希算法List<String> nodeIds = new ArrayList<>(remoteNodes.keySet());
        nodeIds.add(this.nodeId);Collections.sort(nodeIds);return nodeIds.get(Math.abs(key.hashCode()) % nodeIds.size());}@Overridepublic void put(String key, Object value) {String responsibleNode = getResponsibleNode(key);if (responsibleNode.equals(this.nodeId)) {
            localCache.put(key, value);} else {
            remoteNodes.get(responsibleNode).put(key, value);}}@Overridepublic Object get(String key) {String responsibleNode = getResponsibleNode(key);if (responsibleNode.equals(this.nodeId)) {return localCache.get(key);} else {return remoteNodes.get(responsibleNode).get(key);}}@Overridepublic void remove(String key) {String responsibleNode = getResponsibleNode(key);if (responsibleNode.equals(this.nodeId)) {
            localCache.remove(key);} else {
            remoteNodes.get(responsibleNode).remove(key);}}@Overridepublic Set<String> getKeys() {// 简化实现,实际应合并所有节点的keysreturn localCache.getKeys();}
}// 远程缓存节点代理
public class RemoteCacheNode implements CacheNode {private final String host;private final int port;public RemoteCacheNode(String host, int port) {this.host = host;this.port = port;}@Overridepublic void put(String key, Object value) {sendRequest(new CacheRequest(CacheOperation.PUT, key, value));}@Overridepublic Object get(String key) {CacheResponse response = sendRequest(new CacheRequest(CacheOperation.GET, key));return response.getValue();}@Overridepublic void remove(String key) {sendRequest(new CacheRequest(CacheOperation.REMOVE, key));}@Overridepublic Set<String> getKeys() {CacheResponse response = sendRequest(new CacheRequest(CacheOperation.GET_KEYS));return response.getKeys();}private CacheResponse sendRequest(CacheRequest request) {try (Socket socket = new Socket(host, port);ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());ObjectInputStream input = new ObjectInputStream(socket.getInputStream())) {            output.writeObject(request);return (CacheResponse) input.readObject();} catch (Exception e) {throw new RuntimeException("远程缓存调用失败", e);}}
}

4.消息队列实现

1. 消息队列核心组件

▶ 消息队列基本架构

2. 简易消息队列实现

// 消息接口
public interface Message {String getId();String getTopic();Object getPayload();long getTimestamp();
}// 消息实现
public class DefaultMessage implements Message {private final String id;private final String topic;private final Object payload;private final long timestamp;public DefaultMessage(String id, String topic, Object payload) {this.id = id;this.topic = topic;this.payload = payload;this.timestamp = System.currentTimeMillis();}@Overridepublic String getId() {return id;}@Overridepublic String getTopic() {return topic;}@Overridepublic Object getPayload() {return payload;}@Overridepublic long getTimestamp() {return timestamp;}
}// 消息队列接口
public interface MessageQueue {void publish(Message message);Message consume(String topic);void subscribe(String topic, MessageListener listener);void unsubscribe(String topic, MessageListener listener);
}// 消息队列实现
public class SimpleMessageQueue implements MessageQueue {private final Map<String, BlockingQueue<Message>> topicQueues = new ConcurrentHashMap<>();private final Map<String, List<MessageListener>> topicListeners = new ConcurrentHashMap<>();private final ExecutorService threadPool = Executors.newFixedThreadPool(10);@Overridepublic synchronized void publish(Message message) {String topic = message.getTopic();
        topicQueues.computeIfAbsent(topic, k -> new LinkedBlockingQueue<>()).add(message);// 异步通知所有订阅者List<MessageListener> listeners = topicListeners.getOrDefault(topic, Collections.emptyList());for (MessageListener listener : listeners) {
            threadPool.submit(() -> listener.onMessage(message));}}@Overridepublic Message consume(String topic) {BlockingQueue<Message> queue = topicQueues.get(topic);return queue != null ? queue.poll() : null;}@Overridepublic synchronized void subscribe(String topic, MessageListener listener) {
        topicListeners.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).add(listener);}@Overridepublic synchronized void unsubscribe(String topic, MessageListener listener) {List<MessageListener> listeners = topicListeners.get(topic);if (listeners != null) {
            listeners.remove(listener);}}
}// 分布式消息队列节点
public class DistributedMessageQueueNode implements MessageQueue {private final String nodeId;private final int port;private final SimpleMessageQueue localQueue = new SimpleMessageQueue();private final Map<String, MessageQueue> remoteNodes = new ConcurrentHashMap<>();private final ConsistentHashRouter router;public DistributedMessageQueueNode(String nodeId, int port, List<String> nodeIds) {this.nodeId = nodeId;this.port = port;this.router = new ConsistentHashRouter(nodeIds);}// 启动节点服务器public void start() {// 启动网络服务接收消息new Thread(() -> {try (ServerSocket serverSocket = new ServerSocket(port)) {System.out.println("消息队列节点 " + nodeId + " 启动,监听端口: " + port);while (true) {try (Socket socket = serverSocket.accept();ObjectInputStream input = new ObjectInputStream(socket.getInputStream());ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream())) {// 处理消息请求MessageRequest request = (MessageRequest) input.readObject();switch (request.getType()) {case PUBLISH:
                                localQueue.publish(request.getMessage());break;case CONSUME:Message message = localQueue.consume(request.getTopic());
                                output.writeObject(message);break;}} catch (ClassNotFoundException e) {
                        e.printStackTrace();}}} catch (IOException e) {
                e.printStackTrace();}}).start();}// 连接到其他节点public void connectToNode(String nodeId, String host, int port) {
        remoteNodes.put(nodeId, new RemoteMessageQueueNode(host, port));}@Overridepublic void publish(Message message) {String responsibleNode = router.getNode(message.getTopic());if (responsibleNode.equals(nodeId)) {
            localQueue.publish(message);} else {
            remoteNodes.get(responsibleNode).publish(message);}}@Overridepublic Message consume(String topic) {String responsibleNode = router.getNode(topic);if (responsibleNode.equals(nodeId)) {return localQueue.consume(topic);} else {return remoteNodes.get(responsibleNode).consume(topic);}}@Overridepublic void subscribe(String topic, MessageListener listener) {
        localQueue.subscribe(topic, listener);}@Overridepublic void unsubscribe(String topic, MessageListener listener) {
        localQueue.unsubscribe(topic, listener);}
}

4.分布式系统实践建议

1. 服务发现与注册

  • ZooKeeper:分布式协调服务,用于服务注册与发现
  • Consul:提供服务发现、健康检查、KV存储等功能
  • Etcd:高可用的键值存储,用于服务发现和配置共享

2. 负载均衡

  • 客户端负载均衡:服务消费者维护服务列表,自行选择节点
  • 服务端负载均衡:通过负载均衡器(如Nginx、HAProxy)分发请求

3. 容错与恢复

  • 熔断机制:当服务不可用时,快速失败而不是等待
  • 降级策略:当资源不足时,提供简化版服务
  • 重试机制:对临时性故障进行有限次数的重试

以上示例,介绍了分布式系统中网络编程的核心概念和实现方法。在实际开发中,建议使用成熟的开源框架(如gRPC、Redis、Kafka)来简化分布式系统的开发,避免重复造轮子。

http://www.xdnf.cn/news/669457.html

相关文章:

  • wordpress迁移到Hostinger
  • 爬虫入门指南-某专利网站的专利数据查询并存储
  • YOLOv2 深度解析:目标检测领域的进阶之路
  • 【文献阅读】EndoChat: Grounded Multimodal Large Language Model for Endoscopic Surgery
  • 【HW系列】—目录扫描、口令爆破、远程RCE流量特征
  • 攻防世界-ics-07
  • 【Web应用】基础篇04-功能详解-权限控制(创建菜单--分配角色--创建用户)
  • 使用 scikit-learn 库对乌克兰冲突事件数据集进行多维度分类分析
  • ABP VNext + Apache Flink 实时流计算:打造高可用“交易风控”系统
  • 【深度学习】9. CNN性能提升-轻量化模型专辑:SqueezeNet / MobileNet / ShuffleNet / EfficientNet
  • 汽车电子/电气(E/E)架构将朝着区域(分区)式架构方向发展
  • Filebeat es 同步服务器日志到es
  • C++ STL 容器:List 深度解析与实践指南
  • Linux编辑器——vim的使用
  • 文件上传白名单绕过(图片马 - 图片二次渲染绕过)
  • React从基础入门到高级实战:React 核心技术 - React 与 TypeScript:构建类型安全的应用
  • 第十章:构建之巅 · 打包与部署的终极试炼
  • uniapp-商城-72-shop(5-商品列表,步进器添加商品到的购物车实现)
  • Unsupervised Learning-Word Embedding
  • 如何提高CAD作图设计效率,技术分享
  • 每日算法 -【Swift 算法】实现回文数判断!
  • stm32f系列工程切换到H系列
  • 电芯单节精密焊接机:以先进功能与特点赋能电池制造科技升级
  • 传统数据表设计与Prompt驱动设计的范式对比:以NBA投篮数据表为例
  • PHPStudy 一键式网站搭建工具的下载使用
  • EfficientLLM: Efficiency in Large Language Models 高效大模型
  • AppArmor(Application Armor)是 Linux 内核的一个安全模块
  • 比亚迪“双剑”电池获中汽中心权威认证,堪称“移动安全堡垒”。
  • HTTPS 协议:数据传输安全的坚实堡垒
  • 视频监控汇聚平台EasyCVR工业与安全监控:防爆摄像机的安全应用与注意事项