当前位置: 首页 > backend >正文

Spring Batch学习,和Spring Cloud Stream区别

Spring Batch学习,和Spring Cloud Stream区别

  • 1. 使用Spring Initializr创建项目
  • 2. 使用步骤构建作业(Chunk 模式)
    • 🧩 场景说明
    • 🧰 1. 示例目录结构
    • 📄 2. 创建输入文件(`users.csv`)
    • 🧱 3. 创建实体类(`User.java`)
    • 🔄 4. 编写处理器(`UserItemProcessor.java`)
    • ⚙️ 5. Job 配置(`ChunkJobConfig.java`)
    • 🚀 6. 启动类(`BatchDemoApplication.java`)
    • 📄 7. 配置文件(`application.yml`)
    • ✅ 8. 启动后输出示例
    • 🎓 小结
  • 3. 添加容错功能(retry、skip)
    • 🎯 场景目标
    • 🧱 修改点概览
    • 🧩 1. 修改 `UserItemProcessor.java`,模拟失败逻辑
    • 🛠️ 2. 修改 `chunkStep()`:添加容错配置
    • 📄 3. 示例 CSV 文件(`users.csv`)
    • ✅ 4. 启动后控制台输出示例
    • 🔎 总结关键 API
  • 4. 多步骤 Job 示例(多个 Step 串联)
    • 🎯 模块目标
    • 🗂 项目结构说明(在原基础上新增)
    • 📄 1. 创建 Tasklet 步骤:通知任务(`NotificationTasklet.java`)
    • ⚙️ 2. 多步骤 Job 配置(`MultiStepJobConfig.java`)
    • ✅ 3. 控制台输出预期
    • 🧠 关键知识点
  • 5. 定时调度执行 Spring Batch Job
    • 📦 添加定时任务类(`ScheduledJobLauncher.java`)
    • ✅ 注意:
    • 🔧 开启定时任务功能(`BatchDemoApplication.java`)
    • 🛠 示例控制台输出(每 30 秒一次)
    • 🎓 小结
  • 6. Spring Cloud Stream和Spring Batch区别
    • 🧠 一句话总结
    • 🔍 核心区别详解

参考文章: Building a Batch Application with Spring Batch - Spring Academy

1. 使用Spring Initializr创建项目

[~/exercises] $ curl -o 'billing-job.zip' 'https://start.spring.io/starter.zip?type=gradle-project&language=java&dependencies=batch%2Cpostgresql&name=Billing+Job&groupId=example&artifactId=billing-job&description=Billing+job+for+Spring+Cellular&packaging=jar&packageName=example.billingjob&javaVersion=17' && unzip -d 'billing-job' 'billing-job.zip'
cd billing-job

查看目录

[~/exercises/billing-job] $ tree .
.
├── HELP.md
├── mvnw
├── mvnw.cmd
├── pom.xml
└── src├── main│   ├── java│   │   └── example│   │       └── billingjob│   │           └── BillingJobApplication.java│   └── resources│       └── application.properties└── test└── java└── example└── billingjob└── BillingJobApplicationTests.java11 directories, 8 files

创建配置文件

package example.billingjob;import org.springframework.context.annotation.Configuration;@Configuration
public class BillingJobConfiguration {// TODO add job definition here
}
[~/exercises] $ docker ps
CONTAINER ID   IMAGE                  COMMAND                  CREATED          STATUS          PORTS                      NAMES
c711e7873371   postgres:14.1-alpine   "docker-entrypoint.s…"   20 minutes ago   Up 20 minutes   127.0.0.1:5432->5432/tcp   postgres
[~/exercises] $

连接数据库创建表

[~/exercises] $ docker exec -it postgres psql -U postgres
psql (14.1)
Type "help" for help.
postgres=#
CREATE TABLE BATCH_JOB_INSTANCE  (JOB_INSTANCE_ID BIGINT  NOT NULL PRIMARY KEY ,VERSION BIGINT ,JOB_NAME VARCHAR(100) NOT NULL,JOB_KEY VARCHAR(32) NOT NULL,constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ;
CREATE TABLE BATCH_JOB_EXECUTION  (JOB_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,VERSION BIGINT  ,JOB_INSTANCE_ID BIGINT NOT NULL,CREATE_TIME TIMESTAMP NOT NULL,START_TIME TIMESTAMP DEFAULT NULL ,END_TIME TIMESTAMP DEFAULT NULL ,STATUS VARCHAR(10) ,EXIT_CODE VARCHAR(2500) ,EXIT_MESSAGE VARCHAR(2500) ,LAST_UPDATED TIMESTAMP,constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (JOB_EXECUTION_ID BIGINT NOT NULL ,PARAMETER_NAME VARCHAR(100) NOT NULL ,PARAMETER_TYPE VARCHAR(100) NOT NULL ,PARAMETER_VALUE VARCHAR(2500) ,IDENTIFYING CHAR(1) NOT NULL ,constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE TABLE BATCH_STEP_EXECUTION  (STEP_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,VERSION BIGINT NOT NULL,STEP_NAME VARCHAR(100) NOT NULL,JOB_EXECUTION_ID BIGINT NOT NULL,CREATE_TIME TIMESTAMP NOT NULL,START_TIME TIMESTAMP DEFAULT NULL ,END_TIME TIMESTAMP DEFAULT NULL ,STATUS VARCHAR(10) ,COMMIT_COUNT BIGINT ,READ_COUNT BIGINT ,FILTER_COUNT BIGINT ,WRITE_COUNT BIGINT ,READ_SKIP_COUNT BIGINT ,WRITE_SKIP_COUNT BIGINT ,PROCESS_SKIP_COUNT BIGINT ,ROLLBACK_COUNT BIGINT ,EXIT_CODE VARCHAR(2500) ,EXIT_MESSAGE VARCHAR(2500) ,LAST_UPDATED TIMESTAMP,constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT  (STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,SHORT_CONTEXT VARCHAR(2500) NOT NULL,SERIALIZED_CONTEXT TEXT ,constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ;
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,SHORT_CONTEXT VARCHAR(2500) NOT NULL,SERIALIZED_CONTEXT TEXT ,constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_SEQ MAXVALUE 9223372036854775807 NO CYCLE;

检查创建的表

postgres=# \dList of relationsSchema |             Name             |   Type   |  Owner
--------+------------------------------+----------+----------public | batch_job_execution          | table    | postgrespublic | batch_job_execution_context  | table    | postgrespublic | batch_job_execution_params   | table    | postgrespublic | batch_job_execution_seq      | sequence | postgrespublic | batch_job_instance           | table    | postgrespublic | batch_job_seq                | sequence | postgrespublic | batch_step_execution         | table    | postgrespublic | batch_step_execution_context | table    | postgrespublic | batch_step_execution_seq     | sequence | postgres
(9 rows)
postgres=#

修改配置billing-job/src/main/resources/application.properties

spring.datasource.url=jdbc:postgresql://localhost:5432/postgres
spring.datasource.username=postgres
spring.datasource.password=postgres

2. 使用步骤构建作业(Chunk 模式)

很好!下面是 Spring Batch 模块 2:使用步骤构建作业(Chunk 模式) 的完整示例,模拟一个典型的「读取 → 处理 → 写入」的场景。


🧩 场景说明

我们将读取一个 CSV 文件中的用户信息(users.csv),处理数据(将用户名转为大写),然后将处理结果打印出来(模拟写入数据库)。


🧰 1. 示例目录结构

src/
├── main/
│   ├── java/
│   │   └── com/example/batch/
│   │       ├── BatchDemoApplication.java
│   │       ├── config/ChunkJobConfig.java
│   │       ├── model/User.java
│   │       ├── processor/UserItemProcessor.java
│   ├── resources/
│       ├── users.csv
│       └── application.yml

📄 2. 创建输入文件(users.csv

id,name
1,alice
2,bob
3,charlie

🧱 3. 创建实体类(User.java

public class User {private Long id;private String name;// 构造方法、getter/setterpublic User() {}public User(Long id, String name) {this.id = id;this.name = name;}public Long getId() { return id; }public void setId(Long id) { this.id = id; }public String getName() { return name; }public void setName(String name) { this.name = name; }@Overridepublic String toString() {return "User{id=" + id + ", name='" + name + "'}";}
}

🔄 4. 编写处理器(UserItemProcessor.java

@Component
public class UserItemProcessor implements ItemProcessor<User, User> {@Overridepublic User process(User user) {user.setName(user.getName().toUpperCase());return user;}
}

⚙️ 5. Job 配置(ChunkJobConfig.java

@Configuration
@EnableBatchProcessing
public class ChunkJobConfig {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate UserItemProcessor processor;// 读取器:从 CSV 文件中读取数据@Beanpublic FlatFileItemReader<User> reader() {return new FlatFileItemReaderBuilder<User>().name("userItemReader").resource(new ClassPathResource("users.csv")).delimited().names("id", "name").fieldSetMapper(fieldSet -> new User(fieldSet.readLong("id"),fieldSet.readString("name"))).linesToSkip(1) // 跳过标题行.build();}// 写入器:将处理后的数据打印到控制台(模拟写数据库)@Beanpublic ItemWriter<User> writer() {return users -> {System.out.println("📥 写入数据:");users.forEach(System.out::println);};}@Beanpublic Step chunkStep() {return stepBuilderFactory.get("chunkStep").<User, User>chunk(2) // 每2个为一个块处理.reader(reader()).processor(processor).writer(writer()).build();}@Beanpublic Job chunkJob() {return jobBuilderFactory.get("chunkJob").start(chunkStep()).build();}
}

🚀 6. 启动类(BatchDemoApplication.java

@SpringBootApplication
public class BatchDemoApplication {public static void main(String[] args) {SpringApplication.run(BatchDemoApplication.class, args);}
}

📄 7. 配置文件(application.yml

spring:batch:job:enabled: truedatasource:url: jdbc:h2:mem:testdbdriver-class-name: org.h2.Driverusername: sapassword:h2:console:enabled: true

✅ 8. 启动后输出示例

📥 写入数据:
User{id=1, name='ALICE'}
User{id=2, name='BOB'}
📥 写入数据:
User{id=3, name='CHARLIE'}

🎓 小结

部件作用
ItemReader读取数据源(CSV)
ItemProcessor转换数据(小写转大写)
ItemWriter输出结果(控制台)
chunk(n)每 n 条数据为一个事务处理块

3. 添加容错功能(retry、skip)


🎯 场景目标

在上一个示例基础上,模拟某些数据处理失败 的情况,并配置:

  • ✅ 自动重试指定异常(如最多重试 2 次)
  • ✅ 跳过指定异常(跳过错误记录继续执行)

🧱 修改点概览

部分修改
processor模拟处理过程中抛异常
chunkStep()添加 .faultTolerant().retry().skip() 配置
控制台输出可看到错误被捕获、重试、跳过后的行为

🧩 1. 修改 UserItemProcessor.java,模拟失败逻辑

@Component
public class UserItemProcessor implements ItemProcessor<User, User> {@Overridepublic User process(User user) throws Exception {// 模拟用户 id 为 2 的处理会失败if (user.getId() == 2) {System.out.println("❌ 模拟处理异常:用户 " + user.getName());throw new IllegalArgumentException("处理用户失败!");}user.setName(user.getName().toUpperCase());return user;}
}

🛠️ 2. 修改 chunkStep():添加容错配置

@Bean
public Step chunkStep() {return stepBuilderFactory.get("chunkStep").<User, User>chunk(2).reader(reader()).processor(processor).writer(writer()).faultTolerant()                            // 开启容错模式.retry(IllegalArgumentException.class)     // 指定重试异常.retryLimit(2)                              // 最多重试 2 次.skip(IllegalArgumentException.class)      // 如果还失败则跳过.skipLimit(5)                               // 最多跳过 5 个.build();
}

📄 3. 示例 CSV 文件(users.csv

id,name
1,alice
2,bob      <-- 模拟失败
3,charlie

✅ 4. 启动后控制台输出示例

👉 正在读取用户数据...
📥 写入数据:
User{id=1, name='ALICE'}
❌ 模拟处理异常:用户 bob
⚠️ 重试第1次...
❌ 模拟处理异常:用户 bob
⚠️ 重试第2次...
❌ 模拟处理异常:用户 bob
⚠️ 达到重试次数限制,跳过该用户📥 写入数据:
User{id=3, name='CHARLIE'}

你将看到:

  • 用户 bob 被处理时故意失败
  • 自动重试 2 次
  • 重试仍失败 → 被跳过
  • Job 正常完成,未中断!

🔎 总结关键 API

方法含义
.faultTolerant()启用容错处理
.retry(Exception.class)出现该异常时会自动重试
.retryLimit(n)最大重试次数
.skip(Exception.class)如果重试仍失败,可以跳过
.skipLimit(n)最大跳过条数,超过就 fail

4. 多步骤 Job 示例(多个 Step 串联)


🎯 模块目标

我们将构建一个包含多个 Step 的 Job:

  1. Step 1:打印 Job 启动信息
  2. Step 2:执行 chunk 处理逻辑(读取、处理、写入用户)
  3. Step 3:清理任务或发送通知(模拟)

🗂 项目结构说明(在原基础上新增)

├── config/
│   └── MultiStepJobConfig.java    <-- 新增:多个步骤的 Job 配置
├── service/
│   └── NotificationTasklet.java   <-- 新增:通知步骤

📄 1. 创建 Tasklet 步骤:通知任务(NotificationTasklet.java

@Component
public class NotificationTasklet implements Tasklet {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {System.out.println("📣 所有用户处理完成,发送通知!");return RepeatStatus.FINISHED;}
}

⚙️ 2. 多步骤 Job 配置(MultiStepJobConfig.java

@Configuration
public class MultiStepJobConfig {@Autowired private JobBuilderFactory jobBuilderFactory;@Autowired private StepBuilderFactory stepBuilderFactory;@Autowired private FlatFileItemReader<User> reader;@Autowired private ItemProcessor<User, User> processor;@Autowired private ItemWriter<User> writer;@Autowired private NotificationTasklet notificationTasklet;@Beanpublic Step startStep() {return stepBuilderFactory.get("startStep").tasklet((contribution, context) -> {System.out.println("🚀 Job 开始执行!");return RepeatStatus.FINISHED;}).build();}@Beanpublic Step chunkStep() {return stepBuilderFactory.get("chunkStep").<User, User>chunk(2).reader(reader).processor(processor).writer(writer).faultTolerant().retry(IllegalArgumentException.class).retryLimit(2).skip(IllegalArgumentException.class).skipLimit(5).build();}@Beanpublic Step notifyStep() {return stepBuilderFactory.get("notifyStep").tasklet(notificationTasklet).build();}@Beanpublic Job multiStepJob() {return jobBuilderFactory.get("multiStepJob").start(startStep()).next(chunkStep()).next(notifyStep()).build();}
}

✅ 3. 控制台输出预期

🚀 Job 开始执行!
📥 写入数据:
User{id=1, name='ALICE'}
❌ 模拟处理异常:用户 bob
⚠️ 重试中...
📥 写入数据:
User{id=3, name='CHARLIE'}
📣 所有用户处理完成,发送通知!

🧠 关键知识点

概念说明
多个 Step使用 .start().next().next() 串联
Tasklet Step适合执行单个逻辑(如日志、通知)
Chunk Step适合批量数据处理
JobBuilder构建包含多个 Step 的流程

5. 定时调度执行 Spring Batch Job

📦 添加定时任务类(ScheduledJobLauncher.java

@Component
public class ScheduledJobLauncher {@Autowiredprivate JobLauncher jobLauncher;@Autowired@Qualifier("multiStepJob")  // 使用我们前面定义的 Jobprivate Job job;@Scheduled(fixedRate = 30000) // 每 30 秒触发一次public void runJob() {try {JobParameters params = new JobParametersBuilder().addLong("timestamp", System.currentTimeMillis()) // 保证每次唯一.toJobParameters();System.out.println("⏰ 定时任务启动 Job...");jobLauncher.run(job, params);} catch (Exception e) {System.err.println("❌ Job 启动失败:" + e.getMessage());}}
}

✅ 注意:

Spring Batch 的 Job 每次执行都需要唯一参数(否则不会重复执行),因此我们加了:

.addLong("timestamp", System.currentTimeMillis())

🔧 开启定时任务功能(BatchDemoApplication.java

@SpringBootApplication
@EnableScheduling  // 启用定时任务功能
public class BatchDemoApplication {public static void main(String[] args) {SpringApplication.run(BatchDemoApplication.class, args);}
}

🛠 示例控制台输出(每 30 秒一次)

⏰ 定时任务启动 Job...
🚀 Job 开始执行!
📥 写入数据:
User{id=1, name='ALICE'}
❌ 模拟处理异常:用户 bob
📥 写入数据:
User{id=3, name='CHARLIE'}
📣 所有用户处理完成,发送通知!

🎓 小结

组件功能
@Scheduled每隔一段时间自动触发 Job
JobLauncher手动执行指定 Job
JobParametersBuilder创建 Job 参数,确保唯一性
@EnableScheduling启用定时任务功能

6. Spring Cloud Stream和Spring Batch区别

🧠 一句话总结

框架关注点用于什么
Spring Cloud Stream实时消息流处理处理从消息队列(如 Kafka、RabbitMQ)中来的事件流
Spring Batch批量任务处理处理大量结构化数据的离线批处理任务,如夜间账单

🔍 核心区别详解

特性 / 区别点Spring Cloud StreamSpring Batch
💡 处理模式异步、实时流式处理(事件驱动)同步、批量处理(定时或手动触发)
🕘 适用场景IoT 数据流、订单事件、消息队列消费者、微服务事件链路日终结算、数据库导入导出、文件解析、大规模数据迁移
🔌 输入来源Kafka、RabbitMQ、Pulsar 等消息中间件数据库、CSV、XML、REST 接口等
🔄 输出目标下游队列或服务数据库、文件、API
🧱 组成模型Supplier、Function、ConsumerStep、Job、ItemReader、ItemProcessor、ItemWriter
💥 故障处理支持 Retry、DLQ、分区等支持跳过、重试、事务、Job Restart
🛠️ 持久化状态一般无状态,靠中间件保证可靠传递有状态,支持 Job Execution 状态保存(如重启恢复)
🧪 测试/调试流处理链可拆解为小函数,易于集成测试Job 参数可控制执行,适合验证数据处理逻辑
🧰 配置方式application.yml(通道绑定)XML/Java DSL 配置 Job、Step、Reader 等
http://www.xdnf.cn/news/6689.html

相关文章:

  • MySQL面试知识点详解
  • 计算机图形学基础--Games101笔记(一)数学基础与光栅化
  • 生产级编排AI工作流套件:Flyte全面使用指南 — Core concepts Launch plans
  • 非受控组件在 React 中如何进行状态更新?
  • 好用的拓客APP有哪些?
  • C#学习第23天:面向对象设计模式
  • 基于WISE30sec制作中国1km分辨率土壤属性栅格数据(20种属性/0-200cm深度分层)
  • Flask框架搭建
  • 信号灯和旋钮在接地电阻柜内的作用主要包括以下几个方面
  • 吴恩达 Deep Learning(1-36)ppt逐行理解
  • React中使用openLayer画地图
  • 【大模型面试每日一题】Day 20:大模型出现“幻觉”(Hallucination)的可能原因有哪些?如何从数据或训练层面缓解?
  • 支持蓝牙5.0和2.4G私有协议芯片-PHY6222
  • ISBI 2012 EM 神经元结构分割数据集复现UNet
  • 前端实现流式输出《后端返回Markdown格式文本,前端输出类似于打字的那种》
  • DTC测试点归纳
  • 2025Linux安装配置文档(五)
  • 【Linux】iptables 命令详解
  • Tcping详细使用教程
  • [SpringBoot]Spring MVC(2.0)
  • 项目思维vs产品思维
  • 系统线程nt!CcPfBootWorker里面的nt!MmPrefetchPages函数分析
  • 光学设计核心
  • milvus学习笔记
  • 关于计算机系统和数据原子性的联系
  • 计算机网络-----6分层结构
  • Java百度身份证识别接口实现【配置即用】
  • 国芯思辰| 轮速传感器AH741对标TLE7471应用于汽车车轮速度感应
  • Sigmoid与Softmax:从二分类到多分类的深度解析
  • Flask 是否使用类似 Spring Boot 的核心注解机制