您好,登錄后才能下訂單哦!
本文轉載 https://www.javadoop.com
本系列文章將整理到我在GitHub上的《Java面試指南》倉庫,更多精彩內容請到我的倉庫里查看
https://github.com/h3pl/Java-Tutorial
喜歡的話麻煩點下Star哈
文章將同步到我的個人博客:
www.how2playlife.com
本文是微信公眾號【Java技術江湖】的《不可輕視的Java網絡編程》其中一篇,本文部分內容來源于網絡,為了把本文主題講得清晰透徹,也整合了很多我認為不錯的技術博客內容,引用其中了一些比較好的博客文章,如有侵權,請聯系作者。
該系列博文會告訴你如何從計算機網絡的基礎知識入手,一步步地學習Java網絡基礎,從socket到nio、bio、aio和netty等網絡編程知識,并且進行實戰,網絡編程是每一個Java后端工程師必須要學習和理解的知識點,進一步來說,你還需要掌握Linux中的網絡編程原理,包括IO模型、網絡編程框架netty的進階原理,才能更完整地了解整個Java網絡編程的知識體系,形成自己的知識框架。
為了更好地總結和檢驗你的學習成果,本系列文章也會提供部分知識點對應的面試題以及參考答案。
如果對本系列文章有什么建議,或者是有什么疑問的話,也可以關注公眾號【Java技術江湖】聯系作者,歡迎你參與本系列博文的創作和修訂。
之前寫了兩篇關于 NIO 的文章,第一篇介紹了 NIO 的 Channel、Buffer、Selector 使用,第二篇介紹了非阻塞 IO 和異步 IO,并展示了簡單的用例。
本文將介紹 Tomcat 中的 NIO 使用,使大家對 Java NIO 的生產使用有更加直觀的認識。
雖然本文的源碼篇幅也不短,但是 Tomcat 的源碼畢竟不像 Doug Lea 的并發源碼那么“變態”,對于大部分讀者來說,閱讀難度比之前介紹的其他并發源碼要簡單一些,所以讀者不要覺得有什么壓力。
本文基于 Tomcat 當前(2018-03-20) 最新版本 9.0.6。
先簡單畫一張圖示意一下本文的主要內容:
目錄
Tomcat 9.0.6 下載地址: https://tomcat.apache.org/download-90.cgi
由于上面下載的 tomcat 的源碼并沒有使用 maven 進行組織,不方便我們看源碼,也不方便我們進行調試。這里我們將使用 maven 倉庫中的 tomcat-embed-core,自己編寫代碼進行啟動的方式來進行調試。
首先,創建一個空的 maven 工程,然后添加以下依賴。
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
tomcat-embed-core
<version>9.0.6</version>
</dependency>
上面的依賴,只會將 tomcat-embed-core-9.0.6.jar 和 tomcat-annotations-api-9.0.6.jar 兩個包引進來,對于本文來說,已經足夠了,如果你需要其他功能,需要額外引用其他的依賴,如 Jasper。
然后,使用以下啟動方法:
public static void main(String[] args) throws LifecycleException {
Tomcat tomcat = new Tomcat();
Connector connector = new Connector("HTTP/1.1");
connector.setPort(8080);
tomcat.setConnector(connector);
tomcat.start();
tomcat.getServer().await();
}
經過以上的代碼,我們的 Tomcat 就啟動起來了。
Tomcat 中的其他接口感興趣的讀者請自行探索,如設置 webapp 目錄,設置 resources 等
這里,介紹第一個重要的概念: Connector。在 Tomcat 中,使用 Connector 來處理連接,一個 Tomcat 可以配置多個 Connector,分別用于監聽不同端口,或處理不同協議。
在 Connector 的構造方法中,我們可以傳
HTTP/1.1
或
AJP/1.3
用于指定協議,也可以傳入相應的協議處理類,畢竟協議不是重點,將不同端口進來的連接對應不同處理類才是正道。典型地,我們可以指定以下幾個協議處理類:
本文的重點當然是非阻塞 IO 了,之前已經介紹過
異步 IO
的基礎知識了,讀者看完本文后,如果對異步 IO 的處理流程感興趣,可以自行去分析一遍。
如果你使用 9.0 以前的版本,Tomcat 在啟動的時候是會自動配置一個 connector 的,我們可以不用顯示配置。
9.0 版本的 Tomcat#start() 方法:
public void start() throws LifecycleException { getServer(); server.start(); }
8.5 及之前版本的 Tomcat#start() 方法:
public void start() throws LifecycleException { getServer(); // 自動配置一個使用非阻塞 IO 的 connector getConnector(); server.start(); }
前面我們說過一個 Connector 對應一個協議,當然這描述也不太對,NIO 和 NIO2 就都是處理 HTTP/1.1 的,只不過一個使用非阻塞,一個使用異步。進到指定 protocol 代碼,我們就會發現,它們的代碼及其簡單,只不過是指定了特定的 endpoint。
打開
Http11NioProtocol
和
Http11Nio2Protocol
源碼,我們可以看到,在構造方法中,它們分別指定了 NioEndpoint 和 Nio2Endpoint。
// 非阻塞模式
public class Http11NioProtocol extends AbstractHttp11JsseProtocol<NioChannel> {
public Http11NioProtocol() {
// NioEndpoint
super(new NioEndpoint());
}
...
}
// 異步模式
public class Http11Nio2Protocol extends AbstractHttp11JsseProtocol<Nio2Channel> {
public Http11Nio2Protocol() {
// Nio2Endpoint
super(new Nio2Endpoint());
}
...
}
這里介紹第二個重要的概念: endpoint。Tomcat 使用不同的 endpoint 來處理不同的協議請求,今天我們的重點是 NioEndpoint,其使用 非阻塞 IO 來進行處理 HTTP/1.1 協議的請求。
NioEndpoint 繼承 =>
AbstractJsseEndpoint 繼承 =>
AbstractEndpoint。中間的 AbstractJsseEndpoint 主要是提供了一些關于
HTTPS
的方法,這塊我們暫時忽略它,后面所有關于 HTTPS 的我們都直接忽略,感興趣的讀者請自行分析。
下面,我們看看從 tomcat.start() 一直到 NioEndpoint 的過程。
1. AbstractProtocol # init
@Override
public void init() throws Exception {
...
String endpointName = getName();
endpoint.setName(endpointName.substring(1, endpointName.length()-1));
endpoint.setDomain(domain);
// endpoint 的 name=http-nio-8089,domain=Tomcat
endpoint.init();
}
2. AbstractEndpoint # init
public final void init() throws Exception {
if (bindOnInit) {
bind(); // 這里對應的當然是子類 NioEndpoint 的 bind() 方法
bindState = BindState.BOUND_ON_INIT;
}
...
}
3. NioEndpoint # bind
這里就到我們的 NioEndpoint 了,要使用到我們之前學習的 NIO 的知識了。
@Override
public void bind() throws Exception {
// initServerSocket(); 原代碼是這行,我們 “內聯” 過來一起說
// 開啟 ServerSocketChannel
serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
// getPort() 會返回我們最開始設置的 8080,得到我們的 address 是 0.0.0.0:8080
InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
// ServerSocketChannel 綁定地址、端口,
// 第二個參數 backlog 默認為 100,超過 100 的時候,新連接會被拒絕(不過源碼注釋也說了,這個值的真實語義取決于具體實現)
serverSock.socket().bind(addr,getAcceptCount());
// ※※※ 設置 ServerSocketChannel 為阻塞模式 ※※※
serverSock.configureBlocking(true);
// 設置 acceptor 和 poller 的數量,至于它們是什么角色,待會說
// acceptorThreadCount 默認為 1
if (acceptorThreadCount == 0) {
// FIXME: Doesn't seem to work that well with multiple accept threads
// 作者想表達的意思應該是:使用多個 acceptor 線程并不見得性能會更好
acceptorThreadCount = 1;
}
// poller 線程數,默認值定義如下,所以在多核模式下,默認為 2
// pollerThreadCount = Math.min(2,Runtime.getRuntime().availableProcessors());
if (pollerThreadCount <= 0) {
pollerThreadCount = 1;
}
//
setStopLatch(new CountDownLatch(pollerThreadCount));
// 初始化 ssl,我們忽略 ssl
initialiseSsl();
// 打開 NioSelectorPool,先忽略它
selectorPool.open();
}
到這里,我們還不知道 Acceptor 和 Poller 是什么東西,我們只是設置了它們的數量,我們先來看看最后面提到的 SelectorPool。
剛剛我們分析完了 init() 過程,下面是啟動過程 start() 分析。
AbstractProtocol # start
@Override
public void start() throws Exception {
...
// 調用 endpoint 的 start 方法
endpoint.start();
// Start async timeout thread
asyncTimeout = new AsyncTimeout();
Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
int priority = endpoint.getThreadPriority();
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
priority = Thread.NORM_PRIORITY;
}
timeoutThread.setPriority(priority);
timeoutThread.setDaemon(true);
timeoutThread.start();
}
AbstractEndpoint # start
public final void start() throws Exception {
// 按照我們的流程,剛剛 init 的時候,已經把 bindState 改為 BindState.BOUND_ON_INIT 了,
// 所以下面的 if 分支我們就不進去了
if (bindState == BindState.UNBOUND) {
bind();
bindState = BindState.BOUND_ON_START;
}
// 往里看 NioEndpoint 的實現
startInternal();
}
下面這個方法還是比較重要的,這里會創建前面說過的 acceptor 和 poller。
NioEndpoint # startInternal
@Override
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
// 以下幾個是緩存用的,之后我們也會看到很多這樣的代碼,為了減少 new 很多對象出來
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());
eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getEventCache());
nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getBufferPool());
// 創建【工作線程池】,Tomcat 自己包裝了一下 ThreadPoolExecutor,
// 1\. 為了在創建線程池以后,先啟動 corePoolSize 個線程(這個屬于線程池的知識了,不熟悉的讀者可以看看我之前的文章)
// 2\. 自己管理線程池的增長方式(默認 corePoolSize 10, maxPoolSize 200),不是本文重點,不分析
if ( getExecutor() == null ) {
createExecutor();
}
// 設置一個柵欄(tomcat 自定義了類 LimitLatch),控制最大的連接數,默認是 10000
initializeConnectionLatch();
// 開啟 poller 線程
// 還記得之前 init 的時候,默認地設置了 poller 的數量為 2,所以這里啟動 2 個 poller 線程
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
// 開啟 acceptor 線程,和開啟 poller 線程組差不多。
// init 的時候,默認地,acceptor 的線程數是 1
startAcceptorThreads();
}
}
到這里,我們啟動了 工作線程池、 poller 線程組、 acceptor 線程組。同時,工作線程池初始就已經啟動了 10 個線程。我們用 jconsole 來看看此時的線程,請看下圖:
從 jconsole 中,我們可以看到,此時啟動了 BlockPoller、worker、poller、acceptor、AsyncTimeout,大家應該都已經清楚了每個線程是哪里啟動的吧。
Tomcat 中并沒有 Worker 這個類,此名字是我瞎編。
此時,我們還是不知道 acceptor、poller 甚至 worker 到底是干嘛的,下面,我們從 acceptor 線程開始看起。
它的結構非常簡單,在構造函數中,已經把 endpoint 傳進來了,此外就只有 threadName 和 state 兩個簡單的屬性。
private final AbstractEndpoint<?,U> endpoint;
private String threadName;
protected volatile AcceptorState state = AcceptorState.NEW;
public Acceptor(AbstractEndpoint<?,U> endpoint) {
this.endpoint = endpoint;
}
threadName 就是一個線程名字而已,Acceptor 的狀態 state 主要是隨著 endpoint 來的。
public enum AcceptorState {
NEW, RUNNING, PAUSED, ENDED
}
我們直接來看 acceptor 的 run 方法吧:
Acceptor # run
@Override
public void run() {
int errorDelay = 0;
// 只要 endpoint 處于 running,這里就一直循環
while (endpoint.isRunning()) {
// 如果 endpoint 處于 pause 狀態,這邊 Acceptor 用一個 while 循環將自己也掛起
while (endpoint.isPaused() && endpoint.isRunning()) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
// endpoint 結束了,Acceptor 自然也要結束嘛
if (!endpoint.isRunning()) {
break;
}
state = AcceptorState.RUNNING;
try {
// 如果此時達到了最大連接數(之前我們說過,默認是10000),就等待
endpoint.countUpOrAwaitConnection();
// Endpoint might have been paused while waiting for latch
// If that is the case, don't accept new connections
if (endpoint.isPaused()) {
continue;
}
U socket = null;
try {
// 這里就是接收下一個進來的 SocketChannel
// 之前我們設置了 ServerSocketChannel 為阻塞模式,所以這邊的 accept 是阻塞的
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
// We didn't get a socket
endpoint.countDownConnection();
if (endpoint.isRunning()) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break;
}
}
// accept 成功,將 errorDelay 設置為 0
errorDelay = 0;
if (endpoint.isRunning() && !endpoint.isPaused()) {
// setSocketOptions() 是這里的關鍵方法,也就是說前面千辛萬苦都是為了能到這里進行處理
if (!endpoint.setSocketOptions(socket)) {
// 如果上面的方法返回 false,關閉 SocketChannel
endpoint.closeSocket(socket);
}
} else {
// 由于 endpoint 不 running 了,或者處于 pause 了,將此 SocketChannel 關閉
endpoint.destroySocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
String msg = sm.getString("endpoint.accept.fail");
// APR specific.
// Could push this down but not sure it is worth the trouble.
if (t instanceof Error) {
Error e = (Error) t;
if (e.getError() == 233) {
// Not an error on HP-UX so log as a warning
// so it can be filtered out on that platform
// See bug 50273
log.warn(msg, t);
} else {
log.error(msg, t);
}
} else {
log.error(msg, t);
}
}
}
state = AcceptorState.ENDED;
}
大家應該發現了,Acceptor 繞來繞去,都是在調用 NioEndpoint 的方法,我們簡單分析一下這個。
在 NioEndpoint init 的時候,我們開啟了一個 ServerSocketChannel,后來 start 的時候,我們開啟多個 acceptor(實際上,默認是 1 個),每個 acceptor 啟動以后就開始循環調用 ServerSocketChannel 的 accept() 方法獲取新的連接,然后調用 endpoint.setSocketOptions(socket) 處理新的連接,之后再進入循環 accept 下一個連接。
到這里,大家應該也就知道了,為什么這個叫 acceptor 了吧?接下來,我們來看看 setSocketOptions 方法到底做了什么。
NioEndpoint # setSocketOptions
@Override
protected boolean setSocketOptions(SocketChannel socket) {
try {
// 設置該 SocketChannel 為非阻塞模式
socket.configureBlocking(false);
Socket sock = socket.socket();
// 設置 socket 的一些屬性
socketProperties.setProperties(sock);
// 還記得 startInternal 的時候,說過了 nioChannels 是緩存用的。
// 限于篇幅,這里的 NioChannel 就不展開了,它包括了 socket 和 buffer
NioChannel channel = nioChannels.pop();
if (channel == null) {
// 主要是創建讀和寫的兩個 buffer,默認地,讀和寫 buffer 都是 8192 字節,8k
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
channel.reset();
}
// getPoller0() 會選取所有 poller 中的一個 poller
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error("",t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}
我們看到,這里又沒有進行實際的處理,而是將這個 SocketChannel 注冊到了其中一個 poller 上。因為我們知道,acceptor 應該盡可能的簡單,只做 accept 的工作,簡單處理下就往后面扔。acceptor 還得回到之前的循環去 accept 新的連接呢。
我們只需要明白,此時,往 poller 中注冊了一個 NioChannel 實例,此實例包含客戶端過來的 SocketChannel 和一個 SocketBufferHandler 實例。
之前我們看到 acceptor 將一個 NioChannel 實例 register 到了一個 poller 中。在看 register 方法之前,我們需要先對 poller 要有個簡單的認識。
public class Poller implements Runnable {
public Poller() throws IOException {
// 每個 poller 開啟一個 Selector
this.selector = Selector.open();
}
private Selector selector;
// events 隊列,此類的核心
private final SynchronizedQueue<PollerEvent> events =
new SynchronizedQueue<>();
private volatile boolean close = false;
private long nextExpiration = 0;//optimize expiration handling
// 這個值后面有用,記住它的初始值為 0
private AtomicLong wakeupCounter = new AtomicLong(0);
private volatile int keyCount = 0;
...
}
敲重點:每個 poller 關聯了一個 Selector。
Poller 內部圍著一個 events 隊列轉,來看看其 events() 方法:
public boolean events() {
boolean result = false;
PollerEvent pe = null;
for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
result = true;
try {
// 逐個執行 event.run()
pe.run();
// 該 PollerEvent 還得給以后用,這里 reset 一下(還是之前說過的緩存)
pe.reset();
if (running && !paused) {
eventCache.push(pe);
}
} catch ( Throwable x ) {
log.error("",x);
}
}
return result;
}
events() 方法比較簡單,就是取出當前隊列中的 PollerEvent 對象,逐個執行 event.run() 方法。
然后,現在來看 Poller 的 run() 方法,該方法會一直循環,直到 poller.destroy() 被調用。
Poller # run
public void run() {
while (true) {
boolean hasEvents = false;
try {
if (!close) {
// 執行 events 隊列中每個 event 的 run() 方法
hasEvents = events();
// wakeupCounter 的初始值為 0,這里設置為 -1
if (wakeupCounter.getAndSet(-1) > 0) {
//if we are here, means we have other stuff to do
//do a non blocking select
keyCount = selector.selectNow();
} else {
// timeout 默認值 1 秒
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
// 篇幅所限,我們就不說 close 的情況了
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("",x);
continue;
}
//either we timed out or we woke up, process events first
// 這里沒什么好說的,頂多就再執行一次 events() 方法
if ( keyCount == 0 ) hasEvents = (hasEvents | events());
// 如果剛剛 select 有返回 ready keys,進行處理
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
// ※※※※※ 處理 ready key ※※※※※
processKey(sk, attachment);
}
}//while
//process timeouts
timeout(keyCount,hasEvents);
}//while
getStopLatch().countDown();
}
poller 的 run() 方法主要做了調用 events() 方法和處理注冊到 Selector 上的 ready key,這里我們暫時不展開 processKey 方法,因為此方法必定是及其復雜的。
我們回過頭來看之前從 acceptor 線程中調用的 register 方法。
Poller # register
public void register(final NioChannel socket) {
socket.setPoller(this);
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
socket.setSocketWrapper(ka);
ka.setPoller(this);
ka.setReadTimeout(getConnectionTimeout());
ka.setWriteTimeout(getConnectionTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
PollerEvent r = eventCache.pop();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
// 注意第三個參數值 OP_REGISTER
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
// 添加 event 到 poller 中
addEvent(r);
}
這里將這個 socket(包含 socket 和 buffer 的 NioChannel 實例) 包裝為一個 PollerEvent,然后添加到 events 中,此時調用此方法的 acceptor 結束返回,去處理新的 accepted 連接了。
接下來,我們已經知道了,poller 線程在循環過程中會不斷調用 events() 方法,那么 PollerEvent 的 run() 方法很快就會被執行,我們就來看看剛剛這個新的連接被 注冊到這個 poller 后,會發生什么。
PollerEvent # run
@Override
public void run() {
// 對于新來的連接,前面我們說過,interestOps == OP_REGISTER
if (interestOps == OP_REGISTER) {
try {
// 這步很關鍵!!!
// 將這個新連接 SocketChannel 注冊到該 poller 的 Selector 中,
// 設置監聽 OP_READ 事件,
// 將 socketWrapper 設置為 attachment 進行傳遞(這個對象可是什么鬼都有,往上看就知道了)
socket.getIOChannel().register(
socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
} else {
/* else 這塊不介紹,省得大家頭大 */
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
if (key == null) {
// The key was cancelled (e.g. due to socket closure)
// and removed from the selector while it was being
// processed. Count down the connections at this point
// since it won't have been counted down when the socket
// closed.
socket.socketWrapper.getEndpoint().countDownConnection();
} else {
final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
if (socketWrapper != null) {
//we are registering the key to start with, reset the fairness counter.
int ops = key.interestOps() | interestOps;
socketWrapper.interestOps(ops);
key.interestOps(ops);
} else {
socket.getPoller().cancelledKey(key);
}
}
} catch (CancelledKeyException ckx) {
try {
socket.getPoller().cancelledKey(key);
} catch (Exception ignore) {}
}
}
}
到這里,我們再回顧一下:剛剛在 PollerEvent 的 run() 方法中,我們看到,新的 SocketChannel 注冊到了 Poller 內部的 Selector 中,監聽 OP_READ 事件,然后我們再回到 Poller 的 run() 看下,一旦該 SocketChannel 是 readable 的狀態,那么就會進入到 poller 的 processKey 方法。
Poller # processKey
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
if ( close ) {
cancelledKey(sk);
} else if ( sk.isValid() && attachment != null ) {
if (sk.isReadable() || sk.isWritable() ) {
// 忽略 sendfile
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
} else {
// unregister 相應的 interest set,
// 如接下來是處理 SocketChannel 進來的數據,那么就不再監聽該 channel 的 OP_READ 事件
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
// 處理讀
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
// 處理寫
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
}
} else {
//invalid key
cancelledKey(sk);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("",t);
}
}
接下來是 processSocket 方法,注意第三個參數,上面進來的時候是 true。
AbstractEndpoint # processSocket
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
// 創建一個 SocketProcessor 的實例
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
// 將任務放到之前建立的 worker 線程池中執行
executor.execute(sc);
} else {
sc.run(); // ps: 如果 dispatch 為 false,那么就當前線程自己執行
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
NioEndpoint # createSocketProcessor
@Override
protected SocketProcessorBase<NioChannel> createSocketProcessor(
SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
return new SocketProcessor(socketWrapper, event);
}
我們看到,提交到 worker 線程池中的是 NioEndpoint.SocketProcessor 的實例,至于它的 run() 方法之后的邏輯,我們就不再繼續往里分析了。
最后,再祭出文章開始的那張圖來總結一下:
這里簡單梳理下前面我們說的流程,幫大家回憶一下:
后續的流程,感興趣的讀者請自行分析,本文就說到這里了。
(全文完)
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。