NIO----JAVA
在 Java 编程领域,NIO(New I/O)以其高效的 I/O 处理能力成为高并发、大数据量场景下的重要技术。当 NIO 的网络编程与文件处理能力相结合,能实现诸如文件传输系统等实用功能。下面将深入探讨 Java NIO 知识,并展示如何将网络编程与文件处理结合的具体实践。
Java NIO 核心组件解析
Java NIO 主要由 Buffer(缓冲区)、Channel(通道)和 Selector(选择器)三大核心组件构成。
Buffer:数据存储的容器
Buffer 用于存储特定基本数据类型的数据,其具备 capacity(容量)、position(位置)、limit(上限)和 mark(标记)等关键属性。以 ByteBuffer 为例,创建容量为 10 的 ByteBuffer 并写入数据,再切换到读模式读取,使用完毕后清空缓冲区,示例代码如下:
import java.nio.ByteBuffer;public class BufferExample {public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.allocate(10);buffer.put((byte) 'H');buffer.put((byte) 'e');buffer.put((byte) 'l');buffer.put((byte) 'l');buffer.put((byte) 'o');buffer.flip();while(buffer.hasRemaining()) {System.out.print((char) buffer.get());}buffer.clear();}
}
Channel:双向数据通道
Channel 是对传统 I/O 流的升级,支持双向数据传输,可与 Buffer 直接交互,常见实现有 FileChannel、SocketChannel 等。利用 FileChannel 进行文件复制,能直接在通道之间传输数据,提高效率:
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;public class FileChannelExample {public static void main(String[] args) {try (FileInputStream fis = new FileInputStream("source.txt");FileOutputStream fos = new FileOutputStream("target.txt");FileChannel inChannel = fis.getChannel();FileChannel outChannel = fos.getChannel()) {inChannel.transferTo(0, inChannel.size(), outChannel);} catch (IOException e) {e.printStackTrace();}}
}
Selector:多路复用的关键
Selector 可检测多个 Channel 上的事件,实现单个线程管理多个 Channel。在使用时,先创建 Selector 实例,将 Channel 注册到 Selector 并指定感兴趣的事件,再通过 select () 方法等待事件发生并处理。如下是使用 Selector 实现的简单 TCP 服务器示例:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;public class NioServer {private static final int PORT = 8080;private static final int BUFFER_SIZE = 1024;public static void main(String[] args) {try (Selector selector = Selector.open();ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {serverSocketChannel.bind(new InetSocketAddress(PORT));serverSocketChannel.configureBlocking(false);serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("服务器启动,监听端口: " + PORT);while (true) {int readyChannels = selector.select();if (readyChannels == 0) continue;Set<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> keyIterator = selectedKeys.iterator();while (keyIterator.hasNext()) {SelectionKey key = keyIterator.next();if (key.isAcceptable()) {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel socketChannel = ssc.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);System.out.println("新客户端连接: " + socketChannel.getRemoteAddress());} else if (key.isReadable()) {SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);int bytesRead = socketChannel.read(buffer);if (bytesRead > 0) {buffer.flip();byte[] data = new byte[buffer.remaining()];buffer.get(data);String message = new String(data);System.out.println("收到客户端消息: " + message);ByteBuffer response = ByteBuffer.wrap(("服务器已收到: " + message).getBytes());socketChannel.write(response);} else if (bytesRead == -1) {System.out.println("客户端断开连接: " + socketChannel.getRemoteAddress());socketChannel.close();}}keyIterator.remove();}}} catch (IOException e) {e.printStackTrace();}}
}
Java NIO 在文件处理中的应用
文件的基本读写
NIO 提供了强大的文件处理能力。在文件读取方面,通过 FileChannel 和 ByteBuffer,利用直接缓冲区减少内存拷贝,实现高效读取:
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;public class FileReadExample {public static void main(String[] args) {Path path = Paths.get("example.txt");try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {ByteBuffer buffer = ByteBuffer.allocateDirect(1024);while (channel.read(buffer) != -1) {buffer.flip();while (buffer.hasRemaining()) {System.out.print((char) buffer.get());}buffer.clear();}} catch (IOException e) {e.printStackTrace();}}
}
文件写入时,可指定文件的打开方式,如创建、写入和截断原有内容等:
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;public class FileWriteExample {public static void main(String[] args) {Path path = Paths.get("output.txt");String content = "Hello, NIO File Channel!";try (FileChannel channel = FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)) {ByteBuffer buffer = ByteBuffer.wrap(content.getBytes());channel.write(buffer);System.out.println("文件写入成功");} catch (IOException e) {e.printStackTrace();}}
}
大文件处理与其他操作
对于大文件复制,可使用内存映射文件技术,将文件部分直接映射到内存,减少系统调用和数据拷贝。此外,还能进行文件追加写入、属性操作以及目录遍历等操作 :
// 大文件复制(内存映射)
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;public class LargeFileCopyExample {private static final int BUFFER_SIZE = 1024 * 1024;public static void main(String[] args) {Path source = Paths.get("source_large_file.dat");Path target = Paths.get("target_large_file.dat");try (FileChannel inChannel = FileChannel.open(source, StandardOpenOption.READ);FileChannel outChannel = FileChannel.open(target, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {long size = inChannel.size();long position = 0;while (position < size) {long chunkSize = Math.min(size - position, BUFFER_SIZE);MappedByteBuffer inBuffer = inChannel.map(FileChannel.MapMode.READ_ONLY, position, chunkSize);outChannel.write(inBuffer);position += chunkSize;}System.out.println("大文件复制完成");} catch (IOException e) {e.printStackTrace();}}
}// 文件追加写入
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;public class FileAppendExample {public static void main(String[] args) {Path path = Paths.get("append.txt");String appendContent = "\nThis is appended content.";try (FileChannel channel = FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) {ByteBuffer buffer = ByteBuffer.wrap(appendContent.getBytes());channel.write(buffer);System.out.println("内容追加成功");} catch (IOException e) {e.printStackTrace();}}
}// 文件属性操作
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileTime;
import java.time.Instant;public class FileAttributeExample {public static void main(String[] args) {Path path = Paths.get("example.txt");try {if (Files.exists(path)) {BasicFileAttributes attrs = Files.readAttributes(path, BasicFileAttributes.class);System.out.println("创建时间: " + attrs.creationTime());System.out.println("最后修改时间: " + attrs.lastModifiedTime());System.out.println("是否为目录: " + attrs.isDirectory());System.out.println("文件大小: " + attrs.size() + " 字节");Files.setLastModifiedTime(path, FileTime.from(Instant.now()));System.out.println("已更新文件最后修改时间");Path newPath = Paths.get("example_renamed.txt");Files.move(path, newPath, StandardCopyOption.REPLACE_EXISTING);System.out.println("文件已重命名");} else {System.out.println("文件不存在");}} catch (IOException e) {e.printStackTrace();}}
}// 目录遍历
import java.io.IOException;
import java.nio.file.*;public class DirectoryTraversalExample {public static void main(String[] args) {Path dir = Paths.get(".");try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {System.out.println("目录内容:");for (Path path : stream) {if (Files.isDirectory(path)) {System.out.println("[目录] " + path.getFileName());} else {System.out.println("[文件] " + path.getFileName() + " (大小: " + Files.size(path) + " 字节)");}}} catch (IOException | DirectoryIteratorException e) {e.printStackTrace();}}
}
Java NIO 网络编程与文件处理的结合实践
将 NIO 的网络编程与文件处理功能相结合,可实现文件传输系统。以下为一个简单的文件传输服务器和客户端示例。
服务器端实现
服务器端使用 Selector 实现非阻塞 I/O,能同时处理多个客户端连接,支持文件上传和下载,文件存储在server_files
目录下,并通过状态机管理客户端连接的不同阶段。核心代码如下:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.file.*;
import java.util.Iterator;
import java.util.Set;public class FileTransferServer {private static final int PORT = 8080;private static final int BUFFER_SIZE = 8192;private static final Path FILE_DIR = Paths.get("server_files");public static void main(String[] args) {// 创建服务器文件目录try {Files.createDirectories(FILE_DIR);} catch (IOException e) {System.err.println("无法创建文件目录: " + e.getMessage());return;}try (Selector selector = Selector.open();ServerSocketChannel serverChannel = ServerSocketChannel.open()) {// 配置服务器通道serverChannel.bind(new InetSocketAddress(PORT));serverChannel.configureBlocking(false);serverChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("文件传输服务器启动,监听端口: " + PORT);// 事件循环while (true) {selector.select();Set<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> keyIterator = selectedKeys.iterator();while (keyIterator.hasNext()) {SelectionKey key = keyIterator.next();if (key.isAcceptable()) {handleAccept(key, selector);} else if (key.isReadable()) {handleRead(key);} else if (key.isWritable()) {handleWrite(key);}keyIterator.remove();}}} catch (IOException e) {e.printStackTrace();}}// 处理新连接private static void handleAccept(SelectionKey key, Selector selector) throws IOException {ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();SocketChannel clientChannel = serverChannel.accept();clientChannel.configureBlocking(false);// 注册读事件,等待客户端命令clientChannel.register(selector, SelectionKey.OP_READ, new ClientState());System.out.println("新客户端连接: " + clientChannel.getRemoteAddress());}// 处理读事件private static void handleRead(SelectionKey key) throws IOException {SocketChannel clientChannel = (SocketChannel) key.channel();ClientState state = (ClientState) key.attachment();ByteBuffer buffer = state.buffer;buffer.clear();int bytesRead = clientChannel.read(buffer);if (bytesRead == -1) {// 客户端关闭连接System.out.println("客户端断开连接: " + clientChannel.getRemoteAddress());clientChannel.close();return;}buffer.flip();if (!state.commandProcessed) {// 处理命令processCommand(key, buffer, state);} else if (state.operation == Operation.UPLOAD) {// 处理文件上传processFileUpload(key, buffer, state);} else if (state.operation == Operation.DOWNLOAD) {// 准备文件下载prepareFileDownload(key, state);}}// 处理写事件private static void handleWrite(SelectionKey key) throws IOException {SocketChannel clientChannel = (SocketChannel) key.channel();ClientState state = (ClientState) key.attachment();if (state.operation == Operation.DOWNLOAD && state.fileChannel != null) {// 执行文件下载long bytesWritten = state.fileChannel.transferTo(state.position, BUFFER_SIZE, clientChannel);state.position += bytesWritten;if (state.position >= state.fileSize) {// 文件传输完成System.out.println("文件下载完成: " + state.fileName);cleanupFileTransfer(state);key.interestOps(SelectionKey.OP_READ);}}}// 处理客户端命令private static void processCommand(SelectionKey key, ByteBuffer buffer, ClientState state) throws IOException {// 读取命令类型if (buffer.remaining() < 1) return;byte command = buffer.get();// 读取文件名长度if (buffer.remaining() < 4) return;int fileNameLength = buffer.getInt();// 读取文件名if (buffer.remaining() < fileNameLength) return;byte[] fileNameBytes = new byte[fileNameLength];buffer.get(fileNameBytes);String fileName = new String(fileNameBytes);state.fileName = fileName;Path filePath = FILE_DIR.resolve(fileName);if (command == 'U') {// 上传命令state.operation = Operation.UPLOAD;state.fileChannel = FileChannel.open(filePath, StandardOpenOption.CREATE, StandardOpenOption.WRITE);state.commandProcessed = true;System.out.println("准备接收文件: " + fileName);} else if (command == 'D') {// 下载命令if (Files.exists(filePath)) {state.operation = Operation.DOWNLOAD;state.fileSize = Files.size(filePath);state.fileChannel = FileChannel.open(filePath, StandardOpenOption.READ);state.commandProcessed = true;// 发送文件大小给客户端ByteBuffer sizeBuffer = ByteBuffer.allocate(8);sizeBuffer.putLong(state.fileSize);sizeBuffer.flip();((SocketChannel) key.channel()).write(sizeBuffer);key.interestOps(SelectionKey.OP_WRITE);System.out.println("准备发送文件: " + fileName + " (大小: " + state.fileSize + " 字节)");} else {// 文件不存在ByteBuffer response = ByteBuffer.wrap("FILE_NOT_FOUND".getBytes());((SocketChannel) key.channel()).write(response);state.reset();}}}// 处理文件上传private static void processFileUpload(SelectionKey key, ByteBuffer buffer, ClientState state) throws IOException {if (buffer.hasRemaining()) {state.fileChannel.write(buffer);}// 检查文件是否传输完成if (state.expectedFileSize > 0 && state.fileChannel.size() >= state.expectedFileSize) {System.out.println("文件上传完成: " + state.fileName);cleanupFileTransfer(state);key.interestOps(SelectionKey.OP_READ);}}// 准备文件下载private static void prepareFileDownload(SelectionKey key, ClientState state) {key.interestOps(SelectionKey.OP_WRITE);}// 清理文件传输资源private static void cleanupFileTransfer(ClientState state) {try {if (state.fileChannel != null) {state.fileChannel.close();state.fileChannel = null;}} catch (IOException e) {e.printStackTrace();}state.reset();}// 客户端状态private static class ClientState {ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);Operation operation = null;String fileName = null;FileChannel fileChannel = null;long fileSize = 0;long position = 0;long expectedFileSize = 0;boolean commandProcessed = false;void reset() {operation = null;fileName = null;fileSize = 0;position = 0;expectedFileSize = 0;commandProcessed = false;}}// 操作类型private enum Operation {UPLOAD, DOWNLOAD}
}
客户端完整代码
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.*;
import java.util.Scanner;public class FileTransferClient {private static final String SERVER_HOST = "localhost";private static final int SERVER_PORT = 8080;private static final int BUFFER_SIZE = 8192;private static final Path DOWNLOAD_DIR = Paths.get("client_downloads");public static void main(String[] args) {// 创建下载目录try {Files.createDirectories(DOWNLOAD_DIR);} catch (IOException e) {System.err.println("无法创建下载目录: " + e.getMessage());return;}try (SocketChannel socketChannel = SocketChannel.open();Scanner scanner = new Scanner(System.in)) {// 连接服务器socketChannel.connect(new InetSocketAddress(SERVER_HOST, SERVER_PORT));System.out.println("已连接到服务器: " + SERVER_HOST + ":" + SERVER_PORT);while (true) {System.out.println("\n请选择操作:");System.out.println("1. 上传文件");System.out.println("2. 下载文件");System.out.println("3. 退出");System.out.print("输入选项: ");int choice = scanner.nextInt();scanner.nextLine(); // 消耗换行符switch (choice) {case 1:uploadFile(socketChannel, scanner);break;case 2:downloadFile(socketChannel, scanner);break;case 3:System.out.println("退出客户端");return;default:System.out.println("无效选项");}}} catch (IOException e) {e.printStackTrace();}}// 上传文件private static void uploadFile(SocketChannel socketChannel, Scanner scanner) throws IOException {System.out.print("请输入要上传的文件路径: ");String filePath = scanner.nextLine();Path path = Paths.get(filePath);if (!Files.exists(path) || Files.isDirectory(path)) {System.out.println("文件不存在或为目录");return;}String fileName = path.getFileName().toString();long fileSize = Files.size(path);// 发送上传命令ByteBuffer commandBuffer = ByteBuffer.allocate(5 + fileName.getBytes().length);commandBuffer.put((byte) 'U'); // U 表示上传commandBuffer.putInt(fileName.getBytes().length);commandBuffer.put(fileName.getBytes());commandBuffer.flip();socketChannel.write(commandBuffer);// 发送文件内容try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ)) {long bytesTransferred = 0;long totalBytes = fileSize;System.out.println("开始上传文件: " + fileName + " (大小: " + fileSize + " 字节)");while (bytesTransferred < totalBytes) {bytesTransferred += fileChannel.transferTo(0, totalBytes, socketChannel);System.out.printf("上传进度: %.2f%%\r", (100.0 * bytesTransferred / totalBytes));}System.out.println("\n文件上传完成");}}// 下载文件private static void downloadFile(SocketChannel socketChannel, Scanner scanner) throws IOException {System.out.print("请输入要下载的文件名: ");String fileName = scanner.nextLine();// 发送下载命令ByteBuffer commandBuffer = ByteBuffer.allocate(5 + fileName.getBytes().length);commandBuffer.put((byte) 'D'); // D 表示下载commandBuffer.putInt(fileName.getBytes().length);commandBuffer.put(fileName.getBytes());commandBuffer.flip();socketChannel.write(commandBuffer);// 接收文件大小ByteBuffer sizeBuffer = ByteBuffer.allocate(8);int bytesRead = socketChannel.read(sizeBuffer);if (bytesRead == -1) {System.out.println("服务器断开连接");return;}sizeBuffer.flip();if (sizeBuffer.remaining() < 8) {// 读取错误消息byte[] errorBytes = new byte[sizeBuffer.remaining()];sizeBuffer.get(errorBytes);String errorMessage = new String(errorBytes);System.out.println("下载失败: " + errorMessage);return;}long fileSize = sizeBuffer.getLong();Path downloadPath = DOWNLOAD_DIR.resolve(fileName);// 下载文件try (FileChannel fileChannel = FileChannel.open(downloadPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {System.out.println("开始下载文件: " + fileName + " (大小: " + fileSize + " 字节)");ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);long bytesDownloaded = 0;while (bytesDownloaded < fileSize) {buffer.clear();int bytesReadThisTime = socketChannel.read(buffer);if (bytesReadThisTime == -1) {System.out.println("下载中断,文件可能不完整");break;}buffer.flip();fileChannel.write(buffer);bytesDownloaded += bytesReadThisTime;System.out.printf("下载进度: %.2f%%\r", (100.0 * bytesDownloaded / fileSize));}System.out.println("\n文件下载完成,保存至: " + downloadPath);}}
}
通信协议说明
这个文件传输系统使用了简单的自定义协议:
-
命令格式:
- [命令类型 (1 字节)][文件名长度 (4 字节)][文件名 (n 字节)]
- 上传命令:'U' + 文件名
- 下载命令:'D' + 文件名
-
数据传输:
- 文件数据直接通过 Channel 传输
- 下载前先发送文件大小信息
- 上传时服务器根据接收数据自动创建文件