Spring 批处理框架概述
Spring Batch 是 Spring 提供的一个轻量级、全面的批处理框架,用于开发健壮的批处理应用程序。它旨在处理大量数据,能够高效地完成数据的读取、处理和写入操作。Spring Batch 具有以下特点:
- 可重用性:提供了一系列可重用的组件,如读取器、处理器和写入器,开发者可以根据需求组合这些组件。
- 事务管理:支持事务处理,确保在批处理过程中数据的一致性和完整性。
- 错误处理和重试机制:能够捕获和处理批处理过程中出现的错误,并提供重试机制。
- 监控和管理:可以对批处理作业进行监控和管理,包括作业的启动、停止、暂停和恢复等操作。
实现数据批量处理的步骤
步骤 1:添加依赖
如果你使用的是 Maven 项目,在 pom.xml
中添加 Spring Batch 的依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
这里添加了 Spring Boot Starter Batch 依赖,同时引入了 H2 数据库作为示例数据库。
步骤 2:定义数据模型
创建一个简单的数据模型类,用于表示要处理的数据:
public class Person {
private String firstName;
private String lastName;
public Person() {
}
public Person(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
@Override
public String toString() {
return "Person{" +
"firstName='" + firstName + '\'' +
", lastName='" + lastName + '\'' +
'}';
}
}
步骤 3:创建读取器、处理器和写入器
- 读取器(ItemReader):用于从数据源读取数据。以下是一个从 CSV 文件读取数据的示例:
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.core.io.ClassPathResource;
public class PersonItemReader extends FlatFileItemReader<Person> {
public PersonItemReader() {
setResource(new ClassPathResource("data.csv"));
setLineMapper(new DefaultLineMapper<Person>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[]{"firstName", "lastName"});
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}});
}});
}
}
- 处理器(ItemProcessor):用于对读取的数据进行处理。以下是一个简单的处理器示例,将姓名转换为大写:
import org.springframework.batch.item.ItemProcessor;
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
@Override
public Person process(Person person) throws Exception {
String firstName = person.getFirstName().toUpperCase();
String lastName = person.getLastName().toUpperCase();
return new Person(firstName, lastName);
}
}
- 写入器(ItemWriter):用于将处理后的数据写入目标数据源。以下是一个将数据写入控制台的示例:
import org.springframework.batch.item.ItemWriter;
import java.util.List;
public class PersonItemWriter implements ItemWriter<Person> {
@Override
public void write(List<? extends Person> items) throws Exception {
for (Person person : items) {
System.out.println(person);
}
}
}
步骤 4:配置批处理作业
创建一个配置类,配置批处理作业:
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class BatchConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
public BatchConfig(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
public PersonItemReader personItemReader() {
return new PersonItemReader();
}
@Bean
public PersonItemProcessor personItemProcessor() {
return new PersonItemProcessor();
}
@Bean
public PersonItemWriter personItemWriter() {
return new PersonItemWriter();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Person, Person>chunk(10)
.reader(personItemReader())
.processor(personItemProcessor())
.writer(personItemWriter())
.build();
}
@Bean
public Job importUserJob() {
return jobBuilderFactory.get("importUserJob")
.flow(step1())
.end()
.build();
}
}
在这个配置类中,定义了读取器、处理器和写入器,并创建了一个步骤和一个作业。
步骤 5:创建 CSV 文件
在 src/main/resources
目录下创建 data.csv
文件,添加一些示例数据:
firstName,lastName
John,Doe
Jane,Smith
步骤 6:运行批处理作业
创建一个启动类,运行批处理作业:
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class BatchApplication implements CommandLineRunner {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job importUserJob;
public static void main(String[] args) {
SpringApplication.run(BatchApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(importUserJob, jobParameters);
}
}
以下是完整的代码示例,包含了上述步骤中的关键代码:
Person.java
public class Person {
private String firstName;
private String lastName;
public Person() {
}
public Person(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
@Override
public String toString() {
return "Person{" +
"firstName='" + firstName + '\'' +
", lastName='" + lastName + '\'' +
'}';
}
}
BatchConfig.java
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class BatchConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
public BatchConfig(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
public PersonItemReader personItemReader() {
return new PersonItemReader();
}
@Bean
public PersonItemProcessor personItemProcessor() {
return new PersonItemProcessor();
}
@Bean
public PersonItemWriter personItemWriter() {
return new PersonItemWriter();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Person, Person>chunk(10)
.reader(personItemReader())
.processor(personItemProcessor())
.writer(personItemWriter())
.build();
}
@Bean
public Job importUserJob() {
return jobBuilderFactory.get("importUserJob")
.flow(step1())
.end()
.build();
}
}
PersonItemProcessor.java
import org.springframework.batch.item.ItemProcessor;
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
@Override
public Person process(Person person) throws Exception {
String firstName = person.getFirstName().toUpperCase();
String lastName = person.getLastName().toUpperCase();
return new Person(firstName, lastName);
}
}
PersonItemReader.java
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.core.io.ClassPathResource;
public class PersonItemReader extends FlatFileItemReader<Person> {
public PersonItemReader() {
setResource(new ClassPathResource("data.csv"));
setLineMapper(new DefaultLineMapper<Person>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[]{"firstName", "lastName"});
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}});
}});
}
}
PersonItemWriter.java
import org.springframework.batch.item.ItemWriter;
import java.util.List;
public class PersonItemWriter implements ItemWriter<Person> {
@Override
public void write(List<? extends Person> items) throws Exception {
for (Person person : items) {
System.out.println(person);
}
}
}
BatchApplication.java
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class BatchApplication implements CommandLineRunner {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job importUserJob;
public static void main(String[] args) {
SpringApplication.run(BatchApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(importUserJob, jobParameters);
}
}
data.csv
firstName,lastName
John,Doe
Jane,Smith
通过以上步骤,你可以使用 Spring 批处理框架实现数据的批量处理。在实际应用中,你可以根据需求修改读取器、处理器和写入器,以适应不同的数据源和处理逻辑。