Spring 批处理框架概述

Spring Batch 是 Spring 提供的一个轻量级、全面的批处理框架,用于开发健壮的批处理应用程序。它旨在处理大量数据,能够高效地完成数据的读取、处理和写入操作。Spring Batch 具有以下特点:

  • 可重用性:提供了一系列可重用的组件,如读取器、处理器和写入器,开发者可以根据需求组合这些组件。
  • 事务管理:支持事务处理,确保在批处理过程中数据的一致性和完整性。
  • 错误处理和重试机制:能够捕获和处理批处理过程中出现的错误,并提供重试机制。
  • 监控和管理:可以对批处理作业进行监控和管理,包括作业的启动、停止、暂停和恢复等操作。

实现数据批量处理的步骤

步骤 1:添加依赖

如果你使用的是 Maven 项目,在 pom.xml 中添加 Spring Batch 的依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
        <scope>runtime</scope>
    </dependency>
</dependencies>

这里添加了 Spring Boot Starter Batch 依赖,同时引入了 H2 数据库作为示例数据库。

步骤 2:定义数据模型

创建一个简单的数据模型类,用于表示要处理的数据:

public class Person {
    private String firstName;
    private String lastName;

    public Person() {
    }

    public Person(String firstName, String lastName) {
        this.firstName = firstName;
        this.lastName = lastName;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    @Override
    public String toString() {
        return "Person{" +
                "firstName='" + firstName + '\'' +
                ", lastName='" + lastName + '\'' +
                '}';
    }
}

步骤 3:创建读取器、处理器和写入器

  • 读取器(ItemReader):用于从数据源读取数据。以下是一个从 CSV 文件读取数据的示例:
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.core.io.ClassPathResource;

public class PersonItemReader extends FlatFileItemReader<Person> {
    public PersonItemReader() {
        setResource(new ClassPathResource("data.csv"));
        setLineMapper(new DefaultLineMapper<Person>() {{
            setLineTokenizer(new DelimitedLineTokenizer() {{
                setNames(new String[]{"firstName", "lastName"});
            }});
            setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                setTargetType(Person.class);
            }});
        }});
    }
}
  • 处理器(ItemProcessor):用于对读取的数据进行处理。以下是一个简单的处理器示例,将姓名转换为大写:
import org.springframework.batch.item.ItemProcessor;

public class PersonItemProcessor implements ItemProcessor<Person, Person> {
    @Override
    public Person process(Person person) throws Exception {
        String firstName = person.getFirstName().toUpperCase();
        String lastName = person.getLastName().toUpperCase();
        return new Person(firstName, lastName);
    }
}
  • 写入器(ItemWriter):用于将处理后的数据写入目标数据源。以下是一个将数据写入控制台的示例:
import org.springframework.batch.item.ItemWriter;

import java.util.List;

public class PersonItemWriter implements ItemWriter<Person> {
    @Override
    public void write(List<? extends Person> items) throws Exception {
        for (Person person : items) {
            System.out.println(person);
        }
    }
}

步骤 4:配置批处理作业

创建一个配置类,配置批处理作业:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchConfig {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    public BatchConfig(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
    }

    @Bean
    public PersonItemReader personItemReader() {
        return new PersonItemReader();
    }

    @Bean
    public PersonItemProcessor personItemProcessor() {
        return new PersonItemProcessor();
    }

    @Bean
    public PersonItemWriter personItemWriter() {
        return new PersonItemWriter();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
               .<Person, Person>chunk(10)
               .reader(personItemReader())
               .processor(personItemProcessor())
               .writer(personItemWriter())
               .build();
    }

    @Bean
    public Job importUserJob() {
        return jobBuilderFactory.get("importUserJob")
               .flow(step1())
               .end()
               .build();
    }
}

在这个配置类中,定义了读取器、处理器和写入器,并创建了一个步骤和一个作业。

步骤 5:创建 CSV 文件

src/main/resources 目录下创建 data.csv 文件,添加一些示例数据:

firstName,lastName
John,Doe
Jane,Smith

步骤 6:运行批处理作业

创建一个启动类,运行批处理作业:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BatchApplication implements CommandLineRunner {
    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job importUserJob;

    public static void main(String[] args) {
        SpringApplication.run(BatchApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
               .addLong("time", System.currentTimeMillis())
               .toJobParameters();
        jobLauncher.run(importUserJob, jobParameters);
    }
}

以下是完整的代码示例,包含了上述步骤中的关键代码:

Person.java

public class Person {
    private String firstName;
    private String lastName;

    public Person() {
    }

    public Person(String firstName, String lastName) {
        this.firstName = firstName;
        this.lastName = lastName;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    @Override
    public String toString() {
        return "Person{" +
                "firstName='" + firstName + '\'' +
                ", lastName='" + lastName + '\'' +
                '}';
    }
}    

BatchConfig.java

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchConfig {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    public BatchConfig(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
    }

    @Bean
    public PersonItemReader personItemReader() {
        return new PersonItemReader();
    }

    @Bean
    public PersonItemProcessor personItemProcessor() {
        return new PersonItemProcessor();
    }

    @Bean
    public PersonItemWriter personItemWriter() {
        return new PersonItemWriter();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
               .<Person, Person>chunk(10)
               .reader(personItemReader())
               .processor(personItemProcessor())
               .writer(personItemWriter())
               .build();
    }

    @Bean
    public Job importUserJob() {
        return jobBuilderFactory.get("importUserJob")
               .flow(step1())
               .end()
               .build();
    }
}    

PersonItemProcessor.java

import org.springframework.batch.item.ItemProcessor;

public class PersonItemProcessor implements ItemProcessor<Person, Person> {
    @Override
    public Person process(Person person) throws Exception {
        String firstName = person.getFirstName().toUpperCase();
        String lastName = person.getLastName().toUpperCase();
        return new Person(firstName, lastName);
    }
}    

PersonItemReader.java

import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.core.io.ClassPathResource;

public class PersonItemReader extends FlatFileItemReader<Person> {
    public PersonItemReader() {
        setResource(new ClassPathResource("data.csv"));
        setLineMapper(new DefaultLineMapper<Person>() {{
            setLineTokenizer(new DelimitedLineTokenizer() {{
                setNames(new String[]{"firstName", "lastName"});
            }});
            setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                setTargetType(Person.class);
            }});
        }});
    }
}    

PersonItemWriter.java

import org.springframework.batch.item.ItemWriter;

import java.util.List;

public class PersonItemWriter implements ItemWriter<Person> {
    @Override
    public void write(List<? extends Person> items) throws Exception {
        for (Person person : items) {
            System.out.println(person);
        }
    }
}    

BatchApplication.java

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BatchApplication implements CommandLineRunner {
    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job importUserJob;

    public static void main(String[] args) {
        SpringApplication.run(BatchApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
               .addLong("time", System.currentTimeMillis())
               .toJobParameters();
        jobLauncher.run(importUserJob, jobParameters);
    }
}    

data.csv

firstName,lastName
John,Doe
Jane,Smith

通过以上步骤,你可以使用 Spring 批处理框架实现数据的批量处理。在实际应用中,你可以根据需求修改读取器、处理器和写入器,以适应不同的数据源和处理逻辑。