项目线程池统一管理
项目线程池统一管理
为什么需要线程池?
使用线程池可以有效地提高资源利用率、控制并发数量、简化线程管理,并提高系统的响应速度,这对于需要高并发、高吞吐量的项目非常重要
如何使用
代码
下面是项目线程池的配置
@Configuration
@EnableAsync
public class ThreadPoolConfig implements AsyncConfigurer {
/**
* 项目共用线程池
*/
public static final String YUNFEICHAT_EXECUTOR = "yunfeichatExecutor";
/**
* websocket通信线程池
*/
public static final String WS_EXECUTOR = "websocketExecutor";
@Override
public Executor getAsyncExecutor() {
return yunfeichatExecutor();
}
@Bean(YUNFEICHAT_EXECUTOR)
@Primary
public ThreadPoolTaskExecutor yunfeichatExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setWaitForTasksToCompleteOnShutdown(true);// 等待任务执行完成后关闭线程池 优雅关闭
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("yunfeichat-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 满了调用线程执行,认为重要任务
executor.setThreadFactory(new MyThreadFactory(executor));
executor.initialize();
return executor;
}
}
代码解释:
@EnableAsync
注解会启用对异步方法调用的支持使用
@Async
注解的方法将在独立的线程中执行,而不是在当前线程中执行。通常情况下,我们会配置一个自定义的
ThreadPoolTaskExecutor
作为异步执行器,以提供更细粒度的控制和配置。
在这种情况下,@EnableAsync
注解会自动检测并使用我们配置的自定义执行器,而不是使用默认的SimpleAsyncTaskExecutor
。实现了
AsyncConfigurer
接口的getAsyncExecutor
方法,返回了yunfeichatExecutor
方法创建的线程池实例,作为项目默认的异步执行线程池。yunfeichatExecutor
方法创建并配置了一个ThreadPoolTaskExecutor
线程池实例:setWaitForTasksToCompleteOnShutdown(true)
: 在关闭线程池时等待任务执行完成,实现优雅关闭。setCorePoolSize(10)
和setMaxPoolSize(10)
: 设置线程池的核心线程数和最大线程数为 10。setQueueCapacity(200)
: 设置任务队列容量为 200。setThreadNamePrefix("yunfeichat-executor-")
: 设置线程名称前缀。setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy())
: 当线程池处于饱和状态时,将任务交给调用线程来执行,认为这些任务是重要的。setThreadFactory(new MyThreadFactory(executor))
: 使用自定义的线程工厂创建线程。initialize()
: 初始化线程池。
Primary
@Primary
注解的作用是标记一个 Bean 为主要 Bean,当存在多个相同类型的 Bean 时,@Primary
标记的 Bean 将被优先使用。在 Spring 中,当一个接口或抽象类有多个实现时,Spring 需要确定应该使用哪个实现。通常情况下,这是通过
@Qualifier
注解或 Bean 名称来确定的。但是,如果没有使用@Qualifier
注解,并且 Bean 名称也不能明确区分,那么 Spring 会选择标有@Primary
注解的 Bean。一个常见的例子是:当一个接口有多个实现类时,我们可以使用
@Primary
注解来标记其中一个实现类为主要实现,这样在自动注入时就会优先使用这个实现类。拒绝策略
ThreadPoolExecutor.CallerRunsPolicy
是RejectedExecutionHandler
的一种实现,它用于在线程池饱和时的任务拒绝处理策略。当线程池中的线程数量达到最大值,并且任务队列也已满时,新的任务将被拒绝执行。
CallerRunsPolicy
的行为如下:- 当一个任务被拒绝执行时,它将由调用者的线程来直接执行这个任务。
- 这样做的目的是:
- 降低系统的整体吞吐量,使系统处于稳定状态,不会因为过多的任务进入而崩溃。
- 给调用者一定的反馈,让它知道自己的任务没有被成功地异步执行。
- 这种策略适用于以下场景:
- 对于关键任务,我们希望它们能够尽快得到执行,即使会降低整体吞吐量。
- 对于不太重要的任务,我们可以让它们排队等待,而不是被直接拒绝。
使用
@Async
@Override
public void renewalTokenIfNecessary(String token) {
Long uid = getValidUid(token);
String key = getUidKey(uid);
Long expireDays = RedisUtils.getExpire(key, TimeUnit.DAYS);
if (expireDays == -2) {
return;
}
if (expireDays < TOKEN_EXPIRE_DAYS) {
RedisUtils.expire(key, TOKEN_EXPIRE_DAYS, TimeUnit.DAYS);
}
}
我们使用Async注解,就可以让这个方法异步执行了
优雅停机
项目关闭的时候,会调用JVM的shutdownHook回调线程池,等队列里任务执行完再停机。保证任务不丢失。
shutdownHook会回调spring容器,所以我们实现spring的DisposableBean
的destroy
方法也可以达到一样的效果,在里面调用executor.shutdown()
并等待线程池执行完毕。
我们现在使用的线程池是Spring的线程池,而不是JUC包自带的线程池,Spring的线程池自带了优雅停机的功能。
追踪ThreadPoolTaskExecutor
源码:
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
然后进入ExecutorConfigurationSupport
:
public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory
implements BeanNameAware, InitializingBean, DisposableBean {
这里看到了继承Spring的DisposableBean
,里面便是destory方法:
public interface DisposableBean {
void destroy() throws Exception;
}
因此我们的ExecutorConfigurationSupport
会实现这个方法:
private boolean waitForTasksToCompleteOnShutdown = false;
public void destroy() {
shutdown();
}
public void shutdown() {
if (logger.isDebugEnabled()) {
logger.debug("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (this.executor != null) {
if (this.waitForTasksToCompleteOnShutdown) {
this.executor.shutdown();
}
else {
for (Runnable remainingTask : this.executor.shutdownNow()) {
cancelRemainingTask(remainingTask);
}
}
awaitTerminationIfNecessary(this.executor);
}
}
解释:这里线程池会根据waitForTasksToCompleteOnShutdown
的值来判断是否要等待所有的任务执行完,默认是false,只有为true的时候,才会调用线程池的shutdown。this.executor.shutdown();
线程池的shutdown:
shutdown()
方法用于有序地关闭执行者服务(ExecutorService)。其主要作用如下:
有序关闭:当调用
shutdown()
方法时,执行者服务将停止接受新任务,但仍会完成之前提交的任务。拒绝新任务:在调用
shutdown()
方法之后,执行者服务将不再接受新的任务。任何尝试提交新任务的操作都会抛出RejectedExecutionException
异常。不等待任务完成:
shutdown()
方法并不会等待之前提交的任务全部完成。如果需要等待任务完成,应该使用awaitTermination()
方法。安全异常:如果存在安全管理器,并且调用者没有足够的权限修改执行者服务管理的线程,
shutdown()
方法可能会抛出SecurityException
异常。
异常捕获
线程池默认不会抛异常,而仅仅是打印错误信息
@Test
void main() {
threadPoolTaskExecutor.execute(() -> {
if (1 == 1) {
log.error("出错了");
throw new RuntimeException("test");
}
});
}
运行结果:
我们想要的结果是向上面一样可以有日志,而不是在控制台打印。如果出了问题,却不打印error日志,那问题就被隐藏了,非常危险
原因,在ThreadGroup.java文件中,我们找到了对应的源码,可以看到这里仅仅是控制台打印:
public void uncaughtException(Thread t, Throwable e) { if (parent != null) { parent.uncaughtException(t, e); } else { Thread.UncaughtExceptionHandler ueh = Thread.getDefaultUncaughtExceptionHandler(); if (ueh != null) { ueh.uncaughtException(t, e); } else if (!(e instanceof ThreadDeath)) { System.err.print("Exception in thread \"" + t.getName() + "\" "); e.printStackTrace(System.err); } } }
现在我们想要重写这个方法,能够以日志的方式进行捕获:
重写,使得可以以日志的方式打印错误信息:
@Slf4j
public class GlobalUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
private static final GlobalUncaughtExceptionHandler instance = new GlobalUncaughtExceptionHandler();
private GlobalUncaughtExceptionHandler() {
}
@Override
public void uncaughtException(Thread t, Throwable e) {
log.error("Exception in thread {} ", t.getName(), e);
}
public static GlobalUncaughtExceptionHandler getInstance() {
return instance;
}
}
接下来就是使用这个了:
@Bean(YUNFEICHAT_EXECUTOR)
@Primary
public ThreadPoolTaskExecutor yunfeichatExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setWaitForTasksToCompleteOnShutdown(true);// 等待任务执行完成后关闭线程池 优雅关闭
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("yunfeichat-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 满了调用线程执行,认为重要任务
executor.setThreadFactory(new MyThreadFactory(executor));
executor.initialize();
return executor;
}
我们发现ThreadPoolTaskExecutor
这个类已经实现了ThreadFactory
接口
如果我们把线程工厂换了,那么里面的设置都要重新写,现在我们想要使用这个线程工厂,并在此基础上可以增加新的功能,也就是加上我们的GlobalUncaughtExceptionHandler
,因此,我们很自然的可以想到装饰器模式。
于是便有了我们的MyThreadFactory
类
@Slf4j
@AllArgsConstructor
public class MyThreadFactory implements ThreadFactory {
private final ThreadFactory factory;
@Override
public Thread newThread(Runnable r) {
Thread thread = factory.newThread(r);
thread.setUncaughtExceptionHandler(GlobalUncaughtExceptionHandler.getInstance());
return thread;
}
}
这里是组合关系,我们保存组合的成员属性,通过构造器传入
如果是继承,那么可以使用super