線程池就是提前創建若干個線程,如果有任務需要處理,線程池里的線程就會處理任務,處理完之后線程并不會被銷毀,而是等待下一個任務。由于創建和銷毀線程都是消耗系統資源的,所以當你想要頻繁的創建和銷毀線程的時候就可以考慮使用線程池來提升系統的性能。
java中經常需要用到多線程來處理一些業務,我們非常不建議單純使用繼承Thread或者實現Runnable接口的方式來創建線程,那樣勢必有創建及銷毀線程耗費資源、線程上下文切換問題。同時創建過多的線程也可能引發資源耗盡的風險,這個時候引入線程池比較合理,方便線程任務的管理。java中涉及到線程池的相關類均在jdk1.5開始的java.util.concurrent包中,涉及到的幾個核心類及接口包括:Executor、Executors、ExecutorService、ThreadPoolExecutor、FutureTask、Callable、Runnable等。
線程池提供了一種限制和管理資源(包括執行一個任務)。 每個線程池還維護一些基本統計信息,例如已完成任務的數量。
線程池的好處如下:
1.降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。
2.可有效的控制最大并發線程數,提高系統資源的使用率,同時避免過多資源競爭,避免堵塞。
3.提供定時執行、定期執行、單線程、并發數控制等功能。
1)線程提交到線程池
2)判斷核心線程池是否已經達到設定的數量,如果沒有達到,則直接創建線程執行任務
3)如果達到了,則放在隊列中,等待執行
4)如果隊列已經滿了,則判斷線程的數量是否已經達到設定的最大值,如果達到了,則直接執行拒絕策略
5)如果沒有達到,則創建線程執行任務。
RUNNING :能接受新提交的任務,并且也能處理阻塞隊列中的任務;
SHUTDOWN:關閉狀態,不再接受新提交的任務,但卻可以繼續處理阻塞隊列中已保存的任務。在線程池處于 RUNNING 狀態時,調用 shutdown()方法會使線程池進入到該狀態。(finalize() 方法在執行過程中也會調用shutdown()方法進入該狀態);
STOP:不能接受新任務,也不處理隊列中的任務,會中斷正在處理任務的線程。在線程池處于 RUNNING 或 SHUTDOWN 狀態時,調用 shutdownNow() 方法會使線程池進入到該狀態;
TIDYING:如果所有的任務都已終止了,workerCount (有效線程數) 為0,線程池進入該狀態后會調用 terminated() 方法進入TERMINATED 狀態。
TERMINATED:在terminated() 方法執行完后進入該狀態,默認terminated()方法中什么也沒有做。
Executor就是一個線程池框架,Executor 位于java.util.concurrent.Executors ,提供了用于創建工作線程的線程池的工廠方法。它包含一組用于有效管理工作線程的組件。Executor API 通過 Executors 將任務的執行與要執行的實際任務解耦。
Executor 接口對象能執行我們的線程任務;
Executors 工具類的不同方法按照我們的需求創建了不同的線程池,來滿足業務的需求。
ExecutorService 接口繼承了Executor接口并進行了擴展,提供了更多的方法,我們能夠獲得任務執行的狀態并且可以獲取任務的返回值。
1.newSingleThreadExecutor
創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。
2.newFixedThreadPoo
創建一個定長線程池,可控制線程最大并發數,超出的線程會在隊列中等待。
3.newCachedThreadPool
創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
4.newScheduledThreadPool
創建一個定長線程池,支持定時及周期性任務執行。
1)newFixedThreadPool和newSingleThreadExecutor: 主要問題是堆積的請求處理隊列可能會耗費非常大的內存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool: 主要問題是線程數最大數是Integer.MAX_VALUE,可能會創建數量非常多的線程,甚至OOM。
3)線程池不允許使用Executors去創建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓程序員更加明確線程池的運行規則,規避資源耗盡的風險。
ThreadPoolExecutor是線程池的核心實現類,在JDK1.5引入,位于java.util.concurrent包。
通過下面的demo來了解ThreadPoolExecutor創建線程的過程。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 測試ThreadPoolExecutor對線程的執行順序
**/
public class ThreadPoolSerialTest {
public static void main(String[] args) {
//核心線程數
int corePoolSize = 3;
//最大線程數
int maximumPoolSize = 6;
//超過 corePoolSize 線程數量的線程最大空閑時間
long keepAliveTime = 2;
//以秒為時間單位
TimeUnit unit = TimeUnit.SECONDS;
//創建工作隊列,用于存放提交的等待執行任務
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(2);
ThreadPoolExecutor threadPoolExecutor = null;
try {
//創建線程池
threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
new ThreadPoolExecutor.AbortPolicy());
//循環提交任務
for (int i = 0; i < 8; i++) {
//提交任務的索引
final int index = (i + 1);
threadPoolExecutor.submit(() -> {
//線程打印輸出
System.out.println("大家好,我是線程:" + index);
try {
//模擬線程執行時間,10s
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//每個任務提交后休眠500ms再提交下一個任務,用于保證提交順序
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
threadPoolExecutor.shutdown();
}
}
}
當一個新任務被提交時:
1. 當前活躍線程數<corePoolSize,則創建一個新線程執行新任務;
2. 當前活躍線程數>corePoolSize,且隊列(workQueue)未滿時,則將新任務放入隊列中;
3. 當前活躍線程數>corePoolSize,且隊列(workQueue)已滿,且當前活躍線程數<maximumPoolSize,則繼續創建一個新線程執行新任務;
4. 當前活躍線程數>corePoolSize,且隊列(workQueue)已滿,且當前活躍線程數=maximumPoolSize,則執行拒絕策略(handler);
當任務執行完成后:
1. 超出corePoolSize的空閑線程,在等待新任務時,如果超出了keepAliveTime,則線程會被銷毀;
2. 如果allowCoreThreadTimeOut被設置為true,那么corePoolSize以內的空閑線程,如果超出了keepAliveTime,則同樣會被銷毀。
corePoolSize就是線程池中的核心線程數量,這幾個核心線程在沒有用的時候,也不會被回收
maximumPoolSize就是線程池中可以容納的最大線程的數量
keepAliveTime就是線程池中除了核心線程之外的其他的最長可以保留的時間,因為在線程池中,除了核心線程即使在無任務的情況下也不能被清 除,其余的都是有存活時間的,意思就是非核心線程可以保留的最長的空閑時間
util就是計算這個時間的一個單位。
workQueue就是等待隊列,任務可以儲存在任務隊列中等待被執行,執行的是FIFIO原則(先進先出)。
threadFactory就是創建線程的線程工廠。
handler是一種拒絕策略,我們可以在任務滿了之后,拒絕執行某些任務。
當線程充滿了ThreadPool的有界隊列時,飽和策略開始起作用。飽和策略可以理解為隊列飽和后,處理后續無法入隊的任務的策略。ThreadPoolExecutor可以通過調用setRejectedExecutionHandler來修改飽和策略。
當請求任務不斷的過來,而系統此時又處理不過來的時候,我們需要采取的策略是拒絕服務。RejectedExecutionHandler接口提供了拒絕任務處理的自定義方法的機會。在ThreadPoolExecutor中已經包含四種處理策略。
AbortPolicy策略:該策略會直接拋出異常,阻止系統正常工作。
CallerRunsPolicy 策略:只要線程池未關閉,該策略直接在調用者線程中,運行當前的被丟棄的任務。
DiscardOleddestPolicy策略:該策略將丟棄最老的一個請求,也就是即將被執行的任務,并嘗試再次提交當前任務。
DiscardPolicy策略:該策略默默的丟棄無法處理的任務,不予任何處理。
除了JDK默認提供的四種拒絕策略,我們可以根據自己的業務需求去自定義拒絕策略,自定義的方式很簡單,直接實現RejectedExecutionHandler接口即可。
1.execute()方法用于提交不需要返回值的任務,所以無法判斷任務是否被線程池執行成功與否;
2.submit()方法用于提交需要返回值的任務。線程池會返回一個 Future 類型的對象,通過這個 Future 對象可以判斷任務是否執行成功,并且可以通過 Future 的 get()方法來獲取返回值,get()方法會阻塞當前線程直到任務完成,而使用 get(long timeout,TimeUnit unit)方法則會阻塞當前線程一段時間后立即返回,這時候有可能任務沒有執行完。
線程組ThreadGroup對象中的stop,resume,suspend會導致安全問題,主要是死鎖問題,已經被官方廢棄,多以價值已經大不如以前。
線程組ThreadGroup不是線程安全的,在使用過程中不能及時獲取安全的信息。
shutdownNow():立即關閉線程池(暴力),正在執行中的及隊列中的任務會被中斷,同時該方法會返回被中斷的隊列中的任務列表;
shutdown():平滑關閉線程池,正在執行中的及隊列中的任務能執行完成,后續進來的任務會被執行拒絕策略;
isTerminated():當正在執行的任務及對列中的任務全部都執行(清空)完就會返回true;
線程池將線程和任務進行解耦,線程是線程,任務是任務,擺脫了之前通過 Thread 創建線程時的一個線程必須對應一個任務的限制。在線程池中,同一個線程可以從阻塞隊列中不斷獲取新任務來執行,其核心原理在于線程池對 Thread 進行了封裝,并不是每次執行任務都會調用 Thread.start() 來創建新線程,而是讓每個線程去執行一個“循環任務”,在這個“循環任務”中不停的檢查是否有任務需要被執行,如果有則直接執行,也就是調用任務中的 run 方法,將 run 方法當成一個普通的方法執行,通過這種方式將只使用固定的線程就將所有任務的 run 方法串聯起來。
1.ArrayBlockingQueue是一個基于數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
2.LinkedBlockingQueue一個基于鏈表結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列
3.SynchronousQueue 一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀態,吞吐量通常要高于LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool()使用了這個隊列。
4.PriorityBlockingQueue 一個具有優先級的無限阻塞隊列。
首先是利用好SpringBoot的自動裝配功能,配置好線程池的一些基本參數。
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
/*
* 線程池名前綴
*/
private static final String threadNamePrefix = "Api-Async-";
/**
* bean的名稱, 默認為首字母小寫的方法名
* @return
*/
@Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
/**
* 默認情況下,在創建了線程池后,線程池中的線程數為0,當有任務來之后,就會創建一個線程去執行任務,
* 當線程池中的線程數目達到corePoolSize后,就會把到達的任務放到緩存隊列當中;
* 當隊列滿了,就繼續創建線程,當線程數量大于等于maxPoolSize后,開始使用拒絕策略拒絕
*/
/*
* 核心線程數(默認線程數)
*/
executor.setCorePoolSize(corePoolSize);
//最大線程數
executor.setMaxPoolSize(maxPoolSize);
//緩沖隊列數
executor.setQueueCapacity(queueCapacity);
//允許線程空閑時間(單位是秒)
executor.setKeepAliveSeconds(keepAliveTime);
executor.setThreadNamePrefix(threadNamePrefix);
//用來設置線程池關閉時候等待所有任務都完成再繼續銷毀其他的Bean
executor.setWaitForTasksToCompleteOnShutdown(true);
//線程池對拒絕任務的處理策略,CallerRunsPolicy:由調用線程(提交任務的線程)處理該任務
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化
executor.initialize();
return executor;
}
}
配置好線程池的基本參數時候,我們就可以使用線程池了, 只要在一個限定域為public的方法頭部加上@Async注解即可。
@Async
public void createOrder() {
System.out.println("執行任務");
}
1)分析任務的特性
任務的性質:CPU 密集型任務、IO 密集型任務和混合型任務。
任務的優先級:高、中、低。
任務的執行時間:長、中、短。
任務的依賴性:是否依賴其他系統資源,如數據庫連接。
2)具體策略
[1]CPU 密集型任務配置盡可能小的線程,如配置N(CPU核心數)+1個線程的線程池。
[2]IO 密集型任務則由于線程并不是一直在執行任務,則配置盡可能多的線程,如2*N(CPU核心數)。
[3]混合型任務如果可以拆分,則將其拆分成一個 CPU 密集型任務和一個 IO 密集型任務。只要這兩個任務執行的時間相差不是太大,那么分解后執行的吞吐率要高于串行執行的吞吐率;如果這兩個任務執行時間相差太大,則沒必要進行分解。
[4]優先級不同的任務可以使用優先級隊列 PriorityBlockingQueue 來處理,它可以讓優先級高的任務先得到執行。但是,如果一直有高優先級的任務加入到阻塞隊列中,那么低優先級的任務可能永遠不能執行。
[5]執行時間不同的任務可以交給不同規模的線程池來處理,或者也可以使用優先級隊列,讓執行時間短的任務先執行。
[6]依賴數據庫連接池的任務,因為線程提交 SQL 后需要等待數據庫返回結果,線程數應該設置得較大,這樣才能更好的利用 CPU。
[7]建議使用有界隊列,有界隊列能增加系統的穩定性和預警能力。可以根據需要設大一點,比如幾千。使用無界隊列,線程池的隊列就會越來越大,有可能會撐滿內存,導致整個系統不可用。