Seata分布式事务

2023-05-20,,

使用Seata版本:1.6.1(2023/2/6最新版)该版本存在很多坑,相较于1.0版本,配置上存在很多差别,如果你的版本不同,请不要参考本文。

1.6.1配置存在许多问题,比较难找,如果你使用1.6.1可以参考本工程,本文工程功能已正常跑通。

本文章的代码工程:zko0/cloudStudy: Springcloud学习工程 (github.com)

1、介绍

①问题

一个事务需要跨多个数据源或系统进行远程调用,就会产生分布式事务问题

微服务应用的架构图:

单体应用被拆分为微服务应用,原来三个模块就被分为了三个独立的应用,分别使用三个独立的数据源。

业务操作需要调用三个服务完成,此时每个服务内部的数据一致性由本地事务来保证,但是全局的数据一致性没有办法保证

②Seata介绍

文档:Seata文档

一.分布式事务的过程

分布式事务处理过程唯一ID+三组件模型

Transaction ID(XID)全局唯一事务ID
三个组件:

    Transaction Coordinate(TC):事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务提交和回滚
    Transaction Manager(TM):控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议
    Resource Manager(RM):管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚

二.处理过程

    TM向TC申请开启全局事务,全局事务创建成功并且生成一个全局唯一的XID
    XID在微服务调用链路上下文中传播
    RM向TC注册分支事务,并将其纳入XID对全局事务的管辖
    TM向TC发送对于XID的全局提交/回滚决议
    TC调度XID下管辖的全部分支事务完成提交或回滚请求

2、分布式全局事务

①Seata安装启动

下载Seata,解压:

Release v1.6.1 · seata/seata · GitHub

Seata配置Nacos注册中心:

Seata配置存储:

数据库建表

sql存放路径:\seata\script\server\db

测试:

启动Nacos,启动Seata,Seata被注册进Nacos中:

②业务数据库准备

Sql文件:

#order
create database seata_order; use seata_order; CREATE TABLE t_order(
`id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
`user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
`product_id` BIGINT(11)DEFAULT NULL COMMENT '产品id',
`count` INT(11) DEFAULT NULL COMMENT '数量',
`money` DECIMAL(11,0) DEFAULT NULL COMMENT '金额',
`status` INT(1) DEFAULT NULL COMMENT '订单状态: 0:创建中; 1:已完结'
)ENGINE=INNODB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8; select * from t_order; CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table'; #storage
create database seata_storage; use seata_storage; CREATE TABLE t_storage(
`id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
`product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
`total` INT(11) DEFAULT NULL COMMENT '总库存',
`used` INT(11) DEFAULT NULL COMMENT '已用库存',
`residue` INT(11) DEFAULT NULL COMMENT '剩余库存'
)ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8; INSERT INTO t_storage(`id`,`product_id`,`total`,`used`,`residue`)VALUES('1','1','100','0','100'); SELECT * FROM t_storage; CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table'; #account
create database seata_account; use seata_account; CREATE TABLE t_account(
`id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT 'id',
`user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
`total` DECIMAL(10,0) DEFAULT NULL COMMENT '总额度',
`used` DECIMAL(10,0) DEFAULT NULL COMMENT '已用余额',
`residue` DECIMAL(10,0) DEFAULT '0' COMMENT '剩余可用额度'
)ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8; INSERT INTO t_account(`id`,`user_id`,`total`,`used`,`residue`)VALUES('1','1','1000','0','1000'); SELECT * FROM t_account; CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

③业务微服务创建

一.订单模块

1.新建模块seata-order-service2001

2.依赖:

<dependencies>
<!-- nacos -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- seata-->
<!-- https://mvnrepository.com/artifact/io.seata/seata-spring-boot-starter -->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.seata/seata-all -->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.6.1</version>
</dependency> <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
</dependency>
<!-- &lt;!&ndash; https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter &ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>com.alibaba</groupId>-->
<!-- <artifactId>druid-spring-boot-starter</artifactId>-->
<!-- <version>1.2.15</version>-->
<!-- </dependency>-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!--jdbc-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

3.配置:

server:
port: 2001 spring:
application:
name: seata-order-service
cloud:
alibaba:
seata:
registry:
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
group : "SEATA_GROUP"
namespace: ""
username: "nacos"
password: "nacos" nacos:
discovery:
server-addr: 127.0.0.1:8848 #nacos
datasource:
# mysql驱动类
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/seata_storage?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8
username: root
password: 123456
feign:
hystrix:
enabled: false
logging:
level:
io:
seata: info mybatis:
mapperLocations: classpath*:mapper/*.xml

4.工程业务代码较多,详细代码可以参考Github上的工程:

个人根据官网整合,可能存在部分问题或不合理的地方,还请多多指教

测试:

二.库存模块

1.新建模块seata-storage-service2002

2.依赖同订单模块

3.配置同订单模块

4.更多见项目代码

测试:

三.账号模块

1.新建模块seata-account-service2003

2.以下同上,代码在工程中有

测试:

④ 下单模拟

一.正常下单

启动2001,2002,2003项目

调用Order接口:http://localhost:2001/order/create?userId=1&productId=1&count=10&money=10

正常调用成功

二.超时异常

修改2003的AccountServiceImpl,模拟账户服务的错误,休眠20s:

try {
TimeUnit.SECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}

调用Order接口:http://localhost:2001/order/create?userId=1&productId=1&count=10&money=10

结果:

超时异常后,order添加了订单,而且storage的库存和account的余额都发生了变化。但是order显示为未支付的状态,因为在order修改订单状态的之前,account远程调用出错了,此时抛出异常。

此时用户看到的是错误界面,但是account,order和storage都已经正常处理了。

如果我把超时异常修改为其他的异常,比如10/0,那么account可能存在局部的回滚,或者但是其他服务没有回滚的问题

而且feign有超时重试机制,所以可能会多次扣款。

三.@GlobalTransactional

在需要全局回滚的方法上添加@GlobalTransactional,就能实现多个服务的整体回滚。

在2001的OderServiceImpl里的create方法上加上 @GlobalTransactional(name = "fsp-create-order", rollbackFor = Exception.class)

问题:

在1.6.1,被调用服务如果无法获取xid,全局事务就无法生效。如果你出现全局事务未生效的情况,可以尝试并参考下面的解决方案:

    查看xid是否为空,验证是否获取到xid

    //xid检查
    String xid = RootContext.getXID();
    System.out.println("xid_order:" +xid );

    如果xid为null,说明服务调用者在Feign调用时没有发送xid的信息。因为1.6.1为最新版本,使用spring-cloud-starter-alibaba-seata启动会报错。所以我们任然使用seata-spring-boot-starter,通过request拦截器,添加xid的信息:

    public class MultipartSupportConfig implements RequestInterceptor {
    
        /**
    * 解决服务直接调用请求头不传递的问题
    * @param template
    */
    @Override
    public void apply(RequestTemplate template) {
    //解决不传递请求头中的token
    ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
    if (attributes != null){
    HttpServletRequest request = attributes.getRequest();
    Enumeration<String> headerNames = request.getHeaderNames();
    //可以在这里将自定义请求头传递进去, key 请求, value 值
    //处理上游请求头信息,传递时继续携带
    while (headerNames.hasMoreElements()) {
    String name = headerNames.nextElement();
    String values = request.getHeader(name);
    template.header(name, values);
    }
    }
    // 解决seata的xid未传递
    String xid = RootContext.getXID();
    template.header(RootContext.KEY_XID, xid);
    }
    }

    在Feign服务上添加请求拦截器

调用Order接口:http://localhost:2001/order/create?userId=1&productId=1&count=10&money=10

此时,三张表都进行了回滚操作,seata全局事务控制成功:

3、Seata原理

①AT模式

AT模式:无侵入自动补偿的事务模式

一.前提

基于本地ACID的关系型数据库
Java应用,通过jdbc访问数据库

二.整体机制

一阶段:加载,拦截业务sql

    解析sql,找到更新的业务数据,在业务数据更新之前,将其保存为before image
    执行sql,更新业务数据
    业务数据更新后,保存after image,生成行锁

二阶段提交:

如果提交顺利,Seata会将一阶段保存的数据快照和行锁删除,完成数据清理。

二阶段回滚:

如果二阶段出错,Seata需要回滚,一阶段已经执行的sql。还原业务数据。

回滚使用before image还原业务数据,但是在还原之前还需要校验脏写,对比数据库当前业务数据和after image,如果完全一致说明没有脏写,可以还原。如果不一致说明有脏写,需要人工处理。

三.隔离级别

根据Seata机制,AT模式的默认隔离级别为读未提交

如果需要将隔离级别设置为读已提交,则需要让select操作获取Seata的全局锁,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。

SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。

出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。

Seata分布式事务的相关教程结束。

《Seata分布式事务.doc》

下载本文的Word格式文档,以方便收藏与打印。