Java多线程文件下载和处理程序
1.背景
使用java实现一个文件下载功能,需求:从文件服务器上下载一个csv格式的大文件(约50M),解析文件中的数据,每2万条开启一个线程将数据写入数据库,等待所有的线程把数据写入数据库完成后,调用一个函数告知数据已全部写入完毕,可以进行下一步操作了
2.文件解读代码
import java.io.*;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;public class FileDownloadProcessor {private static final int BATCH_SIZE = 20000;private static final int THREAD_POOL_SIZE = 10;public interface CompletionCallback {void onComplete();}public void processFile(String fileUrl, CompletionCallback callback) {try {// 1. 下载文件String localFilePath = downloadFile(fileUrl);// 2. 解析CSV文件List<String[]> data = parseCSV(localFilePath);// 3. 多线程处理数据ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);List<Future<?>> futures = new ArrayList<>();for (int i = 0; i < data.size(); i += BATCH_SIZE) {int end = Math.min(i + BATCH_SIZE, data.size());List<String[]> batch = data.subList(i, end);futures.add(executor.submit(() -> {DatabaseWriter writer = new DatabaseWriter();writer.writeBatch(batch);}));}// 4. 等待所有任务完成for (Future<?> future : futures) {future.get();}executor.shutdown();// 5. 回调通知完成callback.onComplete();// 6. 删除临时文件Files.deleteIfExists(Paths.get(localFilePath));} catch (Exception e) {e.printStackTrace();}}private String downloadFile(String fileUrl) throws IOException {String localPath = "temp_" + System.currentTimeMillis() + ".csv";try (InputStream in = new URL(fileUrl).openStream();FileOutputStream out = new FileOutputStream(localPath)) {byte[] buffer = new byte[1024];int bytesRead;while ((bytesRead = in.read(buffer)) != -1) {out.write(buffer, 0, bytesRead);}}return localPath;}private List<String[]> parseCSV(String filePath) throws IOException {List<String[]> data = new ArrayList<>();try (BufferedReader br = new BufferedReader(new FileReader(filePath))) {String line;while ((line = br.readLine()) != null) {data.add(line.split(","));}}return data;}
}
3.数据写入代码
java.sql.*;
import java.util.List;public class DatabaseWriter {private static final String DB_URL = "jdbc:mysql://localhost:3306/yourdb";private static final String USER = "username";private static final String PASS = "password";public void writeBatch(List<String[]> batch) {try (Connection conn = DriverManager.getConnection(DB_URL, USER, PASS)) {String sql = "INSERT INTO your_table (col1, col2, ...) VALUES (?, ?, ...)";PreparedStatement stmt = conn.prepareStatement(sql);for (String[] row : batch) {for (int i = 0; i < row.length; i++) {stmt.setString(i + 1, row[i]);}stmt.addBatch();}stmt.executeBatch();} catch (SQLException e) {e.printStackTrace();}}
}
4.代码
public class Main {public static void main(String[] args) {FileDownloadProcessor processor = new FileDownloadProcessor();processor.processFile("http://example.com/largefile.csv", () -> {System.out.println("所有数据已写入数据库,可以执行下一步操作");// 这里添加你的后续处理逻辑});}
}