Spring 的响应式编程支持是基于 Reactive Streams 规范构建的,旨在提供非阻塞、事件驱动的编程模型,特别适合高并发、低延迟的场景。以下是核心概念和 Spring WebFlux 的使用方法:
1. 核心概念
响应式编程基础
- 非阻塞流处理:通过异步数据流处理请求,避免线程阻塞,提升吞吐量。
- 背压(Backpressure):消费者控制生产者发送数据的速率,防止过载。
- Reactive Streams 规范:定义了 Publisher、Subscriber、Subscription 和 Processor 四个核心接口。
Spring 响应式组件
- Project Reactor:Spring 官方推荐的响应式库,提供
Mono
(0-1 个元素)和Flux
(0-N 个元素)两种异步数据流。 - Spring WebFlux:基于 Reactor 的响应式 Web 框架,支持 WebSocket、服务器推送事件(SSE)等。
- WebClient:非阻塞的 HTTP 客户端,替代传统的
RestTemplate
。
2. 使用 Spring WebFlux
步骤1:添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
步骤2:定义响应式数据模型
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
private String id;
private String name;
private Integer age;
}
步骤3:创建响应式 Repository
使用 Reactive MongoDB 示例:
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
public interface UserRepository extends ReactiveMongoRepository<User, String> {
Flux<User> findByName(String name);
}
步骤4:编写服务层
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Service
public class UserService {
private final UserRepository repository;
public UserService(UserRepository repository) {
this.repository = repository;
}
public Mono<User> save(User user) {
return repository.save(user);
}
public Flux<User> findAll() {
return repository.findAll();
}
public Mono<User> findById(String id) {
return repository.findById(id);
}
public Mono<Void> delete(String id) {
return repository.deleteById(id);
}
}
步骤5:定义响应式控制器(两种方式)
方式1:注解式控制器(类似 Spring MVC)
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/users")
public class UserController {
private final UserService service;
public UserController(UserService service) {
this.service = service;
}
@PostMapping
public Mono<User> create(@RequestBody User user) {
return service.save(user);
}
@GetMapping
public Flux<User> findAll() {
return service.findAll();
}
@GetMapping("/{id}")
public Mono<User> findById(@PathVariable String id) {
return service.findById(id);
}
@DeleteMapping("/{id}")
public Mono<Void> delete(@PathVariable String id) {
return service.delete(id);
}
}
方式2:函数式端点(Router + Handler)
定义 Handler:
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
@Component
public class UserHandler {
private final UserService service;
public UserHandler(UserService service) {
this.service = service;
}
public Mono<ServerResponse> findAll(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(service.findAll(), User.class);
}
public Mono<ServerResponse> findById(ServerRequest request) {
String id = request.pathVariable("id");
return service.findById(id)
.flatMap(user -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}
}
定义 Router:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
@Configuration
public class UserRouter {
@Bean
public RouterFunction<ServerResponse> routeUsers(UserHandler handler) {
return route(GET("/users"), handler::findAll)
.andRoute(GET("/users/{id}"), handler::findById);
}
}
步骤6:使用 WebClient 调用响应式 API
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
public class WebClientExample {
private final WebClient client = WebClient.create("http://localhost:8080");
public Flux<User> getUsers() {
return client.get()
.uri("/users")
.retrieve()
.bodyToFlux(User.class);
}
}
3. 响应式特性与技术
异步非阻塞处理
@GetMapping("/delay/{seconds}")
public Mono<String> delayedResponse(@PathVariable int seconds) {
return Mono.delay(Duration.ofSeconds(seconds))
.map(t -> "Delayed for " + seconds + " seconds");
}
服务器推送事件(SSE)
@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamUsers() {
return service.findAll(); // 持续推送数据
}
WebSocket 支持
配置 WebSocket 处理:
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myHandler(), "/ws")
.setAllowedOrigins("*");
}
@Bean
public WebSocketHandler myHandler() {
return new MyWebSocketHandler();
}
}
4. 测试响应式代码
使用 WebTestClient
测试:
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.test.web.reactive.server.WebTestClient;
@WebFluxTest(UserController.class)
public class UserControllerTest {
@Autowired
private WebTestClient client;
@Test
public void testFindAllUsers() {
client.get().uri("/users")
.exchange()
.expectStatus().isOk()
.expectBodyList(User.class);
}
}
5. 注意事项
- 避免阻塞操作:响应式系统中应避免使用
Thread.sleep()
或同步 I/O,否则会失去非阻塞优势。 - 背压处理:使用
Flux.buffer()
、Flux.limitRate()
等方法控制数据流速率。 - 错误处理:使用
onErrorReturn()
、retry()
等操作符处理异常。 - 与传统组件混合:响应式与非响应式代码混用可能导致线程阻塞,需谨慎设计。
6. 适用场景
- 高并发系统:如微服务网关、实时数据处理平台。
- I/O 密集型应用:如文件/网络操作频繁的场景。
- 实时交互应用:如在线游戏、协作工具。
总结
Spring WebFlux 通过 Project Reactor 提供了完整的响应式编程模型,支持注解式和函数式两种开发方式,与传统 Spring MVC 相比更适合高并发、低延迟的场景。使用时需注意避免阻塞操作,并合理利用背压和错误处理机制。