更新時(shí)間:2022-11-09 10:18:51 來(lái)源:動(dòng)力節(jié)點(diǎn) 瀏覽1638次
其實(shí)java線程池的實(shí)現(xiàn)原理很簡(jiǎn)單,說白了就是一個(gè)線程集合workerSet和一個(gè)Java阻塞隊(duì)列workQueue。當(dāng)用戶向線程池提交一個(gè)任務(wù)(也就是線程)時(shí),線程池會(huì)先將任務(wù)放入workQueue中。workerSet中的線程會(huì)不斷的從workQueue中獲取線程然后執(zhí)行。當(dāng)workQueue中沒有任務(wù)的時(shí)候,worker就會(huì)阻塞,直到隊(duì)列中有任務(wù)了就取出來(lái)繼續(xù)執(zhí)行。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize: 規(guī)定線程池有幾個(gè)線程(worker)在運(yùn)行。
maximumPoolSize: 當(dāng)workQueue滿了,不能添加任務(wù)的時(shí)候,這個(gè)參數(shù)才會(huì)生效。規(guī)定線程池最多只能有多少個(gè)線程(worker)在執(zhí)行。
keepAliveTime: 超出corePoolSize大小的那些線程的生存時(shí)間,這些線程如果長(zhǎng)時(shí)間沒有執(zhí)行任務(wù)并且超過了keepAliveTime設(shè)定的時(shí)間,就會(huì)消亡。
unit: 生存時(shí)間對(duì)于的單位
workQueue: 存放任務(wù)的隊(duì)列
threadFactory: 創(chuàng)建線程的工廠
handler: 當(dāng)workQueue已經(jīng)滿了,并且線程池線程數(shù)已經(jīng)達(dá)到maximumPoolSize,將執(zhí)行拒絕策略。
用戶通過submit提交一個(gè)任務(wù)。線程池會(huì)執(zhí)行如下流程:
判斷當(dāng)前運(yùn)行的worker數(shù)量是否超過corePoolSize,如果不超過corePoolSize。就創(chuàng)建一個(gè)worker直接執(zhí)行該任務(wù)。—— 線程池最開始是沒有worker在運(yùn)行的
如果正在運(yùn)行的worker數(shù)量超過或者等于corePoolSize,那么就將該任務(wù)加入到workQueue隊(duì)列中去。
如果workQueue隊(duì)列滿了,也就是offer方法返回false的話,就檢查當(dāng)前運(yùn)行的worker數(shù)量是否小于maximumPoolSize,如果小于就創(chuàng)建一個(gè)worker直接執(zhí)行該任務(wù)。
如果當(dāng)前運(yùn)行的worker數(shù)量是否大于等于maximumPoolSize,那么就執(zhí)行RejectedExecutionHandler來(lái)拒絕這個(gè)任務(wù)的提交。
我們先來(lái)看一下ThreadPoolExecutor中的幾個(gè)關(guān)鍵屬性。
//這個(gè)屬性是用來(lái)存放 當(dāng)前運(yùn)行的worker數(shù)量以及線程池狀態(tài)的
//int是32位的,這里把int的高3位拿來(lái)充當(dāng)線程池狀態(tài)的標(biāo)志位,后29位拿來(lái)充當(dāng)當(dāng)前運(yùn)行worker的數(shù)量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//存放任務(wù)的阻塞隊(duì)列
private final BlockingQueue<Runnable> workQueue;
//worker的集合,用set來(lái)存放
private final HashSet<Worker> workers = new HashSet<Worker>();
//歷史達(dá)到的worker數(shù)最大值
private int largestPoolSize;
//當(dāng)隊(duì)列滿了并且worker的數(shù)量達(dá)到maxSize的時(shí)候,執(zhí)行具體的拒絕策略
private volatile RejectedExecutionHandler handler;
//超出coreSize的worker的生存時(shí)間
private volatile long keepAliveTime;
//常駐worker的數(shù)量
private volatile int corePoolSize;
//最大worker的數(shù)量,一般當(dāng)workQueue滿了才會(huì)用到這個(gè)參數(shù)
private volatile int maximumPoolSize;
1. 提交任務(wù)相關(guān)源碼
下面是execute方法的源碼
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//workerCountOf(c)會(huì)獲取當(dāng)前正在運(yùn)行的worker數(shù)量
if (workerCountOf(c) < corePoolSize) {
//如果workerCount小于corePoolSize,就創(chuàng)建一個(gè)worker然后直接執(zhí)行該任務(wù)
if (addWorker(command, true))
return;
c = ctl.get();
}
//isRunning(c)是判斷線程池是否在運(yùn)行中,如果線程池被關(guān)閉了就不會(huì)再接受任務(wù)
//后面將任務(wù)加入到隊(duì)列中
if (isRunning(c) && workQueue.offer(command)) {
//如果添加到隊(duì)列成功了,會(huì)再檢查一次線程池的狀態(tài)
int recheck = ctl.get();
//如果線程池關(guān)閉了,就將剛才添加的任務(wù)從隊(duì)列中移除
if (! isRunning(recheck) && remove(command))
//執(zhí)行拒絕策略
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果加入隊(duì)列失敗,就嘗試直接創(chuàng)建worker來(lái)執(zhí)行任務(wù)
else if (!addWorker(command, false))
//如果創(chuàng)建worker失敗,就執(zhí)行拒絕策略
reject(command);
}
添加worker的方法addWorker源碼
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//使用自旋+cas失敗重試來(lái)保證線程競(jìng)爭(zhēng)問題
for (;;) {
//先獲取線程池的狀態(tài)
int c = ctl.get();
int rs = runStateOf(c);
// 如果線程池是關(guān)閉的,或者workQueue隊(duì)列非空,就直接返回false,不做任何處理
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//根據(jù)入?yún)ore 來(lái)判斷可以創(chuàng)建的worker數(shù)量是否達(dá)到上限,如果達(dá)到上限了就拒絕創(chuàng)建worker
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//沒有的話就嘗試修改ctl添加workerCount的值。這里用了cas操作,如果失敗了下一個(gè)循環(huán)會(huì)繼續(xù)重試,直到設(shè)置成功
if (compareAndIncrementWorkerCount(c))
//如果設(shè)置成功了就跳出外層的那個(gè)for循環(huán)
break retry;
//重讀一次ctl,判斷如果線程池的狀態(tài)改變了,會(huì)再重新循環(huán)一次
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
//創(chuàng)建一個(gè)worker,將提交上來(lái)的任務(wù)直接交給worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//加鎖,防止競(jìng)爭(zhēng)
mainLock.lock();
try {
int c = ctl.get();
int rs = runStateOf(c);
//還是判斷線程池的狀態(tài)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//如果worker的線程已經(jīng)啟動(dòng)了,會(huì)拋出異常
if (t.isAlive())
throw new IllegalThreadStateException();
//添加新建的worker到線程池中
workers.add(w);
int s = workers.size();
//更新歷史worker數(shù)量的最大值
if (s > largestPoolSize)
largestPoolSize = s;
//設(shè)置新增標(biāo)志位
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果worker是新增的,就啟動(dòng)該線程
if (workerAdded) {
t.start();
//成功啟動(dòng)了線程,設(shè)置對(duì)應(yīng)的標(biāo)志位
workerStarted = true;
}
}
} finally {
//如果啟動(dòng)失敗了,會(huì)觸發(fā)執(zhí)行相應(yīng)的方法
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
2. Worker的結(jié)構(gòu)
Worker是ThreadPoolExecutor內(nèi)部定義的一個(gè)Java內(nèi)部類。我們先看一下Worker的繼承關(guān)系
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
它實(shí)現(xiàn)了Runnable接口,所以可以拿來(lái)當(dāng)線程用。同時(shí)它還繼承了AbstractQueuedSynchronizer同步器類,主要用來(lái)實(shí)現(xiàn)一個(gè)不可重入的鎖。
一些屬性還有構(gòu)造方法:
//運(yùn)行的線程,前面addWorker方法中就是直接通過啟動(dòng)這個(gè)線程來(lái)啟動(dòng)這個(gè)worker
final Thread thread;
//當(dāng)一個(gè)worker剛創(chuàng)建的時(shí)候,就先嘗試執(zhí)行這個(gè)任務(wù)
Runnable firstTask;
//記錄完成任務(wù)的數(shù)量
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//創(chuàng)建一個(gè)Thread,將自己設(shè)置給他,后面這個(gè)thread啟動(dòng)的時(shí)候,也就是執(zhí)行worker的run方法
this.thread = getThreadFactory().newThread(this);
}
worker的run方法
public void run() {
//這里調(diào)用了ThreadPoolExecutor的runWorker方法
runWorker(this);
}
ThreadPoolExecutor的runWorker方法
final void runWorker(Worker w) {
//獲取當(dāng)前線程
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//執(zhí)行unlock方法,允許其他線程來(lái)中斷自己
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果前面的firstTask有值,就直接執(zhí)行這個(gè)任務(wù)
//如果沒有具體的任務(wù),就執(zhí)行g(shù)etTask()方法從隊(duì)列中獲取任務(wù)
//這里會(huì)不斷執(zhí)行循環(huán)體,除非線程中斷或者getTask()返回null才會(huì)跳出這個(gè)循環(huán)
while (task != null || (task = getTask()) != null) {
//執(zhí)行任務(wù)前先鎖住,這里主要的作用就是給shutdown方法判斷worker是否在執(zhí)行中的
//shutdown方法里面會(huì)嘗試給這個(gè)線程加鎖,如果這個(gè)線程在執(zhí)行,就不會(huì)中斷它
w.lock();
//判斷線程池狀態(tài),如果線程池被強(qiáng)制關(guān)閉了,就馬上退出
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//執(zhí)行任務(wù)前調(diào)用。預(yù)留的方法,可擴(kuò)展
beforeExecute(wt, task);
Throwable thrown = null;
try {
//真正的執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//執(zhí)行任務(wù)后調(diào)用。預(yù)留的方法,可擴(kuò)展
afterExecute(task, thrown);
}
} finally {
task = null;
//記錄完成的任務(wù)數(shù)量
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
下面來(lái)看一下getTask()方法,這里面涉及到keepAliveTime的使用,從這個(gè)方法我們可以看出先吃池是怎么讓超過corePoolSize的那部分worker銷毀的。
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果線程池已經(jīng)關(guān)閉了,就直接返回null,
//如果這里返回null,調(diào)用的那個(gè)worker就會(huì)跳出while循環(huán),然后執(zhí)行完銷毀線程
//SHUTDOWN狀態(tài)表示執(zhí)行了shutdown()方法
//STOP表示執(zhí)行了shutdownNow()方法
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//獲取當(dāng)前正在運(yùn)行中的worker數(shù)量
int wc = workerCountOf(c);
// 如果設(shè)置了核心worker也會(huì)超時(shí)或者當(dāng)前正在運(yùn)行的worker數(shù)量超過了corePoolSize,就要根據(jù)時(shí)間判斷是否要銷毀線程了
//其實(shí)就是從隊(duì)列獲取任務(wù)的時(shí)候要不要設(shè)置超時(shí)間時(shí)間,如果超過這個(gè)時(shí)間隊(duì)列還沒有任務(wù)進(jìn)來(lái),就會(huì)返回null
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果上一次循環(huán)從隊(duì)列獲取到的未null,這時(shí)候timedOut就會(huì)為true了
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//通過cas來(lái)設(shè)置WorkerCount,如果多個(gè)線程競(jìng)爭(zhēng),只有一個(gè)可以設(shè)置成功
//最后如果沒設(shè)置成功,就進(jìn)入下一次循環(huán),說不定下一次worker的數(shù)量就沒有超過corePoolSize了,也就不用銷毀worker了
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果要設(shè)置超時(shí)時(shí)間,就設(shè)置一下咯
//過了這個(gè)keepAliveTime時(shí)間還沒有任務(wù)進(jìn)隊(duì)列就會(huì)返回null,那worker就會(huì)銷毀
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//如果r為null,就設(shè)置timedOut為true
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
3. 添加Callable任務(wù)的實(shí)現(xiàn)源碼
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
要添加一個(gè)有返回值的任務(wù)的實(shí)現(xiàn)也很簡(jiǎn)單。其實(shí)就是對(duì)任務(wù)做了一層封裝,將其封裝成Future,然后提交給線程池執(zhí)行,最后返回這個(gè)future。
這里的 newTaskFor(task) 方法會(huì)將其封裝成一個(gè)FutureTask類。
外部的線程拿到這個(gè)future,執(zhí)行g(shù)et()方法的時(shí)候,如果任務(wù)本身沒有執(zhí)行完,執(zhí)行線程就會(huì)被阻塞,直到任務(wù)執(zhí)行完。
下面是FutureTask的get方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
//判斷狀態(tài),如果任務(wù)還沒執(zhí)行完,就進(jìn)入休眠,等待喚醒
if (s <= COMPLETING)
s = awaitDone(false, 0L);
//返回值
return report(s);
}
FutureTask中通過一個(gè)state狀態(tài)來(lái)判斷任務(wù)是否完成。當(dāng)run方法執(zhí)行完后,會(huì)將state狀態(tài)置為完成,同時(shí)喚醒所有正在等待的線程。我們可以看一下FutureTask的run方法
public void run() {
//判斷線程的狀態(tài)
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//執(zhí)行call方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
//這個(gè)方法里面會(huì)設(shè)置返回內(nèi)容,并且喚醒所以等待中的線程
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
4. shutdown和shutdownNow方法的實(shí)現(xiàn)
shutdown方法會(huì)將線程池的狀態(tài)設(shè)置為SHUTDOWN,線程池進(jìn)入這個(gè)狀態(tài)后,就拒絕再接受任務(wù),然后會(huì)將剩余的任務(wù)全部執(zhí)行完
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//檢查是否可以關(guān)閉線程
checkShutdownAccess();
//設(shè)置線程池狀態(tài)
advanceRunState(SHUTDOWN);
//嘗試中斷worker
interruptIdleWorkers();
//預(yù)留方法,留給子類實(shí)現(xiàn)
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍歷所有的worker
for (Worker w : workers) {
Thread t = w.thread;
//先嘗試調(diào)用w.tryLock(),如果獲取到鎖,就說明worker是空閑的,就可以直接中斷它
//注意的是,worker自己本身實(shí)現(xiàn)了AQS同步框架,然后實(shí)現(xiàn)的類似鎖的功能
//它實(shí)現(xiàn)的鎖是不可重入的,所以如果worker在執(zhí)行任務(wù)的時(shí)候,會(huì)先進(jìn)行加鎖,這里tryLock()就會(huì)返回false
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
shutdownNow做的比較絕,它先將線程池狀態(tài)設(shè)置為STOP,然后拒絕所有提交的任務(wù)。最后中斷左右正在運(yùn)行中的worker,然后清空任務(wù)隊(duì)列。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//檢測(cè)權(quán)限
advanceRunState(STOP);
//中斷所有的worker
interruptWorkers();
//清空任務(wù)隊(duì)列
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍歷所有worker,然后調(diào)用中斷方法
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
以上就是關(guān)于“Java線程池原理解析”的介紹,大家如果想了解更多相關(guān)知識(shí),不妨來(lái)關(guān)注一下本站的Java在線學(xué)習(xí),里面的課程內(nèi)容從入門到精通,細(xì)致全面,很適合沒有基礎(chǔ)的小伙伴學(xué)習(xí),希望對(duì)大家能夠有所幫助哦。
相關(guān)閱讀
0基礎(chǔ) 0學(xué)費(fèi) 15天面授
有基礎(chǔ) 直達(dá)就業(yè)
業(yè)余時(shí)間 高薪轉(zhuǎn)行
工作1~3年,加薪神器
工作3~5年,晉升架構(gòu)
提交申請(qǐng)后,顧問老師會(huì)電話與您溝通安排學(xué)習(xí)