Spring 的响应式编程支持是基于 Reactive Streams 规范构建的,旨在提供非阻塞、事件驱动的编程模型,特别适合高并发、低延迟的场景。以下是核心概念和 Spring WebFlux 的使用方法:

1. 核心概念

响应式编程基础

  • 非阻塞流处理:通过异步数据流处理请求,避免线程阻塞,提升吞吐量。
  • 背压(Backpressure):消费者控制生产者发送数据的速率,防止过载。
  • Reactive Streams 规范:定义了 Publisher、Subscriber、Subscription 和 Processor 四个核心接口。

Spring 响应式组件

  • Project Reactor:Spring 官方推荐的响应式库,提供 Mono(0-1 个元素)和 Flux(0-N 个元素)两种异步数据流。
  • Spring WebFlux:基于 Reactor 的响应式 Web 框架,支持 WebSocket、服务器推送事件(SSE)等。
  • WebClient:非阻塞的 HTTP 客户端,替代传统的 RestTemplate

2. 使用 Spring WebFlux

步骤1:添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

步骤2:定义响应式数据模型

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
    private String id;
    private String name;
    private Integer age;
}

步骤3:创建响应式 Repository

使用 Reactive MongoDB 示例:

import org.springframework.data.mongodb.repository.ReactiveMongoRepository;

public interface UserRepository extends ReactiveMongoRepository<User, String> {
    Flux<User> findByName(String name);
}

步骤4:编写服务层

import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
public class UserService {
    private final UserRepository repository;

    public UserService(UserRepository repository) {
        this.repository = repository;
    }

    public Mono<User> save(User user) {
        return repository.save(user);
    }

    public Flux<User> findAll() {
        return repository.findAll();
    }

    public Mono<User> findById(String id) {
        return repository.findById(id);
    }

    public Mono<Void> delete(String id) {
        return repository.deleteById(id);
    }
}

步骤5:定义响应式控制器(两种方式)

方式1:注解式控制器(类似 Spring MVC)
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
@RequestMapping("/users")
public class UserController {
    private final UserService service;

    public UserController(UserService service) {
        this.service = service;
    }

    @PostMapping
    public Mono<User> create(@RequestBody User user) {
        return service.save(user);
    }

    @GetMapping
    public Flux<User> findAll() {
        return service.findAll();
    }

    @GetMapping("/{id}")
    public Mono<User> findById(@PathVariable String id) {
        return service.findById(id);
    }

    @DeleteMapping("/{id}")
    public Mono<Void> delete(@PathVariable String id) {
        return service.delete(id);
    }
}
方式2:函数式端点(Router + Handler)

定义 Handler:

import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

@Component
public class UserHandler {
    private final UserService service;

    public UserHandler(UserService service) {
        this.service = service;
    }

    public Mono<ServerResponse> findAll(ServerRequest request) {
        return ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(service.findAll(), User.class);
    }

    public Mono<ServerResponse> findById(ServerRequest request) {
        String id = request.pathVariable("id");
        return service.findById(id)
                .flatMap(user -> ServerResponse.ok()
                        .contentType(MediaType.APPLICATION_JSON)
                        .bodyValue(user))
                .switchIfEmpty(ServerResponse.notFound().build());
    }
}

定义 Router:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

@Configuration
public class UserRouter {
    @Bean
    public RouterFunction<ServerResponse> routeUsers(UserHandler handler) {
        return route(GET("/users"), handler::findAll)
                .andRoute(GET("/users/{id}"), handler::findById);
    }
}

步骤6:使用 WebClient 调用响应式 API

import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;

public class WebClientExample {
    private final WebClient client = WebClient.create("http://localhost:8080");

    public Flux<User> getUsers() {
        return client.get()
                .uri("/users")
                .retrieve()
                .bodyToFlux(User.class);
    }
}

3. 响应式特性与技术

异步非阻塞处理

@GetMapping("/delay/{seconds}")
public Mono<String> delayedResponse(@PathVariable int seconds) {
    return Mono.delay(Duration.ofSeconds(seconds))
            .map(t -> "Delayed for " + seconds + " seconds");
}

服务器推送事件(SSE)

@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamUsers() {
    return service.findAll(); // 持续推送数据
}

WebSocket 支持

配置 WebSocket 处理:

@Configuration
public class WebSocketConfig implements WebSocketConfigurer {
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myHandler(), "/ws")
                .setAllowedOrigins("*");
    }

    @Bean
    public WebSocketHandler myHandler() {
        return new MyWebSocketHandler();
    }
}

4. 测试响应式代码

使用 WebTestClient 测试:

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.test.web.reactive.server.WebTestClient;

@WebFluxTest(UserController.class)
public class UserControllerTest {
    @Autowired
    private WebTestClient client;

    @Test
    public void testFindAllUsers() {
        client.get().uri("/users")
                .exchange()
                .expectStatus().isOk()
                .expectBodyList(User.class);
    }
}

5. 注意事项

  1. 避免阻塞操作:响应式系统中应避免使用 Thread.sleep() 或同步 I/O,否则会失去非阻塞优势。
  2. 背压处理:使用 Flux.buffer()Flux.limitRate() 等方法控制数据流速率。
  3. 错误处理:使用 onErrorReturn()retry() 等操作符处理异常。
  4. 与传统组件混合:响应式与非响应式代码混用可能导致线程阻塞,需谨慎设计。

6. 适用场景

  • 高并发系统:如微服务网关、实时数据处理平台。
  • I/O 密集型应用:如文件/网络操作频繁的场景。
  • 实时交互应用:如在线游戏、协作工具。

总结

Spring WebFlux 通过 Project Reactor 提供了完整的响应式编程模型,支持注解式和函数式两种开发方式,与传统 Spring MVC 相比更适合高并发、低延迟的场景。使用时需注意避免阻塞操作,并合理利用背压和错误处理机制。