Spring Batch学习,和Spring Cloud Stream区别
Spring Batch学习,和Spring Cloud Stream区别
- 1. 使用Spring Initializr创建项目
- 2. 使用步骤构建作业(Chunk 模式)
- 🧩 场景说明
- 🧰 1. 示例目录结构
- 📄 2. 创建输入文件(`users.csv`)
- 🧱 3. 创建实体类(`User.java`)
- 🔄 4. 编写处理器(`UserItemProcessor.java`)
- ⚙️ 5. Job 配置(`ChunkJobConfig.java`)
- 🚀 6. 启动类(`BatchDemoApplication.java`)
- 📄 7. 配置文件(`application.yml`)
- ✅ 8. 启动后输出示例
- 🎓 小结
- 3. 添加容错功能(retry、skip)
- 🎯 场景目标
- 🧱 修改点概览
- 🧩 1. 修改 `UserItemProcessor.java`,模拟失败逻辑
- 🛠️ 2. 修改 `chunkStep()`:添加容错配置
- 📄 3. 示例 CSV 文件(`users.csv`)
- ✅ 4. 启动后控制台输出示例
- 🔎 总结关键 API
- 4. 多步骤 Job 示例(多个 Step 串联)
- 🎯 模块目标
- 🗂 项目结构说明(在原基础上新增)
- 📄 1. 创建 Tasklet 步骤:通知任务(`NotificationTasklet.java`)
- ⚙️ 2. 多步骤 Job 配置(`MultiStepJobConfig.java`)
- ✅ 3. 控制台输出预期
- 🧠 关键知识点
- 5. 定时调度执行 Spring Batch Job
- 📦 添加定时任务类(`ScheduledJobLauncher.java`)
- ✅ 注意:
- 🔧 开启定时任务功能(`BatchDemoApplication.java`)
- 🛠 示例控制台输出(每 30 秒一次)
- 🎓 小结
- 6. Spring Cloud Stream和Spring Batch区别
- 🧠 一句话总结
- 🔍 核心区别详解
参考文章: Building a Batch Application with Spring Batch - Spring Academy
1. 使用Spring Initializr创建项目
[~/exercises] $ curl -o 'billing-job.zip' 'https://start.spring.io/starter.zip?type=gradle-project&language=java&dependencies=batch%2Cpostgresql&name=Billing+Job&groupId=example&artifactId=billing-job&description=Billing+job+for+Spring+Cellular&packaging=jar&packageName=example.billingjob&javaVersion=17' && unzip -d 'billing-job' 'billing-job.zip'
cd billing-job
查看目录
[~/exercises/billing-job] $ tree .
.
├── HELP.md
├── mvnw
├── mvnw.cmd
├── pom.xml
└── src├── main│ ├── java│ │ └── example│ │ └── billingjob│ │ └── BillingJobApplication.java│ └── resources│ └── application.properties└── test└── java└── example└── billingjob└── BillingJobApplicationTests.java11 directories, 8 files
创建配置文件
package example.billingjob;import org.springframework.context.annotation.Configuration;@Configuration
public class BillingJobConfiguration {// TODO add job definition here
}
[~/exercises] $ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
c711e7873371 postgres:14.1-alpine "docker-entrypoint.s…" 20 minutes ago Up 20 minutes 127.0.0.1:5432->5432/tcp postgres
[~/exercises] $
连接数据库创建表
[~/exercises] $ docker exec -it postgres psql -U postgres
psql (14.1)
Type "help" for help.
postgres=#
CREATE TABLE BATCH_JOB_INSTANCE (JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,VERSION BIGINT ,JOB_NAME VARCHAR(100) NOT NULL,JOB_KEY VARCHAR(32) NOT NULL,constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ;
CREATE TABLE BATCH_JOB_EXECUTION (JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,VERSION BIGINT ,JOB_INSTANCE_ID BIGINT NOT NULL,CREATE_TIME TIMESTAMP NOT NULL,START_TIME TIMESTAMP DEFAULT NULL ,END_TIME TIMESTAMP DEFAULT NULL ,STATUS VARCHAR(10) ,EXIT_CODE VARCHAR(2500) ,EXIT_MESSAGE VARCHAR(2500) ,LAST_UPDATED TIMESTAMP,constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (JOB_EXECUTION_ID BIGINT NOT NULL ,PARAMETER_NAME VARCHAR(100) NOT NULL ,PARAMETER_TYPE VARCHAR(100) NOT NULL ,PARAMETER_VALUE VARCHAR(2500) ,IDENTIFYING CHAR(1) NOT NULL ,constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE TABLE BATCH_STEP_EXECUTION (STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,VERSION BIGINT NOT NULL,STEP_NAME VARCHAR(100) NOT NULL,JOB_EXECUTION_ID BIGINT NOT NULL,CREATE_TIME TIMESTAMP NOT NULL,START_TIME TIMESTAMP DEFAULT NULL ,END_TIME TIMESTAMP DEFAULT NULL ,STATUS VARCHAR(10) ,COMMIT_COUNT BIGINT ,READ_COUNT BIGINT ,FILTER_COUNT BIGINT ,WRITE_COUNT BIGINT ,READ_SKIP_COUNT BIGINT ,WRITE_SKIP_COUNT BIGINT ,PROCESS_SKIP_COUNT BIGINT ,ROLLBACK_COUNT BIGINT ,EXIT_CODE VARCHAR(2500) ,EXIT_MESSAGE VARCHAR(2500) ,LAST_UPDATED TIMESTAMP,constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,SHORT_CONTEXT VARCHAR(2500) NOT NULL,SERIALIZED_CONTEXT TEXT ,constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ;
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,SHORT_CONTEXT VARCHAR(2500) NOT NULL,SERIALIZED_CONTEXT TEXT ,constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
检查创建的表
postgres=# \dList of relationsSchema | Name | Type | Owner
--------+------------------------------+----------+----------public | batch_job_execution | table | postgrespublic | batch_job_execution_context | table | postgrespublic | batch_job_execution_params | table | postgrespublic | batch_job_execution_seq | sequence | postgrespublic | batch_job_instance | table | postgrespublic | batch_job_seq | sequence | postgrespublic | batch_step_execution | table | postgrespublic | batch_step_execution_context | table | postgrespublic | batch_step_execution_seq | sequence | postgres
(9 rows)
postgres=#
修改配置billing-job/src/main/resources/application.properties
spring.datasource.url=jdbc:postgresql://localhost:5432/postgres
spring.datasource.username=postgres
spring.datasource.password=postgres
2. 使用步骤构建作业(Chunk 模式)
很好!下面是 Spring Batch 模块 2:使用步骤构建作业(Chunk 模式) 的完整示例,模拟一个典型的「读取 → 处理 → 写入」的场景。
🧩 场景说明
我们将读取一个 CSV 文件中的用户信息(users.csv
),处理数据(将用户名转为大写),然后将处理结果打印出来(模拟写入数据库)。
🧰 1. 示例目录结构
src/
├── main/
│ ├── java/
│ │ └── com/example/batch/
│ │ ├── BatchDemoApplication.java
│ │ ├── config/ChunkJobConfig.java
│ │ ├── model/User.java
│ │ ├── processor/UserItemProcessor.java
│ ├── resources/
│ ├── users.csv
│ └── application.yml
📄 2. 创建输入文件(users.csv
)
id,name
1,alice
2,bob
3,charlie
🧱 3. 创建实体类(User.java
)
public class User {private Long id;private String name;// 构造方法、getter/setterpublic User() {}public User(Long id, String name) {this.id = id;this.name = name;}public Long getId() { return id; }public void setId(Long id) { this.id = id; }public String getName() { return name; }public void setName(String name) { this.name = name; }@Overridepublic String toString() {return "User{id=" + id + ", name='" + name + "'}";}
}
🔄 4. 编写处理器(UserItemProcessor.java
)
@Component
public class UserItemProcessor implements ItemProcessor<User, User> {@Overridepublic User process(User user) {user.setName(user.getName().toUpperCase());return user;}
}
⚙️ 5. Job 配置(ChunkJobConfig.java
)
@Configuration
@EnableBatchProcessing
public class ChunkJobConfig {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate UserItemProcessor processor;// 读取器:从 CSV 文件中读取数据@Beanpublic FlatFileItemReader<User> reader() {return new FlatFileItemReaderBuilder<User>().name("userItemReader").resource(new ClassPathResource("users.csv")).delimited().names("id", "name").fieldSetMapper(fieldSet -> new User(fieldSet.readLong("id"),fieldSet.readString("name"))).linesToSkip(1) // 跳过标题行.build();}// 写入器:将处理后的数据打印到控制台(模拟写数据库)@Beanpublic ItemWriter<User> writer() {return users -> {System.out.println("📥 写入数据:");users.forEach(System.out::println);};}@Beanpublic Step chunkStep() {return stepBuilderFactory.get("chunkStep").<User, User>chunk(2) // 每2个为一个块处理.reader(reader()).processor(processor).writer(writer()).build();}@Beanpublic Job chunkJob() {return jobBuilderFactory.get("chunkJob").start(chunkStep()).build();}
}
🚀 6. 启动类(BatchDemoApplication.java
)
@SpringBootApplication
public class BatchDemoApplication {public static void main(String[] args) {SpringApplication.run(BatchDemoApplication.class, args);}
}
📄 7. 配置文件(application.yml
)
spring:batch:job:enabled: truedatasource:url: jdbc:h2:mem:testdbdriver-class-name: org.h2.Driverusername: sapassword:h2:console:enabled: true
✅ 8. 启动后输出示例
📥 写入数据:
User{id=1, name='ALICE'}
User{id=2, name='BOB'}
📥 写入数据:
User{id=3, name='CHARLIE'}
🎓 小结
部件 | 作用 |
---|---|
ItemReader | 读取数据源(CSV) |
ItemProcessor | 转换数据(小写转大写) |
ItemWriter | 输出结果(控制台) |
chunk(n) | 每 n 条数据为一个事务处理块 |
3. 添加容错功能(retry、skip)
🎯 场景目标
在上一个示例基础上,模拟某些数据处理失败 的情况,并配置:
- ✅ 自动重试指定异常(如最多重试 2 次)
- ✅ 跳过指定异常(跳过错误记录继续执行)
🧱 修改点概览
部分 | 修改 |
---|---|
processor | 模拟处理过程中抛异常 |
chunkStep() | 添加 .faultTolerant() 、.retry() 、.skip() 配置 |
控制台输出 | 可看到错误被捕获、重试、跳过后的行为 |
🧩 1. 修改 UserItemProcessor.java
,模拟失败逻辑
@Component
public class UserItemProcessor implements ItemProcessor<User, User> {@Overridepublic User process(User user) throws Exception {// 模拟用户 id 为 2 的处理会失败if (user.getId() == 2) {System.out.println("❌ 模拟处理异常:用户 " + user.getName());throw new IllegalArgumentException("处理用户失败!");}user.setName(user.getName().toUpperCase());return user;}
}
🛠️ 2. 修改 chunkStep()
:添加容错配置
@Bean
public Step chunkStep() {return stepBuilderFactory.get("chunkStep").<User, User>chunk(2).reader(reader()).processor(processor).writer(writer()).faultTolerant() // 开启容错模式.retry(IllegalArgumentException.class) // 指定重试异常.retryLimit(2) // 最多重试 2 次.skip(IllegalArgumentException.class) // 如果还失败则跳过.skipLimit(5) // 最多跳过 5 个.build();
}
📄 3. 示例 CSV 文件(users.csv
)
id,name
1,alice
2,bob <-- 模拟失败
3,charlie
✅ 4. 启动后控制台输出示例
👉 正在读取用户数据...
📥 写入数据:
User{id=1, name='ALICE'}
❌ 模拟处理异常:用户 bob
⚠️ 重试第1次...
❌ 模拟处理异常:用户 bob
⚠️ 重试第2次...
❌ 模拟处理异常:用户 bob
⚠️ 达到重试次数限制,跳过该用户📥 写入数据:
User{id=3, name='CHARLIE'}
你将看到:
- 用户
bob
被处理时故意失败 - 自动重试 2 次
- 重试仍失败 → 被跳过
- Job 正常完成,未中断!
🔎 总结关键 API
方法 | 含义 |
---|---|
.faultTolerant() | 启用容错处理 |
.retry(Exception.class) | 出现该异常时会自动重试 |
.retryLimit(n) | 最大重试次数 |
.skip(Exception.class) | 如果重试仍失败,可以跳过 |
.skipLimit(n) | 最大跳过条数,超过就 fail |
4. 多步骤 Job 示例(多个 Step 串联)
🎯 模块目标
我们将构建一个包含多个 Step 的 Job:
- Step 1:打印 Job 启动信息
- Step 2:执行 chunk 处理逻辑(读取、处理、写入用户)
- Step 3:清理任务或发送通知(模拟)
🗂 项目结构说明(在原基础上新增)
├── config/
│ └── MultiStepJobConfig.java <-- 新增:多个步骤的 Job 配置
├── service/
│ └── NotificationTasklet.java <-- 新增:通知步骤
📄 1. 创建 Tasklet 步骤:通知任务(NotificationTasklet.java
)
@Component
public class NotificationTasklet implements Tasklet {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {System.out.println("📣 所有用户处理完成,发送通知!");return RepeatStatus.FINISHED;}
}
⚙️ 2. 多步骤 Job 配置(MultiStepJobConfig.java
)
@Configuration
public class MultiStepJobConfig {@Autowired private JobBuilderFactory jobBuilderFactory;@Autowired private StepBuilderFactory stepBuilderFactory;@Autowired private FlatFileItemReader<User> reader;@Autowired private ItemProcessor<User, User> processor;@Autowired private ItemWriter<User> writer;@Autowired private NotificationTasklet notificationTasklet;@Beanpublic Step startStep() {return stepBuilderFactory.get("startStep").tasklet((contribution, context) -> {System.out.println("🚀 Job 开始执行!");return RepeatStatus.FINISHED;}).build();}@Beanpublic Step chunkStep() {return stepBuilderFactory.get("chunkStep").<User, User>chunk(2).reader(reader).processor(processor).writer(writer).faultTolerant().retry(IllegalArgumentException.class).retryLimit(2).skip(IllegalArgumentException.class).skipLimit(5).build();}@Beanpublic Step notifyStep() {return stepBuilderFactory.get("notifyStep").tasklet(notificationTasklet).build();}@Beanpublic Job multiStepJob() {return jobBuilderFactory.get("multiStepJob").start(startStep()).next(chunkStep()).next(notifyStep()).build();}
}
✅ 3. 控制台输出预期
🚀 Job 开始执行!
📥 写入数据:
User{id=1, name='ALICE'}
❌ 模拟处理异常:用户 bob
⚠️ 重试中...
📥 写入数据:
User{id=3, name='CHARLIE'}
📣 所有用户处理完成,发送通知!
🧠 关键知识点
概念 | 说明 |
---|---|
多个 Step | 使用 .start().next().next() 串联 |
Tasklet Step | 适合执行单个逻辑(如日志、通知) |
Chunk Step | 适合批量数据处理 |
JobBuilder | 构建包含多个 Step 的流程 |
5. 定时调度执行 Spring Batch Job
📦 添加定时任务类(ScheduledJobLauncher.java
)
@Component
public class ScheduledJobLauncher {@Autowiredprivate JobLauncher jobLauncher;@Autowired@Qualifier("multiStepJob") // 使用我们前面定义的 Jobprivate Job job;@Scheduled(fixedRate = 30000) // 每 30 秒触发一次public void runJob() {try {JobParameters params = new JobParametersBuilder().addLong("timestamp", System.currentTimeMillis()) // 保证每次唯一.toJobParameters();System.out.println("⏰ 定时任务启动 Job...");jobLauncher.run(job, params);} catch (Exception e) {System.err.println("❌ Job 启动失败:" + e.getMessage());}}
}
✅ 注意:
Spring Batch 的 Job 每次执行都需要唯一参数(否则不会重复执行),因此我们加了:
.addLong("timestamp", System.currentTimeMillis())
🔧 开启定时任务功能(BatchDemoApplication.java
)
@SpringBootApplication
@EnableScheduling // 启用定时任务功能
public class BatchDemoApplication {public static void main(String[] args) {SpringApplication.run(BatchDemoApplication.class, args);}
}
🛠 示例控制台输出(每 30 秒一次)
⏰ 定时任务启动 Job...
🚀 Job 开始执行!
📥 写入数据:
User{id=1, name='ALICE'}
❌ 模拟处理异常:用户 bob
📥 写入数据:
User{id=3, name='CHARLIE'}
📣 所有用户处理完成,发送通知!
🎓 小结
组件 | 功能 |
---|---|
@Scheduled | 每隔一段时间自动触发 Job |
JobLauncher | 手动执行指定 Job |
JobParametersBuilder | 创建 Job 参数,确保唯一性 |
@EnableScheduling | 启用定时任务功能 |
6. Spring Cloud Stream和Spring Batch区别
🧠 一句话总结
框架 | 关注点 | 用于什么 |
---|---|---|
Spring Cloud Stream | 实时消息流处理 | 处理从消息队列(如 Kafka、RabbitMQ)中来的事件流 |
Spring Batch | 批量任务处理 | 处理大量结构化数据的离线批处理任务,如夜间账单 |
🔍 核心区别详解
特性 / 区别点 | Spring Cloud Stream | Spring Batch |
---|---|---|
💡 处理模式 | 异步、实时流式处理(事件驱动) | 同步、批量处理(定时或手动触发) |
🕘 适用场景 | IoT 数据流、订单事件、消息队列消费者、微服务事件链路 | 日终结算、数据库导入导出、文件解析、大规模数据迁移 |
🔌 输入来源 | Kafka、RabbitMQ、Pulsar 等消息中间件 | 数据库、CSV、XML、REST 接口等 |
🔄 输出目标 | 下游队列或服务 | 数据库、文件、API |
🧱 组成模型 | Supplier、Function、Consumer | Step、Job、ItemReader、ItemProcessor、ItemWriter |
💥 故障处理 | 支持 Retry、DLQ、分区等 | 支持跳过、重试、事务、Job Restart |
🛠️ 持久化状态 | 一般无状态,靠中间件保证可靠传递 | 有状态,支持 Job Execution 状态保存(如重启恢复) |
🧪 测试/调试 | 流处理链可拆解为小函数,易于集成测试 | Job 参数可控制执行,适合验证数据处理逻辑 |
🧰 配置方式 | application.yml(通道绑定) | XML/Java DSL 配置 Job、Step、Reader 等 |