SpringBoot 自定义线程池
约 1980 字大约 7 分钟
SpringBoot 自定义线程池
自定义线程池有2种方法:
第一种:是自定义线程池,使用时指定线程池名称,执行异步任务。
第二种:重写spring默认线程池,然后使用自己重写后的线程池,执行异步任务。
一、自定义线程池
1.yml配置
# 线程池配置参数
task:
pool:
corePoolSize: 10 # 设置核心线程数
maxPoolSize: 20 # 设置最大线程数
keepAliveTime: 300 # 设置空闲线程存活时间(秒)
queueCapacity: 100 # 设置队列容量
threadNamePrefix: "-signpolicy-asynnotify-" # 设置线程名称前缀
awaitTerminationSeconds: 60 # 设置线程池等待终止时间(秒)
spring:
main:
allow-bean-definition-overriding: true
2.线程池配置属性类
package com..config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Data
@ConfigurationProperties(prefix = "task.pool")
public class TaskThreadPoolConfig {
private int corePoolSize;
private int maxPoolSize;
private int keepAliveSeconds;
private int queueCapacity;
private int awaitTerminationSeconds;
private String threadNamePrefix;
}
3. 开启异步线程支持
在启动类上/开启异步线程支持+开启配置属性支持
package com.;
import com..config.TaskThreadPoolConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.annotation.EnableAsync;
@EnableAsync//开启异步线程支持
@EnableConfigurationProperties({TaskThreadPoolConfig.class})//开启配置属性支持
@SpringBootApplication
public class AsyncThreadApplication {
public static void main(String[] args) {
SpringApplication.run(AsyncThreadApplication.class, args);
}
}
4.创建自定义线程池配置类
package com..config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 创建自定义线程池配置类
*
* @Author
* @Date 2022-04-05 17:26
**/
@Configuration
public class AsyncScheduledTaskConfig {
@Autowired
private TaskThreadPoolConfig config;
/**
* 1.这种形式的线程池配置是需要在使用的方法上面添加@Async("customAsyncThreadPool")注解的
* 2。如果在使用的方法上不添加该注解,那么spring就会使用默认的线程池
* 3.所以如果添加@Async注解但是不指定使用的线程池,又想自己自定义线程池,那么就可以重写spring默认的线程池
* 4.所以第二个方法就是重写spring默认的线程池
*
* @return
*/
@Bean("customAsyncThreadPool")
public Executor customAsyncThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//最大线程数
executor.setMaxPoolSize(config.getMaxPoolSize());
//核心线程数
executor.setCorePoolSize(config.getCorePoolSize());
//任务队列的大小
executor.setQueueCapacity(config.getQueueCapacity());
//线程池名的前缀
executor.setThreadNamePrefix(config.getThreadNamePrefix());
//允许线程的空闲时间30秒
executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
//设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
executor.setWaitForTasksToCompleteOnShutdown(true);
//设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
executor.setAwaitTerminationSeconds(config.getAwaitTerminationSeconds());
/**
* 拒绝处理策略
* CallerRunsPolicy():交由调用方线程运行,比如 main 线程。
* AbortPolicy():直接抛出异常。
* DiscardPolicy():直接丢弃。
* DiscardOldestPolicy():丢弃队列中最老的任务。
*/
/**
* 特殊说明:
* 1. 这里演示环境,拒绝策略咱们采用抛出异常
* 2.真实业务场景会把缓存队列的大小会设置大一些,
* 如果,提交的任务数量超过最大线程数量或将任务环缓存到本地、redis、mysql中,保证消息不丢失
* 3.如果项目比较大的话,异步通知种类很多的话,建议采用MQ做异步通知方案
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
//线程初始化
executor.initialize();
return executor;
}
}
5.service逻辑层
package com..service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* 自定义线程池测试1
*
* @Author
* @Date 2022-04-05 17:55
**/
@Service
public class ThreadPoolService {
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolService.class);
@Async("customAsyncThreadPool")//指定使用哪个线程池配置,不然会使用spring默认的线程池
public void executeAsync() {
System.out.println("executeAsync");
LOGGER.info("当前运行线程名称:{}", Thread.currentThread().getName());
}
}
package com..service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* 自定义线程池测试2
*
* @Author
* @Date 2022-04-05 17:55
**/
@Service
public class ThreadPoolService2 {
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolService2.class);
@Async("customAsyncThreadPool")//指定使用哪个线程池配置,不然会使用spring默认的线程池
public void executeAsync() {
System.out.println("executeAsync2");
LOGGER.info("当前运行线程名称:{}", Thread.currentThread().getName());
}
}
6.controller控制层
package com..controller;
import com..service.ThreadPoolService;
import com..service.ThreadPoolService2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 自定义线程池侧是
*
* @Author
* @Date 2022-04-05 17:54
**/
@RestController
public class ThreadPoolController {
@Autowired
private ThreadPoolService threadPoolService;
@Autowired
private ThreadPoolService2 threadPoolService2;
@GetMapping("/threadPoolTest")
public void threadPoolTest() {
threadPoolService.executeAsync();
threadPoolService2.executeAsync();
}
}
7. 效果图
浏览器请求:http://localhost:8080/threadPoolTest

二、配置默认线程池
第一种线程池配置是需要在使用的方法上面添加@Async(“customAsyncThreadPool”)注解的。
而这种方式是重写spring默认的线程池,使用的方法上面添加@Async注解,不用去声明线程池类
2.1. yml
# 线程池配置参数
task:
pool:
corePoolSize: 10 # 设置核心线程数
maxPoolSize: 20 # 设置最大线程数
keepAliveTime: 300 # 设置空闲线程存活时间(秒)
queueCapacity: 100 # 设置队列容量
threadNamePrefix: "-signpolicy-asynnotify-" # 设置线程名称前缀
awaitTerminationSeconds: 60 # 设置线程池等待终止时间(秒)
spring:
main:
allow-bean-definition-overriding: true
2.2.线程池配置属性类
package com..config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* 线程配置属性类
*
* @Author
* @Date 2022-04-05 17:20
**/
@Data
@ConfigurationProperties(prefix = "task.pool")
public class TaskThreadPoolConfig {
private int corePoolSize;
private int maxPoolSize;
private int keepAliveSeconds;
private int queueCapacity;
private int awaitTerminationSeconds;
private String threadNamePrefix;
}
2.3. 开启异步线程支持
在启动类上/开启异步线程支持+开启配置属性支持
package com.;
import com..config.TaskThreadPoolConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.annotation.EnableAsync;
@EnableAsync//开启异步线程支持
@EnableConfigurationProperties({TaskThreadPoolConfig.class})//开启配置属性支持
@SpringBootApplication
public class AsyncThreadApplication {
public static void main(String[] args) {
SpringApplication.run(AsyncThreadApplication.class, args);
}
}
2.4. 装配线程池
package com..config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 原生(Spring)异步线程池装配类
* 实现AsyncConfigurer接口,重写getAsyncExecutor和getAsyncUncaughtExceptionHandler方法,
* 这样使用默认线程池时就会使用自己重写之后的线程池
*
* @Author
* @Date 2022-04-05 17:26
**/
@Configuration
public class NativeAsyncScheduledTaskConfig implements AsyncConfigurer {
private static final Logger LOGGER = LoggerFactory.getLogger(NativeAsyncScheduledTaskConfig.class);
@Autowired
private TaskThreadPoolConfig config;
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//最大线程数
executor.setMaxPoolSize(config.getMaxPoolSize());
//核心线程数
executor.setCorePoolSize(config.getCorePoolSize());
//任务队列的大小
executor.setQueueCapacity(config.getQueueCapacity());
//线程池名的前缀
executor.setThreadNamePrefix(config.getThreadNamePrefix());
//允许线程的空闲时间30秒
executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
//设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
executor.setWaitForTasksToCompleteOnShutdown(true);
//设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
executor.setAwaitTerminationSeconds(config.getAwaitTerminationSeconds());
/**
* 拒绝处理策略
* CallerRunsPolicy():交由调用方线程运行,比如 main 线程。
* AbortPolicy():直接抛出异常。
* DiscardPolicy():直接丢弃。
* DiscardOldestPolicy():丢弃队列中最老的任务。
*/
/**
* 特殊说明:
* 1. 这里演示环境,拒绝策略咱们采用抛出异常
* 2.真实业务场景会把缓存队列的大小会设置大一些,
* 如果,提交的任务数量超过最大线程数量或将任务环缓存到本地、redis、mysql中,保证消息不丢失
* 3.如果项目比较大的话,异步通知种类很多的话,建议采用MQ做异步通知方案
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
//线程初始化
executor.initialize();
return executor;
}
/**
* 异步任务重异常处理
*
* @return
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> {
LOGGER.error(ex.getMessage(), ex);
LOGGER.error("excetion method:{}", method.getName());
};
}
// @Override
// public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
// return new AsyncUncaughtExceptionHandler() {
// @Override
// public void handleUncaughtException(Throwable ex, Method method, Object... params) {
// LOGGER.error(ex.getMessage(),ex);
// LOGGER.error("excetion method:{}",method.getName());
// }
// };
// }
}
2.5. service逻辑层
package com..service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* 重写Spring默认线程池测试1
*
* @Author
* @Date 2022-04-05 17:55
**/
@Service
public class ThreadPoolService3 {
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolService3.class);
@Async
public void executeAsync() {
System.out.println("executeAsync3");
LOGGER.info("当前运行线程名称:{}", Thread.currentThread().getName());
}
}
package com..service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* 重写Spring默认线程池测试2
*
* @Author
* @Date 2022-04-05 17:55
**/
@Service
public class ThreadPoolService4 {
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolService4.class);
@Async
public void executeAsync() {
System.out.println("executeAsync4");
LOGGER.info("当前运行线程名称:{}", Thread.currentThread().getName());
}
}
2.6. controller控制层
package com..controller;
import com..service.ThreadPoolService3;
import com..service.ThreadPoolService4;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 自定义线程池侧是
*
* @Author
* @Date 2022-04-05 17:54
**/
@RestController
public class ThreadPoolController2 {
@Autowired
private ThreadPoolService3 threadPoolService3;
@Autowired
private ThreadPoolService4 threadPoolService4;
@GetMapping("/threadPoolTest2")
public void threadPoolTest2() {
threadPoolService3.executeAsync();
threadPoolService4.executeAsync();
}
}
2.7. 效果图
浏览器请求:http://localhost:8080/threadPoolTest2
