Skip to main content

项目线程池统一管理


项目线程池统一管理

为什么需要线程池?

使用线程池可以有效地提高资源利用率、控制并发数量、简化线程管理,并提高系统的响应速度,这对于需要高并发、高吞吐量的项目非常重要

如何使用

代码

下面是项目线程池的配置

@Configuration
@EnableAsync
public class ThreadPoolConfig implements AsyncConfigurer {
    /**
     * 项目共用线程池
     */
    public static final String YUNFEICHAT_EXECUTOR = "yunfeichatExecutor";
    /**
     * websocket通信线程池
     */
    public static final String WS_EXECUTOR = "websocketExecutor";

    @Override
    public Executor getAsyncExecutor() {
        return yunfeichatExecutor();
    }

    @Bean(YUNFEICHAT_EXECUTOR)
    @Primary
    public ThreadPoolTaskExecutor yunfeichatExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setWaitForTasksToCompleteOnShutdown(true);// 等待任务执行完成后关闭线程池 优雅关闭
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(200);
        executor.setThreadNamePrefix("yunfeichat-executor-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 满了调用线程执行,认为重要任务
        executor.setThreadFactory(new MyThreadFactory(executor));
        executor.initialize();
        return executor;
    }
}

代码解释:

  1. @EnableAsync 注解会启用对异步方法调用的支持

    使用 @Async 注解的方法将在独立的线程中执行,而不是在当前线程中执行。

    通常情况下,我们会配置一个自定义的 ThreadPoolTaskExecutor 作为异步执行器,以提供更细粒度的控制和配置。
    在这种情况下,@EnableAsync 注解会自动检测并使用我们配置的自定义执行器,而不是使用默认的 SimpleAsyncTaskExecutor

  2. 实现了 AsyncConfigurer 接口的 getAsyncExecutor 方法,返回了 yunfeichatExecutor 方法创建的线程池实例,作为项目默认的异步执行线程池。

  3. yunfeichatExecutor 方法创建并配置了一个 ThreadPoolTaskExecutor 线程池实例:

    • setWaitForTasksToCompleteOnShutdown(true): 在关闭线程池时等待任务执行完成,实现优雅关闭。
    • setCorePoolSize(10)setMaxPoolSize(10): 设置线程池的核心线程数和最大线程数为 10。
    • setQueueCapacity(200): 设置任务队列容量为 200。
    • setThreadNamePrefix("yunfeichat-executor-"): 设置线程名称前缀。
    • setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()): 当线程池处于饱和状态时,将任务交给调用线程来执行,认为这些任务是重要的。
    • setThreadFactory(new MyThreadFactory(executor)): 使用自定义的线程工厂创建线程。
    • initialize(): 初始化线程池。
  4. Primary

    @Primary 注解的作用是标记一个 Bean 为主要 Bean,当存在多个相同类型的 Bean 时,@Primary 标记的 Bean 将被优先使用。

    在 Spring 中,当一个接口或抽象类有多个实现时,Spring 需要确定应该使用哪个实现。通常情况下,这是通过@Qualifier注解或 Bean 名称来确定的。但是,如果没有使用@Qualifier注解,并且 Bean 名称也不能明确区分,那么 Spring 会选择标有@Primary注解的 Bean。

    一个常见的例子是:当一个接口有多个实现类时,我们可以使用@Primary注解来标记其中一个实现类为主要实现,这样在自动注入时就会优先使用这个实现类。

  5. 拒绝策略

    ThreadPoolExecutor.CallerRunsPolicyRejectedExecutionHandler 的一种实现,它用于在线程池饱和时的任务拒绝处理策略。

    当线程池中的线程数量达到最大值,并且任务队列也已满时,新的任务将被拒绝执行。CallerRunsPolicy 的行为如下:

    1. 当一个任务被拒绝执行时,它将由调用者的线程来直接执行这个任务。
    2. 这样做的目的是:
      • 降低系统的整体吞吐量,使系统处于稳定状态,不会因为过多的任务进入而崩溃。
      • 给调用者一定的反馈,让它知道自己的任务没有被成功地异步执行。
    3. 这种策略适用于以下场景:
      • 对于关键任务,我们希望它们能够尽快得到执行,即使会降低整体吞吐量。
      • 对于不太重要的任务,我们可以让它们排队等待,而不是被直接拒绝。

使用

@Async
@Override
public void renewalTokenIfNecessary(String token) {
    Long uid = getValidUid(token);
    String key = getUidKey(uid);
    Long expireDays = RedisUtils.getExpire(key, TimeUnit.DAYS);
    if (expireDays == -2) {
        return;
    }
    if (expireDays < TOKEN_EXPIRE_DAYS) {
        RedisUtils.expire(key, TOKEN_EXPIRE_DAYS, TimeUnit.DAYS);
    }
}

我们使用Async注解,就可以让这个方法异步执行了

优雅停机

项目关闭的时候,会调用JVM的shutdownHook回调线程池,等队列里任务执行完再停机。保证任务不丢失。

shutdownHook会回调spring容器,所以我们实现spring的DisposableBeandestroy方法也可以达到一样的效果,在里面调用executor.shutdown()并等待线程池执行完毕。

我们现在使用的线程池是Spring的线程池,而不是JUC包自带的线程池,Spring的线程池自带了优雅停机的功能。

追踪ThreadPoolTaskExecutor源码:

public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
       implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {

然后进入ExecutorConfigurationSupport:

public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory
       implements BeanNameAware, InitializingBean, DisposableBean {

这里看到了继承Spring的DisposableBean,里面便是destory方法:

public interface DisposableBean {
    void destroy() throws Exception;
}

因此我们的ExecutorConfigurationSupport会实现这个方法:

	private boolean waitForTasksToCompleteOnShutdown = false;

    public void destroy() {
        shutdown();
    }
	public void shutdown() {
		if (logger.isDebugEnabled()) {
			logger.debug("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
		}
		if (this.executor != null) {
			if (this.waitForTasksToCompleteOnShutdown) {
				this.executor.shutdown();
			}
			else {
				for (Runnable remainingTask : this.executor.shutdownNow()) {
					cancelRemainingTask(remainingTask);
				}
			}
			awaitTerminationIfNecessary(this.executor);
		}
	}

解释:这里线程池会根据waitForTasksToCompleteOnShutdown的值来判断是否要等待所有的任务执行完,默认是false,只有为true的时候,才会调用线程池的shutdown。this.executor.shutdown();

线程池的shutdown:

shutdown() 方法用于有序地关闭执行者服务(ExecutorService)。其主要作用如下:

  1. 有序关闭:当调用 shutdown() 方法时,执行者服务将停止接受新任务,但仍会完成之前提交的任务。

  2. 拒绝新任务:在调用 shutdown() 方法之后,执行者服务将不再接受新的任务。任何尝试提交新任务的操作都会抛出 RejectedExecutionException 异常。

  3. 不等待任务完成:shutdown() 方法并不会等待之前提交的任务全部完成。如果需要等待任务完成,应该使用 awaitTermination() 方法。

  4. 安全异常:如果存在安全管理器,并且调用者没有足够的权限修改执行者服务管理的线程,shutdown() 方法可能会抛出 SecurityException 异常。

异常捕获

线程池默认不会抛异常,而仅仅是打印错误信息

@Test
void main() {
    threadPoolTaskExecutor.execute(() -> {
        if (1 == 1) {
            log.error("出错了");
            throw new RuntimeException("test");
        }
    });
}

运行结果:

image-20240517205519031

我们想要的结果是向上面一样可以有日志,而不是在控制台打印。如果出了问题,却不打印error日志,那问题就被隐藏了,非常危险

原因,在ThreadGroup.java文件中,我们找到了对应的源码,可以看到这里仅仅是控制台打印:

public void uncaughtException(Thread t, Throwable e) {
    if (parent != null) {
        parent.uncaughtException(t, e);
    } else {
        Thread.UncaughtExceptionHandler ueh =
            Thread.getDefaultUncaughtExceptionHandler();
        if (ueh != null) {
            ueh.uncaughtException(t, e);
        } else if (!(e instanceof ThreadDeath)) {
            System.err.print("Exception in thread \""
                             + t.getName() + "\" ");
            e.printStackTrace(System.err);
        }
    }
}

现在我们想要重写这个方法,能够以日志的方式进行捕获:

重写,使得可以以日志的方式打印错误信息:

@Slf4j
public class GlobalUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {

    private static final GlobalUncaughtExceptionHandler instance = new GlobalUncaughtExceptionHandler();

    private GlobalUncaughtExceptionHandler() {
    }

    @Override
    public void uncaughtException(Thread t, Throwable e) {
        log.error("Exception in thread {} ", t.getName(), e);
    }

    public static GlobalUncaughtExceptionHandler getInstance() {
        return instance;
    }

}

接下来就是使用这个了:

@Bean(YUNFEICHAT_EXECUTOR)
@Primary
public ThreadPoolTaskExecutor yunfeichatExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setWaitForTasksToCompleteOnShutdown(true);// 等待任务执行完成后关闭线程池 优雅关闭
    executor.setCorePoolSize(10);
    executor.setMaxPoolSize(10);
    executor.setQueueCapacity(200);
    executor.setThreadNamePrefix("yunfeichat-executor-");
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 满了调用线程执行,认为重要任务
    executor.setThreadFactory(new MyThreadFactory(executor));
    executor.initialize();
    return executor;
}

我们发现ThreadPoolTaskExecutor这个类已经实现了ThreadFactory接口

image-20240517210435462

如果我们把线程工厂换了,那么里面的设置都要重新写,现在我们想要使用这个线程工厂,并在此基础上可以增加新的功能,也就是加上我们的GlobalUncaughtExceptionHandler,因此,我们很自然的可以想到装饰器模式。

于是便有了我们的MyThreadFactory

@Slf4j
@AllArgsConstructor
public class MyThreadFactory implements ThreadFactory {

    private final ThreadFactory factory;

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = factory.newThread(r);
        thread.setUncaughtExceptionHandler(GlobalUncaughtExceptionHandler.getInstance());
        return thread;
    }
}

这里是组合关系,我们保存组合的成员属性,通过构造器传入

如果是继承,那么可以使用super