跳至主要內容

SpringBoot 自定义线程池

Mr.Dabao约 1980 字大约 7 分钟

SpringBoot 自定义线程池

自定义线程池有2种方法:
第一种:是自定义线程池,使用时指定线程池名称,执行异步任务。
第二种:重写spring默认线程池,然后使用自己重写后的线程池,执行异步任务。

一、自定义线程池

1.yml配置

# 线程池配置参数
task:
  pool:
    corePoolSize: 10 # 设置核心线程数
    maxPoolSize: 20  # 设置最大线程数
    keepAliveTime: 300 # 设置空闲线程存活时间(秒)
    queueCapacity: 100 # 设置队列容量
    threadNamePrefix: "-signpolicy-asynnotify-" # 设置线程名称前缀
    awaitTerminationSeconds: 60 #  设置线程池等待终止时间(秒)
spring:
  main:
    allow-bean-definition-overriding: true

2.线程池配置属性类

package com..config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

@Data
@ConfigurationProperties(prefix = "task.pool")
public class TaskThreadPoolConfig {
    private int corePoolSize;
    private int maxPoolSize;
    private int keepAliveSeconds;
    private int queueCapacity;
    private int awaitTerminationSeconds;
    private String threadNamePrefix;
}

3. 开启异步线程支持

在启动类上/开启异步线程支持+开启配置属性支持

package com.;

import com..config.TaskThreadPoolConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.annotation.EnableAsync;

@EnableAsync//开启异步线程支持
@EnableConfigurationProperties({TaskThreadPoolConfig.class})//开启配置属性支持
@SpringBootApplication
public class AsyncThreadApplication {

    public static void main(String[] args) {
        SpringApplication.run(AsyncThreadApplication.class, args);
    }

}

4.创建自定义线程池配置类

package com..config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 创建自定义线程池配置类
 *
 * @Author 
 * @Date 2022-04-05 17:26
 **/
@Configuration
public class AsyncScheduledTaskConfig {

    @Autowired
    private TaskThreadPoolConfig config;

    /**
     * 1.这种形式的线程池配置是需要在使用的方法上面添加@Async("customAsyncThreadPool")注解的
     * 2。如果在使用的方法上不添加该注解,那么spring就会使用默认的线程池
     * 3.所以如果添加@Async注解但是不指定使用的线程池,又想自己自定义线程池,那么就可以重写spring默认的线程池
     * 4.所以第二个方法就是重写spring默认的线程池
     *
     * @return
     */
    @Bean("customAsyncThreadPool")
    public Executor customAsyncThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //最大线程数
        executor.setMaxPoolSize(config.getMaxPoolSize());
        //核心线程数
        executor.setCorePoolSize(config.getCorePoolSize());
        //任务队列的大小
        executor.setQueueCapacity(config.getQueueCapacity());
        //线程池名的前缀
        executor.setThreadNamePrefix(config.getThreadNamePrefix());
        //允许线程的空闲时间30秒
        executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
        //设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
        executor.setWaitForTasksToCompleteOnShutdown(true);
        //设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
        executor.setAwaitTerminationSeconds(config.getAwaitTerminationSeconds());

        /**
         * 拒绝处理策略
         * CallerRunsPolicy():交由调用方线程运行,比如 main 线程。
         * AbortPolicy():直接抛出异常。
         * DiscardPolicy():直接丢弃。
         * DiscardOldestPolicy():丢弃队列中最老的任务。
         */
        /**
         * 特殊说明:
         * 1. 这里演示环境,拒绝策略咱们采用抛出异常
         * 2.真实业务场景会把缓存队列的大小会设置大一些,
         * 如果,提交的任务数量超过最大线程数量或将任务环缓存到本地、redis、mysql中,保证消息不丢失
         * 3.如果项目比较大的话,异步通知种类很多的话,建议采用MQ做异步通知方案
         */
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        //线程初始化
        executor.initialize();
        return executor;
    }
}

5.service逻辑层

package com..service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;


/**
 * 自定义线程池测试1
 *
 * @Author 
 * @Date 2022-04-05 17:55
 **/
@Service
public class ThreadPoolService {
    private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolService.class);

    @Async("customAsyncThreadPool")//指定使用哪个线程池配置,不然会使用spring默认的线程池
    public void executeAsync() {
        System.out.println("executeAsync");
        LOGGER.info("当前运行线程名称:{}", Thread.currentThread().getName());
    }
}



package com..service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;


/**
 * 自定义线程池测试2
 *
 * @Author 
 * @Date 2022-04-05 17:55
 **/
@Service
public class ThreadPoolService2 {
    private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolService2.class);

    @Async("customAsyncThreadPool")//指定使用哪个线程池配置,不然会使用spring默认的线程池
    public void executeAsync() {
        System.out.println("executeAsync2");
        LOGGER.info("当前运行线程名称:{}", Thread.currentThread().getName());
    }
}

6.controller控制层

package com..controller;

import com..service.ThreadPoolService;
import com..service.ThreadPoolService2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 自定义线程池侧是
 *
 * @Author 
 * @Date 2022-04-05 17:54
 **/
@RestController
public class ThreadPoolController {

    @Autowired
    private ThreadPoolService threadPoolService;
    @Autowired
    private ThreadPoolService2 threadPoolService2;

    @GetMapping("/threadPoolTest")
    public void threadPoolTest() {
        threadPoolService.executeAsync();
        threadPoolService2.executeAsync();
    }
}

7. 效果图

浏览器请求:http://localhost:8080/threadPoolTestopen in new window

image-20230628193834575
image-20230628193834575

二、配置默认线程池

第一种线程池配置是需要在使用的方法上面添加@Async(“customAsyncThreadPool”)注解的。
而这种方式是重写spring默认的线程池,使用的方法上面添加@Async注解,不用去声明线程池类

2.1. yml

# 线程池配置参数
task:
  pool:
    corePoolSize: 10 # 设置核心线程数
    maxPoolSize: 20  # 设置最大线程数
    keepAliveTime: 300 # 设置空闲线程存活时间(秒)
    queueCapacity: 100 # 设置队列容量
    threadNamePrefix: "-signpolicy-asynnotify-" # 设置线程名称前缀
    awaitTerminationSeconds: 60 #  设置线程池等待终止时间(秒)
spring:
  main:
    allow-bean-definition-overriding: true

2.2.线程池配置属性类

package com..config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
 * 线程配置属性类
 *
 * @Author 
 * @Date 2022-04-05 17:20
 **/
@Data
@ConfigurationProperties(prefix = "task.pool")
public class TaskThreadPoolConfig {
    private int corePoolSize;
    private int maxPoolSize;
    private int keepAliveSeconds;
    private int queueCapacity;
    private int awaitTerminationSeconds;
    private String threadNamePrefix;
}

2.3. 开启异步线程支持

在启动类上/开启异步线程支持+开启配置属性支持

package com.;

import com..config.TaskThreadPoolConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.annotation.EnableAsync;

@EnableAsync//开启异步线程支持
@EnableConfigurationProperties({TaskThreadPoolConfig.class})//开启配置属性支持
@SpringBootApplication
public class AsyncThreadApplication {

    public static void main(String[] args) {
        SpringApplication.run(AsyncThreadApplication.class, args);
    }

}

2.4. 装配线程池

package com..config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 原生(Spring)异步线程池装配类
 * 实现AsyncConfigurer接口,重写getAsyncExecutor和getAsyncUncaughtExceptionHandler方法,
 * 这样使用默认线程池时就会使用自己重写之后的线程池
 *
 * @Author 
 * @Date 2022-04-05 17:26
 **/
@Configuration
public class NativeAsyncScheduledTaskConfig implements AsyncConfigurer {
    private static final Logger LOGGER = LoggerFactory.getLogger(NativeAsyncScheduledTaskConfig.class);


    @Autowired
    private TaskThreadPoolConfig config;

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //最大线程数
        executor.setMaxPoolSize(config.getMaxPoolSize());
        //核心线程数
        executor.setCorePoolSize(config.getCorePoolSize());
        //任务队列的大小
        executor.setQueueCapacity(config.getQueueCapacity());
        //线程池名的前缀
        executor.setThreadNamePrefix(config.getThreadNamePrefix());
        //允许线程的空闲时间30秒
        executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
        //设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
        executor.setWaitForTasksToCompleteOnShutdown(true);
        //设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
        executor.setAwaitTerminationSeconds(config.getAwaitTerminationSeconds());

        /**
         * 拒绝处理策略
         * CallerRunsPolicy():交由调用方线程运行,比如 main 线程。
         * AbortPolicy():直接抛出异常。
         * DiscardPolicy():直接丢弃。
         * DiscardOldestPolicy():丢弃队列中最老的任务。
         */
        /**
         * 特殊说明:
         * 1. 这里演示环境,拒绝策略咱们采用抛出异常
         * 2.真实业务场景会把缓存队列的大小会设置大一些,
         * 如果,提交的任务数量超过最大线程数量或将任务环缓存到本地、redis、mysql中,保证消息不丢失
         * 3.如果项目比较大的话,异步通知种类很多的话,建议采用MQ做异步通知方案
         */
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        //线程初始化
        executor.initialize();
        return executor;
    }

    /**
     * 异步任务重异常处理
     *
     * @return
     */
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) -> {
            LOGGER.error(ex.getMessage(), ex);
            LOGGER.error("excetion method:{}", method.getName());
        };
    }
    // @Override
    // public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    //     return new AsyncUncaughtExceptionHandler() {
    //         @Override
    //         public void handleUncaughtException(Throwable ex, Method method, Object... params) {
    //             LOGGER.error(ex.getMessage(),ex);
    //             LOGGER.error("excetion method:{}",method.getName());
    //         }
    //     };
    // }
}

2.5. service逻辑层

package com..service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;


/**
 * 重写Spring默认线程池测试1
 *
 * @Author 
 * @Date 2022-04-05 17:55
 **/
@Service
public class ThreadPoolService3 {
    private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolService3.class);

    @Async
    public void executeAsync() {
        System.out.println("executeAsync3");
        LOGGER.info("当前运行线程名称:{}", Thread.currentThread().getName());
    }
}



package com..service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;


/**
 * 重写Spring默认线程池测试2
 *
 * @Author 
 * @Date 2022-04-05 17:55
 **/
@Service
public class ThreadPoolService4 {
    private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolService4.class);

    @Async
    public void executeAsync() {
        System.out.println("executeAsync4");
        LOGGER.info("当前运行线程名称:{}", Thread.currentThread().getName());
    }
}

2.6. controller控制层

package com..controller;

import com..service.ThreadPoolService3;
import com..service.ThreadPoolService4;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 自定义线程池侧是
 *
 * @Author 
 * @Date 2022-04-05 17:54
 **/
@RestController
public class ThreadPoolController2 {

    @Autowired
    private ThreadPoolService3 threadPoolService3;
    @Autowired
    private ThreadPoolService4 threadPoolService4;

    @GetMapping("/threadPoolTest2")
    public void threadPoolTest2() {
        threadPoolService3.executeAsync();
        threadPoolService4.executeAsync();
    }
}

2.7. 效果图

浏览器请求:http://localhost:8080/threadPoolTest2open in new window

image-20230628194335274
image-20230628194335274