SpringBoot系列之实现高效批量写入数据
Spring Boot 实现高效批量插入数据的实践指南
在实际开发中,我们经常会遇到需要批量插入大量数据到数据库的场景。如果使用传统的单条插入方式,不仅效率低下,还会给数据库带来巨大压力。本文将介绍如何使用 Spring Boot 实现高效
批量数据插入,并通过具体代码示例展示优化过程。
项目背景与需求
我们需要实现一个接口,能够高效地向数据库插入大量用户数据。具体要求如下:
-
支持高并发场景下的批量插入需求
-
保证插入效率,减少数据库压力
-
实现异步处理,避免接口超时
技术选型
- 框架:Spring Boot 3.3.2
- 数据库:MySQL
- ORM:Spring Data JPA
- 工具类:Hutool
- 构建工具:Maven
项目结构
springboot-batch-insert/
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ └── com/
│ │ │ └── example/
│ │ │ └── springboot/
│ │ │ ├── SpringbootBatchInsertApplication.java
│ │ │ ├── controller/
│ │ │ │ └── BatchInsertController.java
│ │ │ ├── service/
│ │ │ │ └── BatchInsertService.java
│ │ │ ├── repository/
│ │ │ │ └── UserRepository.java
│ │ │ ├── model/
│ │ │ │ └── User.java
│ │ │ └── configuration/
│ │ │ └── AsyncConfig.java
│ │ └── resources/
│ │ ├── application.properties
│ │ └── application.yml
│ └── test/
│ └── java/
│ └── com/
│ └── example/
│ └── springboot/
│ └── SpringBatchInsertApplicationTests.java
└── pom.xml
核心代码实现
1. 配置文件
首先,我们需要配置数据库连接和 JPA 相关属性,在 application.yml
中:
server:port: 8080spring:datasource:url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=trueusername: rootpassword: rootdriver-class-name: com.mysql.cj.jdbc.Driverjpa:hibernate:ddl-auto: updateshow-sql: falseproperties:hibernate:format_sql: truedialect: org.hibernate.dialect.MySQL8Dialectjdbc:batch_size: 1000order_inserts: truebatch_versioned_data: truelogging:level:org.hibernate.SQL: WARNorg.hibernate.type.descriptor.sql.BasicBinder: WARNcom.example: INFO
关键配置说明:
-
rewriteBatchedStatements=true
:开启 MySQL 的批量插入优化 -
hibernate.jdbc.batch_size=1000
:设置 Hibernate 批量操作大小 -
hibernate.order_inserts=true
:优化批量插入性能
2. 实体类定义
创建 User
实体类:
package com.example.springboot.model;import jakarta.persistence.*;
import lombok.Data;import java.time.LocalDateTime;/*** 用户实体类*/@Data
@Entity
@Table(name = "sys_user")
public class User {@Idprivate Long id;/*** 用户名*/@Column(nullable = false, length = 50)private String username;/*** 密码*/@Column(nullable = false, length = 100)private String password;/*** 邮箱*/@Column(nullable = false, length = 100)private String email;/*** 创建时间*/@Column(name = "create_time", nullable = false, updatable = false)@Builder.Defaultprivate LocalDateTime createTime = LocalDateTime.now();/*** 修改时间*/@Column(name = "modify_time", nullable = false)@Builder.Defaultprivate LocalDateTime modifyTime = LocalDateTime.now();}
3. 数据库访问层
创建 UserRepository
接口,定义批量插入方法:
package com.example.springboot.repository;import com.example.springboot.model.User;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;@Repository
public interface UserRepository extends JpaRepository<User, Long> {}
4. 异步配置
创建 AsyncConfig
配置类,定义线程池:
package com.example.springboot.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;/*** 异步线程池配置*/
// 修改 AsyncConfig.java
@Configuration
@EnableAsync
public class AsyncConfig {@Bean(name = "batchInsertExecutor")public Executor batchInsertExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();int coreThreads = Runtime.getRuntime().availableProcessors();executor.setCorePoolSize(coreThreads);executor.setMaxPoolSize(coreThreads * 2);executor.setQueueCapacity(1000);executor.setThreadNamePrefix("BatchInsert-");executor.setKeepAliveSeconds(60);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}
}
5. 服务层实现
创建 BatchInsertService
服务类,实现批量插入逻辑:
package com.example.springboot.service;import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.IdUtil;
import com.example.springboot.model.User;
import com.example.springboot.repository.UserRepository;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;@Service
public class BatchInsertService {private Logger log = LoggerFactory.getLogger(BatchInsertService.class);@Autowired@Qualifier("batchInsertExecutor")private Executor batchInsertExecutor;@Autowiredprivate UserRepository userRepository;private static final Integer BATCH_SIZE = 1000;public CompletableFuture<Void> processBatchInsert(Integer totalCount) {if (totalCount <= 0) {throw new IllegalArgumentException("插入数量必须大于0");}// 记录总任务开始时间Instant totalStart = Instant.now();log.info("批量插入开始,总数量:{} ", totalCount);// 1. 生成测试数据并记录耗时Instant dataGenStart = Instant.now();List<User> testData = generateTestData(totalCount);long dataGenCost = java.time.Duration.between(dataGenStart, Instant.now()).toMillis();log.info("测试数据生成完成,耗时:{}ms", dataGenCost);// 2. 分割数据并记录耗时Instant splitStart = Instant.now();List<List<User>> partitionedUserList = CollUtil.split(testData, BATCH_SIZE);long splitCost = java.time.Duration.between(splitStart, Instant.now()).toMillis();log.info("数据分割完成,分成 {} 批,每批 {} 条,耗时:{}ms",partitionedUserList.size(), BATCH_SIZE, splitCost);// 3. 批量插入并记录每批耗时List<CompletableFuture<Void>> futures = new ArrayList<>();for (List<User> batch : partitionedUserList) {final int batchSize = batch.size();// 记录当前批次开始时间Instant batchStart = Instant.now();CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {userRepository.saveAll(batch);// 计算当前批次耗时long batchCost = java.time.Duration.between(batchStart, Instant.now()).toMillis();log.info("批次插入完成,批次大小:{},耗时:{}ms", batchSize, batchCost);} catch (Exception e) {log.error("批次插入失败,批次大小:{},错误:{}", batchSize, e.getMessage(), e);throw e;}}, batchInsertExecutor);futures.add(future);}// 4. 等待所有批次完成并记录总耗时return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((v, e) -> {long totalCost = java.time.Duration.between(totalStart, Instant.now()).toMillis();if (e != null) {log.error("批量插入失败,总耗时:{}ms ", totalCost, e);} else {log.info("批量插入完成,总数量:{},总耗时:{}ms,平均每条耗时:{}ms",totalCount, totalCost, totalCount > 0 ? (totalCost / totalCount) : 0);}});}private List<User> generateTestData(Integer count) {List<User> users = new ArrayList<>(count);for (int i = 0; i < count; i++) {User user = new User();user.setId(IdUtil.getSnowflake().nextId());user.setUsername("user" + i);user.setPassword(IdUtil.fastSimpleUUID());user.setEmail("user" + i + "@example.com");user.setCreateTime(LocalDateTimeUtil.now());user.setModifyTime(LocalDateTimeUtil.now());users.add(user);}return users;}
}
6. 控制器实现
创建 BatchInsertController
控制器,提供 API 接口:
package com.example.springboot.controller;import com.example.springboot.service.BatchInsertService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/api/v1")
public class BatchInsertController {private Logger log = LoggerFactory.getLogger(BatchInsertController.class);@Autowiredprivate BatchInsertService batchInsertService;/*** 提供API接口,用于触发批量插入*/@PostMapping("/batch-insert")public ResponseEntity<String> triggerBatchInsert(@RequestParam(defaultValue = "10000") int count) {if (count <= 0) {return ResponseEntity.badRequest().body("插入数量必须大于0");}try {batchInsertService.processBatchInsert(count);return ResponseEntity.accepted().body("批量插入任务已启动,将异步执行!");} catch (Exception e) {log.error("Failed to start batch insert", e);return ResponseEntity.internalServerError().body("启动批量插入失败");}}
}
7. 启动类
package com.example.springboot;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class SpringbootBatchInsertApplication {public static void main(String[] args) {SpringApplication.run(SpringbootBatchInsertApplication.class, args);}}
关键技术点解析
- 批量插入优化:
-
数据库连接参数
rewriteBatchedStatements=true
开启 MySQL 批量插入优化 -
Hibernate 配置
batch_size=1000
实现批量操作 -
分批次处理数据,避免一次性加载过多数据到内存
- 异步处理:
-
使用
@EnableAsync
开启异步功能 -
自定义线程池,合理设置核心线程数、最大线程数等参数
-
使用
CompletableFuture
实现异步编程,提高接口响应速度
- 事务管理:
-
在批量插入方法上添加
@Transactional
注解,保证数据一致性 -
合理控制事务粒度,避免长事务
性能优化建议
-
调整批次大小:根据数据库性能和网络情况,调整
BATCH_SIZE
参数,通常在 500-2000 之间效果较好 -
线程池优化:
- 核心线程数不宜过多,一般设置为 CPU 核心数
- 最大线程数根据系统资源和数据库连接数合理设置
- 合理设置队列容量,避免内存溢出
-
JVM 优化:根据数据量大小,适当调整 JVM 堆内存大小
-
数据库优化:
- 确保表结构合理,索引设计恰当
- 批量插入期间可暂时关闭索引,插入完成后再重建
- 考虑使用数据库分区表
总结
本文介绍了如何使用 Spring Boot 实现高效的批量数据插入功能,通过分批次处理、异步执行和数据库优化等手段,显著提高了大量数据插入的效率。在实际应用中,还需要根据具体业务场景和数据量大小,进一步调整和优化参数,以达到最佳性能。