您好,登錄后才能下訂單哦!
本篇內容介紹了“Java中J.U.C并發包誕生分析”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
JCP是Java Community Process的簡寫,是一種開發和修訂Java技術規范的流程,同時,我們提到JCP一般是指維護管理這套流程的組織,這個組織主要由Java開發者以及被授權的非盈利組織組成,他們掌管著Java的發展方向。JCP是Sun公司1998年12月8日提出的,旨在通過java社區的力量推進Java的發展。截止目前,已經從1.0版本發展到了最新的2019年7月21日頒布的2.11版本。JCP流程中有四個主要階段,分別是啟動、發布草案、最終版本、維護,一個Java的新功能從啟動階段提出,到順利走完整套流程后,就會出現在下個版本的JDK中了。
JSR是Java Specification Requests的簡寫,是服務JCP啟動階段提出草案的規范,任何人注冊成為JCP的會員后,都可以向JCP提交JSR。比如,你覺得JDK中String的操作方法沒有guava中的實用,你提個JSR增強String中的方法,只要能夠通過JCP的審核,就可以在下個版本的JDK中看到了。我們熟知的提案有,Java緩存api的JSR-107、Bean屬性校驗JSR-303等,當然還有本篇要講的Java并發包JSR-166.
JCP官網:https://jcp.org
Doug Lea,中文名為道格·利。是美國的一個大學教師,大神級的人物,J.U.C就是出自他之手。JDK1.5之前,我們控制程序并發訪問同步代碼只能使用synchronized,那個時候synchronized的性能還沒優化好,性能并不好,控制線程也只能使用Object的wait和notify方法。這個時候Doug Lea給JCP提交了JSR-166的提案,在提交JSR-166之前,Doug Lea已經使用了類似J.U.C包功能的代碼已經三年多了,這些代碼就是J.U.C的原型,下面簡單看下這些具有歷史味道的代碼,同時也能引發我們的一些思考,如果JDK中沒有,那么就自己造呀!
public class Mutex implements Sync { /** The lock status **/ protected boolean inuse_ = false; @Override public void acquire() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } synchronized(this) { try { //如果inuse_為true就wait住線程 while (inuse_) { wait(); } inuse_ = true; } catch (InterruptedException ex) { notify(); throw ex; } } } /** * 釋放鎖,通知線程繼續執行 */ @Override public synchronized void release() { inuse_ = false; notify(); } @Override public boolean attempt(long msecs) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } synchronized(this) { if (!inuse_) { inuse_ = true; return true; } else if (msecs <= 0) { return false; } else { long waitTime = msecs; long start = System.currentTimeMillis(); try { for (;;) { wait(waitTime); if (!inuse_) { inuse_ = true; return true; } else { waitTime = msecs - (System.currentTimeMillis() - start); if (waitTime <= 0) return false; } } } catch (InterruptedException ex) { notify(); throw ex; } } } } }
public class CountDown implements Sync { protected final int initialCount_; protected int count_; /** * Create a new CountDown with given count value **/ public CountDown(int count) { count_ = initialCount_ = count; } @Override public void acquire() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } synchronized (this) { while (count_ > 0) { wait(); } } } @Override public boolean attempt(long msecs) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } synchronized (this) { if (count_ <= 0) { return true; } else if (msecs <= 0) { return false; } else { long waitTime = msecs; long start = System.currentTimeMillis(); for (; ; ) { wait(waitTime); if (count_ <= 0) { return true; } else { waitTime = msecs - (System.currentTimeMillis() - start); if (waitTime <= 0) { return false; } } } } } } /** * Decrement the count. * After the initialCount'th release, all current and future * acquires will pass **/ @Override public synchronized void release() { if (--count_ == 0) { notifyAll(); } } /** * Return the initial count value **/ public int initialCount() { return initialCount_; } public synchronized int currentCount() { return count_; } }
了解J.U.C的都知道,在這個包里面AbstractQueuedSynchronizer是精髓所在,這就是我們在聊并發包時俗稱的AQS,這個框架設計為同步狀態的原子性管理、線程的阻塞和解除阻塞以及排隊提供一種通用的機制。并發包下ReentrantLock、CountDownLatch等都是基于AQS來實現的,在看下上面的原型實現都是實現的Sync接口,是不是似曾相識,下面的Sync就是AbstractQueuedSynchronizer的原型了
AQS設計論文:http://gee.cs.oswego.edu/dl/papers/aqs.pdf
中文譯文:https://www.cnblogs.com/dennyzhangdd/p/7218510.html
public interface Sync { public void acquire() throws InterruptedException; public boolean attempt(long msecs) throws InterruptedException; public void release(); /** One second, in milliseconds; convenient as a time-out value **/ public static final long ONE_SECOND = 1000; /** One minute, in milliseconds; convenient as a time-out value **/ public static final long ONE_MINUTE = 60 * ONE_SECOND; /** One hour, in milliseconds; convenient as a time-out value **/ public static final long ONE_HOUR = 60 * ONE_MINUTE; /** One day, in milliseconds; convenient as a time-out value **/ public static final long ONE_DAY = 24 * ONE_HOUR; /** One week, in milliseconds; convenient as a time-out value **/ public static final long ONE_WEEK = 7 * ONE_DAY; /** One year in milliseconds; convenient as a time-out value **/ public static final long ONE_YEAR = (long)(365.2425 * ONE_DAY); /** One century in milliseconds; convenient as a time-out value **/ public static final long ONE_CENTURY = 100 * ONE_YEAR; }
這個JSR的目標類似于JDK1.2 Collections包的目標:
1.標準化一個簡單,可擴展的框架,該框架將常用的實用程序組織成一個足夠小的包,以便用戶可以輕松學習并由開發人員維護。
2.提供一些高質量的實現。
該包將包含接口和類,這些接口和類在各種編程樣式和應用程序中都很有用。這些類包括:
原子變量。
專用鎖,屏障,信號量和條件變量。
為多線程使用而設計的隊列和相關集合。
線程池和自定義執行框架。
我們還將研究核心語言和庫中的相關支持。 請注意,這些與J2EE中使用的事務并發控制框架完全不同。(但是,對于那些創建此類框架的人來說,它們會很有用。)
J2SE
底層線程原語(例如synchronized塊,Object.wait和Object.notify)不足以用于許多編程任務。因此,應用程序員經常被迫實現自己的更高級別的并發工具。這導致了巨大的重復工作。此外,眾所周知,這些設施難以正確,甚至更難以優化。應用程序員編寫的并發工具通常不正確或效率低下。提供一組標準的并發實用程序將簡化編寫各種多線程應用程序的任務,并通常可以提高使用它們的應用程序的質量。
目前,開發人員只能使用Java語言本身提供的并發控制結構。對于某些應用程序來說,這些級別太低,而對其他應用程序則不完整
絕大多數軟件包將在低級Java構造之上實現。但是,有一些關于原子性和監視器的關鍵JVM /語言增強功能是獲得高效和正確語義所必需的。
java.util.concurrent中
只是間接地,因為在不同平臺上運行的JVM可能能夠以不同方式優化某些構造。
沒有
沒有
沒有
目標是將此規范包含在J2SE 1.5(Tiger)的JSR中。
電子郵件,電話會議和不常見的會議。我們還將使用或創建一個開放的郵件列表,供專家組以外的其他感興趣的人討論。
前面說到AQS是并發包下的精髓所在,那么LockSupport和Unsafe就是整個JSR-166并發包的所有功能實現的靈魂,縱觀整個并發包下的代碼,無處不見LockSupport和Unsafe的身影。LockSupport提供了兩個關鍵方法,park和unpark,用來操作線程的阻塞和放行,功能可以類比Object的wait和notify,但是比這兩個api更靈活。下面是博主簡化了的實現(JDK中不是這樣的)
/** 用于創建鎖和其他同步類的基本線程阻塞基礎類,提供基礎的線程控制功能。 */ public class LockSupport { private LockSupport() {} /** 解除park的阻塞,如果還沒阻塞,它對{@code park}的下一次調用將保證不會阻塞 */ public static void unpark(Thread thread) { if (thread != null) { UNSAFE.unpark(thread); } } /** 阻塞當前線程,除非先調用了unpark()方法。 */ public static void park() { UNSAFE.park(false, 0L); } //Hotspot implementation via intrinsics API private static final Unsafe UNSAFE; static { try { try { final PrivilegedExceptionAction<Unsafe> action = new PrivilegedExceptionAction<Unsafe>() { @Override public Unsafe run() throws Exception { Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); theUnsafe.setAccessible(true); return (Unsafe) theUnsafe.get(null); } }; UNSAFE = AccessController.doPrivileged(action); } catch (Exception e) { throw new RuntimeException("Unable to load unsafe", e); } } catch (Exception ex) { throw new Error(ex); } } }
有了park和unpark后,我們也可以這樣來實現Lock的功能,代碼如下,有了ConcurrentLinkedQueue加持后,就可以在基本的鎖的功能上,實現公平鎖的語義了。
/** * 簡版先進先出的公平鎖實現 * @author: kl @kailing.pub * @date: 2019/9/2 */ public class FIFOMutex { private final AtomicBoolean locked = new AtomicBoolean(false); private final Queue<Thread> waiters = new ConcurrentLinkedQueue<>(); public void lock() { boolean wasInterrupted = false; Thread current = Thread.currentThread(); waiters.add(current); // 在隊列中不是第一個或無法獲取鎖時阻塞 while (waiters.peek() != current || !locked.compareAndSet(false, true)) { LockSupport.park(this); if (Thread.interrupted()) { wasInterrupted = true; } } waiters.remove(); if (wasInterrupted) { current.interrupt(); } } public void unlock() { locked.set(false); LockSupport.unpark(waiters.peek()); } }
心細的你可能發現了LockSupport最終還是基于Unsafe的park和unpark來實現的,Unsafe在JDK1.5之前就存在的,那JSR166后增加了哪些內容呢?先來看下Unsafe是什么來頭。JDK源碼中是這樣描述的:一組用于執行低層、不安全操作的方法。盡管該類和所有方法都是公共的,但是該類的使用受到限制,因為只有受信任的代碼才能獲得該類的實例。如其名,不安全的,所以在JDK1.8后直接不提供源碼了,JDK中其他的代碼都可以在IDE中直接看到.java的文件,而Unsafe只有.class編譯后的代碼。因為Unsafe是真的有黑魔法,可以直接操作系統級的資源,比如系統內存、線程等。JDK不直接對外暴露Unsafe的api,如果直接在自己的應用程序中像JDK中那么獲取Unsafe的實例,如:
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
會直接拋異常SecurityException("Unsafe"),正確的獲取方式如下:
private static final Unsafe UNSAFE; static { try { try { final PrivilegedExceptionAction<Unsafe> action = new PrivilegedExceptionAction<Unsafe>() { @Override public Unsafe run() throws Exception { Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); theUnsafe.setAccessible(true); return (Unsafe) theUnsafe.get(null); } }; UNSAFE = AccessController.doPrivileged(action); } catch (Exception e) { throw new RuntimeException("Unable to load unsafe", e); } } catch (Exception ex) { throw new Error(ex); } }
其實,深入到Unsafe后,會發現深入不下去了,Unsafe中的方法是都是native標記的本地方法,沒有實現,如:
public native void unpark(Object var1); public native void park(boolean var1, long var2);
如果是在Windows下,最終調用的就是使用C++開發的最終編譯成.dll的包,所以只要看到C++相關的代碼就知道怎么回事了
JDK源碼下載地址:http://www.java.net/download/openjdk/jdk8/promoted/b132/openjdk-8-src-b132-03_mar_2014.zip
JDK源碼倉庫:http://hg.openjdk.java.net/
首先定位到Unsafe.cpp,文件位置在:openjdk\hotspot\src\share\vm\prims\Unsafe.cpp,會發現和JSR166相關的都有注釋,如:// These are the methods prior to the JSR 166 changes in 1.6.0。根據這些信息,得知JSR166在Unsafe中新增了五個方法,分別是compareAndSwapObject、compareAndSwapInt、compareAndSwapLong、park、unpark,這就是并發包中CAS原子操作和線程控制的核心所在了,并發包中的大部分功能都是基于他們來實現的。最后我們看下park和unpark的具體實現,在學校學的C語言丟的差不好多了,但是下面的代碼還語義還是很清晰的
// JSR166 // ------------------------------------------------------- /* * The Windows implementation of Park is very straightforward: Basic * operations on Win32 Events turn out to have the right semantics to * use them directly. We opportunistically resuse the event inherited * from Monitor. * void Parker::park(bool isAbsolute, jlong time) { guarantee (_ParkEvent != NULL, "invariant") ; // First, demultiplex/decode time arguments if (time < 0) { // don't wait return; } else if (time == 0 && !isAbsolute) { time = INFINITE; } else if (isAbsolute) { time -= os::javaTimeMillis(); // convert to relative time if (time <= 0) // already elapsed return; } else { // relative time /= 1000000; // Must coarsen from nanos to millis if (time == 0) // Wait for the minimal time unit if zero time = 1; } JavaThread* thread = (JavaThread*)(Thread::current()); assert(thread->is_Java_thread(), "Must be JavaThread"); JavaThread *jt = (JavaThread *)thread; // Don't wait if interrupted or already triggered if (Thread::is_interrupted(thread, false) || WaitForSingleObject(_ParkEvent, 0) == WAIT_OBJECT_0) { ResetEvent(_ParkEvent); return; } else { ThreadBlockInVM tbivm(jt); OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */); jt->set_suspend_equivalent(); WaitForSingleObject(_ParkEvent, time); ResetEvent(_ParkEvent); // If externally suspended while waiting, re-suspend if (jt->handle_special_suspend_equivalent_condition()) { jt->java_suspend_self(); } } } void Parker::unpark() { guarantee (_ParkEvent != NULL, "invariant") ; SetEvent(_ParkEvent); }
“Java中J.U.C并發包誕生分析”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。