Skip to main content

分布式事务一致性-本地消息表


分布式事务一致性-本地消息表

分布式一致性

在我们的分布式系统中,可能设计消息队列(RabbitMQ、RocketMQ等),RPC调用(Dubbo、gRpc等)、缓存(Redis等)

可能在一个业务中,我们既要操作消息队列,又要操作RPC调用,又要操作缓存。

一旦某个环节出了问题,如网络波动,超时,失败等情况,那么所有的事务都要进行回滚。

对于这种网络不确定性的问题,业界有一个基本共识。就是结果的返回可能延迟,可能丢失,但是不可能出错

由于分布式的不确定性,会出现返回超时,也就是中间态的结果。针对这种情况,我们不能随意的做动作,必须不断重试,查到最终确定的结果,成功或者失败。所以重试在分布式一致性中必不可少。有了重试,就一定要保证幂等。确保多次的扣减库存操作,最终不会扣了多次。

要是对于第三方的操作,本地事务就无法保证这种分布式下的一致性问题。

本地消息表

我们使用本地消息表,目的就是确保本地的操作和第三方操作保证一致。要么都成功,要么都失败。

由于网络的不可靠性,会出现本地成功了,但是第三方可能网络波动,没有执行成功。

事务消息保证的就是第三方一定要执行成功,达到最终一致性

本地消息表介绍:

分布式事务是指在分布式系统中,涉及多个服务或数据库的事务操作,需要保证这些操作要么全部成功,要么全部失败,以确保数据的一致性。实现分布式事务的方法有很多种,其中一种常见的方法是使用本地消息表(Local Message Table)模式。

本地消息表实现原理

本地消息表模式的核心思想是将消息的发送和业务操作放在同一个本地事务中,以确保它们的一致性。具体步骤如下:

  1. 本地事务中记录消息:在本地事务中,除了执行业务操作外,还将待发送的消息记录到本地消息表中。
  2. 定时任务扫描消息表:一个定时任务不断扫描本地消息表中的未发送消息,并尝试发送这些消息。
  3. 消息发送确认:消息发送成功后,更新本地消息表中的消息状态,标记为已发送。

实现步骤

  1. 定义本地消息表

首先,在数据库中创建一个本地消息表,用于存储待发送的消息。表结构可能如下:

CREATE TABLE local_message (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    message_body TEXT NOT NULL,
    status VARCHAR(20) NOT NULL,
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
  • id:消息的唯一标识。
  • message_body:消息的内容。
  • status:消息的状态(如 "PENDING", "SENT")。
  • create_timeupdate_time:记录消息的创建和更新时间。
  1. 在本地事务中记录消息

在业务操作中,将消息记录到本地消息表中。例如:

@Transactional
public void processOrder(Order order) {
    // 业务操作
    orderRepository.save(order);
    
    // 记录消息
    LocalMessage message = new LocalMessage();
    message.setMessageBody(createMessageBody(order));
    message.setStatus("PENDING");
    localMessageRepository.save(message);
}
  1. 定时任务扫描消息表

创建一个定时任务,扫描本地消息表中的未发送消息,并尝试发送这些消息:

@Scheduled(fixedRate = 5000)
public void sendPendingMessages() {
    List<LocalMessage> pendingMessages = localMessageRepository.findByStatus("PENDING");
    for (LocalMessage message : pendingMessages) {
        try {
            // 发送消息
            messageSender.send(message.getMessageBody());
            
            // 更新消息状态
            message.setStatus("SENT");
            localMessageRepository.save(message);
        } catch (Exception e) {
            // 处理发送失败的情况,可以记录日志或采取其他措施
            e.printStackTrace();
        }
    }
}
  1. 消息发送确认

在消息发送成功后,更新本地消息表中的消息状态,标记为已发送:

public class MessageSender {
    public void send(String messageBody) {
        // 实现消息发送逻辑,例如通过Kafka、RabbitMQ等
    }
}

本地消息表模式通过将消息的发送和业务操作放在同一个本地事务中,确保了它们的一致性。定时任务不断扫描本地消息表,确保消息最终被发送出去。这种方式有效地解决了分布式事务中的一致性问题,同时避免了复杂的分布式事务管理器的使用。

这种模式的优点包括:

  • 简单易实现,不需要引入复杂的分布式事务协调器。
  • 通过定时任务和消息表,可以确保消息最终被发送。

当然,这种模式也有其局限性,例如需要处理定时任务的性能和可靠性问题,但对于很多场景来说,它是一种有效且实用的解决方案。

本地消息表需要记录哪些信息?

类型:需要执行哪个中间件:Dubbo?Redis?RocketMQ?

参数:操作方法的方法名,入参等

但是这样每接入一个操作类型,就要去兼容对应的存储,有没有更简单的方法?

我们存的本地消息表,可以高度抽象成对方法的执行,确保某个方法执行成功。

本地消息表SQL:

CREATE TABLE `secure_invoke_record` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
  `secure_invoke_json` json NOT NULL COMMENT '请求快照参数json',
  `status` tinyint(8) NOT NULL COMMENT '状态 1待执行 2已失败',
  `next_retry_time` datetime(3) NOT NULL COMMENT '下一次重试的时间',
  `retry_times` int(11) NOT NULL COMMENT '已经重试的次数',
  `max_retry_times` int(11) NOT NULL COMMENT '最大重试次数',
  `fail_reason` text COMMENT '执行失败的堆栈',
  `create_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间',
  `update_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '修改时间',
  PRIMARY KEY (`id`) USING BTREE,
  KEY `idx_next_retry_time` (`next_retry_time`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='本地消息表';

具体实现

SecureInvoke注解

所有被加了SecureInvoke注解的方法,表示这个方法是要被安全执行的,确保一定可以安全执行成功

/**
 * 保证方法成功执行。如果在事务内的方法,会将操作记录入库,保证执行。
 */
@Retention(RetentionPolicy.RUNTIME)//运行时生效
@Target(ElementType.METHOD)//作用在方法上
public @interface SecureInvoke {

    /**
     * 默认3次
     *
     * @return 最大重试次数(包括第一次正常执行)
     */
    int maxRetryTimes() default 3;

    /**
     * 默认异步执行,先入库,后续异步执行,不影响主线程快速返回结果,毕竟失败了有重试,而且主线程的事务已经提交了,串行执行没啥意义。
     * 同步执行适合mq消费场景等对耗时不关心,但是希望链路追踪不被异步影响的场景。
     *
     * @return 是否异步执行
     */
    boolean async() default true;
}

使用SecureInvoke注解

我们把这个注解加载了发送消息的注解上,在执行的时候,不会执行具体的方法,而是转成记录存到数据库中

public class MQProducer {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    public void sendMsg(String topic, Object body) {
        Message<Object> build = MessageBuilder.withPayload(body).build();
        rocketMQTemplate.send(topic, build);
    }

    /**
     * 发送可靠消息,在事务提交后保证发送成功
     *
     * @param topic
     * @param body
     */
    @SecureInvoke
    public void sendSecureMsg(String topic, Object body, Object key) {
        Message<Object> build = MessageBuilder
                .withPayload(body)
                .setHeader("KEYS", key)
                .build();
        rocketMQTemplate.send(topic, build);
    }
}

SecureInvoke切面实现

@Slf4j
@Aspect
@Order(Ordered.HIGHEST_PRECEDENCE + 1)//确保最先执行
@Component
public class SecureInvokeAspect {
    @Autowired
    private SecureInvokeService secureInvokeService;

    @Around("@annotation(secureInvoke)")
    public Object around(ProceedingJoinPoint joinPoint, SecureInvoke secureInvoke) throws Throwable {
        boolean async = secureInvoke.async();
        boolean inTransaction = TransactionSynchronizationManager.isActualTransactionActive();
        //非事务状态,直接执行,不做任何保证。
        if (SecureInvokeHolder.isInvoking() || !inTransaction) {
            return joinPoint.proceed();
        }
        Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
        List<String> parameters = Stream.of(method.getParameterTypes()).map(Class::getName).collect(Collectors.toList());
        SecureInvokeDTO dto = SecureInvokeDTO.builder()
                .args(JsonUtils.toStr(joinPoint.getArgs()))
                .className(method.getDeclaringClass().getName())
                .methodName(method.getName())
                .parameterTypes(JsonUtils.toStr(parameters))
                .build();
        SecureInvokeRecord record = SecureInvokeRecord.builder()
                .secureInvokeDTO(dto)
                .maxRetryTimes(secureInvoke.maxRetryTimes())
                .nextRetryTime(DateUtil.offsetMinute(new Date(), (int) SecureInvokeService.RETRY_INTERVAL_MINUTES))
                .build();
        secureInvokeService.invoke(record, async);
        return null;
    }
}

这段代码主要做了以下几件事:

  1. 拦截带有 @SecureInvoke 注解的方法:使用 @Around 注解拦截所有标记了 @SecureInvoke 注解的方法。
  2. 检查事务状态:如果当前没有活动的事务或者已经在进行安全调用,则直接执行目标方法,不进行额外处理。
  3. 构建调用信息:收集被拦截方法的详细信息(如参数、类名、方法名、参数类型),并构建一个 SecureInvokeDTO 对象。
  4. 创建重试记录:根据注解中的配置,创建一个包含最大重试次数和下次重试时间的 SecureInvokeRecord 对象。
  5. 调用安全服务:使用 SecureInvokeService 执行安全调用逻辑,并根据注解配置决定是否异步执行。

一个有用的方法主要有四个信息:类名、方法名、参数类型、参数

public class SecureInvokeDTO {
    private String className;
    private String methodName;
    private String parameterTypes;
    private String args;
}

SecureInvokeService逻辑

接着就走到SecureInvokeService中,如果不在事务中,那么就直接结束,否则,就把记录进行保存,用于重放。

这个时候记录是保存到数据库中,就有一个定时任务来不断扫描哪些任务需要执行,但是这样太慢了,所以下面还有几句话就是来干这个的。

更快的调用:在事务结束后,我们可以选择异步或者同步执行原来的方法。

public void invoke(SecureInvokeRecord record, boolean async) {
    boolean inTransaction = TransactionSynchronizationManager.isActualTransactionActive();
    //非事务状态,直接执行,不做任何保证。
    if (!inTransaction) {
        return;
    }
    //保存执行数据
    save(record);
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
        @SneakyThrows
        @Override
        public void afterCommit() {
            //事务后执行
            if (async) {
                doAsyncInvoke(record);
            } else {
                doInvoke(record);
            }
        }
    });
}

SecureInvokeService具体调用逻辑

public void doAsyncInvoke(SecureInvokeRecord record) {
    executor.execute(() -> {
        System.out.println(Thread.currentThread().getName());
        doInvoke(record);
    });
}

public void doInvoke(SecureInvokeRecord record) {
    SecureInvokeDTO secureInvokeDTO = record.getSecureInvokeDTO();
    try {
        SecureInvokeHolder.setInvoking();
        Class<?> beanClass = Class.forName(secureInvokeDTO.getClassName());
        Object bean = SpringUtil.getBean(beanClass);
        List<String> parameterStrings = JsonUtils.toList(secureInvokeDTO.getParameterTypes(), String.class);
        List<Class<?>> parameterClasses = getParameters(parameterStrings);
        Method method = ReflectUtil.getMethod(beanClass, secureInvokeDTO.getMethodName(), parameterClasses.toArray(new Class[]{}));
        Object[] args = getArgs(secureInvokeDTO, parameterClasses);
        //执行方法
        method.invoke(bean, args);
        //执行成功更新状态
        removeRecord(record.getId());
    } catch (Throwable e) {
        log.error("SecureInvokeService invoke fail", e);
        //执行失败,等待下次执行
        retryRecord(record, e.getMessage());
    } finally {
        SecureInvokeHolder.invoked();
    }
}

这段代码的主要功能是执行一个安全调用(SecureInvoke),并处理调用成功或失败的情况。具体步骤如下:

  1. 获取调用信息:从 SecureInvokeRecord 对象中获取 SecureInvokeDTO,它包含了被调用方法的详细信息。
  2. 设置调用状态:使用 SecureInvokeHolder.setInvoking() 标记当前正在进行安全调用。
  3. 反射获取目标方法
    • 根据类名获取目标类的 Class 对象。
    • 使用 SpringUtil.getBean 获取目标类的实例。
    • 解析方法参数类型并转换为 Class 对象列表。
    • 使用反射获取目标方法的 Method 对象。
  4. 获取方法参数:根据参数类型和 SecureInvokeDTO 中的参数信息,获取实际的参数值。
  5. 执行目标方法
    • 使用反射调用目标方法,并传入参数。
    • 如果方法调用成功,调用 removeRecord(record.getId()) 更新状态,表示调用成功并移除记录。
  6. 处理调用失败
    • 如果方法调用过程中抛出异常,记录错误日志。
    • 调用 retryRecord(record, e.getMessage()) 记录重试信息,等待下次重试。
  7. 恢复调用状态:在 finally 块中调用 SecureInvokeHolder.invoked(),标记安全调用结束。

总结:

  • 反射调用目标方法:通过反射机制调用指定的目标方法。
  • 处理调用结果:根据调用结果更新状态或记录重试信息。
  • 管理调用状态:在调用前后设置和恢复调用状态,确保调用过程的完整性和一致性。

失败退避算法

private void retryRecord(SecureInvokeRecord record, String errorMsg) {
    Integer retryTimes = record.getRetryTimes() + 1;
    SecureInvokeRecord update = new SecureInvokeRecord();
    update.setId(record.getId());
    update.setFailReason(errorMsg);
    update.setNextRetryTime(getNextRetryTime(retryTimes));
    if (retryTimes > record.getMaxRetryTimes()) {
        update.setStatus(SecureInvokeRecord.STATUS_FAIL);
    } else {
        update.setRetryTimes(retryTimes);
    }
    secureInvokeRecordDao.updateById(update);
}

private Date getNextRetryTime(Integer retryTimes) {//或者可以采用退避算法
    double waitMinutes = Math.pow(RETRY_INTERVAL_MINUTES, retryTimes);//重试时间指数上升 2m 4m 8m 16m
    return DateUtil.offsetMinute(new Date(), (int) waitMinutes);
}

这段代码的主要功能是处理方法调用失败后的重试逻辑。

retryRecord 方法

  1. 更新重试次数:将当前重试次数加一。
  2. 创建更新对象:创建一个新的 SecureInvokeRecord 对象,用于更新数据库中的记录。
  3. 设置失败原因:将错误信息设置到更新对象中。
  4. 计算下次重试时间:调用 getNextRetryTime 方法,根据重试次数计算下次重试的时间。
  5. 检查重试次数:如果重试次数超过最大重试次数,设置状态为失败;否则,更新重试次数。
  6. 更新数据库记录:调用 secureInvokeRecordDao.updateById 方法,将更新对象保存到数据库中。

getNextRetryTime 方法

  1. 计算等待时间:使用指数退避算法,根据重试次数计算等待时间。等待时间以分钟为单位,随着重试次数的增加呈指数增长(例如,2 分钟,4 分钟,8 分钟等)。
  2. 计算下次重试时间:使用 DateUtil.offsetMinute 方法,将当前时间偏移计算出的分钟数,得到下次重试的时间。

总结:

  • 更新重试记录:在方法调用失败后,更新重试记录,包括增加重试次数、设置失败原因和计算下次重试时间。
  • 计算指数退避时间:根据重试次数,使用指数退避算法计算下次重试的时间间隔。
  • 更新数据库:将更新后的重试记录保存到数据库中,以便后续重试。

定时任务查询数据库

@Scheduled(cron = "*/5 * * * * ?")
public void retry() {
    List<SecureInvokeRecord> secureInvokeRecords = secureInvokeRecordDao.getWaitRetryRecords();
    for (SecureInvokeRecord secureInvokeRecord : secureInvokeRecords) {
        doAsyncInvoke(secureInvokeRecord);
    }
}

public List<SecureInvokeRecord> getWaitRetryRecords() {
    Date now = new Date();
    //查2分钟前的失败数据。避免刚入库的数据被查出来
    DateTime afterTime = DateUtil.offsetMinute(now, -(int) SecureInvokeService.RETRY_INTERVAL_MINUTES);
    return lambdaQuery()
            .eq(SecureInvokeRecord::getStatus, SecureInvokeRecord.STATUS_WAIT)
            .lt(SecureInvokeRecord::getNextRetryTime, new Date())
            .lt(SecureInvokeRecord::getCreateTime, afterTime)
            .list();
}
  • 定时任务:使用 @Scheduled(cron = "*/5 * * * * ?") 注解,表示这个方法每隔5秒执行一次。
  • 获取待重试记录:调用 secureInvokeRecordDao.getWaitRetryRecords() 方法,从数据库中获取所有待重试的 SecureInvokeRecord 记录。
  • 异步执行重试:遍历获取到的待重试记录,调用 doAsyncInvoke(secureInvokeRecord) 方法,异步执行重试逻辑。
  • 查询的时候,我们查询2分钟之前的,防止刚入库就被查询出来

线程池如何加载的

@Slf4j
@AllArgsConstructor
public class SecureInvokeService {

    public static final double RETRY_INTERVAL_MINUTES = 2D;

    private final SecureInvokeRecordDao secureInvokeRecordDao;

    private final Executor executor;
}

我们的SecureInvokeService中有一个Executor,显然是通过全参构造器来传入的AllArgsConstructor

在TransactionAutoConfiguration配置类中,我们找到了这个构造器传入参数

@Configuration
@EnableScheduling
@MapperScan(basePackageClasses = SecureInvokeRecordMapper.class)
@Import({SecureInvokeAspect.class, SecureInvokeRecordDao.class})
public class TransactionAutoConfiguration {

    @Nullable
    protected Executor executor;

    /**
     * Collect any {@link AsyncConfigurer} beans through autowiring.
     */
    @Autowired
    void setConfigurers(ObjectProvider<SecureInvokeConfigurer> configurers) {
        Supplier<SecureInvokeConfigurer> configurer = SingletonSupplier.of(() -> {
            List<SecureInvokeConfigurer> candidates = configurers.stream().collect(Collectors.toList());
            if (CollectionUtils.isEmpty(candidates)) {
                return null;
            }
            if (candidates.size() > 1) {
                throw new IllegalStateException("Only one SecureInvokeConfigurer may exist");
            }
            return candidates.get(0);
        });
        executor = Optional.ofNullable(configurer.get()).map(SecureInvokeConfigurer::getSecureInvokeExecutor).orElse(ForkJoinPool.commonPool());
    }

    @Bean
    public SecureInvokeService getSecureInvokeService(SecureInvokeRecordDao dao) {
        return new SecureInvokeService(dao, executor);
    }

    @Bean
    public MQProducer getMQProducer() {
        return new MQProducer();
    }
}

Executor是如何拿到的?

public interface SecureInvokeConfigurer {

    /**
     * 返回一个线程池
     */
    @Nullable
    default Executor getSecureInvokeExecutor() {
        return null;
    }

}

业务(SecureInvokeService)只需要去继承这个方法,然后实现getSecureInvokeService功能

一般的starter都会有一个Configuration来做自动装配

image-20240528232441304