seata是一个分布式的解决方案,致力于在微服务架构下提供高性能和简单易用的分布式微服务,官网:http://seata.io/zh-cn/
一、三组概念
TC 事务协调者:维护全局和分支事务的状态,驱动全局事务提交或回滚
TM 事务管理器:定义全局事务的范围:开始、提交、回滚
RM 资源管理器:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚
二、安装
GitHub地址:https://github.com/seata/seata(以Nacos 2.0、Seata 1.4.2为例)
修改配置文件
1、/conf/file.conf (注意与1.0之前的版本区分,1.0之后可以在yml文件中指定service配置)
修改数据库模式(使用数据库存储事务信息)和数据源
2、/conf/registry.conf
3、mysql建表
1、新建seata
数据库
2、运行mysql.sql
注意:建表sql在/conf/REDME.md文件中,点击server出现存放sql的github地址:https://github.com/seata/seata/tree/develop/script/server
4、启动
先启动Nacos再启动Seata(bin/seata-server.bat)
三、业务测试
1、业务说明
创建三个微服务:订单---库存---账户
大致流程:当用户下单时,订单服务创建订单,远程调用库存服务扣减库存,再通过远程调用账户服务来扣减账户余额,最后在订单服务中修改订单状态为已完成。
2、数据库建表
- 创建三个数据库
seata_order
(订单),seata_storage
(库存),seata_account
(账户) - 创建对应的表
-- t_order
DROP TABLE IF EXISTS `t_order`;
CREATE TABLE `t_order` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`user_id` bigint(11) NULL DEFAULT NULL COMMENT '用户id',
`product_id` bigint(11) NULL DEFAULT NULL COMMENT '产品id',
`count` int(11) NULL DEFAULT NULL COMMENT '数量',
`money` decimal(11, 0) NULL DEFAULT NULL COMMENT '金额',
`status` int(1) NULL DEFAULT NULL COMMENT '订单状态:0:创建中; 1:已完结',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 14 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
-- t_storage
DROP TABLE IF EXISTS `t_storage`;
CREATE TABLE `t_storage` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`product_id` bigint(11) NULL DEFAULT NULL COMMENT '产品id',
`total` int(11) NULL DEFAULT NULL COMMENT '总库存',
`used` int(11) NULL DEFAULT NULL COMMENT '已用库存',
`residue` int(11) NULL DEFAULT NULL COMMENT '剩余库存',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
INSERT INTO `t_storage` VALUES (1, 1, 100, 0, 100);
-- t_account
DROP TABLE IF EXISTS `t_account`;
CREATE TABLE `t_account` (
`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
`user_id` bigint(11) NULL DEFAULT NULL COMMENT '用户id',
`total` decimal(10, 0) NULL DEFAULT NULL COMMENT '总额度',
`used` decimal(10, 0) NULL DEFAULT NULL COMMENT '已用余额',
`residue` decimal(10, 0) NULL DEFAULT 0 COMMENT '剩余可用额度',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
INSERT INTO `t_account` VALUES (1, 1, 1000, 0, 1000);
undo回滚日志表,三个库每个都需要创建一张回滚表,分别执行三次
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime(0) NOT NULL,
`log_modified` datetime(0) NOT NULL,
`ext` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE,
UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
3、seata-order-service
pom
<dependencies>
<!-- API -->
<dependency>
<groupId>com.sw</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!--nacos-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!--seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<artifactId>seata-all</artifactId>
<groupId>io.seata</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.0</version>
</dependency>
<!--feign-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<!--web-actuator-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--mysql-druid-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.16</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
yml
server:
port: 2001
spring:
application:
name: seata-order-service
cloud:
nacos:
discovery:
server-addr: localhost:8848
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/seata_order?useSSL=true&serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf-8
username: root
password: 123456
seata:
enabled: true
application-id: ${spring.application.name}
# 自定义事务组名称
tx-service-group: test_tx_group
service:
vgroup-mapping:
test_tx_group: default
config:
nacos:
namespace:
server-addr: 127.0.0.1:8848
group: SEATA_GROUP
userName: "nacos"
password: "nacos"
registry:
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
namespace:
userName: "nacos"
password: "nacos"
feign:
hystrix:
enabled: false
logging:
level:
io:
seata: info
启动类
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class) //取消自动创建数据源
@EnableDiscoveryClient
@EnableFeignClients
@MapperScan("com.sw.mapper")
public class SeataOrderMain2001 {
public static void main(String[] args) {
SpringApplication.run(SeataOrderMain2001.class, args);
}
}
pojo
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Order implements Serializable {
private Long id;
private Long userId;
private Long productId;
private Integer count;
private BigDecimal money;
private Integer status; //订单状态:0:创建中;1:已完结
}
mapper
@Repository
public interface OrderMapper {
/**
* 新建订单
* @param order
*/
@Insert("insert into t_order(id,user_id,product_id,count,money,status) " +
"values(null,#{userId},#{productId},#{count},#{money},0)")
void create(Order order);
/**
* 更新订单
* @param userId
* @param status
*/
@Update("update t_order set status = 1 " +
"where user_id=#{userId} and status = #{status}")
void update(@Param("userId")Long userId, @Param("status")Integer status);
}
service
OrderService
public interface OrderService {
void create(Order order);
}
StorageService
@FeignClient("seata-storage-service")
public interface StorageService {
@PostMapping("/storage/decrease")
CommonResult decrease(@RequestParam("productId")Long productId, @RequestParam("count")Integer count);
}
AccountService
@FeignClient("seata-account-service")
public interface AccountService {
@PostMapping("/account/decrease")
CommonResult decrease(@RequestParam("userId")Long userId, @RequestParam("money")BigDecimal money);
}
service实现
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private AccountService accountService;
@Autowired
private StorageService storageService;
@Override
public void create(Order order) {
//1.新建订单
log.info("===开始新建订单===");
orderMapper.create(order);
//2.扣减库存
log.info("===订单微服务调用库存===");
storageService.decrease(order.getProductId(), order.getCount());
log.info("===订单微服务调用库存,库存数量操作结束===");
//3.扣减账户
log.info("===订单微服务调用账户余额===");
accountService.decrease(order.getUserId(), order.getMoney());
log.info("===订单微服务调用账户余额,账户余额操作结束===");
//4.修改订单状态
log.info("===修改订单状态,开始===");
//从 0 到 1,1代表已经完成
orderMapper.update(order.getUserId(), 0);
log.info("===修改订单状态,操作结束===");
log.info("订单操作完成(*^_^*)");
}
}
controller
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired
private OrderService orderService;
@GetMapping("/create")
public CommonResult create(Order order){
orderService.create(order);
return new CommonResult(200, "订单创建成功!");
}
}
config
主启动类排除了数据源配置,此处需指定数据源代理
@Configuration
public class DataSourceProxyConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource druidDataSource(){
return new DruidDataSource();
}
@Primary
@Bean("dataSource")
public DataSourceProxy dataSourceProxy(DataSource dataSource){
return new DataSourceProxy(dataSource);
}
}
4、seata-storage-service
yml
server:
port: 2002
spring:
application:
name: seata-storage-service
cloud:
nacos:
discovery:
server-addr: localhost:8848
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/seata_storage?useSSL=true&serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf-8
username: root
password: 123456
seata:
enabled: true
application-id: ${spring.application.name}
tx-service-group: test_tx_group
service:
vgroup-mapping:
test_tx_group: default
config:
nacos:
namespace:
server-addr: 127.0.0.1:8848
group: SEATA_GROUP
userName: "nacos"
password: "nacos"
registry:
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
namespace:
userName: "nacos"
password: "nacos"
feign:
hystrix:
enabled: false
logging:
level:
io:
seata: info
mapper
@Repository
public interface StorageMapper {
/**
* 扣减库存信息
* @param productId
* @param count
*/
@Update("update t_storage " +
"set used = used + #{count}," +
"residue = residue - #{count} "+
"where product_id = #{productId}"
)
void decrease(@Param("productId")Long productId, @Param("count")Integer count);
}
service
public interface StorageService {
void decrease(Long productId, Integer count);
}
service实现
@Slf4j
@Service
public class StorageServiceImpl implements StorageService {
@Autowired
private StorageMapper storageMapper;
@Override
public void decrease(Long productId, Integer count) {
log.info("===storage-service 开始扣减库存===");
storageMapper.decrease(productId, count);
log.info("===storage-service 扣减库存操作结束===");
}
}
其他配置同理order微服务模块的
5、seata-account-service
mapper
@Repository
public interface AccountMapper {
/**
* 扣减账户余额
* @param userId
* @param money
*/
@Update("update t_account set "+
"used = used + #{money},"+
"residue = residue - #{money} "+
"where user_id = #{userId}"
)
void decrease(@Param("userId")Long userId, @Param("money") BigDecimal money);
}
其他配置同理storage微服务模块的创建
6、不添加全局事务管理测试
seata-account-service模块
1、手动模拟超时
@Slf4j
@Service
public class AccountServiceImpl implements AccountService {
@Autowired
private AccountMapper accountMapper;
@Override
public void decrease(Long userId, BigDecimal money) {
log.info("===account-service 开始扣减账户余额===");
//模拟超时异常
try {
TimeUnit.SECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
accountMapper.decrease(userId, money);
log.info("===account-service 扣减账户余额操作结束===");
}
}
浏览器输入http://localhost:2001/order/create?userId=1&productId=1&count=10&money=100
报错Read TimeOut异常:
此时数据库的状态:
7、开启全局事务
在order模块中添加全局事务注解@GlobalTransactional
使用同样的方法测试,发生调用超时异常后,数据库中被修改的数据回滚到了操作之前的状态
评论 (0)