当前位置: 首页 > news >正文

分布式文件系统07-小文件系统的请求异步化高并发性能优化

小文件系统的请求异步化高并发性能优化

222_分布式图片存储系统中的高性能指的到底是什么?

重构系统架构,来实现一个高性能。然后就要做非常完善的一个测试,最后对这个系统做一个总结,说说后续我们还要做一些什么东西。另外,我还要给大家留一些作业,相当于是让大家课后自己去做的,就不是完全拷贝我的代码

高并发

前面已经通过Reactor模式实现了

高性能主要是两块

第一块:客户端现在是短连接,每次发送请求,都需要建立连接,然后断开连接。站在客户端的角度而言,发现每执行一次文件上传和下载的操作,速度都很慢

第二块:文件上传,需要多副本上传。一般来说,针对kafka,多副本的时候默认情况下只要写成功一个副本,就返回了。另外其他的副本的写都是异步慢慢来执行的,kafka采取的是副本pull数据的机制,只要在一个数据节点上写成功数据,别的数据节点会主从从这个写成功的数据节点上pull数据

Kafka,强调高性能,生产消息的行为都是尽快的可以完成

HDFS,不强调高性能,它主要针对的是几个GB的大文件上传到服务器上去,只要慢慢上传就可以了,速度慢点无所谓,只要能上传成功。所以,HDFS采用的是多个副本一定要依次上传成功,才可以说是本次文件上传成功了。所以,HDFS的上传速度肯定是很慢的,因为它们根本不强调文件上传过程的高性能。所以Kafka和HDFS的应用场景本身就不相同

高性能架构的重构

  • 短连接 -> 长连接;
  • 同步上传多副本 -> 写一个副本,其他副本在后台慢慢的异步复制和拉取

这样,文件上传和文件下载,性能至少会提升好几倍

223_回头审视一下客户端的短连接模式有哪些问题?

除了客户端有NioClient以外,数据节点也有NioClient,因为他在进行数据节点扩缩容时,需要从其他的数据节点拷贝副本过来写入本地,这个过程使用短连接也无所谓,因为这个过程都是后台慢慢执行的,但是当然最好也是重构成长连接模式

224_初步实现用于进行网络管理的NetworkManager组件

225_在NetworkManager中实现核心线程无限循环进行poll操作

NetworkManager

/*** 网络连接管理器*/
public class NetworkManager {// 正在连接中public static final Integer CONNECTING = 1;// 已经建立连接public static final Integer CONNECTED = 2;// 多路复用Selectorprivate Selector selector;// 所有的连接private Map<String, SocketChannel> connections;// 每个数据节点的连接状态private Map<String, Integer> connectState;// 等待建立连接的机器private ConcurrentLinkedQueue<Host> waitingConnectHosts;public NetworkManager() {try {this.selector = Selector.open();} catch (IOException e) {e.printStackTrace();}this.connections = new ConcurrentHashMap<String, SocketChannel>();this.connectState = new ConcurrentHashMap<String, Integer>();this.waitingConnectHosts = new ConcurrentLinkedQueue<Host>();new NetworkPollThread().start();}/*** 尝试连接到数据节点的端口上去*/public void maybeConnect(String hostname, Integer nioPort) throws Exception {synchronized(this) {if(!connectState.containsKey(hostname)) {connectState.put(hostname, CONNECTING);waitingConnectHosts.offer(new Host(hostname, nioPort)); }while(connectState.get(hostname).equals(CONNECTING)) {wait(100);}}}/*** 尝试把排队中的机器发起连接的请求*/private void tryConnect() {try {Host host = null;SocketChannel channel = null;while((host = waitingConnectHosts.poll()) != null) {channel = SocketChannel.open();  channel.configureBlocking(false);  channel.connect(new InetSocketAddress(host.hostname, host.nioPort)); channel.register(selector, SelectionKey.OP_CONNECT);  }} catch (Exception e) {e.printStackTrace();}}// 网络连接的核心线程class NetworkPollThread extends Thread {@Overridepublic void run() {while(true) {tryConnect();}}}// 代表了一台机器class Host {String hostname;Integer nioPort;public Host(String hostname, Integer nioPort) {this.hostname = hostname;this.nioPort = nioPort;}}}

226_在无限循环的poll方法中完成网络连接的建立

public class NetworkManager {// 正在连接中public static final Integer CONNECTING = 1;// 已经建立连接public static final Integer CONNECTED = 2;// 网络poll操作的超时时间public static final Long POLL_TIMEOUT = 500L; // 多路复用Selectorprivate Selector selector;// 所有的连接private Map<String, SocketChannel> connections;// 每个数据节点的连接状态private Map<String, Integer> connectState;// 等待建立连接的机器private ConcurrentLinkedQueue<Host> waitingConnectHosts;public NetworkManager() {try {this.selector = Selector.open();} catch (IOException e) {e.printStackTrace();}this.connections = new ConcurrentHashMap<String, SocketChannel>();this.connectState = new ConcurrentHashMap<String, Integer>();this.waitingConnectHosts = new ConcurrentLinkedQueue<Host>();new NetworkPollThread().start();}/*** 尝试连接到数据节点的端口上去*/public void maybeConnect(String hostname, Integer nioPort) throws Exception {synchronized(this) {if(!connectState.containsKey(hostname)) {connectState.put(hostname, CONNECTING);waitingConnectHosts.offer(new Host(hostname, nioPort)); }while(connectState.get(hostname).equals(CONNECTING)) {wait(100);}}}// 网络连接的核心线程class NetworkPollThread extends Thread {@Overridepublic void run() {while(true) {tryConnect();poll();}}/*** 尝试把排队中的机器发起连接的请求*/private void tryConnect() {try {Host host = null;SocketChannel channel = null;while((host = waitingConnectHosts.poll()) != null) {channel = SocketChannel.open();  channel.configureBlocking(false);  channel.connect(new InetSocketAddress(host.hostname, host.nioPort)); channel.register(selector, SelectionKey.OP_CONNECT);  }} catch (Exception e) {e.printStackTrace();}}/*** 尝试完成网络连接、请求发送、响应读取*/private void poll() {SocketChannel channel = null;try {int selectedKeys = selector.select(500);   if(selectedKeys <= 0) {return;}Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();  while(keysIterator.hasNext()){  SelectionKey key = (SelectionKey) keysIterator.next();  keysIterator.remove();  // 如果是网络连接操作if(key.isConnectable()){  channel = (SocketChannel) key.channel();if(channel.isConnectionPending()){  while(!channel.finishConnect()) {Thread.sleep(100); }}   System.out.println("完成与服务端的连接的建立......"); InetSocketAddress remoteAddress = (InetSocketAddress)channel.getRemoteAddress();connectState.put(remoteAddress.getHostName(), CONNECTED);connections.put(remoteAddress.getHostName(), channel);}}} catch (Exception e) {e.printStackTrace();if(channel != null) {try {channel.close();} catch (IOException e1) {e1.printStackTrace();}}}}}// 代表了一台机器class Host {String hostname;Integer nioPort;public Host(String hostname, Integer nioPort) {this.hostname = hostname;this.nioPort = nioPort;}}}

227_客户端的核心业务方法对要发送的请求进行封装

228_将封装好的请求放入NetworkManager的请求队列中

229_如何实现异步发送请求以及同步等待响应两个接口

230_对每个数据节点获取一个请求缓存起来等待发送

231_在核心的poll方法中将每个机器暂存等待的请求发送出去

http://www.xdnf.cn/news/1251487.html

相关文章:

  • TCP的拥塞控制
  • CSS :is () 与 :where ():简化复杂选择器的 “语法糖”
  • NodeJs学习日志(1):windows安装使用node.js 安装express,suquelize,sqlite,nodemon
  • 基于Hadoop的股票大数据分析可视化及多模型的股票预测研究与实现
  • 笔试——Day30
  • 如何快速掌握大数据技术?大四学生用Spark和Python构建直肠癌数据分析与可视化系统
  • 【数据结构与算法-Day 12】深入浅出栈:从“后进先出”原理到数组与链表双实现
  • 开疆智能ModbusTCP转Profinet网关连接EPSON机器人配置案例
  • Gitlab+Jenkins+K8S+Registry 建立 CI/CD 流水线
  • MATLAB深度学习之数据集-数据库构建方法详解
  • 无人机开发分享——基于行为树的无人机集群机载自主决策算法框架搭建及开发
  • 2025国赛数学建模C题详细思路模型代码获取,备战国赛算法解析——决策树
  • 信息安全概述
  • Dart中回调函数的简单实现
  • NY112NY117美光固态闪存NY119NY123
  • C++之vector类的代码及其逻辑详解 (下)
  • 【Excel】通过Index函数向下拖动单元格并【重复引用/循环引用】数据源
  • 【Linux】调试器gdb/cgdb的使用
  • 推荐一款优质的开源博客与内容管理系统
  • Android PDFBox 的使用指南
  • 【数据结构与算法】刷题篇——环形链表的约瑟夫问题
  • 8.6笔记
  • 93、【OS】【Nuttx】【构建】cmake menuconfig 目标
  • vxe-table表格编辑单元格,进行正则验证,不符合验证,清空单元格数据。
  • 【“连亏十年” 川机器人,启动科创板IPO辅导】
  • 短剧小程序系统开发:技术驱动下的内容创新之路
  • 后端服务oom
  • [linux] Linux系统中断机制详解及用户空间中断使用方法
  • Java技术栈/面试题合集(19)-架构设计篇
  • Android—服务+通知=>前台服务