当前位置: 首页 > backend >正文

Java多线程文件下载和处理程序

1.背景

使用java实现一个文件下载功能,需求:从文件服务器上下载一个csv格式的大文件(约50M),解析文件中的数据,每2万条开启一个线程将数据写入数据库,等待所有的线程把数据写入数据库完成后,调用一个函数告知数据已全部写入完毕,可以进行下一步操作了

2.文件解读代码


import java.io.*;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;public class FileDownloadProcessor {private static final int BATCH_SIZE = 20000;private static final int THREAD_POOL_SIZE = 10;public interface CompletionCallback {void onComplete();}public void processFile(String fileUrl, CompletionCallback callback) {try {// 1. 下载文件String localFilePath = downloadFile(fileUrl);// 2. 解析CSV文件List<String[]> data = parseCSV(localFilePath);// 3. 多线程处理数据ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);List<Future<?>> futures = new ArrayList<>();for (int i = 0; i < data.size(); i += BATCH_SIZE) {int end = Math.min(i + BATCH_SIZE, data.size());List<String[]> batch = data.subList(i, end);futures.add(executor.submit(() -> {DatabaseWriter writer = new DatabaseWriter();writer.writeBatch(batch);}));}// 4. 等待所有任务完成for (Future<?> future : futures) {future.get();}executor.shutdown();// 5. 回调通知完成callback.onComplete();// 6. 删除临时文件Files.deleteIfExists(Paths.get(localFilePath));} catch (Exception e) {e.printStackTrace();}}private String downloadFile(String fileUrl) throws IOException {String localPath = "temp_" + System.currentTimeMillis() + ".csv";try (InputStream in = new URL(fileUrl).openStream();FileOutputStream out = new FileOutputStream(localPath)) {byte[] buffer = new byte[1024];int bytesRead;while ((bytesRead = in.read(buffer)) != -1) {out.write(buffer, 0, bytesRead);}}return localPath;}private List<String[]> parseCSV(String filePath) throws IOException {List<String[]> data = new ArrayList<>();try (BufferedReader br = new BufferedReader(new FileReader(filePath))) {String line;while ((line = br.readLine()) != null) {data.add(line.split(","));}}return data;}
}

3.数据写入代码

 java.sql.*;
import java.util.List;public class DatabaseWriter {private static final String DB_URL = "jdbc:mysql://localhost:3306/yourdb";private static final String USER = "username";private static final String PASS = "password";public void writeBatch(List<String[]> batch) {try (Connection conn = DriverManager.getConnection(DB_URL, USER, PASS)) {String sql = "INSERT INTO your_table (col1, col2, ...) VALUES (?, ?, ...)";PreparedStatement stmt = conn.prepareStatement(sql);for (String[] row : batch) {for (int i = 0; i < row.length; i++) {stmt.setString(i + 1, row[i]);}stmt.addBatch();}stmt.executeBatch();} catch (SQLException e) {e.printStackTrace();}}
}

4.代码

public class Main {public static void main(String[] args) {FileDownloadProcessor processor = new FileDownloadProcessor();processor.processFile("http://example.com/largefile.csv", () -> {System.out.println("所有数据已写入数据库,可以执行下一步操作");// 这里添加你的后续处理逻辑});}
}

完美!

http://www.xdnf.cn/news/10201.html

相关文章:

  • 仿真每日一练 | 静力学分析与动力学分析的区别
  • QT 第一讲 --- 基础篇 Qt 基础环境搭建
  • 做销售讲究接地气
  • 【Python-Day 20】揭秘Python变量作用域:LEGB规则与global/nonlocal关键字详解
  • 太阳诱电多层陶瓷电容器的优势和特点
  • springboot java.lang.ClassNotFoundException: dm.jdbc.driver.DmDriver应该如何解决
  • leetcode题解513:找树左下角的值(递归中的回溯处理)!
  • 【CF】Day70——Codeforces Round 896 (Div. 2) CD1 (排列 + 构造 | ⭐思维 + 数学)
  • 20250530-C#知识:抽象类、抽象方法、接口
  • nt!FsRtlFindLargeIndex函数分析计算在那个Mapping[(I)]数组中
  • 基于Java 实现 IM 业务回调
  • Java 之殇:从中流砥柱到“被温柔替代”
  • LeetCode Hot100(动态规划)
  • 04-redis-分布式锁-edisson
  • yum安装nginx后无法通过服务方式启动
  • 企业知识库问答系统避坑指南:检索优化与生成一致性解决方案
  • [ctfshow web入门] web80
  • 2.测试项目启动和研读需求文档
  • js 动画库、2048核心逻辑、面试题add[1][2][3]+4
  • Datatable和实体集合互转
  • 华锐视点助力,虚拟旅游绽放更璀璨光彩​
  • 图书管理系统的设计与实现
  • 北京大学肖臻老师《区块链技术与应用》公开课:06-BTC-网络
  • canoe 排查配置相关【graphics,capl】
  • Python基本运算符
  • python装饰器
  • DSP处理数字信号做什么用的?
  • Unsafe.putOrderedInt与Volatile
  • 驱动灯珠芯片LT3743手册理解
  • phpmyadmin