之前出现过这样的一个状况: 使用 java.util.concurrent.CompletableFuture 类中 runAsync 提交任务,由于没有指定线程池,使用的是ForkJoinPool.commonPool()作为线程池执行异步代码。 线上任务提交得太快,线程池中线程执行任务太慢,导致创建太多线程,以及提交到任务队列,任务队列不断增加,耗光内存,线上大量的GC。
首先这边先使用自定义线程池,还是发现队列过长,以及创建线程数过多的情况
{
"threadInfo": {
"active": 768,
"finished": 240087,
"queue": 401621
}
}
其实关键是maximumPoolSize和workQueue,控制好最大线程数以及任务队列(最好为阻塞的),以及拒接策略。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
int corePoolSize 线程池中保留的线程数量
int maximumPoolSize 线程池的最大大小
long keepAliveTime 当前线程数超过corePoolSize时,空闲线程的最长保留时间
TimeUnit unit 时间单位
BlockingQueue
当提交任务当线程中时,由corePoolSize数量的线程去执行(假设线程执行的时间无限长),并且达到corePoolSize, 再次提交任务时,任务会被放置于workQueue,并且达到workQueue size, 再次提交任务时,线程池会重新创建线程,来执行任务,并且达到maximumPoolSize, 再次提交任务时,由于此时已经达到最大线程数并且达到最大队列容量,所以执行拒绝策略。
拒绝该任务,并且把该任务放置于调用者线程中去执行,当线程池中的线程被shutdown之后,任务会被丢弃。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
拒绝任务,并抛出异常
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
抛弃任务
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
丢弃队列中第一个任务,并将任务加入队列
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
public class ThreadTest {
public static class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("hello");
}
}
public static class MyThread extends Thread {
@Override
public void run() {
System.out.println("myThread hello");
}
}
@Test
public void test() {
Thread thread = new Thread(new MyRunnable());
thread.start();
Thread thread1 = new MyThread();
thread1.start();
}
}
参考: