当前位置: 首页 > java >正文

SpringBoot系列之实现高效批量写入数据

Spring Boot 实现高效批量插入数据的实践指南

在实际开发中,我们经常会遇到需要批量插入大量数据到数据库的场景。如果使用传统的单条插入方式,不仅效率低下,还会给数据库带来巨大压力。本文将介绍如何使用 Spring Boot 实现高效

批量数据插入,并通过具体代码示例展示优化过程。

项目背景与需求

我们需要实现一个接口,能够高效地向数据库插入大量用户数据。具体要求如下:

  • 支持高并发场景下的批量插入需求

  • 保证插入效率,减少数据库压力

  • 实现异步处理,避免接口超时

技术选型

  • 框架:Spring Boot 3.3.2
  • 数据库:MySQL
  • ORM:Spring Data JPA
  • 工具类:Hutool
  • 构建工具:Maven

项目结构

springboot-batch-insert/
├── src/
│   ├── main/
│   │   ├── java/
│   │   │   └── com/
│   │   │       └── example/
│   │   │           └── springboot/
│   │   │               ├── SpringbootBatchInsertApplication.java
│   │   │               ├── controller/
│   │   │               │   └── BatchInsertController.java
│   │   │               ├── service/
│   │   │               │   └── BatchInsertService.java
│   │   │               ├── repository/
│   │   │               │   └── UserRepository.java
│   │   │               ├── model/
│   │   │               │   └── User.java
│   │   │               └── configuration/
│   │   │                   └── AsyncConfig.java
│   │   └── resources/
│   │       ├── application.properties
│   │       └── application.yml
│   └── test/
│       └── java/
│           └── com/
│               └── example/
│                   └── springboot/
│                       └── SpringBatchInsertApplicationTests.java
└── pom.xml

核心代码实现

1. 配置文件

首先,我们需要配置数据库连接和 JPA 相关属性,在 application.yml 中:

server:port: 8080spring:datasource:url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=trueusername: rootpassword: rootdriver-class-name: com.mysql.cj.jdbc.Driverjpa:hibernate:ddl-auto: updateshow-sql: falseproperties:hibernate:format_sql: truedialect: org.hibernate.dialect.MySQL8Dialectjdbc:batch_size: 1000order_inserts: truebatch_versioned_data: truelogging:level:org.hibernate.SQL: WARNorg.hibernate.type.descriptor.sql.BasicBinder: WARNcom.example: INFO

关键配置说明:

  • rewriteBatchedStatements=true:开启 MySQL 的批量插入优化

  • hibernate.jdbc.batch_size=1000:设置 Hibernate 批量操作大小

  • hibernate.order_inserts=true:优化批量插入性能

2. 实体类定义

创建 User 实体类:

package com.example.springboot.model;import jakarta.persistence.*;
import lombok.Data;import java.time.LocalDateTime;/*** 用户实体类*/@Data
@Entity
@Table(name = "sys_user")
public class User {@Idprivate Long id;/*** 用户名*/@Column(nullable = false, length = 50)private String username;/*** 密码*/@Column(nullable = false, length = 100)private String password;/*** 邮箱*/@Column(nullable = false, length = 100)private String email;/*** 创建时间*/@Column(name = "create_time", nullable = false, updatable = false)@Builder.Defaultprivate LocalDateTime createTime = LocalDateTime.now();/*** 修改时间*/@Column(name = "modify_time", nullable = false)@Builder.Defaultprivate LocalDateTime modifyTime = LocalDateTime.now();}

3. 数据库访问层

创建 UserRepository 接口,定义批量插入方法:

package com.example.springboot.repository;import com.example.springboot.model.User;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;@Repository
public interface UserRepository extends JpaRepository<User, Long> {}

4. 异步配置

创建 AsyncConfig 配置类,定义线程池:

package com.example.springboot.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;/*** 异步线程池配置*/
// 修改 AsyncConfig.java
@Configuration
@EnableAsync
public class AsyncConfig {@Bean(name = "batchInsertExecutor")public Executor batchInsertExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();int coreThreads = Runtime.getRuntime().availableProcessors();executor.setCorePoolSize(coreThreads);executor.setMaxPoolSize(coreThreads * 2);executor.setQueueCapacity(1000);executor.setThreadNamePrefix("BatchInsert-");executor.setKeepAliveSeconds(60);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}
}

5. 服务层实现

创建 BatchInsertService 服务类,实现批量插入逻辑:

package com.example.springboot.service;import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.IdUtil;
import com.example.springboot.model.User;
import com.example.springboot.repository.UserRepository;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;@Service
public class BatchInsertService {private Logger log = LoggerFactory.getLogger(BatchInsertService.class);@Autowired@Qualifier("batchInsertExecutor")private Executor batchInsertExecutor;@Autowiredprivate UserRepository userRepository;private static final Integer BATCH_SIZE = 1000;public CompletableFuture<Void> processBatchInsert(Integer totalCount) {if (totalCount <= 0) {throw new IllegalArgumentException("插入数量必须大于0");}// 记录总任务开始时间Instant totalStart = Instant.now();log.info("批量插入开始,总数量:{} ", totalCount);// 1. 生成测试数据并记录耗时Instant dataGenStart = Instant.now();List<User> testData = generateTestData(totalCount);long dataGenCost = java.time.Duration.between(dataGenStart, Instant.now()).toMillis();log.info("测试数据生成完成,耗时:{}ms", dataGenCost);// 2. 分割数据并记录耗时Instant splitStart = Instant.now();List<List<User>> partitionedUserList = CollUtil.split(testData, BATCH_SIZE);long splitCost = java.time.Duration.between(splitStart, Instant.now()).toMillis();log.info("数据分割完成,分成 {} 批,每批 {} 条,耗时:{}ms",partitionedUserList.size(), BATCH_SIZE, splitCost);// 3. 批量插入并记录每批耗时List<CompletableFuture<Void>> futures = new ArrayList<>();for (List<User> batch : partitionedUserList) {final int batchSize = batch.size();// 记录当前批次开始时间Instant batchStart = Instant.now();CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {userRepository.saveAll(batch);// 计算当前批次耗时long batchCost = java.time.Duration.between(batchStart, Instant.now()).toMillis();log.info("批次插入完成,批次大小:{},耗时:{}ms", batchSize, batchCost);} catch (Exception e) {log.error("批次插入失败,批次大小:{},错误:{}", batchSize, e.getMessage(), e);throw e;}}, batchInsertExecutor);futures.add(future);}// 4. 等待所有批次完成并记录总耗时return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((v, e) -> {long totalCost = java.time.Duration.between(totalStart, Instant.now()).toMillis();if (e != null) {log.error("批量插入失败,总耗时:{}ms ", totalCost, e);} else {log.info("批量插入完成,总数量:{},总耗时:{}ms,平均每条耗时:{}ms",totalCount, totalCost, totalCount > 0 ? (totalCost / totalCount) : 0);}});}private List<User> generateTestData(Integer count) {List<User> users = new ArrayList<>(count);for (int i = 0; i < count; i++) {User user = new User();user.setId(IdUtil.getSnowflake().nextId());user.setUsername("user" + i);user.setPassword(IdUtil.fastSimpleUUID());user.setEmail("user" + i + "@example.com");user.setCreateTime(LocalDateTimeUtil.now());user.setModifyTime(LocalDateTimeUtil.now());users.add(user);}return users;}
}

6. 控制器实现

创建 BatchInsertController 控制器,提供 API 接口:

package com.example.springboot.controller;import com.example.springboot.service.BatchInsertService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/api/v1")
public class BatchInsertController {private Logger log = LoggerFactory.getLogger(BatchInsertController.class);@Autowiredprivate BatchInsertService batchInsertService;/*** 提供API接口,用于触发批量插入*/@PostMapping("/batch-insert")public ResponseEntity<String> triggerBatchInsert(@RequestParam(defaultValue = "10000") int count) {if (count <= 0) {return ResponseEntity.badRequest().body("插入数量必须大于0");}try {batchInsertService.processBatchInsert(count);return ResponseEntity.accepted().body("批量插入任务已启动,将异步执行!");} catch (Exception e) {log.error("Failed to start batch insert", e);return ResponseEntity.internalServerError().body("启动批量插入失败");}}
}

7. 启动类

package com.example.springboot;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class SpringbootBatchInsertApplication {public static void main(String[] args) {SpringApplication.run(SpringbootBatchInsertApplication.class, args);}}

关键技术点解析

  1. 批量插入优化
  • 数据库连接参数 rewriteBatchedStatements=true 开启 MySQL 批量插入优化

  • Hibernate 配置 batch_size=1000 实现批量操作

  • 分批次处理数据,避免一次性加载过多数据到内存

  1. 异步处理
  • 使用 @EnableAsync 开启异步功能

  • 自定义线程池,合理设置核心线程数、最大线程数等参数

  • 使用 CompletableFuture 实现异步编程,提高接口响应速度

  1. 事务管理
  • 在批量插入方法上添加 @Transactional 注解,保证数据一致性

  • 合理控制事务粒度,避免长事务

性能优化建议

  1. 调整批次大小:根据数据库性能和网络情况,调整 BATCH_SIZE 参数,通常在 500-2000 之间效果较好

  2. 线程池优化

  • 核心线程数不宜过多,一般设置为 CPU 核心数
  • 最大线程数根据系统资源和数据库连接数合理设置
  • 合理设置队列容量,避免内存溢出
  1. JVM 优化:根据数据量大小,适当调整 JVM 堆内存大小

  2. 数据库优化

  • 确保表结构合理,索引设计恰当
  • 批量插入期间可暂时关闭索引,插入完成后再重建
  • 考虑使用数据库分区表

总结

本文介绍了如何使用 Spring Boot 实现高效的批量数据插入功能,通过分批次处理、异步执行和数据库优化等手段,显著提高了大量数据插入的效率。在实际应用中,还需要根据具体业务场景和数据量大小,进一步调整和优化参数,以达到最佳性能。

http://www.xdnf.cn/news/19013.html

相关文章:

  • 基础IO详解
  • 【前缀和】
  • Pandas的数据结构
  • 第十七章 Java基础-常用API-System
  • [p2p-Magnet] 数据模型(GORM) | DHT爬虫 | 分类器
  • React Hook+Ts+Antd+SpringBoot实现分片上传(前端)
  • 数据湖与数据仓库
  • Qt 中日志级别
  • ArcGIS+Fragstats:土地利用统计分析、景观格局指数计算与地图制图
  • Android Keystore签名文件详解与安全防护
  • AI视频生成工具全景对比:元宝AI、即梦AI、清影AI和Vidu AI
  • 【贪心 单调栈】P10334 [UESTCPC 2024] 饮料|普及+
  • 工业 5G + AI:智能制造的未来引擎
  • Day16_【机器学习建模流程】
  • 【Rust】 3. 语句与表达式笔记
  • Java HTTP 请求:Unirest 使用指南及与 HttpClient 对比
  • .Net Core Web 架构(Request Pipeline)的底层实现
  • 自己定义的模型如何用hf的from_pretrained
  • Linux(一) | 初识Linux与目录管理基础命令掌握
  • 测试题ansible临时命令模块
  • CuTe C++ 简介01,从示例开始
  • imx6ull-驱动开发篇47——Linux SPI 驱动实验
  • Electron解压缩文件
  • hive on tez为什么写表时,要写临时文件到hdfs目录
  • docker 1分钟 快速搭建 redis 哨兵集群
  • 配置nginx.conf (增加21001端口实例操作)
  • 医疗AI时代的生物医学Go编程:高性能计算与精准医疗的案例分析(三)
  • [灵动微电子 MM32BIN560CN MM32SPIN0280]读懂电机MCU之比较器
  • jQuery 从入门到实践:基础语法、事件与元素操作全解析
  • mac电脑双屏显示时程序坞跑到副屏的解决方法