您好,登錄后才能下訂單哦!
????在Java中比較常見的兩種創建線程的方法:繼承Thread類和實現Runnable接口。但是這兩種方法有個缺點就是無法獲取線程執行后的結果。所以Java之后提供了Future和Runnable接口,用于實現獲取線程執行結果。下面開始源碼分析:
1、Callable接口
public interface Callable<V> {
//返回接口,或者拋出異常
V call() throws Exception;
}
2、Future接口
public interface Future<V> {
/***嘗試取消任務,如果任務已經完成、已取消或其他原因無法取消,則失敗。
** 1、如果任務還沒開始執行,則該任務不應該運行
** 2、如果任務已經開始執行,由參數mayInterruptIfRunning來決定執行該任務的線程是否應該被中斷,這只是終止任務的一種嘗試。若mayInterruptIfRunning為true,則會立即中斷執行任務的線程并返回true,若mayInterruptIfRunning為false,則會返回true且不會中斷任務執行線程。
** 3、調用這個方法后,以后對isDone方法調用都返回true。
** 4、如果這個方法返回true,以后對isCancelled返回true。
***/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 判斷任務是否被取消了,如果調用了cance()則返回true
*/
boolean isCancelled();
/**
*如果任務完成,則返回ture
*任務完成包含正常終止、異常、取消任務。在這些情況下都返回true
*/
boolean isDone();
/**
* 線程阻塞,直到任務完成,返回結果
* 如果任務被取消,則引發CancellationException
* 如果當前線程被中斷,則引發InterruptedException
* 當任務在執行的過程中出現異常,則拋出ExecutionException
*/
V get() throws InterruptedException, ExecutionException;
/**
* 線程阻塞一定時間等待任務完成,并返回任務執行結果,如果則超時則拋出TimeoutException
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
3、FutureTask
????Future只是一個接口,不能直接用來創建對象,其實現類是FutureTask,JDK1.8修改了FutureTask的實現,JKD1.8不再依賴AQS來實現,而是通過一個volatile變量state以及CAS操作來實現。FutureTask結構如下所示:
public class FutureTask<V> implements RunnableFuture<V> {
/*
* 當前任務運行狀態
* NEW -> COMPLETING -> NORMAL(正常結束,返回結果)
* NEW -> COMPLETING -> EXCEPTIONAL(返回異常結果)
* NEW -> CANCELLED(任務被取消,無結果)
* NEW -> INTERRUPTING -> INTERRUPTED(任務被打斷,無結果)
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** 將要被執行的任務 */
private Callable<V> callable;
/** 存放執行結果,用于get()方法獲取結果,也可能用于get()方法拋出異常 */
private Object outcome; // non-volatile, protected by state reads/writes
/** 執行任務Callable的線程; */
private volatile Thread runner;
/** 棧結構的等待隊列,該節點是棧中最頂層的節點 */
private volatile WaitNode waiters;
為了后面更好的分析FutureTask的實現,這里有必要解釋下各個狀態。
NEW:表示是個新的任務或者還沒被執行完的任務。這是初始狀態。
COMPLETING:任務已經執行完成或者執行任務的時候發生異常,但是任務執行結果或者異常原因還沒有保存到outcome字段(outcome字段用來保存任務執行結果,如果發生異常,則用來保存異常原因)的時候,狀態會從NEW變更到COMPLETING。但是這個狀態會時間會比較短,屬于中間狀態。
NORMAL:任務已經執行完成并且任務執行結果已經保存到outcome字段,狀態會從COMPLETING轉換到NORMAL。這是一個最終態。
EXCEPTIONAL:任務執行發生異常并且異常原因已經保存到outcome字段中后,狀態會從COMPLETING轉換到EXCEPTIONAL。這是一個最終態。
CANCELLED:任務還沒開始執行或者已經開始執行但是還沒有執行完成的時候,用戶調用了cancel(false)方法取消任務且不中斷任務執行線程,這個時候狀態會從NEW轉化為CANCELLED狀態。這是一個最終態。
INTERRUPTING: 任務還沒開始執行或者已經執行但是還沒有執行完成的時候,用戶調用了cancel(true)方法取消任務并且要中斷任務執行線程但是還沒有中斷任務執行線程之前,狀態會從NEW轉化為INTERRUPTING。這是一個中間狀態。
INTERRUPTED:調用interrupt()中斷任務執行線程之后狀態會從INTERRUPTING轉換到INTERRUPTED。這是一個最終態。
有一點需要注意的是,所有值大于COMPLETING的狀態都表示任務已經執行完成(任務正常執行完成,任務執行異常或者任務被取消)。
3.1、FutureTask構造方法
// Callable 構造方法
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
/**
* runnable 構造函數
*/
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
3.2、get()方法阻塞隊列
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
3.3、run方法解析
//Executor調用執行任務
//
public void run() {
//狀態如果不是NEW,說明任務或者已經執行過,或者已經被取消,直接返返回,當然如果執行任務的線程runner不為null,說明任務正在執行
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
//執行任務
try {
Callable<V> c = callable;
//判斷任務是否為null,狀態是否為NEW
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call(); //暫時存放任務執行結果
ran = true;
} catch (Throwable ex) {
result = null;
ran = false; //執行失敗
//通過CAS算法設置返回值(COMPLETING)和狀態值(EXCEPTIONAL)
setException(ex);
}
//執行成功通過CAS(UNSAFE)設置返回值(COMPLETING)和狀態值(NORMAL)
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
//將任務runner設置為null,避免發生并發調用run()方法
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
//須重新讀取任務狀態,避免不可達(泄漏)的中斷
int s = state;
//確保cancle(ture)操作時,運行中的任務能接收到中斷指令
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
3.4、setException(Throwable t)解析
//發生異常時,將返回值設置到outcome(=COMPLETING)中,并更新任務狀態(EXCEPTIONAL)
protected void setException(Throwable t) {
//調用UNSAFE類封裝的CAS算法,設置值
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
//喚醒因等待返回值而阻塞的線程
finishCompletion();
}
}
3.5、set(V v)方法解析
//任務正常完成,將返回值設置到outcome(=COMPLETING)中,并更新任務狀態(=NORMAL)
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
3.6、 finishCompletion解析
//移除所有等待線程并發出信號,調用done(),以及將任務callable清空
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
//循環喚醒阻塞線程,直到阻塞隊列為空
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
//一直到阻塞隊列為空,跳出循環
if (next == null)
break;
q.next = null; // unlink to help gc 方便gc在適當的時候回收
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
3.7、handlePossibleCancellationInterrupt 方法解析
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
//自旋等待cancle(true)結束(中斷結束)
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
3.8、cancle方法解析
//取消任務執行
public boolean cancel(boolean mayInterruptIfRunning) {
//對NEW狀態的任務進行中斷,并根據參數設置state
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
//任務已完成(已發出中斷或已取消)
return false;
//中斷線程
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {//cancel(true)
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
//通過CAS算法,更新狀態
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//喚醒阻塞線程
finishCompletion();
}
return true;
}
3.9 get方法解析
/**
* 獲取執行結果
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
//假如任務還沒有執行完,則則塞線程,直至任務執行完成(結果已經存放到對應變量中)
s = awaitDone(false, 0L);
//返回結果
return report(s);
}
/**
* 獲取任務執行結果,指定時間結束,則超時返回,不再阻塞
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
3.10 awaitDone解析
/**
* Awaits completion or aborts on interrupt or timeout.
* 如英文注釋:等待任務執行完畢或任務中斷或任務超時
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
//循環等待
for (;;) {
//線程已經中斷,則移除等待任務
if (Thread.interrupted()) {
removeWaiter(q);
//移除當前任務后,拋出中斷異常
throw new InterruptedException();
}
//任務已經完成,則返回任務狀態,并對當前任務清場處理
int s = state;
if (s > COMPLETING) {
if (q != null) //任務不為空,則將執行線程設為null,避免并發下重復執行
q.thread = null;
return s;
}
//設置結果,很快就能完成,自旋等待
else if (s == COMPLETING) // cannot time out yet
Thread.yield(); //任務提前退出
//正在執行或者還沒開始,則構建新的節點
else if (q == null)
q = new WaitNode();
//判斷是否入隊,新節點一般在下一次循環入隊列阻塞
else if (!queued)
//沒有入隊列,設置q.next=waiters,并將waiters設為q
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
//假如有超時限制,則判斷是否超時
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
//超時則將任務節點從阻塞隊列中移除,并返回狀態
removeWaiter(q);
return state;
}
//阻塞調用get方法的線程,有超時限制
LockSupport.parkNanos(this, nanos);
}
else
//阻塞調用get方法的線程,無超時限制
LockSupport.park(this);
}
}
3.11 removeWaiter方法解析
//移除任務節點
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。