946 lines
27 KiB
Markdown
946 lines
27 KiB
Markdown
# 响应式编程规范文档
|
||
|
||
> 文档编号: GYM-STD-REACTIVE-001
|
||
> 版本: v1.0
|
||
> 日期: 2026-03-04
|
||
> 作者: 张翔
|
||
> 状态: 初稿
|
||
|
||
---
|
||
|
||
## 文档修订历史
|
||
|
||
| 版本 | 日期 | 作者 | 修订内容 |
|
||
| ---- | ---------- | ---- | ------------------ |
|
||
| v1.0 | 2026-03-04 | 张翔 | 创建响应式编程规范文档 |
|
||
|
||
---
|
||
|
||
## 参考文档
|
||
|
||
- 《健身房管理系统技术架构设计文档》 GYM-HLD-TECH-001
|
||
- Project Reactor 官方文档
|
||
- Spring WebFlux 官方文档
|
||
- R2DBC 官方文档
|
||
|
||
---
|
||
|
||
## 一、概述
|
||
|
||
### 1.1 目的
|
||
|
||
本文档旨在为健身房管理系统项目制定响应式编程规范,确保团队成员正确使用响应式编程技术栈,避免常见的反模式,提高代码质量和系统性能。
|
||
|
||
### 1.2 适用范围
|
||
|
||
本规范适用于所有使用 Spring WebFlux + R2DBC 技术栈的代码开发。
|
||
|
||
### 1.3 核心原则
|
||
|
||
1. **永不阻塞**:禁止在响应式流中使用阻塞操作
|
||
2. **链式调用**:使用操作符链式调用,避免嵌套
|
||
3. **错误处理**:使用响应式错误处理机制,避免 try-catch
|
||
4. **背压处理**:正确处理背压,避免内存溢出
|
||
5. **资源释放**:确保所有资源正确释放,避免资源泄漏
|
||
|
||
---
|
||
|
||
## 二、响应式编程基础
|
||
|
||
### 2.1 核心概念
|
||
|
||
#### 2.1.1 Mono
|
||
|
||
**定义**:表示 0-1 个元素的异步序列,返回单个对象或空。
|
||
|
||
**适用场景**:
|
||
- 查询单个对象
|
||
- 保存单个对象
|
||
- 更新单个对象
|
||
- 删除单个对象
|
||
|
||
**示例**:
|
||
|
||
```java
|
||
// 查询单个会员
|
||
public Mono<Member> getMember(Long id) {
|
||
return memberRepository.findById(id);
|
||
}
|
||
|
||
// 保存单个会员
|
||
public Mono<Member> saveMember(Member member) {
|
||
return memberRepository.save(member);
|
||
}
|
||
```
|
||
|
||
#### 2.1.2 Flux
|
||
|
||
**定义**:表示 0-N 个元素的异步序列,返回多个对象。
|
||
|
||
**适用场景**:
|
||
- 查询列表
|
||
- 批量操作
|
||
- 流式处理
|
||
- 实时数据推送
|
||
|
||
**示例**:
|
||
|
||
```java
|
||
// 查询会员列表
|
||
public Flux<Member> listMembers(Long tenantId) {
|
||
return memberRepository.findByTenantId(tenantId);
|
||
}
|
||
|
||
// 批量保存会员
|
||
public Flux<Member> saveMembers(List<Member> members) {
|
||
return Flux.fromIterable(members)
|
||
.flatMap(memberRepository::save);
|
||
}
|
||
```
|
||
|
||
#### 2.1.3 Scheduler
|
||
|
||
**定义**:控制响应式操作的执行线程。
|
||
|
||
**常用 Scheduler**:
|
||
|
||
| Scheduler | 用途 | 示例 |
|
||
|-----------|------|------|
|
||
| **Schedulers.parallel()** | CPU 密集型操作 | 数据计算、转换 |
|
||
| **Schedulers.boundedElastic()** | 阻塞 I/O 操作 | 文件读写、网络请求 |
|
||
| **Schedulers.single()** | 单线程顺序执行 | 顺序处理任务 |
|
||
| **Schedulers.immediate()** | 当前线程执行 | 简单操作 |
|
||
|
||
**示例**:
|
||
|
||
```java
|
||
// CPU 密集型操作
|
||
public Flux<Member> processMembers(Flux<Member> members) {
|
||
return members.publishOn(Schedulers.parallel())
|
||
.map(this::calculateLevel);
|
||
}
|
||
|
||
// 阻塞 I/O 操作
|
||
public Mono<String> readFile(String path) {
|
||
return Mono.fromCallable(() -> Files.readString(Paths.get(path)))
|
||
.subscribeOn(Schedulers.boundedElastic());
|
||
}
|
||
```
|
||
|
||
### 2.2 常用操作符
|
||
|
||
#### 2.2.1 转换操作符
|
||
|
||
| 操作符 | 功能 | 示例 |
|
||
|-------|------|------|
|
||
| **map** | 一对一转换 | `.map(member -> member.getName())` |
|
||
| **flatMap** | 一对多转换(异步) | `.flatMap(member -> loadCards(member.getId()))` |
|
||
| **flatMapMany** | 一对多转换(返回 Flux) | `.flatMapMany(member -> listBenefits(member.getId()))` |
|
||
| **filter** | 过滤元素 | `.filter(member -> member.getStatus() == 1)` |
|
||
|
||
**示例**:
|
||
|
||
```java
|
||
// map:一对一转换
|
||
public Flux<String> getMemberNames(Long tenantId) {
|
||
return memberRepository.findByTenantId(tenantId)
|
||
.map(Member::getName);
|
||
}
|
||
|
||
// flatMap:一对多转换(异步)
|
||
public Mono<Member> getMemberWithCards(Long id) {
|
||
return memberRepository.findById(id)
|
||
.flatMap(member -> memberCardRepository.findByMemberId(member.getId())
|
||
.collectList()
|
||
.map(cards -> {
|
||
member.setCards(cards);
|
||
return member;
|
||
}));
|
||
}
|
||
|
||
// filter:过滤元素
|
||
public Flux<Member> getActiveMembers(Long tenantId) {
|
||
return memberRepository.findByTenantId(tenantId)
|
||
.filter(member -> member.getStatus() == 1);
|
||
}
|
||
```
|
||
|
||
#### 2.2.2 条件操作符
|
||
|
||
| 操作符 | 功能 | 示例 |
|
||
|-------|------|------|
|
||
| **switchIfEmpty** | 序列为空时返回备选 | `.switchIfEmpty(Mono.error(new BusinessException("会员不存在")))` |
|
||
| **defaultIfEmpty** | 序列为空时返回默认值 | `.defaultIfEmpty(Member.builder().build())` |
|
||
| **take** | 取前 N 个元素 | `.take(10)` |
|
||
| **skip** | 跳过前 N 个元素 | `.skip(10)` |
|
||
|
||
**示例**:
|
||
|
||
```java
|
||
// switchIfEmpty:序列为空时返回备选
|
||
public Mono<Member> getMember(Long id) {
|
||
return memberRepository.findById(id)
|
||
.switchIfEmpty(Mono.error(new BusinessException("会员不存在")));
|
||
}
|
||
|
||
// defaultIfEmpty:序列为空时返回默认值
|
||
public Flux<Member> listMembers(Long tenantId) {
|
||
return memberRepository.findByTenantId(tenantId)
|
||
.defaultIfEmpty(Member.builder().build());
|
||
}
|
||
|
||
// take:取前 10 个元素
|
||
public Flux<Member> listMembers(Long tenantId, int limit) {
|
||
return memberRepository.findByTenantId(tenantId)
|
||
.take(limit);
|
||
}
|
||
```
|
||
|
||
#### 2.2.3 错误处理操作符
|
||
|
||
| 操作符 | 功能 | 示例 |
|
||
|-------|------|------|
|
||
| **onErrorResume** | 捕获错误并返回备选序列 | `.onErrorResume(e -> Mono.empty())` |
|
||
| **onErrorReturn** | 捕获错误并返回默认值 | `.onErrorReturn(Member.builder().build())` |
|
||
| **doOnError** | 错误时执行副作用 | `.doOnError(e -> log.error("查询失败", e))` |
|
||
| **retry** | 重试 | `.retry(3)` |
|
||
| **retryWhen** | 高级重试 | `.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))` |
|
||
|
||
**示例**:
|
||
|
||
```java
|
||
// onErrorResume:捕获错误并返回备选序列
|
||
public Mono<Member> getMember(Long id) {
|
||
return memberRepository.findById(id)
|
||
.onErrorResume(DataAccessException.class, e -> {
|
||
log.error("数据库查询失败: memberId={}", id, e);
|
||
return Mono.empty();
|
||
});
|
||
}
|
||
|
||
// onErrorReturn:捕获错误并返回默认值
|
||
public Mono<Member> getMember(Long id) {
|
||
return memberRepository.findById(id)
|
||
.onErrorReturn(Member.builder().build());
|
||
}
|
||
|
||
// doOnError:错误时执行副作用
|
||
public Mono<Member> getMember(Long id) {
|
||
return memberRepository.findById(id)
|
||
.doOnError(e -> log.error("查询会员失败: memberId={}", id, e));
|
||
}
|
||
|
||
// retry:重试 3 次
|
||
public Mono<Member> getMember(Long id) {
|
||
return memberRepository.findById(id)
|
||
.retry(3);
|
||
}
|
||
|
||
// retryWhen:高级重试(指数退避)
|
||
public Mono<Member> getMember(Long id) {
|
||
return memberRepository.findById(id)
|
||
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
|
||
.filter(throwable -> throwable instanceof TimeoutException)
|
||
.doBeforeRetry(signal -> log.warn("重试: attempt={}", signal.totalRetries())));
|
||
}
|
||
```
|
||
|
||
#### 2.2.4 生命周期操作符
|
||
|
||
| 操作符 | 功能 | 示例 |
|
||
|-------|------|------|
|
||
| **doOnSubscribe** | 订阅时执行 | `.doOnSubscribe(s -> log.debug("开始查询"))` |
|
||
| **doOnNext** | 每个元素到达时执行 | `.doOnNext(member -> log.debug("查询到会员: {}", member.getName()))` |
|
||
| **doOnComplete** | 完成时执行 | `.doOnComplete(() -> log.debug("查询完成"))` |
|
||
| **doOnError** | 错误时执行 | `.doOnError(e -> log.error("查询失败", e))` |
|
||
| **doOnTerminate** | 终止时执行(无论成功或失败) | `.doOnTerminate(() -> log.debug("查询结束"))` |
|
||
|
||
**示例**:
|
||
|
||
```java
|
||
public Mono<Member> getMember(Long id) {
|
||
return memberRepository.findById(id)
|
||
.doOnSubscribe(s -> log.debug("开始查询会员: memberId={}", id))
|
||
.doOnNext(member -> log.debug("查询到会员: memberId={}, name={}", member.getId(), member.getName()))
|
||
.doOnComplete(() -> log.debug("查询会员完成: memberId={}", id))
|
||
.doOnError(e -> log.error("查询会员失败: memberId={}", id, e))
|
||
.doOnTerminate(() -> log.debug("查询会员结束: memberId={}", id));
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 三、编码规范
|
||
|
||
### 3.1 基本原则
|
||
|
||
#### 3.1.1 永不阻塞
|
||
|
||
**规则**:禁止在响应式流中使用阻塞操作。
|
||
|
||
**✅ 正确示例**:
|
||
|
||
```java
|
||
public Mono<Member> getMember(Long id) {
|
||
return memberRepository.findById(id)
|
||
.flatMap(member -> loadMemberCards(member.getId()));
|
||
}
|
||
```
|
||
|
||
**❌ 错误示例**:
|
||
|
||
```java
|
||
public Mono<Member> getMember(Long id) {
|
||
return memberRepository.findById(id)
|
||
.flatMap(member -> {
|
||
// 错误:使用 block() 阻塞
|
||
List<MemberCard> cards = memberCardRepository.findByMemberId(member.getId())
|
||
.collectList().block();
|
||
member.setCards(cards);
|
||
return Mono.just(member);
|
||
});
|
||
}
|
||
```
|
||
|
||
#### 3.1.2 链式调用
|
||
|
||
**规则**:使用操作符链式调用,避免嵌套。
|
||
|
||
**✅ 正确示例**:
|
||
|
||
```java
|
||
public Mono<Member> getMemberWithCardsAndBenefits(Long id) {
|
||
return memberRepository.findById(id)
|
||
.flatMap(member -> loadMemberCards(member.getId())
|
||
.map(cards -> {
|
||
member.setCards(cards);
|
||
return member;
|
||
}))
|
||
.flatMap(member -> loadMemberBenefits(member.getId())
|
||
.map(benefits -> {
|
||
member.setBenefits(benefits);
|
||
return member;
|
||
}));
|
||
}
|
||
```
|
||
|
||
**❌ 错误示例**:
|
||
|
||
```java
|
||
public Mono<Member> getMemberWithCardsAndBenefits(Long id) {
|
||
return memberRepository.findById(id)
|
||
.flatMap(member -> {
|
||
return loadMemberCards(member.getId())
|
||
.map(cards -> {
|
||
member.setCards(cards);
|
||
return member;
|
||
})
|
||
.flatMap(memberWithCards -> {
|
||
return loadMemberBenefits(memberWithCards.getId())
|
||
.map(benefits -> {
|
||
memberWithCards.setBenefits(benefits);
|
||
return memberWithCards;
|
||
});
|
||
});
|
||
});
|
||
}
|
||
```
|
||
|
||
#### 3.1.3 错误处理
|
||
|
||
**规则**:使用响应式错误处理机制,避免 try-catch。
|
||
|
||
**✅ 正确示例**:
|
||
|
||
```java
|
||
public Mono<Member> getMember(Long id) {
|
||
return memberRepository.findById(id)
|
||
.switchIfEmpty(Mono.error(new BusinessException("会员不存在")))
|
||
.onErrorResume(DataAccessException.class, e -> {
|
||
log.error("数据库查询失败: memberId={}", id, e);
|
||
return Mono.error(new SystemException("系统错误"));
|
||
});
|
||
}
|
||
```
|
||
|
||
**❌ 错误示例**:
|
||
|
||
```java
|
||
public Mono<Member> getMember(Long id) {
|
||
try {
|
||
return memberRepository.findById(id)
|
||
.switchIfEmpty(Mono.error(new BusinessException("会员不存在")));
|
||
} catch (Exception e) {
|
||
// 错误:try-catch 无法捕获响应式异常
|
||
log.error("查询失败", e);
|
||
return Mono.error(new SystemException("系统错误"));
|
||
}
|
||
}
|
||
```
|
||
|
||
### 3.2 Service 层规范
|
||
|
||
#### 3.2.1 基本结构
|
||
|
||
```java
|
||
@Service
|
||
@Slf4j
|
||
@RequiredArgsConstructor
|
||
public class MemberService {
|
||
|
||
private final MemberRepository memberRepository;
|
||
private final MemberCardRepository memberCardRepository;
|
||
private final BenefitService benefitService;
|
||
|
||
/**
|
||
* 查询会员
|
||
*/
|
||
public Mono<Member> getMember(Long id) {
|
||
return memberRepository.findById(id)
|
||
.switchIfEmpty(Mono.error(new BusinessException("会员不存在")))
|
||
.doOnSubscribe(s -> log.debug("开始查询会员: memberId={}", id))
|
||
.doOnNext(member -> log.debug("查询到会员: memberId={}, name={}", member.getId(), member.getName()))
|
||
.doOnError(e -> log.error("查询会员失败: memberId={}", id, e))
|
||
.doOnTerminate(() -> log.debug("查询会员结束: memberId={}", id));
|
||
}
|
||
|
||
/**
|
||
* 查询会员列表
|
||
*/
|
||
public Flux<Member> listMembers(Long tenantId, Long storeId) {
|
||
return memberRepository.findByTenantIdAndStoreId(tenantId, storeId)
|
||
.filter(member -> member.getStatus() == 1)
|
||
.sort(Comparator.comparing(Member::getCreatedAt).reversed());
|
||
}
|
||
|
||
/**
|
||
* 创建会员
|
||
*/
|
||
@Transactional
|
||
public Mono<Member> createMember(MemberCreateRequest request) {
|
||
return validateMemberCreateRequest(request)
|
||
.flatMap(v -> buildMember(request))
|
||
.flatMap(memberRepository::save)
|
||
.flatMap(member -> createDefaultMemberCard(member))
|
||
.doOnSuccess(member -> log.info("创建会员成功: memberId={}", member.getId()))
|
||
.doOnError(e -> log.error("创建会员失败: {}", e.getMessage()));
|
||
}
|
||
|
||
private Mono<Void> validateMemberCreateRequest(MemberCreateRequest request) {
|
||
return memberRepository.findByPhoneAndTenantId(request.getPhone(), request.getTenantId())
|
||
.flatMap(existing -> Mono.<Void>error(new BusinessException("手机号已注册")))
|
||
.switchIfEmpty(Mono.empty());
|
||
}
|
||
|
||
private Mono<Member> buildMember(MemberCreateRequest request) {
|
||
Member member = Member.builder()
|
||
.tenantId(request.getTenantId())
|
||
.storeId(request.getStoreId())
|
||
.memberNo(generateMemberNo(request.getTenantId()))
|
||
.name(request.getName())
|
||
.phone(encryptPhone(request.getPhone()))
|
||
.phoneMask(maskPhone(request.getPhone()))
|
||
.gender(request.getGender())
|
||
.birthday(request.getBirthday())
|
||
.status(1)
|
||
.build();
|
||
|
||
return Mono.just(member);
|
||
}
|
||
|
||
private Mono<Member> createDefaultMemberCard(Member member) {
|
||
MemberCard card = MemberCard.builder()
|
||
.tenantId(member.getTenantId())
|
||
.memberId(member.getId())
|
||
.cardNo(generateCardNo(member.getTenantId()))
|
||
.status(1)
|
||
.build();
|
||
|
||
return memberCardRepository.save(card)
|
||
.thenReturn(member);
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 3.2.2 事务管理
|
||
|
||
```java
|
||
@Service
|
||
@Slf4j
|
||
public class BookingService {
|
||
|
||
private final BookingRecordRepository bookingRecordRepository;
|
||
private final BookingSlotRepository bookingSlotRepository;
|
||
private final BenefitService benefitService;
|
||
|
||
/**
|
||
* 预约时段
|
||
*/
|
||
@Transactional
|
||
public Mono<BookingRecord> bookSlot(BookingRequest request) {
|
||
return validateBooking(request)
|
||
.flatMap(v -> checkSlotAvailability(request.getSlotId()))
|
||
.flatMap(slot -> deductBenefit(request.getMemberId(), slot))
|
||
.flatMap(benefit -> createBookingRecord(request, benefit))
|
||
.flatMap(booking -> updateSlotBookedCount(request.getSlotId()))
|
||
.doOnSuccess(booking -> log.info("预约成功: bookingId={}", booking.getId()))
|
||
.doOnError(e -> log.error("预约失败: {}", e.getMessage()));
|
||
}
|
||
|
||
private Mono<BookingSlot> checkSlotAvailability(Long slotId) {
|
||
return bookingSlotRepository.findById(slotId)
|
||
.switchIfEmpty(Mono.error(new BusinessException("时段不存在")))
|
||
.filter(slot -> slot.getStatus() == 1)
|
||
.switchIfEmpty(Mono.error(new BusinessException("时段不可预约")))
|
||
.filter(slot -> slot.getBookedCount() < slot.getCapacity())
|
||
.switchIfEmpty(Mono.error(new BusinessException("时段已满")));
|
||
}
|
||
|
||
private Mono<MemberBenefit> deductBenefit(Long memberId, BookingSlot slot) {
|
||
return benefitService.deductBenefit(memberId,
|
||
slot.getPriceType(), slot.getPriceValue())
|
||
.switchIfEmpty(Mono.error(new BusinessException("权益不足")));
|
||
}
|
||
|
||
private Mono<BookingRecord> createBookingRecord(BookingRequest request,
|
||
MemberBenefit benefit) {
|
||
BookingRecord record = BookingRecord.builder()
|
||
.tenantId(request.getTenantId())
|
||
.storeId(request.getStoreId())
|
||
.memberId(request.getMemberId())
|
||
.slotId(request.getSlotId())
|
||
.bookingNo(generateBookingNo(request.getTenantId()))
|
||
.status(1)
|
||
.benefitId(benefit.getId())
|
||
.build();
|
||
|
||
return bookingRecordRepository.save(record);
|
||
}
|
||
|
||
private Mono<Void> updateSlotBookedCount(Long slotId) {
|
||
return bookingSlotRepository.findById(slotId)
|
||
.flatMap(slot -> {
|
||
slot.setBookedCount(slot.getBookedCount() + 1);
|
||
if (slot.getBookedCount() >= slot.getCapacity()) {
|
||
slot.setStatus(2); // 已满
|
||
}
|
||
return bookingSlotRepository.save(slot);
|
||
})
|
||
.then();
|
||
}
|
||
}
|
||
```
|
||
|
||
### 3.3 Controller 层规范
|
||
|
||
#### 3.3.1 基本结构
|
||
|
||
```java
|
||
@RestController
|
||
@RequestMapping("/members")
|
||
@RequiredArgsConstructor
|
||
@Slf4j
|
||
public class MemberController {
|
||
|
||
private final MemberService memberService;
|
||
|
||
/**
|
||
* 查询会员
|
||
*/
|
||
@GetMapping("/{id}")
|
||
public Mono<ResponseEntity<ApiResponse<Member>>> getMember(@PathVariable Long id) {
|
||
return memberService.getMember(id)
|
||
.map(member -> ResponseEntity.ok(ApiResponse.success(member)))
|
||
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()))
|
||
.onErrorResume(BusinessException.class, e ->
|
||
Mono.just(ResponseEntity.badRequest()
|
||
.body(ApiResponse.error(e.getMessage()))))
|
||
.onErrorResume(Exception.class, e -> {
|
||
log.error("查询会员失败: memberId={}", id, e);
|
||
return Mono.just(ResponseEntity.internalServerError()
|
||
.body(ApiResponse.error("系统错误")));
|
||
});
|
||
}
|
||
|
||
/**
|
||
* 查询会员列表
|
||
*/
|
||
@GetMapping
|
||
public Mono<ResponseEntity<ApiResponse<Page<Member>>>> listMembers(
|
||
@RequestParam(required = false) Long tenantId,
|
||
@RequestParam(required = false) Long storeId,
|
||
@RequestParam(defaultValue = "0") int page,
|
||
@RequestParam(defaultValue = "20") int size) {
|
||
return memberService.listMembers(tenantId, storeId, page, size)
|
||
.map(members -> ResponseEntity.ok(ApiResponse.success(members)));
|
||
}
|
||
|
||
/**
|
||
* 创建会员
|
||
*/
|
||
@PostMapping
|
||
public Mono<ResponseEntity<ApiResponse<Member>>> createMember(
|
||
@Valid @RequestBody MemberCreateRequest request) {
|
||
return memberService.createMember(request)
|
||
.map(member -> ResponseEntity.ok(ApiResponse.success(member)))
|
||
.onErrorResume(BusinessException.class, e ->
|
||
Mono.just(ResponseEntity.badRequest()
|
||
.body(ApiResponse.error(e.getMessage()))))
|
||
.onErrorResume(Exception.class, e -> {
|
||
log.error("创建会员失败", e);
|
||
return Mono.just(ResponseEntity.internalServerError()
|
||
.body(ApiResponse.error("系统错误")));
|
||
});
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 四、反模式
|
||
|
||
### 4.1 阻塞操作
|
||
|
||
**❌ 反模式**:在响应式流中使用 `block()`、`blockFirst()`、`blockLast()`
|
||
|
||
```java
|
||
// 错误示例
|
||
public Mono<Member> getMember(Long id) {
|
||
return memberRepository.findById(id)
|
||
.flatMap(member -> {
|
||
// 错误:使用 block() 阻塞
|
||
List<MemberCard> cards = memberCardRepository.findByMemberId(member.getId())
|
||
.collectList().block();
|
||
member.setCards(cards);
|
||
return Mono.just(member);
|
||
});
|
||
}
|
||
```
|
||
|
||
**✅ 正确做法**:使用 `flatMap` 链式调用
|
||
|
||
```java
|
||
// 正确示例
|
||
public Mono<Member> getMember(Long id) {
|
||
return memberRepository.findById(id)
|
||
.flatMap(member -> memberCardRepository.findByMemberId(member.getId())
|
||
.collectList()
|
||
.map(cards -> {
|
||
member.setCards(cards);
|
||
return member;
|
||
}));
|
||
}
|
||
```
|
||
|
||
### 4.2 嵌套订阅
|
||
|
||
**❌ 反模式**:在 `flatMap` 中使用 `subscribe`
|
||
|
||
```java
|
||
// 错误示例
|
||
public Mono<Member> getMember(Long id) {
|
||
return memberRepository.findById(id)
|
||
.flatMap(member -> {
|
||
memberCardRepository.findByMemberId(member.getId())
|
||
.collectList()
|
||
.subscribe(cards -> {
|
||
// 错误:在 flatMap 中使用 subscribe
|
||
member.setCards(cards);
|
||
});
|
||
return Mono.just(member);
|
||
});
|
||
}
|
||
```
|
||
|
||
**✅ 正确做法**:使用 `map` 转换数据
|
||
|
||
```java
|
||
// 正确示例
|
||
public Mono<Member> getMember(Long id) {
|
||
return memberRepository.findById(id)
|
||
.zipWith(memberCardRepository.findByMemberId(id).collectList())
|
||
.map(tuple -> {
|
||
Member member = tuple.getT1();
|
||
List<MemberCard> cards = tuple.getT2();
|
||
member.setCards(cards);
|
||
return member;
|
||
});
|
||
}
|
||
```
|
||
|
||
### 4.3 忽略错误
|
||
|
||
**❌ 反模式**:忽略错误,不处理
|
||
|
||
```java
|
||
// 错误示例
|
||
public Mono<Member> getMember(Long id) {
|
||
return memberRepository.findById(id)
|
||
.onErrorResume(e -> Mono.empty()); // 错误:忽略错误
|
||
}
|
||
```
|
||
|
||
**✅ 正确做法**:记录错误并处理
|
||
|
||
```java
|
||
// 正确示例
|
||
public Mono<Member> getMember(Long id) {
|
||
return memberRepository.findById(id)
|
||
.onErrorResume(e -> {
|
||
log.error("查询会员失败: memberId={}", id, e);
|
||
return Mono.error(new SystemException("系统错误"));
|
||
});
|
||
}
|
||
```
|
||
|
||
### 4.4 不处理背压
|
||
|
||
**❌ 反模式**:不处理背压,可能导致内存溢出
|
||
|
||
```java
|
||
// 错误示例
|
||
public Flux<Member> listAllMembers() {
|
||
return memberRepository.findAll(); // 错误:可能返回大量数据
|
||
}
|
||
```
|
||
|
||
**✅ 正确做法**:使用 `take` 限制数据量
|
||
|
||
```java
|
||
// 正确示例
|
||
public Flux<Member> listAllMembers() {
|
||
return memberRepository.findAll()
|
||
.take(1000); // 限制最多返回 1000 条
|
||
}
|
||
```
|
||
|
||
### 4.5 资源泄漏
|
||
|
||
**❌ 反模式**:不释放资源
|
||
|
||
```java
|
||
// 错误示例
|
||
public Mono<String> readFile(String path) {
|
||
return Mono.fromCallable(() -> {
|
||
// 错误:不释放资源
|
||
BufferedReader reader = Files.newBufferedReader(Paths.get(path));
|
||
return reader.readLine();
|
||
});
|
||
}
|
||
```
|
||
|
||
**✅ 正确做法**:使用 `using` 确保资源释放
|
||
|
||
```java
|
||
// 正确示例
|
||
public Mono<String> readFile(String path) {
|
||
return Mono.using(
|
||
() -> Files.newBufferedReader(Paths.get(path)),
|
||
reader -> Mono.fromCallable(reader::readLine),
|
||
reader -> {
|
||
try {
|
||
reader.close();
|
||
} catch (IOException e) {
|
||
log.error("关闭文件失败", e);
|
||
}
|
||
});
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 五、最佳实践
|
||
|
||
### 5.1 日志记录
|
||
|
||
**原则**:使用 `doOnSubscribe`、`doOnNext`、`doOnError`、`doOnTerminate` 记录关键操作。
|
||
|
||
```java
|
||
public Mono<Member> getMember(Long id) {
|
||
return memberRepository.findById(id)
|
||
.doOnSubscribe(s -> log.debug("开始查询会员: memberId={}", id))
|
||
.doOnNext(member -> log.debug("查询到会员: memberId={}, name={}", member.getId(), member.getName()))
|
||
.doOnComplete(() -> log.debug("查询会员完成: memberId={}", id))
|
||
.doOnError(e -> log.error("查询会员失败: memberId={}", id, e))
|
||
.doOnTerminate(() -> log.debug("查询会员结束: memberId={}", id));
|
||
}
|
||
```
|
||
|
||
### 5.2 超时控制
|
||
|
||
**原则**:为所有外部调用设置超时时间。
|
||
|
||
```java
|
||
public Mono<Member> getMember(Long id) {
|
||
return memberRepository.findById(id)
|
||
.timeout(Duration.ofSeconds(3)) // 3 秒超时
|
||
.switchIfEmpty(Mono.error(new BusinessException("会员不存在")));
|
||
}
|
||
```
|
||
|
||
### 5.3 重试机制
|
||
|
||
**原则**:为可重试的操作设置重试机制。
|
||
|
||
```java
|
||
public Mono<Member> getMember(Long id) {
|
||
return memberRepository.findById(id)
|
||
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)) // 重试 3 次,间隔 1 秒
|
||
.filter(throwable -> throwable instanceof TimeoutException)
|
||
.doBeforeRetry(signal -> log.warn("重试: attempt={}", signal.totalRetries())));
|
||
}
|
||
```
|
||
|
||
### 5.4 缓存策略
|
||
|
||
**原则**:使用 Cache-Aside 模式,先查缓存,缓存未命中再查数据库。
|
||
|
||
```java
|
||
public Mono<Member> getMember(Long id) {
|
||
String cacheKey = "member:" + id;
|
||
|
||
return redisTemplate.opsForValue()
|
||
.get(cacheKey)
|
||
.cast(Member.class)
|
||
.switchIfEmpty(
|
||
memberRepository.findById(id)
|
||
.flatMap(member -> redisTemplate.opsForValue()
|
||
.set(cacheKey, member, Duration.ofMinutes(30))
|
||
.thenReturn(member))
|
||
);
|
||
}
|
||
```
|
||
|
||
### 5.5 性能优化
|
||
|
||
**原则**:使用 `parallel()` 并行处理 CPU 密集型操作。
|
||
|
||
```java
|
||
public Flux<Member> processMembers(Flux<Member> members) {
|
||
return members.publishOn(Schedulers.parallel())
|
||
.map(this::calculateLevel)
|
||
.publishOn(Schedulers.boundedElastic());
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 六、测试规范
|
||
|
||
### 6.1 单元测试
|
||
|
||
**原则**:使用 `StepVerifier` 测试响应式流。
|
||
|
||
```java
|
||
@SpringBootTest
|
||
class MemberServiceTest {
|
||
|
||
@Autowired
|
||
private MemberService memberService;
|
||
|
||
@MockBean
|
||
private MemberRepository memberRepository;
|
||
|
||
@Test
|
||
void testGetMember() {
|
||
Member member = Member.builder()
|
||
.id(1L)
|
||
.name("张三")
|
||
.phone("13800138000")
|
||
.build();
|
||
|
||
when(memberRepository.findById(1L))
|
||
.thenReturn(Mono.just(member));
|
||
|
||
StepVerifier.create(memberService.getMember(1L))
|
||
.expectNextMatches(m -> m.getName().equals("张三"))
|
||
.verifyComplete();
|
||
}
|
||
|
||
@Test
|
||
void testGetMemberNotFound() {
|
||
when(memberRepository.findById(1L))
|
||
.thenReturn(Mono.empty());
|
||
|
||
StepVerifier.create(memberService.getMember(1L))
|
||
.expectErrorMatches(e -> e instanceof BusinessException)
|
||
.verify();
|
||
}
|
||
}
|
||
```
|
||
|
||
### 6.2 集成测试
|
||
|
||
**原则**:使用 `WebTestClient` 测试 Controller。
|
||
|
||
```java
|
||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
|
||
@AutoConfigureWebTestClient
|
||
class MemberControllerTest {
|
||
|
||
@Autowired
|
||
private WebTestClient webTestClient;
|
||
|
||
@Test
|
||
void testGetMember() {
|
||
webTestClient.get()
|
||
.uri("/api/v1/members/1")
|
||
.exchange()
|
||
.expectStatus().isOk()
|
||
.expectBody(Member.class)
|
||
.value(member -> {
|
||
assertThat(member.getName()).isEqualTo("张三");
|
||
});
|
||
}
|
||
|
||
@Test
|
||
void testGetMemberNotFound() {
|
||
webTestClient.get()
|
||
.uri("/api/v1/members/999")
|
||
.exchange()
|
||
.expectStatus().isNotFound();
|
||
}
|
||
}
|
||
```
|
||
|
||
### 6.3 性能测试
|
||
|
||
**原则**:使用 `StepVerifier.withVirtualTime` 测试性能。
|
||
|
||
```java
|
||
@Test
|
||
void testGetMemberPerformance() {
|
||
StepVerifier.withVirtualTime(() -> memberService.getMember(1L))
|
||
.expectNextCount(1)
|
||
.expectComplete()
|
||
.verify(Duration.ofMillis(100)); // 100ms 内完成
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 七、总结
|
||
|
||
### 7.1 核心原则回顾
|
||
|
||
1. ✅ **永不阻塞**:禁止在响应式流中使用阻塞操作
|
||
2. ✅ **链式调用**:使用操作符链式调用,避免嵌套
|
||
3. ✅ **错误处理**:使用响应式错误处理机制,避免 try-catch
|
||
4. ✅ **背压处理**:正确处理背压,避免内存溢出
|
||
5. ✅ **资源释放**:确保所有资源正确释放,避免资源泄漏
|
||
|
||
### 7.2 关键成功因素
|
||
|
||
1. ✅ 严格遵守响应式编程规范
|
||
2. ✅ 使用 StepVerifier 进行测试
|
||
3. ✅ 完善的日志记录
|
||
4. ✅ 合理的超时和重试机制
|
||
5. ✅ 正确的缓存策略
|
||
|
||
### 7.3 持续改进
|
||
|
||
1. ✅ 定期代码审查
|
||
2. ✅ 性能监控和优化
|
||
3. ✅ 技术分享和培训
|
||
4. ✅ 文档更新和维护
|