# 响应式编程规范文档 > 文档编号: GYM-STD-REACTIVE-001 > 版本: v1.0 > 日期: 2026-03-04 > 作者: 张翔 > 状态: 初稿 --- ## 文档修订历史 | 版本 | 日期 | 作者 | 修订内容 | | ---- | ---------- | ---- | ------------------ | | v1.0 | 2026-03-04 | 张翔 | 创建响应式编程规范文档 | --- ## 参考文档 - 《健身房管理系统技术架构设计文档》 GYM-HLD-TECH-001 - Project Reactor 官方文档 - Spring WebFlux 官方文档 - R2DBC 官方文档 --- ## 一、概述 ### 1.1 目的 本文档旨在为健身房管理系统项目制定响应式编程规范,确保团队成员正确使用响应式编程技术栈,避免常见的反模式,提高代码质量和系统性能。 ### 1.2 适用范围 本规范适用于所有使用 Spring WebFlux + R2DBC 技术栈的代码开发。 ### 1.3 核心原则 1. **永不阻塞**:禁止在响应式流中使用阻塞操作 2. **链式调用**:使用操作符链式调用,避免嵌套 3. **错误处理**:使用响应式错误处理机制,避免 try-catch 4. **背压处理**:正确处理背压,避免内存溢出 5. **资源释放**:确保所有资源正确释放,避免资源泄漏 --- ## 二、响应式编程基础 ### 2.1 核心概念 #### 2.1.1 Mono **定义**:表示 0-1 个元素的异步序列,返回单个对象或空。 **适用场景**: - 查询单个对象 - 保存单个对象 - 更新单个对象 - 删除单个对象 **示例**: ```java // 查询单个会员 public Mono getMember(Long id) { return memberRepository.findById(id); } // 保存单个会员 public Mono saveMember(Member member) { return memberRepository.save(member); } ``` #### 2.1.2 Flux **定义**:表示 0-N 个元素的异步序列,返回多个对象。 **适用场景**: - 查询列表 - 批量操作 - 流式处理 - 实时数据推送 **示例**: ```java // 查询会员列表 public Flux listMembers(Long tenantId) { return memberRepository.findByTenantId(tenantId); } // 批量保存会员 public Flux saveMembers(List members) { return Flux.fromIterable(members) .flatMap(memberRepository::save); } ``` #### 2.1.3 Scheduler **定义**:控制响应式操作的执行线程。 **常用 Scheduler**: | Scheduler | 用途 | 示例 | |-----------|------|------| | **Schedulers.parallel()** | CPU 密集型操作 | 数据计算、转换 | | **Schedulers.boundedElastic()** | 阻塞 I/O 操作 | 文件读写、网络请求 | | **Schedulers.single()** | 单线程顺序执行 | 顺序处理任务 | | **Schedulers.immediate()** | 当前线程执行 | 简单操作 | **示例**: ```java // CPU 密集型操作 public Flux processMembers(Flux members) { return members.publishOn(Schedulers.parallel()) .map(this::calculateLevel); } // 阻塞 I/O 操作 public Mono readFile(String path) { return Mono.fromCallable(() -> Files.readString(Paths.get(path))) .subscribeOn(Schedulers.boundedElastic()); } ``` ### 2.2 常用操作符 #### 2.2.1 转换操作符 | 操作符 | 功能 | 示例 | |-------|------|------| | **map** | 一对一转换 | `.map(member -> member.getName())` | | **flatMap** | 一对多转换(异步) | `.flatMap(member -> loadCards(member.getId()))` | | **flatMapMany** | 一对多转换(返回 Flux) | `.flatMapMany(member -> listBenefits(member.getId()))` | | **filter** | 过滤元素 | `.filter(member -> member.getStatus() == 1)` | **示例**: ```java // map:一对一转换 public Flux getMemberNames(Long tenantId) { return memberRepository.findByTenantId(tenantId) .map(Member::getName); } // flatMap:一对多转换(异步) public Mono getMemberWithCards(Long id) { return memberRepository.findById(id) .flatMap(member -> memberCardRepository.findByMemberId(member.getId()) .collectList() .map(cards -> { member.setCards(cards); return member; })); } // filter:过滤元素 public Flux getActiveMembers(Long tenantId) { return memberRepository.findByTenantId(tenantId) .filter(member -> member.getStatus() == 1); } ``` #### 2.2.2 条件操作符 | 操作符 | 功能 | 示例 | |-------|------|------| | **switchIfEmpty** | 序列为空时返回备选 | `.switchIfEmpty(Mono.error(new BusinessException("会员不存在")))` | | **defaultIfEmpty** | 序列为空时返回默认值 | `.defaultIfEmpty(Member.builder().build())` | | **take** | 取前 N 个元素 | `.take(10)` | | **skip** | 跳过前 N 个元素 | `.skip(10)` | **示例**: ```java // switchIfEmpty:序列为空时返回备选 public Mono getMember(Long id) { return memberRepository.findById(id) .switchIfEmpty(Mono.error(new BusinessException("会员不存在"))); } // defaultIfEmpty:序列为空时返回默认值 public Flux listMembers(Long tenantId) { return memberRepository.findByTenantId(tenantId) .defaultIfEmpty(Member.builder().build()); } // take:取前 10 个元素 public Flux listMembers(Long tenantId, int limit) { return memberRepository.findByTenantId(tenantId) .take(limit); } ``` #### 2.2.3 错误处理操作符 | 操作符 | 功能 | 示例 | |-------|------|------| | **onErrorResume** | 捕获错误并返回备选序列 | `.onErrorResume(e -> Mono.empty())` | | **onErrorReturn** | 捕获错误并返回默认值 | `.onErrorReturn(Member.builder().build())` | | **doOnError** | 错误时执行副作用 | `.doOnError(e -> log.error("查询失败", e))` | | **retry** | 重试 | `.retry(3)` | | **retryWhen** | 高级重试 | `.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))` | **示例**: ```java // onErrorResume:捕获错误并返回备选序列 public Mono getMember(Long id) { return memberRepository.findById(id) .onErrorResume(DataAccessException.class, e -> { log.error("数据库查询失败: memberId={}", id, e); return Mono.empty(); }); } // onErrorReturn:捕获错误并返回默认值 public Mono getMember(Long id) { return memberRepository.findById(id) .onErrorReturn(Member.builder().build()); } // doOnError:错误时执行副作用 public Mono getMember(Long id) { return memberRepository.findById(id) .doOnError(e -> log.error("查询会员失败: memberId={}", id, e)); } // retry:重试 3 次 public Mono getMember(Long id) { return memberRepository.findById(id) .retry(3); } // retryWhen:高级重试(指数退避) public Mono getMember(Long id) { return memberRepository.findById(id) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)) .filter(throwable -> throwable instanceof TimeoutException) .doBeforeRetry(signal -> log.warn("重试: attempt={}", signal.totalRetries()))); } ``` #### 2.2.4 生命周期操作符 | 操作符 | 功能 | 示例 | |-------|------|------| | **doOnSubscribe** | 订阅时执行 | `.doOnSubscribe(s -> log.debug("开始查询"))` | | **doOnNext** | 每个元素到达时执行 | `.doOnNext(member -> log.debug("查询到会员: {}", member.getName()))` | | **doOnComplete** | 完成时执行 | `.doOnComplete(() -> log.debug("查询完成"))` | | **doOnError** | 错误时执行 | `.doOnError(e -> log.error("查询失败", e))` | | **doOnTerminate** | 终止时执行(无论成功或失败) | `.doOnTerminate(() -> log.debug("查询结束"))` | **示例**: ```java public Mono getMember(Long id) { return memberRepository.findById(id) .doOnSubscribe(s -> log.debug("开始查询会员: memberId={}", id)) .doOnNext(member -> log.debug("查询到会员: memberId={}, name={}", member.getId(), member.getName())) .doOnComplete(() -> log.debug("查询会员完成: memberId={}", id)) .doOnError(e -> log.error("查询会员失败: memberId={}", id, e)) .doOnTerminate(() -> log.debug("查询会员结束: memberId={}", id)); } ``` --- ## 三、编码规范 ### 3.1 基本原则 #### 3.1.1 永不阻塞 **规则**:禁止在响应式流中使用阻塞操作。 **✅ 正确示例**: ```java public Mono getMember(Long id) { return memberRepository.findById(id) .flatMap(member -> loadMemberCards(member.getId())); } ``` **❌ 错误示例**: ```java public Mono getMember(Long id) { return memberRepository.findById(id) .flatMap(member -> { // 错误:使用 block() 阻塞 List cards = memberCardRepository.findByMemberId(member.getId()) .collectList().block(); member.setCards(cards); return Mono.just(member); }); } ``` #### 3.1.2 链式调用 **规则**:使用操作符链式调用,避免嵌套。 **✅ 正确示例**: ```java public Mono getMemberWithCardsAndBenefits(Long id) { return memberRepository.findById(id) .flatMap(member -> loadMemberCards(member.getId()) .map(cards -> { member.setCards(cards); return member; })) .flatMap(member -> loadMemberBenefits(member.getId()) .map(benefits -> { member.setBenefits(benefits); return member; })); } ``` **❌ 错误示例**: ```java public Mono getMemberWithCardsAndBenefits(Long id) { return memberRepository.findById(id) .flatMap(member -> { return loadMemberCards(member.getId()) .map(cards -> { member.setCards(cards); return member; }) .flatMap(memberWithCards -> { return loadMemberBenefits(memberWithCards.getId()) .map(benefits -> { memberWithCards.setBenefits(benefits); return memberWithCards; }); }); }); } ``` #### 3.1.3 错误处理 **规则**:使用响应式错误处理机制,避免 try-catch。 **✅ 正确示例**: ```java public Mono getMember(Long id) { return memberRepository.findById(id) .switchIfEmpty(Mono.error(new BusinessException("会员不存在"))) .onErrorResume(DataAccessException.class, e -> { log.error("数据库查询失败: memberId={}", id, e); return Mono.error(new SystemException("系统错误")); }); } ``` **❌ 错误示例**: ```java public Mono getMember(Long id) { try { return memberRepository.findById(id) .switchIfEmpty(Mono.error(new BusinessException("会员不存在"))); } catch (Exception e) { // 错误:try-catch 无法捕获响应式异常 log.error("查询失败", e); return Mono.error(new SystemException("系统错误")); } } ``` ### 3.2 Service 层规范 #### 3.2.1 基本结构 ```java @Service @Slf4j @RequiredArgsConstructor public class MemberService { private final MemberRepository memberRepository; private final MemberCardRepository memberCardRepository; private final BenefitService benefitService; /** * 查询会员 */ public Mono getMember(Long id) { return memberRepository.findById(id) .switchIfEmpty(Mono.error(new BusinessException("会员不存在"))) .doOnSubscribe(s -> log.debug("开始查询会员: memberId={}", id)) .doOnNext(member -> log.debug("查询到会员: memberId={}, name={}", member.getId(), member.getName())) .doOnError(e -> log.error("查询会员失败: memberId={}", id, e)) .doOnTerminate(() -> log.debug("查询会员结束: memberId={}", id)); } /** * 查询会员列表 */ public Flux listMembers(Long tenantId, Long storeId) { return memberRepository.findByTenantIdAndStoreId(tenantId, storeId) .filter(member -> member.getStatus() == 1) .sort(Comparator.comparing(Member::getCreatedAt).reversed()); } /** * 创建会员 */ @Transactional public Mono createMember(MemberCreateRequest request) { return validateMemberCreateRequest(request) .flatMap(v -> buildMember(request)) .flatMap(memberRepository::save) .flatMap(member -> createDefaultMemberCard(member)) .doOnSuccess(member -> log.info("创建会员成功: memberId={}", member.getId())) .doOnError(e -> log.error("创建会员失败: {}", e.getMessage())); } private Mono validateMemberCreateRequest(MemberCreateRequest request) { return memberRepository.findByPhoneAndTenantId(request.getPhone(), request.getTenantId()) .flatMap(existing -> Mono.error(new BusinessException("手机号已注册"))) .switchIfEmpty(Mono.empty()); } private Mono buildMember(MemberCreateRequest request) { Member member = Member.builder() .tenantId(request.getTenantId()) .storeId(request.getStoreId()) .memberNo(generateMemberNo(request.getTenantId())) .name(request.getName()) .phone(encryptPhone(request.getPhone())) .phoneMask(maskPhone(request.getPhone())) .gender(request.getGender()) .birthday(request.getBirthday()) .status(1) .build(); return Mono.just(member); } private Mono createDefaultMemberCard(Member member) { MemberCard card = MemberCard.builder() .tenantId(member.getTenantId()) .memberId(member.getId()) .cardNo(generateCardNo(member.getTenantId())) .status(1) .build(); return memberCardRepository.save(card) .thenReturn(member); } } ``` #### 3.2.2 事务管理 ```java @Service @Slf4j public class BookingService { private final BookingRecordRepository bookingRecordRepository; private final BookingSlotRepository bookingSlotRepository; private final BenefitService benefitService; /** * 预约时段 */ @Transactional public Mono bookSlot(BookingRequest request) { return validateBooking(request) .flatMap(v -> checkSlotAvailability(request.getSlotId())) .flatMap(slot -> deductBenefit(request.getMemberId(), slot)) .flatMap(benefit -> createBookingRecord(request, benefit)) .flatMap(booking -> updateSlotBookedCount(request.getSlotId())) .doOnSuccess(booking -> log.info("预约成功: bookingId={}", booking.getId())) .doOnError(e -> log.error("预约失败: {}", e.getMessage())); } private Mono checkSlotAvailability(Long slotId) { return bookingSlotRepository.findById(slotId) .switchIfEmpty(Mono.error(new BusinessException("时段不存在"))) .filter(slot -> slot.getStatus() == 1) .switchIfEmpty(Mono.error(new BusinessException("时段不可预约"))) .filter(slot -> slot.getBookedCount() < slot.getCapacity()) .switchIfEmpty(Mono.error(new BusinessException("时段已满"))); } private Mono deductBenefit(Long memberId, BookingSlot slot) { return benefitService.deductBenefit(memberId, slot.getPriceType(), slot.getPriceValue()) .switchIfEmpty(Mono.error(new BusinessException("权益不足"))); } private Mono createBookingRecord(BookingRequest request, MemberBenefit benefit) { BookingRecord record = BookingRecord.builder() .tenantId(request.getTenantId()) .storeId(request.getStoreId()) .memberId(request.getMemberId()) .slotId(request.getSlotId()) .bookingNo(generateBookingNo(request.getTenantId())) .status(1) .benefitId(benefit.getId()) .build(); return bookingRecordRepository.save(record); } private Mono updateSlotBookedCount(Long slotId) { return bookingSlotRepository.findById(slotId) .flatMap(slot -> { slot.setBookedCount(slot.getBookedCount() + 1); if (slot.getBookedCount() >= slot.getCapacity()) { slot.setStatus(2); // 已满 } return bookingSlotRepository.save(slot); }) .then(); } } ``` ### 3.3 Controller 层规范 #### 3.3.1 基本结构 ```java @RestController @RequestMapping("/members") @RequiredArgsConstructor @Slf4j public class MemberController { private final MemberService memberService; /** * 查询会员 */ @GetMapping("/{id}") public Mono>> getMember(@PathVariable Long id) { return memberService.getMember(id) .map(member -> ResponseEntity.ok(ApiResponse.success(member))) .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())) .onErrorResume(BusinessException.class, e -> Mono.just(ResponseEntity.badRequest() .body(ApiResponse.error(e.getMessage())))) .onErrorResume(Exception.class, e -> { log.error("查询会员失败: memberId={}", id, e); return Mono.just(ResponseEntity.internalServerError() .body(ApiResponse.error("系统错误"))); }); } /** * 查询会员列表 */ @GetMapping public Mono>>> listMembers( @RequestParam(required = false) Long tenantId, @RequestParam(required = false) Long storeId, @RequestParam(defaultValue = "0") int page, @RequestParam(defaultValue = "20") int size) { return memberService.listMembers(tenantId, storeId, page, size) .map(members -> ResponseEntity.ok(ApiResponse.success(members))); } /** * 创建会员 */ @PostMapping public Mono>> createMember( @Valid @RequestBody MemberCreateRequest request) { return memberService.createMember(request) .map(member -> ResponseEntity.ok(ApiResponse.success(member))) .onErrorResume(BusinessException.class, e -> Mono.just(ResponseEntity.badRequest() .body(ApiResponse.error(e.getMessage())))) .onErrorResume(Exception.class, e -> { log.error("创建会员失败", e); return Mono.just(ResponseEntity.internalServerError() .body(ApiResponse.error("系统错误"))); }); } } ``` --- ## 四、反模式 ### 4.1 阻塞操作 **❌ 反模式**:在响应式流中使用 `block()`、`blockFirst()`、`blockLast()` ```java // 错误示例 public Mono getMember(Long id) { return memberRepository.findById(id) .flatMap(member -> { // 错误:使用 block() 阻塞 List cards = memberCardRepository.findByMemberId(member.getId()) .collectList().block(); member.setCards(cards); return Mono.just(member); }); } ``` **✅ 正确做法**:使用 `flatMap` 链式调用 ```java // 正确示例 public Mono getMember(Long id) { return memberRepository.findById(id) .flatMap(member -> memberCardRepository.findByMemberId(member.getId()) .collectList() .map(cards -> { member.setCards(cards); return member; })); } ``` ### 4.2 嵌套订阅 **❌ 反模式**:在 `flatMap` 中使用 `subscribe` ```java // 错误示例 public Mono getMember(Long id) { return memberRepository.findById(id) .flatMap(member -> { memberCardRepository.findByMemberId(member.getId()) .collectList() .subscribe(cards -> { // 错误:在 flatMap 中使用 subscribe member.setCards(cards); }); return Mono.just(member); }); } ``` **✅ 正确做法**:使用 `map` 转换数据 ```java // 正确示例 public Mono getMember(Long id) { return memberRepository.findById(id) .zipWith(memberCardRepository.findByMemberId(id).collectList()) .map(tuple -> { Member member = tuple.getT1(); List cards = tuple.getT2(); member.setCards(cards); return member; }); } ``` ### 4.3 忽略错误 **❌ 反模式**:忽略错误,不处理 ```java // 错误示例 public Mono getMember(Long id) { return memberRepository.findById(id) .onErrorResume(e -> Mono.empty()); // 错误:忽略错误 } ``` **✅ 正确做法**:记录错误并处理 ```java // 正确示例 public Mono getMember(Long id) { return memberRepository.findById(id) .onErrorResume(e -> { log.error("查询会员失败: memberId={}", id, e); return Mono.error(new SystemException("系统错误")); }); } ``` ### 4.4 不处理背压 **❌ 反模式**:不处理背压,可能导致内存溢出 ```java // 错误示例 public Flux listAllMembers() { return memberRepository.findAll(); // 错误:可能返回大量数据 } ``` **✅ 正确做法**:使用 `take` 限制数据量 ```java // 正确示例 public Flux listAllMembers() { return memberRepository.findAll() .take(1000); // 限制最多返回 1000 条 } ``` ### 4.5 资源泄漏 **❌ 反模式**:不释放资源 ```java // 错误示例 public Mono readFile(String path) { return Mono.fromCallable(() -> { // 错误:不释放资源 BufferedReader reader = Files.newBufferedReader(Paths.get(path)); return reader.readLine(); }); } ``` **✅ 正确做法**:使用 `using` 确保资源释放 ```java // 正确示例 public Mono readFile(String path) { return Mono.using( () -> Files.newBufferedReader(Paths.get(path)), reader -> Mono.fromCallable(reader::readLine), reader -> { try { reader.close(); } catch (IOException e) { log.error("关闭文件失败", e); } }); } ``` --- ## 五、最佳实践 ### 5.1 日志记录 **原则**:使用 `doOnSubscribe`、`doOnNext`、`doOnError`、`doOnTerminate` 记录关键操作。 ```java public Mono getMember(Long id) { return memberRepository.findById(id) .doOnSubscribe(s -> log.debug("开始查询会员: memberId={}", id)) .doOnNext(member -> log.debug("查询到会员: memberId={}, name={}", member.getId(), member.getName())) .doOnComplete(() -> log.debug("查询会员完成: memberId={}", id)) .doOnError(e -> log.error("查询会员失败: memberId={}", id, e)) .doOnTerminate(() -> log.debug("查询会员结束: memberId={}", id)); } ``` ### 5.2 超时控制 **原则**:为所有外部调用设置超时时间。 ```java public Mono getMember(Long id) { return memberRepository.findById(id) .timeout(Duration.ofSeconds(3)) // 3 秒超时 .switchIfEmpty(Mono.error(new BusinessException("会员不存在"))); } ``` ### 5.3 重试机制 **原则**:为可重试的操作设置重试机制。 ```java public Mono getMember(Long id) { return memberRepository.findById(id) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)) // 重试 3 次,间隔 1 秒 .filter(throwable -> throwable instanceof TimeoutException) .doBeforeRetry(signal -> log.warn("重试: attempt={}", signal.totalRetries()))); } ``` ### 5.4 缓存策略 **原则**:使用 Cache-Aside 模式,先查缓存,缓存未命中再查数据库。 ```java public Mono getMember(Long id) { String cacheKey = "member:" + id; return redisTemplate.opsForValue() .get(cacheKey) .cast(Member.class) .switchIfEmpty( memberRepository.findById(id) .flatMap(member -> redisTemplate.opsForValue() .set(cacheKey, member, Duration.ofMinutes(30)) .thenReturn(member)) ); } ``` ### 5.5 性能优化 **原则**:使用 `parallel()` 并行处理 CPU 密集型操作。 ```java public Flux processMembers(Flux members) { return members.publishOn(Schedulers.parallel()) .map(this::calculateLevel) .publishOn(Schedulers.boundedElastic()); } ``` --- ## 六、测试规范 ### 6.1 单元测试 **原则**:使用 `StepVerifier` 测试响应式流。 ```java @SpringBootTest class MemberServiceTest { @Autowired private MemberService memberService; @MockBean private MemberRepository memberRepository; @Test void testGetMember() { Member member = Member.builder() .id(1L) .name("张三") .phone("13800138000") .build(); when(memberRepository.findById(1L)) .thenReturn(Mono.just(member)); StepVerifier.create(memberService.getMember(1L)) .expectNextMatches(m -> m.getName().equals("张三")) .verifyComplete(); } @Test void testGetMemberNotFound() { when(memberRepository.findById(1L)) .thenReturn(Mono.empty()); StepVerifier.create(memberService.getMember(1L)) .expectErrorMatches(e -> e instanceof BusinessException) .verify(); } } ``` ### 6.2 集成测试 **原则**:使用 `WebTestClient` 测试 Controller。 ```java @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @AutoConfigureWebTestClient class MemberControllerTest { @Autowired private WebTestClient webTestClient; @Test void testGetMember() { webTestClient.get() .uri("/api/v1/members/1") .exchange() .expectStatus().isOk() .expectBody(Member.class) .value(member -> { assertThat(member.getName()).isEqualTo("张三"); }); } @Test void testGetMemberNotFound() { webTestClient.get() .uri("/api/v1/members/999") .exchange() .expectStatus().isNotFound(); } } ``` ### 6.3 性能测试 **原则**:使用 `StepVerifier.withVirtualTime` 测试性能。 ```java @Test void testGetMemberPerformance() { StepVerifier.withVirtualTime(() -> memberService.getMember(1L)) .expectNextCount(1) .expectComplete() .verify(Duration.ofMillis(100)); // 100ms 内完成 } ``` --- ## 七、总结 ### 7.1 核心原则回顾 1. ✅ **永不阻塞**:禁止在响应式流中使用阻塞操作 2. ✅ **链式调用**:使用操作符链式调用,避免嵌套 3. ✅ **错误处理**:使用响应式错误处理机制,避免 try-catch 4. ✅ **背压处理**:正确处理背压,避免内存溢出 5. ✅ **资源释放**:确保所有资源正确释放,避免资源泄漏 ### 7.2 关键成功因素 1. ✅ 严格遵守响应式编程规范 2. ✅ 使用 StepVerifier 进行测试 3. ✅ 完善的日志记录 4. ✅ 合理的超时和重试机制 5. ✅ 正确的缓存策略 ### 7.3 持续改进 1. ✅ 定期代码审查 2. ✅ 性能监控和优化 3. ✅ 技术分享和培训 4. ✅ 文档更新和维护