加载中...
Nestjs+RabbitMQ+支付宝沙盒 实现电商支付功能
发表于:2025-03-14 | 分类: 后端

1. 前言

在电商系统中,支付功能是整个交易流程中最为关键的环节之一。一个完善的支付系统不仅要保证交易的安全性和可靠性,还需要具备良好的用户体验和高效的系统性能。本文将详细介绍如何在商城系统中集成支付宝支付功能,并通过RabbitMQ消息队列来优化支付流程。

在实现过程中,我们将重点关注以下几个方面:首先,通过RabbitMQ实现支付流程的异步处理,提高系统的并发处理能力;其次,利用死信队列来优雅处理支付超时等异常情况;最后,通过同步和异步通知机制,确保支付状态的及时更新和订单流程的顺利进行。

通过本文的学习,你将深入理解RabbitMQ在实际业务场景中的应用,以及如何构建一个可靠的支付系统。

2. 安装RabbitMQ

2.1. 使用docker安装(如果你看了前面使用docker-compose管理所有容器,这一步就可以省略)

2.1.1. 拉取镜像

docker pull rabbitmq::4.0

2.1.2. 运行容器

docker run -it --d --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672  \
    -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
    rabbitmq:4.0

2.1.3. 等待服务器启动,然后启用流和流管理插件:

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management

2.1.4. 以root权限进入docker

docker exec -u root -it rabbitmq /bin/bash

2.1.5. 添加一个用户

rabbitmqctl add_user [username] [password]

2.1.6. 创建一个Virtualhost

rabbitmqctl add_vhost /ibuy

2.1.7. 给用户访问权限

rabbitmqctl set_permissions -p /ibuy [username] ".*" ".*" ".*"

2.1.7.1. 检查权限

如果你已经有用户,但提示没有管理权限,你可以使用以下命令查看用户权限:

rabbitmqctl list_users

**该命令会列出所有用户及其角色。确保你登录的用户拥有 **administrator 标签。如果没有,则可以使用以下命令为用户分配管理权限

rabbitmqctl set_user_tags [username] administrator

2.1.7.2. 访问rabbitmq界面

接着,就可以通过http://localhost:15672/#/queues管理你的rabbitmq

3. RabbitMQ介绍

3.1. 什么是MQ

MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。

3.1.1. 为什么使用MQ

在项目中,可将一些无需即时返回且耗时的操作提取出来,进行​**异步处理**​**,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而****提高**了**系统**的​**吞吐量**​**。 **

3.1.2. MQ的优势:

1、任务****异步处理
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
2、应用程序解耦合
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
3、削峰填谷
如订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒1000左右的并发写入,并发量再高就容易宕机。低峰期的时候并发也就100多个,但是在高峰期时候,并发量会突然激增到5000以上,这个时候数据库肯定卡死了。

消息被MQ保存起来了,然后系统就可以按照自己的消费能力来消费,比如每秒1000个数据,这样慢慢写入数据库,这样就不会卡死数据库了。

但是使用了MQ之后,限制消费消息的速度为1000,但是这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了。但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000QPS,直到消费完积压的消息,这就叫做“填谷”

3.1.3. MQ的劣势

** **1、 可用性降低

系统引入的外部依赖越多没系统稳定性越差。一旦MQ宕机,就会对业务造成影响。如何保证MQ的高可用,就是这个系统设计的关键

2、系统复杂度提高

** MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?怎么保证消息传递的顺序性?**

3、 一致性问题

A系统处理完业务,通过MQ给 B、C、D三个系统发送消息数据,如果B系统、C系统处理成功,D系统处理失败。如何保证消息数据处理的一致性?

3.2. AMQP 和 JMS

MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS。

3.2.1. AMQP

AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。

3.2.2. JMS

JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

3.2.3. AMQP 与 JMS 区别

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
  • JMS规定了两种消息模式;而AMQP的消息模式更加丰富

3.3. 消息队列产品

市场上常见的消息队列有如下:

  • ActiveMQ:基于JMS
  • ZeroMQ:基于C语言开发
  • RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
  • RocketMQ:基于JMS,阿里巴巴产品
  • Kafka:类似MQ的产品;分布式消息系统,高吞吐量

3.4. RabbitMQ

RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,它在分布式系统中用于解耦生产者(发送消息的程序)和消费者(接收消息的程序),提供可靠的消息传输和灵活的路由机制。

RabbitMQ官方地址:http://www.rabbitmq.com/

3.4.1. rabbitmq几个核心组件

3.4.1.1. 生产者(Producer)

负责发送消息到 RabbitMQ。

3.4.1.2. 消费者(Consumer)

从 RabbitMQ 队列中接收消息并进行处理。

3.4.1.3. 交换器(Exchange)

RabbitMQ 消息传递模型的核心思想是生产者从不直接向队列发送任何消息。生产者只将消息发送到 Exchange 交换器中,并不知道消息是否会被传送到队列。交换器负责接收生产者生产的消息,并通过一定路由规则将消息发送到指定的队列,起到一个传递的作用

它有多种类型的交换机

  • Direct​**:按照路由键精确匹配将消息发送到指定队列。**
  • Fanout​**:广播消息给所有绑定到该交换机的队列。**
  • Topic​**:通过模式匹配路由键来转发消息。**
  • Headers​**:通过消息头的值来路由消息。**

3.4.1.4. 队列(Queue)

消息的存储位置,消费者从队列中消费消息

3.4.1.5. 绑定(Binding)

用于将队列与交换机连接,并根据路由键来定义消息的转发规则。

3.4.2. RabbitMQ消息模式

3.4.2.1. 简单队列模式(Simple Queue Model)

这是最基本的模式,生产者发送消息到队列,消费者从队列中接收并处理消息。消息在队列中是 FIFO(先进先出)顺序处理的。
特点:只涉及一个生产者、一个消费者和一个队列,适合简单的任务处理场景。

3.4.2.2. 工作队列模式(Work Queue Model)

在此模式下,一个生产者发送消息到一个队列,多个消费者可以同时从同一个队列中消费消息。这种模式常用于分布式任务处理,旨在平衡消费者之间的工作负载。
特点:消费者之间竞争接收消息,负载可以自动分配,适合高并发环境。

3.4.2.3. 发布/订阅模式(Publish/Subscribe Model)

生产者将消息发送到交换机(Exchange),交换机会将消息广播到多个绑定的队列,消费者订阅这些队列以接收消息。常用的交换机类型是 fanout,可以将消息广播到所有绑定的队列。
特点:消息可以同时传递给多个消费者,适合广播类通知,如日志、事件推送。

3.4.2.4. 路由模式(Routing Model)

此模式依赖于 direct 类型的交换机,生产者在发送消息时会指定一个路由键(Routing Key),交换机会根据该路由键将消息发送到对应的队列。不同的队列可以绑定到同一个交换机并指定不同的路由键。
特点:消息路由更灵活,适合有条件过滤的消息投递,如根据消息类型或优先级分发消息。

3.4.2.5. 主题模式(Topic Model)

这是基于 topic 交换机的模式。生产者发送带有主题(Topic)的消息,交换机会根据消息主题匹配绑定的队列,支持模糊匹配。路由键支持通配符(如 * 和 #)用于匹配不同类型的队列。
特点:支持复杂的路由规则,适合按主题分类的消息投递,如新闻系统、日志系统等。

3.4.2.6. 头交换模式(Headers Exchange Model)

这种模式下,交换机会根据消息的头部属性来决定将消息发送到哪个队列,而不是依赖于路由键。消费者通过消息头部属性的值来接收特定消息。
特点:基于消息头进行路由的方式更加灵活,适合需要多条件匹配的场景。

3.4.2.7. RPC 模式(Remote Procedure Call Model)

在此模式下,RabbitMQ 被用作远程过程调用的中介。客户端发送一个请求到队列,服务器从队列中取出请求并处理后,返回结果到另一个队列,客户端从该队列中获取结果。
特点:适用于分布式系统中的远程服务调用,适合高并发下的请求-响应场景。
每种模式都有其特定的应用场景,RabbitMQ 通过灵活的交换机和队列绑定,支持多种消息路由和处理机制,满足不同业务需求。这些模式不仅提高了系统的可扩展性和解耦性,也为高并发场景下的消息处理提供了保障。

3.4.3. 有关死信队列和延迟队列

3.4.3.1. 死信队列(Dead Letter Queue)

**是一种用于存储和管理无法正常处理的消息的特殊队列。**这些无法处理的消息可能是由于各种原因,例如消息过期、队列已满、消息无法路由等。

通常是在正常队列上添加x-dead-letter-exchangex-dead-letter-routing-key 参数,将死信消息路由到指定的死信交换机和队列。

3.4.3.2. 延迟队列(Delay Queue)

延迟队列用于在指定的时间之后再处理消息。在发送消息时,生产者可以为消息设置一个延迟时间(TTL,Time To Live),消息在队列中到达过期时间后才会变为可消费状态。

有两种实现方式:

  • TTL + 死信队列​**:通过设置消息的TTL(过期时间)和绑定到死信队列实现的。当消息在原队列中过期后,自动进入死信队列,死信队列再路由到目标队列进行处理。**
  • 专门的插件​**:RabbitMQ 提供了 **rabbitmq-delayed-message-exchange 插件,可以直接支持延迟队列功能。

3.4.3.3. 区别

  • 目的不同​**:死信队列的主要目的是处理无法正常消费的消息,而延迟队列的目的是让消息在特定的延迟时间之后才被处理。**
  • 实现方式不同​**:死信队列通过设置 x-dead-letter-exchangex-dead-letter-routing-key 实现,而延迟队列通常通过TTL(消息或队列的过期时间)和死信机制或者延迟插件**来实现。
  • 触发条件不同​**:**
    • 死信队列​**:当消息被拒绝、超时或队列满时触发。**
    • 延迟队列​**:消息在特定的延迟时间后自动变为可消费状态。**

4. 创建延迟队列 和 支付队列

4.1.1.1. 基于TTL + 死信队列实现延迟队列

在完成上述配置后,就可以通过http://localhost:15672/#/queues管理你的rabbitmq。

我们的延迟队列用于处理用户提交订单后,超时未支付的情况。

我们先通过一张图来了解一下rabbitqm的延迟队列的工作流程, 具体逻辑请看下图

4.1.1.1.1. 创建queue.order.check消费队列

下图中已有的队列是所有队列创建完后的样子

4.1.1.1.2. 创建死信交换机exchange.order.delay

切换到Exchanges的tab创建一个死信交换机exchange.order.delay,type选择direct

然后点进exchange.order.delay交换机,并绑定消费队列queue.order.check,并设置routing key 为queue.order.check

4.1.1.1.3. 创建死信队列queue.order.delay


这里,我们将死信队列的消息通过配置参数发送到了queue.order.check消费队列中。

<strong>x-message-ttl=10000</strong>​**,这个参数最好是在代码中配置。**

最后就是在死信交换机exchange.order.delay中绑定queue.order.delay

4.1.1.1.4. 验证

**然后,我们可以回到exchange中,点进我们的 **exchange.order.delay交换机,然后向 queu.order.delay这个死信队列发送一条消息

在发送完成后,赶快点到queue.order.check这个消费队列去查看,你会发现是查不到信息的。等过了10秒后就会接收到死信队列传过来的数据。到这里,我们基于rabbitmq的延迟队列就实现了。

4.1.1.2. 创建支付队列

这里的队列使用主要有以下两点原因

  1. 异步处理支付结果、解耦支付处理流程,以及通过消息队列保证消息的可靠传递,防止消息丢失。
  2. 削峰填谷​**:在订单系统中,下单的时候直接往数据库中写数据时,只能支撑每秒1000左右的并发写入,并发量再高就容易宕机。如果在高峰期时候,并发量突然激增到1000以上,或者更多,这个时候数据库就会可能卡死了,所以通过增加mq这个中间件来对写入数据进行缓存。**
4.1.1.2.1. 创建一个支付队列queue.order.pay

不用加任何arguments

4.1.1.2.2. 创建支付队列交换机exchange.order.pay

点进去再绑定到支付队列queue.order.pay

5. **使用 **cpolar 实现内网穿透

因为我们在开发支付功能时,需要接入支付宝。在对应的沙箱应用接入时,需要提供用于接收支付宝沙箱异步通知消息,需要传入http(s)公网可访问的网页地址。若不设置,则无法接收相应的异步通知消息。所以,我们在开发阶段需要将本机地址映射到公网

5.1. 安装homebrew(选填)

如果已经安装,则跳过

/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"

5.2. 安装cpolar

brew tap probezy/core && brew install cpolar

5.3. token验证

需要在cpolar注册账号

登录cpolar官网,点击左侧的验证,查看自己的认证token,之后将token贴在命令行里

5.4. 4.安装服务

sudo cpolar service install

5.4.1. 修改默认端口号

因为我们本机上还有elasticsearch的服务已经占用了9200端口,所以需要修改一下cpolar的默认端口号

  1. 找到cpolar配置文件
  • **windows系统: **c:\Users\用户名.cpolar\cpolar.yml,右键点击“打开方式”——“记事本”,选择使用记事本打开
  • linux系统:执行命令nano /usr/local/etc/cpolar/cpolar.yml
  • mac:如果没有配置指定的配置文件,则在~/.cpolar/cpolar.yml
  1. 在配置文件中增加一行参数:client_dashboard_addr: 127.0.0.1:9300。这一行与authtoken是同一级别
  2. 保存cpolar配置文件
  3. 如果你已经使用sudo cpolar service start启动了服务,则再运行一下sudo cpolar service restart重启一下。然后就可以在9300上访问了

5.4.2. 启动服务

sudo cpolar service start

5.4.3. 访问

在浏览器上访问本地9300端口 127.0.0.1:9300,使用cpolar邮箱账号登录cpolar web UI管理界面,即可开始使用cpolar。

5.4.4. 创建隧道

点击左侧仪表盘的隧道管理——创建隧道,我们来创建一条隧道,将在本地15672端口下的rabbitmq服务映射到公网:

提示隧道创建成功后,页面自动跳转至隧道列表,可以看到刚刚创建成功的“本机rabbitmq”,状态active,表示为正常在线,注意无需再次点击启动。

点击左侧仪表盘的状态——在线隧道列表,可以看到本机rabbitmq隧道已经有生成了相应的公网地址,一个http协议,一个https协议(免去配置ssl证书的繁琐步骤),均可以访问到本地web服务,复制公网地址。

5.4.5. 测试访问公网地址

5.4.6. 配置固定二级域名

**使用免费的cpolar生成的公网地址为随机临时地址,**24小时内会发生变化,对于需要长期访问的用户不是很方便。为此,我们可以为其配置一个容易记忆的固定二级子域名,同时提高带宽,实现更为流畅的访问。

如果需要升级套餐,可以在官网升级套餐

5.4.7. 保留一个二级子域名

登录cpolar官网后台,点击左侧的预留,找到保留二级子域名:

  • 地区:选择China VIP
  • 二级域名:可自定义填写
  • 描述:即备注,可自定义填写

点击保留

提示子域名保留成功,复制所保留的二级子域名

5.4.8. 配置二级子域名

访问http://127.0.0.1:9200/登录cpolar web UI管理界面,点击左侧仪表盘的隧道管理——隧道列表,找到所要配置的隧道,点击右侧的编辑

修改隧道信息,将保留成功的二级子域名配置到隧道中

  • 域名类型:选择二级子域名
  • Sub Domain:填写保留成功的二级子域名,本例为test01

点击更新

提示更新隧道成功,点击左侧仪表盘的状态——在线隧道列表,可以看到公网地址已经更新为保留成功的二级子域名,将其复制下来。

5.4.9. 测试访问公网固定二级子域名

在浏览器上访问固定二级子域名,测试访问成功,现在该公网地址不会随机变化了。

如果您想要使用自己的域名来访问本地web服务,cpolar也支持该项功能,详细可以参考下一篇文章教程:

  • 为本地web服务配置自己的域名

6. 使用Nestjs接入支付宝支付

6.1. 登录支付宝开放平台,创建应用

因为我使用的是支付宝的沙箱支付,所以先跳过这一步

6.2. 沙箱接入

登入控制台,找到开发工具推荐-> 沙箱

开发信息选择自定义秘钥**,开启公钥模式。然后通过支付宝开放平台助手,生成应用公私钥。再通过应用公钥生成支付宝公钥**

https://opendocs.alipay.com/open/02np9g?pathHash=b87f0c98

6.3. 梳理实现流程

下图是支付宝的支付调用流程

但是,在项目中,我们还有别的业务,不能直接在某个业务逻辑中接入支付宝的支付功能,我们新建一个支付服务,并且通过mq来管信息。

使用mq的好处就是,增加一个消息中间件,解除多个服务之间的耦合度。因为,在这个电商项目中,每卖掉一个商品,就需要减库存,****​而且一旦用户支付后,需要修改订单状态,并且通知物流进行发货,有的还需要短信通知等等。​如果不添加消息中间件,​那么就需要在支付服务或者订单服务中统一进行这一系列的操作。那么这些服务之间的耦合度会很高,当业务进行横向扩展的时候,新接入一个服务,都需要对支付/订单服务进行修改,这就增加了出错的可能​**,且负责这两个服务的工作人员也会很累。那么通过添加消息中间件,当用户支付完成后,直接将第三方的支付信息添加到mq队列中,哪一个服务需要用到这个信息,就去mq中监听即可。这样,程序更健壮,更易于维护**

所以,改进后的流程如下​**(PS: 暂时未用到微服务,还是先按照模块拆分的方式来实该时序图)**

6.4. 实现步骤

6.4.1. 支付模块配置

  • 配置支付宝SDK,包括应用ID、商户私钥、支付宝公钥等
  • 设置RabbitMQ消息队列,定义交换机和队列
  • 配置支付回调接口,处理支付结果通知

6.4.1.1. 支付宝SDK环境变量配置

# 支付相关
# 商户id
ALIPAY_APP_ID: xxx
# 商户秘钥
ALIPAY_MERCHANT_PRIVATE_KEY: xxx
# 支付宝公钥
ALIPAY_PUBLIC_KEY: xxx

#支付状态异步通知
ALIPAY_NOTIFY_URL: xxx/alipay/alipayNotifyNotice
#支付状态同步通知
ALIPAY_RETURN_URL: xxx/alipay/alipayReturnNotice
#支付宝开发环境网关 用的沙箱的网关
ALIPAY_GATEWAY_URL: https://openapi-sandbox.dl.alipaydev.com/gateway.do

#签名方式
ALIPAY_SIGN_TYPE: RSA2
#ALIPAY_CHARSET: utf-8
#ALIPAY_FORMAT: json

6.4.1.2. RabbitMQ环境变量配置

# 开发环境配置
# rabbitmq
RMQ_HOST=localhost
RMQ_PORT=5672
RMQ_USERNAME=xxx
RMQ_PASSWORD=xxx
RMQ_VIRTUAL_HOST=/ibuy

6.4.2. 订单创建流程

  • 用户从购物车下单,系统生成订单号
  • 创建订单主表记录和订单明细记录
  • 扣减商品库存
  • 清空用户购物车缓存
  • 发送延时消息到RabbitMQ,用于后续订单状态检查

6.4.2.1. 订单实体定义

// src/mall-service/mall-service-order/entities/order.entity.ts
@Entity('tb_order')
export class OrderEntity {
  @PrimaryColumn()
  id: string;

  @Column()
  username: string;

  @Column()
  totalNum: number;

  @Column()
  totalMoney: number;

  @Column()
  payMoney: number;

  @Column()
  createTime: Date;

  @Column()
  updateTime: Date;

  @Column()
  payTime: Date;

  @Column()
  consignTime: Date;

  @Column()
  endTime: Date;

  @Column()
  closeTime: Date;

  // 订单状态:0-未完成,1-已完成,2-已退货
  @Column()
  orderStatus: string;

  // 支付状态:0-未支付,1-已支付,2-支付失败
  @Column()
  payStatus: string;

  // 支付类型:1-在线支付,2-货到付款
  @Column()
  payType: string;

  // 订单来源:1-PC端,2-移动端
  @Column()
  sourceType: string;

  @Column()
  transactionId: string;
}

6.4.2.2. 订单创建服务

// src/mall-service/mall-service-order/services/order.service.ts
@Injectable()
export class OrderService {
  constructor(
    @InjectRepository(OrderEntity)
    private readonly orderRepository: Repository<OrderEntity>,
    private readonly cartService: CartService,
    private readonly goodsService: GoodsService,
    private readonly amqpConnection: AmqpConnection,
    private readonly dataSource: DataSource
  ) {}

  async createOrder(orderDto: CreateOrderDto, user: any): Promise<Result<any>> {
    const queryRunner = this.dataSource.createQueryRunner();
    await queryRunner.connect();
    await queryRunner.startTransaction();

    try {
      // 1. 获取购物车数据
      const cartItems = await this.cartService.list(user.username);
      if (!cartItems || cartItems.length === 0) {
        throw new BadRequestException('购物车为空');
      }

      // 2. 计算订单金额
      let totalMoney = 0;
      let totalNum = 0;
      for (const item of cartItems) {
        totalMoney += item.price * item.num;
        totalNum += item.num;
      }

      // 3. 创建订单
      const order = new OrderEntity();
      order.id = `ORDER_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
      order.username = user.username;
      order.totalMoney = totalMoney;
      order.totalNum = totalNum;
      order.createTime = new Date();
      order.orderStatus = '0';
      order.payStatus = '0';
      order.payType = '1';
      order.sourceType = orderDto.sourceType;

      await queryRunner.manager.save(OrderEntity, order);

      // 4. 创建订单明细
      for (const item of cartItems) {
        const orderItem = new OrderItemEntity();
        orderItem.orderId = order.id;
        orderItem.goodsId = item.goodsId;
        orderItem.num = item.num;
        orderItem.price = item.price;
        await queryRunner.manager.save(OrderItemEntity, orderItem);

        // 5. 扣减库存
        await this.goodsService.decreaseStock(item.goodsId, item.num);
      }

      // 6. 清空购物车
      await this.cartService.clear(user.username);

      // 7. 发送延时消息
      await this.sendDelayMessage(order.id);

      await queryRunner.commitTransaction();
      return new Result(order);
    } catch (error) {
      await queryRunner.rollbackTransaction();
      throw error;
    } finally {
      await queryRunner.release();
    }
  }

  private async sendDelayMessage(orderId: string) {
    await this.amqpConnection.publish(
      RabbitMQConstants.EXCHANGE_ORDER_DELAY,
      RabbitMQConstants.QUEUE_ORDER_DELAY,
      { orderId }
    );
  }
}

6.4.3. 支付流程

  • 前端调用支付接口,获取支付宝支付链接
  • 用户跳转到支付宝完成支付
  • 支付宝异步通知商城支付结果
  • 系统接收通知,验证签名,发送消息到支付队列
  • 支付监听器消费消息,更新订单状态

6.4.3.1. 支付服务实现

// src/mall-service/alipay/services/alipay.service.ts
@Injectable()
export class AlipayService {
  private readonly alipaySdk: AlipaySdk;

  constructor(
    private readonly configService: ConfigService,
    private readonly orderService: OrderService,
    private readonly amqpConnection: AmqpConnection
  ) {
    this.alipaySdk = new AlipaySdk({
      appId: this.configService.get('ALIPAY_APP_ID'),
      privateKey: this.configService.get('ALIPAY_MERCHANT_PRIVATE_KEY'),
      alipayPublicKey: this.configService.get('ALIPAY_PUBLIC_KEY'),
      gateway: this.configService.get('ALIPAY_GATEWAY_URL'),
      signType: 'RSA2'
    });
  }

  async createPayment(orderId: string): Promise<Result<string>> {
    const order = await this.orderService.findById(orderId);
    if (!order) {
      throw new NotFoundException('订单不存在');
    }

    if (order.payStatus === '1') {
      throw new BadRequestException('订单已支付');
    }

    const paymentUrl = await this.alipaySdk.pageExecute('alipay.trade.page.pay', {
      notifyUrl: this.configService.get('ALIPAY_NOTIFY_URL'),
      returnUrl: this.configService.get('ALIPAY_RETURN_URL'),
      bizContent: {
        outTradeNo: order.id,
        totalAmount: order.totalMoney.toFixed(2),
        subject: `订单${order.id}`,
        productCode: 'FAST_INSTANT_TRADE_PAY'
      }
    });

    return new Result(paymentUrl);
  }

  async handlePaymentNotify(params: any): Promise<string> {
    const signVerified = this.alipaySdk.checkNotifySign(params);
    if (!signVerified) {
      return 'failure';
    }

    if (params.trade_status === 'TRADE_SUCCESS') {
      await this.amqpConnection.publish(
        RabbitMQConstants.EXCHANGE_ORDER_PAY,
        RabbitMQConstants.QUEUE_ORDER_PAY,
        {
          orderId: params.out_trade_no,
          tradeNo: params.trade_no,
          payTime: params.gmt_payment
        }
      );
    }

    return 'success';
  }
}

6.4.4. 订单状态管理

  • 使用延时队列检查订单支付状态
  • 对于超时未支付的订单,自动关闭并回滚库存
  • 支付成功的订单,更新状态为待发货

6.4.4.1. 订单支付消息监听器

// src/mall-service/mall-service-order/listeners/order-pay.listener.ts
@Injectable()
export class OrderPayListener {
  constructor(
    private readonly orderService: OrderService,
    private readonly goodsService: GoodsService
  ) {}

  @RabbitSubscribe({
    exchange: RabbitMQConstants.EXCHANGE_ORDER_PAY,
    routingKey: RabbitMQConstants.QUEUE_ORDER_PAY,
    queue: RabbitMQConstants.QUEUE_ORDER_PAY
  })
  async handlePaymentSuccess(msg: any) {
    const { orderId, tradeNo, payTime } = msg;

    await this.orderService.update(orderId, {
      payStatus: '1',
      payTime: new Date(payTime),
      transactionId: tradeNo,
      orderStatus: '1'
    });
  }
}

6.4.4.2. 订单超时检查监听器

// src/mall-service/mall-service-order/listeners/order-check.listener.ts
@Injectable()
export class OrderCheckListener {
  constructor(
    private readonly orderService: OrderService,
    private readonly goodsService: GoodsService,
    private readonly alipayService: AlipayService
  ) {}

  @RabbitSubscribe({
    exchange: RabbitMQConstants.EXCHANGE_ORDER_DLX,
    routingKey: RabbitMQConstants.QUEUE_ORDER_CHECK,
    queue: RabbitMQConstants.QUEUE_ORDER_CHECK
  })
  async handleOrderCheck(msg: any) {
    const { orderId } = msg;
    const order = await this.orderService.findById(orderId);

    if (!order || order.payStatus === '1') {
      return;
    }

    // 关闭支付宝交易
    await this.alipayService.closePayment(orderId);

    // 更新订单状态
    await this.orderService.update(orderId, {
      orderStatus: '2',
      payStatus: '2',
      closeTime: new Date()
    });

    // 回滚库存
    const orderItems = await this.orderService.findOrderItems(orderId);
    for (const item of orderItems) {
      await this.goodsService.increaseStock(item.goodsId, item.num);
    }
  }
}

6.5. 流程描述

6.5.1. 下单支付流程

  1. 用户在购物车中选择商品并下单
  2. 系统创建订单记录,生成唯一订单号
  3. 系统扣减相应商品库存,清空用户购物车
  4. 系统向RabbitMQ发送延时消息,设置35分钟后检查订单状态
  5. 用户点击支付按钮,系统调用支付宝SDK生成支付链接
  6. 用户跳转到支付宝页面,完成支付操作
  7. 支付宝通过异步通知接口通知商城支付结果
  8. 系统验证支付宝通知的签名,确认支付成功
  9. 系统将支付结果消息发送到支付队列
  10. 支付监听器消费消息,更新订单状态为已支付、待发货

6.5.2. 订单超时处理流程

  1. 用户下单后,系统发送延时消息到RabbitMQ
  2. 35分钟后,延时消息过期,转发到订单检查队列
  3. 订单检查监听器消费消息,检查订单支付状态
  4. 如果订单未支付,系统自动关闭订单
  5. 系统回滚商品库存,恢复库存数量
  6. 系统调用支付宝接口关闭交易

6.5.3. 技术实现细节

  1. 延时队列实现​**:**
  • 使用RabbitMQ的消息过期机制实现延时队列
  • 消息发送到延时交换机,设置过期时间
  • 消息过期后,转发到实际消费队列
  1. 事务管理​**:**
  • 使用TypeORM的事务功能确保订单创建的原子性
  • 订单创建、订单明细添加、库存扣减在同一事务中完成
  • 任何步骤失败,整个事务回滚
  1. 支付宝集成​**:**
  • 使用支付宝SDK生成支付链接
  • 配置异步通知接口,处理支付结果
  • 验证支付宝通知签名,确保通知真实性
  1. 消息队列应用​**:**
  • 使用RabbitMQ实现系统间解耦
  • 定义不同的交换机和队列处理不同类型的消息
  • 支持普通订单和秒杀订单的不同处理流程

通过以上技术和流程,我们实现了一个商城系统的支付功能。

上一篇:
使用Nestjs实现基于JWT + RBAC的认证授权系统
下一篇:
使用Nestjs+ElasticSearch实现电商搜索功能
本文目录
本文目录