您好,登錄后才能下訂單哦!
如果一個任務需要返回執行結果,一般我們會實現一個Callable任務,并創建一個線程來執行任務。對于執行時間比較長的任務,顯然我們同步的等待結果再去執行后續的業務是不現實的,那么,Future模式是怎樣解決這個問題的呢?
Future模式,可以讓調用方立即返回,然后它自己會在后面慢慢處理,此時調用者拿到的僅僅是一個憑證,調用者可以先去處理其它任務,在真正需要用到調用結果的場合,再使用憑證去獲取調用結果。這個憑證就是這里的Future。
Future接口的定義:
public interface Future<V> {
// 取消任務
boolean cancel(boolean mayInterruptIfRunning);
// 任務是否取消
boolean isCancelled();
// 標記任務是否執行完成
boolean isDone();
// 阻塞獲取任務結果
V get() throws InterruptedException, ExecutionException;
// 超時獲取任務結果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
1、NEW:表示任務的初始化狀態;
2、COMPLETING:表示任務已執行完成(正常完成或異常完成),但任務結果或異常原因還未設置完成,屬于中間狀態;
3、NORMAL:表示任務已經執行完成(正常完成),且任務結果已設置完成,屬于最終狀態;
4、EXCEPTIONAL:表示任務已經執行完成(異常完成),且任務異常已設置完成,屬于最終狀態;
5、CANCELLED:表示任務還沒開始執行就被取消(非中斷方式),屬于最終狀態;
6、INTERRUPTING:表示任務還沒開始執行就被取消(中斷方式),正式被中斷前的過渡狀態,屬于中間狀態;
7、INTERRUPTED:表示任務還沒開始執行就被取消(中斷方式),且已被中斷,屬于最終狀態。
各個狀態之間的流轉:
FutureTask在構造時可以接受Runnable或Callable任務,如果是Runnable,則最終包裝成Callable:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
// 包裝Runnable成為Callable
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
private volatile int state;//任務狀態
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
private Callable<V> callable; // 真正的任務
private volatile Thread runner; // 保存正在執行任務的線程
/**
* 記錄結果或異常
*/
private Object outcome;
/**
* 無鎖棧(Treiber stack)
* 保存等待線程
*/
private volatile WaitNode waiters;
當調用FutureTask的get方法時,如果任務沒有完成,則調用線程會被阻塞,其實就是將線程包裝成WaitNode結點保存到waiters指向的棧中。
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
public void run() {
// 僅當任務為NEW狀態時, 才能執行任務
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//執行任務
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
//設置異常
setException(ex);
}
if (ran)
//設置任務執行結果outcome
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
set方法:
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;//存儲結果值
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
public boolean cancel(boolean mayInterruptIfRunning) {
// 僅NEW狀態下可以取消任務
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
if (mayInterruptIfRunning) { // 中斷任務
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//釋放所有在棧上等待的線程
finishCompletion();
}
return true;
}
任務取消后,最終調用finishCompletion方法,釋放所有在棧上等待的線程
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) { //自旋釋放所有等待線程
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);//喚醒線程
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
FutureTask可以通過get方法獲取任務結果,如果需要限時等待,可以調用get(long timeout, TimeUnit unit)
public V get() throws InterruptedException, ExecutionException {
int s = state;
//當前任務的狀態是NEW或COMPLETING,會調用awaitDone阻塞線程
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s); // 任務執行結果
}
/**
* 返回執行結果.
*/
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V) x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable) x);
}
1、ScheduledFutureTask在普通FutureTask的基礎上增加了周期執行/延遲執行的功能
2、ScheduledFutureTask是ScheduledThreadPoolExecutor這個線程池的默認調度任務類,通過繼承FutureTask和Delayed接口來實現周期/延遲功能的。
public void run() {
// 是否是周期任務
boolean periodic = isPeriodic();
//// 能否運行任務
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic) // 非周期任務:調用FutureTask的run方法運行
ScheduledFutureTask.super.run();
// 周期任務:調用FutureTask的runAndReset方法運行
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
FutureTask的runAndReset方法與run方法的區別就是當任務正常執行完成后,不會設置任務的最終狀態(即保持NEW狀態),以便任務重復執行:
protected boolean runAndReset() {
// 僅NEW狀態的任務可以執行
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); //不設置執行結果
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
runner = null;
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;//重新設置任務狀態為NEW,繼續重復執行
}
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。