第13课:分布式事务

第13课:分布式事务

近几年,随着软件业务越来越复杂,传统单体架构模式已无法满足各大公司自身业务演变的要求,于是各大企业(尤其是互联网公司)纷纷开始采用分布式、微服务等架构模式重构或开发自身软件系统。

在分布式、微服务架构模式下,服务被分布在不同的机器上,服务间采用 RESTful 风格 API 或 RPC 接口进行通信。

相比传统架构模式,这种架构模式的确非常灵活,尤其可根据服务的请求流量、性能等关键指标对该服务自身实例进行扩、缩容,从而充分利用企业资源。

采用分布式、微服务架构的系统往往会将服务部署在不同服务器节点或不同容器中,因网络、服务进程自身的不稳定性很容易造成数据的不一致,而分布式事务主要用来解决该问题。

为了帮助大家更好地理解分布式事务,我们看下面这个场景案例。

轩轩父亲通过支付宝向轩轩的中国银行账户转账 2000 元,成功后,支付宝提示转账成功。假如因为网络原因,中国银行系统无法收到这 2000 元,又或者中国银行系统收到了这 2000 元的请求,但由于银行系统自身问题,导致转账处理失败。这两种情况,都会导致轩轩父亲账户金额减少 2000 元,而轩轩银行账户金额不增加,即造成了数据的不一致。为了让分布式系统的数据在有限的时间内达到一致状态,在开发过程中引入分布式事务解决方案即可。

除了上面的应用场景,实际开发过程中还有很多需要引入分布式事务的场景,比如下订单成功,而商品库存扣减失败等。

分布式事务常用方案

2PC(Two Phase Commit)

2PC 是一种基于 XA 协议实现的、用于解决分布式事务的常用方案,该方案包含两种角色,分别为事务管理器(Transaction Manager,以下简称 TM)和资源管理器(Resource Manager,以下简称 RM)。我们可以将资源管理器理解为常用的关系型数据库 Oracle、MySQL 等。为了支持 2PC,Oracle、MySQL 也都实现了 XA 接口。TM 作为解决分布式事务的核心组件,主要负责协调资源管理器的预处理、提交及回滚。基于 XA 协议实现的 2PC,其原理图如下所示:

enter image description here

从上图可以看出,事务管理器首先向资源管理器发送“准备”命令,事务管理器收到来自所有资源管理器的“就绪”响应后,事务管理器最终向所有资源管理器下发“提交”指令,所有资源管理器成功执行提交指令,整个事务提交结束。

enter image description here

从上图可以看出,事务管理器首先向资源管理器发送“准备”命令,事务管理器收到来自部分资源管理器的“未就绪”响应后,事务管理器最终向所有资源管理器下发“回滚”指令,所有资源管理器成功执行回滚指令,整个事务回滚结束。

TCC(Try-Confirm-Cancel)

TCC 可以说是 2PC 的变种,它专注于系统业务层的实现,其实现思路如下:

  1. 业务检测、资源预留(Try 阶段);
  2. 真正执行业务(Confirm 阶段);
  3. 释放 Try 阶段预留资源(Cancel 阶段)。

以上实现思路可以用下图表示:

enter image description here

想进一步了解如何利用 TCC 实现分布式事务,可以参照开源 ByteTCC

MQ 实现最终一致性

以上两种解决方案的实现比较简单,能够解决分布式系统中数据的强一致性问题,但在实际应用中性能也是需要我们考虑的,尤其在互联网场景中,高并发更难得以保证。下面我为大家讲解如何利用 MQ 实现最终一致性,及如何利用解耦的思路解决互联网场景下高并发的硬性指标。

下图是利用 MQ 解决最终一致性问题的实现思路:

enter image description here

基于 RabbitMQ 实现最终一致性

目前常用的 MQ 有 RabbitMQ、Redis、ZeroMQ、ActiveMQ、Kafka,RocketMQ 等。相比之下,RabbitMQ 具有如下特点:

  1. 基于 AMQP 协议开发;
  2. 安装部署、入门更为容易;
  3. 搭建高可用集群更为简单;
  4. 有强大的 UI 管理界面。

本课中,我们选择使用 RabbitMQ 实现最终一致性。

便于大家更好地理解如何利用 RabbitMQ 解决分布式事务问题,我们先假想这样一个场景:下订单,然后扣减库存,当订单生成成功,库存扣减失败时,取消生成的订单。

实现该功能,我们可以按如下步骤来做。

1. 首先新建两个库,库名分别为 db_orderdb_stock

2. 新建项目 transOrderService、transStockService,并在它们的 Pom 文件中加入如下主要依赖:

<dependency>
<groupId>org.springframework.boot</groupId> 
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3. 在项目 transOrderService 的 resources 目录中添加 application.yml 配置文件,并在配置文件中添加如下配置信息:

server:
  contextpath: /
  port: 8080

spring:
  application:
    name: trans-order-service

  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/db_order?useSSL=false
    username: root
    password: root
    ##初始化连接数
    initialSize: 5
    ##最小连接数
    minIdle: 5
    ##最大连接数
    maxActive: 20

  jpa:
    hibernate:
      ddl-auto: update
    show-sql: true

  ##rabbitmq配置信息
  rabbitmq:
    host: 192.168.1.120
    port: 5672
    username: root
    password: hcb13579

注意:数据库配置信息、RabbitMQ 配置信息要确保配置正确,否则程序可能无法启动。

4. 在项目 transStockService 的 resources 目录中添加 application.yml 配置文件,并在配置文件中添加如下配置信息:

server:
  contextpath: /
  port: 8081

spring:
  application:
    name: trans-stock-service

  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/db_stock?useSSL=false
    username: root
    password: root
    ##初始化连接数
    initialSize: 5
    ##最小连接数
    minIdle: 5
    ##最大连接数
    maxActive: 20

  jpa:
    hibernate:
      ddl-auto: update
    show-sql: true

  ##rabbitmq配置信息
  rabbitmq:
    host: 192.168.1.120
    port: 5672
    username: root
    password: hcb13579

注意:数据库配置信息、RabbitMQ 配置信息确保配置正确,否则程序可能无法启动。

5. 在项目 transOrderService 中添加订单模型类 Order,其代码如下:

@Entity
@Table(name = "tb_order")
public class Order {
    @Id
    @GeneratedValue(strategy = GenerationType.AUTO)
    @Column(name = "order_id")
    //主键
    private Integer orderId;

    //产品名称
    @Column(name = "product_name")
    private String productName;

    //产品id
    @Column(name = "product_id")
    private Integer productId;

    //购买产品数量
    @Column(name = "count")
    private Integer count;

    //订单创建时间
    @Column(name = "created_date")
    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date createdDate;

    @Column(name = "updated_date")
    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date updatedDate;

    ...此处省略get set方法
}

注意:类名 Order 上要记得添加注解 @Entity,@Table(name = "tb_order"),否则可能无法在数据库中自动生成订单表 tb_order

6. 在项目 transOrderService 中添加事件模型类 Event,其代码如下:

@Entity
@Table(name = "tb_event")
public class Event implements Serializable {
    @Id
    @GeneratedValue(strategy = GenerationType.AUTO)
    @Column(name = "event_id")
    //事件id
    private Integer eventId;

    //时间类型
    @Column(name = "event_type")
    private Integer eventType;

    @Column(name = "model_name")
    //模型分类
    private String modelName;

    //模型id
    @Column(name = "model_id")
    private Integer modelId;

    //产品id
    @Column(name = "product_id")
    private Integer productId;

    //产品购买数量
    @Column(name = "order_count")
    private Integer orderCount;

    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @Column(name = "created_date")
    private Date createdDate;

    ...此处省略get set方法
}

注意:类名 Event 上要记得添加注解 @Entity,@Table(name = "tb_order"),否则可能无法在数据库中自动生成事件表 tb_event

7. 在项目 transOrderService 中分别添加订单、事件持久化接口 OrderRepository、EventRepository,此处就不粘贴代码了,读者可以从文末的 GitHub 地址中下载源码。

8. 在项目 transOrderService 中新建配置类 RabbitmqConfig 生成队列 success-queuefailure-queue,源码如下:

@Configuration
public class RabbitmqConfig {
    @Bean
    public Queue successQueue(){
        return new Queue(CommonConstant.SUCCESS_QUEUE);
    }

    @Bean
    public Queue failureQueue(){
        return new Queue(CommonConstant.FAILURE_QUEUE);
    }
}

9. 在项目 transOrderService 中新建订单服务类 OrderServiceImpl,并实现订单创建接口 createOrder,实现如下:

@Override
@Transactional
public Order createOrder(Integer productId, Integer count) {
        Order order = new Order();
        try{
            order.setProductName("安踏运动鞋");
            order.setProductId(productId);
            order.setCreatedDate(new Date());
            order.setCount(count);

            //保存订单
            order = orderDao.save(order);
        }
        finally {
            Event event = new Event();

            event = createEvent(productId, count, order, event);

            //向消息队列发送事件消息
         rabbitTemplate.convertAndSend(CommonConstant.SUCCESS_QUEUE, event);
        }

        return order;
    }

注意:此处 convertAndSend 方法中的队列为订单创建成功的通知消息队列。

10. 在类 OrderServiceImpl 中添加 handleStockFailureMsg 接口,负责处理库存扣减失败后回滚订单信息 ,源码如下:

@RabbitListener(queues = CommonConstant.FAILURE_QUEUE)
@Override
@Transactional
public void handleStockFailureMsg(Event event) {
        Integer eventId = event.getEventId();

        Integer orderId = eventDao.findOne(eventId).getModelId();

        //删除订单信息
        orderDao.delete(orderId);
    }

注意:handleStockFailureMsg 方法上的注解 @RabbitListener 用于监听扣减库存消息。

11. 在项目 transOrderService 中添加控制器类 OrderController,并在该类中添加订单创建接口 createOrder,具体实现代码如下:

@RequestMapping("/createOrder")
public Order createOrder(Integer productId,Integer count){
        if(!StringUtils.isEmpty(productId) &&
                (count != null && count > 0)){
            return orderService.createOrder(productId,count);
        }

        return null;
    }

注意:该代码包含了订单项目的主要实现逻辑。为了课程简洁,这里省略了部分代码,详情代码请大家访问文末的 GitHub 地址自行下载。

12. 在项目 transStockService 中添加事件类 Event,主要代码如下:

public class Event implements Serializable {

    private Integer eventId;

    private Integer eventType;

    private String modelName;

    private Integer modelId;

    private Integer productId;

    private Integer orderCount;

    private Date createdDate;

    public Integer getProductId() {
        return productId;
    }

   ...测试省略get set方法
}

注意:Event 类的包结构必须要和项目 transOrderService 中的 Event 类包结构完全一致,否则会出现反序列化错误。

13. 在项目 transStockService 中添加产品模型类 Product,源代码如下:

@Entity
@Table(name = "product")
@DynamicUpdate
public class Product {
    @Id
    @GeneratedValue(strategy = GenerationType.AUTO)
    @Column(name = "product_id")
    //产品id
    private Integer productId;

    @Column(name = "product_name")
    //产品名称
    private String productName;

    @Column(name = "stock_count")
    //产品库存数量
    private Integer stockCount;

    @Column(name = "price")
    private BigInteger price;

    @Column(name = "created_date")
    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date createdDate;

    @Column(name = "updated_date")
    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date updatedDate;

    ...测试省略get set方法
}

注意:类 Product 上一定要记得添加注解 @Entity,@Table(name = "tb_order"),否则可能无法在数据库中自动生成商品表 product。

14. 在项目 transStockService 中添加产品持久化接口 ProductRepository。

15. 在项目 transStockService 中添加产品服务类 ProductServiceImpl,并在该类中添加接口handleOrderSuccessMsg,用于监听订单是否成功生成。其主要实现代码如下:

@Override
@RabbitListener(queues = CommonConstant.SUCCESS_QUEUE)
public void handleOrderSuccessMsg(Event event) {
        Integer productId = event.getProductId();

        Product product = productDao.findOne(productId);

        if (product.getStockCount() >= event.getOrderCount()) {
            //正常扣减库存
            product.setStockCount(product.getStockCount() - event.getOrderCount());

            productDao.saveAndFlush(product);
        } else {
            //回滚生产订单信息
            rabbitTemplate.convertAndSend(CommonConstant.FAILURE_QUEUE,event);
        }
    }

注意:handleOrderSuccessMsg 方法上的注解 @RabbitListener 用于监听订单成功生成的消息。

16. 为了演示回滚生成的订单信息,我们首先在商品表中添加一条商品数据,该商品库存为 1,如下图所示:

enter image description here

17. 按顺序分别启动 RabbitMQ、transOrderService、transStockService,然后利用 Postman 发送请求:http://localhost:8080/order/createOrder?productId=1&count=2,观察订单表中是否有订单信息生成,测试结果如下:

enter image description here

当把产品库存量修改为 3 时,再调用接口:http://localhost:8080/order/createOrder?productId=1&count=2,观察订单表中是否有订单信息生成,测试结果如下:

enter image description here

分布式事务的相关知识,本文就介绍到这里。如果遇到分布式事务相关问题,大家可在读者圈向我提问,或者直接微信联系我。

本课代码下载地址:

GitHub

上一篇
下一篇
目录