消息推送服务主要是处理同步给用户推送短信通知或是异步推送短信通知、微信模板消息通知等。例如,在用户注册时需同步发送短信验证码、在订单发货时需异步推送微信模板消息通知或短信通知。
消费推送接口并发量可能不高,要求同步推送消息的场景不多,但仍需要考虑可能会存在流量突增情况。在促销活动的前后、拉新活动期间等等,都可能需要同步推送大量短信通知,而我们的目标是只要一个POD(一个进程)就能处理整个电商平台的消息推送。
至于可异步的消息推送则通过MQ对接,实现削峰填谷,并通过监听系统的负载情况动态的控制消息消费速度,让系统处在一个稳定的运行状态,这是一个理想的假设。
现在MQ的消费端大多采用主动拉取模式消费消息,只是在拉模式上封装了一层,让我们感觉是服务端主动推送的消息,要实现系统自适应消费最好就是改源码去掉封装。在不修改源码的前提下,最简单的实现方式就是在消费时让线程休眠阻塞住,并在消费超时后返回消费失败,消息消费超时后会重新投递。
由于在消息消费过程中,会有一次落库,在开始消费消息之前创建一条消息推送记录并设置状态为未推送,在推送成功后更新记录状态为成功,失败时更新记录为失败,因此,MQ可以在接收消息后立即响应消费成功,并使用反应式API去消费,应对应用重启情况,可定时扫描数据库未推送的记录重新推送。这些将在项目上线后再完善。
在团队协同定义完消息推送接口后,消息推送服务相当于只做一层代理,与网关非常相似,这也是我们考虑使用WebFlux的原因之一。以此降低消息推送服务的部署成本。
Spring WebFlux与Spring WebMvc同为Web框架,不同的是,WebFlux是完全非阻塞的,能够实现以少量的线程处理并发请求、以更少的硬件资源获取系统更高的吞吐量。
但使用反应式编程可能不适合复杂业务的开发,也不适合采用了DDD领域驱动设计架构的项目,如果要使用,就必须要让响应式API侵入DDD的领域服务类、仓储类。
要使用Spring WebFlux提供完全非阻塞的接口,就必须要确保处理一个请求的整个流程都是非阻塞的,只要有一个步骤导致线程发生阻塞,WebFlux的性能就直线下降,为此你还要给WebFlux配置更多的线程,这与使用WebMvc并无差异,得不到高性能反而还增加项目的复杂性。
例如,处理接口请求阻塞在操作数据库上,那么默认WebFlux配置的几个线程都会被阻塞住,此时,如果想通过增加WebFlux的工作线程数来解决问题,那么不如直接切换回WebMvc。
使用WebFlux获得高性能的同时必然要失去些什么,毕竟是等价替换。所以代码难以调试、项目代码复杂度提升难以阅读、并且会导致一些强依赖ThreadLocal实现特性的框架无法正常工作,我们不得不抛弃这些框架而寻找支持反应式的框架替代。
消息推送服务在处理一次消息推送请求的过程中,可能需要访问Redis、数据库RDS、以及第三方接口。
Redis用于缓存消息模板,但这块可以使用内存缓存替代以获取更快的响应速度,后期如果需要访问Redis,可以使用Lettuce替代Jedis,前提是放弃现有项目在使用的Reids的多DB特性。
请求第三方接口则可以使用WebFlux提供的WebClient实现,用于替代诸如httpclient、okhttp这类http客户端框架,实现可以使用单一长连接的非阻塞发送http请求。
最后可能需要持久化推送记录以便于后续报表的统计或其它,所以需要使用R2DBC替换JDBC实现非阻塞操作数据库。
R2DBC与jdbc的关系类似于WebFlux与WebMvc的关系,R2DBC是实现非阻塞操作数据库的规范,提供反应式编程API,目前已有多种实现该规范的数据库驱动程序包,如r2dbc-mysql,spring data r2dbc则是我们用来替代mybatis的orm框架。
- webflux的异常处理与全局异常处理
webflux兼容webmvc的全局异常处理机制,如果不嫌麻烦,也可以每个接口自行处理异常,例如:
@PostMapping("push/sms")
public Mono<GenericResponse> genericSendSmsMsg(
// webflux也支持参数检验
@Validated @RequestBody Mono command) {
return xxxxService.pushMessage(command)
.flatMap(messagePushResultDto -> Mono.just(GenericResponse.success(messagePushResultDto)))
// 处理异常,不处理则走全局异常处理
.onErrorResume(throwable -> Mono.just(GenericResponse.fail(throwable.getMessage())));
}
- 让自定义的JsonUtils接替webflux解析json的工作
我们将Json解析封装成独立的组件,目的是适配多个json解析框架,让切换json解析框架只需要切换依赖jar包即可。为此,我们依然需要让JsonUtils替代WebFlux的json解析工作。代码实现如下。
- 使用WebClient发送Post请求示例
private Mono sendTemplateMessage(WxmbMessageCommand command, String token) {
return webClient.post().uri("/cgi-bin/message/template/send?access_token=" token)
.accept(MediaType.APPLICATION_JSON).acceptCharset(Charset.defaultCharset())
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(command)
.retrieve()
.bodyToMono(WxmbMessageResponse.class);
}
- 使用Spring Data R2DBC实现增删改查
项目需要添加mysql的r2dbc驱动包,以及spring-data-r2dbc,同时spring-data-r2dbc依赖的r2dbc包也会被导入。
<dependency>
<groupId>dev.mikugroupId>
<artifactId>r2dbc-mysqlartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-data-r2dbcartifactId>
dependency>
spring-data-r2dbc与spring-data-jpa的使用非常相似,两者都是实现spirng-data-commons下的repository的API,spring-data-r2dbc实现的是反应式API。简单的CRUD可通过继承R2dbcRepository<T, ID>接口实现,例如:
public interface MessageDao extends R2dbcRepository<MessagePushPO, Long> {
}
使用@Query自定义查询实现如下:
public interface MessageDao extends R2dbcRepository<MessagePushPO, Long> {
@Query("select * from message where id = ?")
Mono selectByMsgId(Long msgId);
}
@Query注解不等于Mybatis的@Select注解,@Query可以编写增删改查SQL,如果需要执行写操作,需要配合@Modifying注解使用。例如
public interface MessageDao extends R2dbcRepository<MessagePushPO, Long> {
@Modifying
@Query("delete from message where id = :msgId")
Mono deleteByMsgId(Long msgId);
}
- 使用spring-data-r2dbc实现复杂查询
我们也可以直接使用spring-data-r2dbc的API实现复杂查询,例如:
DatabaseClient类似Mybatis中的SqlSession概念。
关于spring-data-r2dbc的使用,推荐阅读spring官方文档,虽然是英文,但阅读起来并不难理解,想要学习冷门技术,就必须要啃英文文档,因为你会发现,这方面的博客文章少之又少,还避免不了一些博客文章使用的spring-data-r2dbc版本与自己使用的版本不同存在API差异导致“copy”的代码画红线问题。
spring-data-r2dbc 1.1.0版本官方文档链接:https://docs.spring.io/spring-data/r2dbc/docs/1.1.0.RELEASE/reference/html/#reference,也可到spring.io官网搜索。
内容出处:,
声明:本网站所收集的部分公开资料来源于互联网,转载的目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。如果您发现网站上有侵犯您的知识产权的作品,请与我们取得联系,我们会及时修改或删除。文章链接:http://www.yixao.com/share/13677.html
评论列表(1条)
请教一下,criteria这里是怎么创建的,可以看看代码吗