您好,登錄后才能下訂單哦!
這篇文章給大家介紹MINA原理是怎樣的,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
1. 通過SocketConnector同服務器端建立連接
2. 鏈接建立之后I/O的讀寫交給了I/O Processor線程,I/O Processor是多線程的
3. 通過I/O Processor讀取的數據經過IoFilterChain里所有配置的IoFilter,IoFilter進行消息的過濾,格式的轉換,在這個層面可以制定一些自定義的協議
4. 最后IoFilter將數據交給Handler進行業務處理,完成了整個讀取的過程
5. 寫入過程也是類似,只是剛好倒過來,通過IoSession.write寫出數據,然后Handler進行寫入的業務處理,處理完成后交給IoFilterChain,進行消息過濾和協議的轉換,最后通過I/O Processor將數據寫出到socket通道
IoFilterChain作為消息過濾鏈
1. 讀取的時候是從低級協議到高級協議的過程,一般來說從byte字節逐漸轉換成業務對象的過程
2. 寫入的時候一般是從業務對象到字節byte的過程
IoSession貫穿整個通信過程的始終
整個過程可以用一個圖來表現
消息箭頭都是有NioProcessor-N線程發起調用,默認情況下也在NioProcessor-N線程中執行
Connector : 作為連接客戶端,SocketConector用來和服務器端建立連接,連接成功,創建IoProcessor Thread(不能超過指定的processorCount),Thread由指定的線程池進行管理,IoProcessor 利用NIO框架對IO進行處理,同時創建IoSession。連接的建立是通過Nio的SocketChannel進行。
NioSocketConnector connector = new NioSocketConnector(processorCount);
ConnectFuture future = connector.connect(new InetSocketAddress(HOSTNAME, PORT));建立一個I/O通道
Acceptor :作為服務器端的連接接受者,SocketAcceptor用來監聽端口,同客戶端建立連接,連接建立之后的I/O操作全部交給IoProcessor進行處理
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.bind( new InetSocketAddress(PORT) );
Protocol : 利用IoFilter,對消息進行解碼和編碼,如以下代碼通過 MyProtocolEncoder 將java對象轉成byte串,通過MyProtocalDecoder 將byte串恢復成java對象
connector.getFilterChain().addLast("codec";, new ProtocolCodecFilter( new MyProtocalFactory()));
......
public class MyProtocalFactory implements ProtocolCodecFactory {
ProtocolEncoderAdapter encoder = new MyProtocolEncoder();
ProtocolDecoder decoder = new MyProtocalDecoder() ;
public ProtocolDecoder getDecoder(IoSession session) throws Exception {
return decoder;
}
public ProtocolEncoder getEncoder(IoSession session) throws Exception {
return encoder;
}
}
......
public class MyProtocalDecoder extends ProtocolDecoderAdapter {
public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
throws Exception {
int id = in.getInt();
int len = in.getInt();
byte [] dst = new byte [len];
in.get(dst);
String name = new String(dst,"GBK");
Item item = new Item();
item.setId(id);
item.setName(name);
out.write(item);
}
}
......
public class MyProtocolEncoder extends ProtocolEncoderAdapter {
public void encode(IoSession session, Object message,
ProtocolEncoderOutput out) throws Exception {
Item item = (Item)message;
int byteLen = 8 + item.getName().getBytes("GBK").length ;
IoBuffer buf = IoBuffer.allocate(byteLen);
buf.putInt(item.getId());
buf.putInt(item.getName().getBytes("GBK").length);
buf.put(item.getName().getBytes("GBK";));
buf.flip();
out.write(buf);
}
}
handler : 具體處理事件,事件包括:sessionCreated、sessionOpened、sessionClosed、sessionIdle、exceptionCaught、messageReceived、messageSent。
connector.setHandler(new MyHandler());MyHandler繼承IoHandlerAdapter類或者實現IoHandler接口.事件最終由IoProcessor線程發動調用。
Processor : I/O處理器、允許多線程讀寫,開發過程中只需要指定線程數量,Processor通過Nio框架進行I/O的續寫操作,Processor包含了Nio的Selector的引用。這點也正是mina的優勢,如果直接用Nio編寫,則需要自己編寫代碼來實現類似Processor的功能。正因為 I/O Processor是異步處理讀寫的,所以我們有時候需要識別同一個任務的消息,比如一個任務包括發送消息,接收消息,反饋消息,那么我們需要在制定消息格式的時候,消息頭里能包含一個能識別是同一個任務的id。
I/O Porcessor線程數的設置 :如果是SocketConnector,則可以在構造方法中指定,如:new SocketConnector(processorCount, Executors.newCachedThreadPool());如果是SocketAcceptor,也是一樣的:SocketAcceptor acceptor = new SocketAcceptor(ProcessorCount, Executors.newCachedThreadPool());
processorCount為最大Porcessor線程數,這個值可以通過性能測試進行調優,默認值是cpu核數量+1(Runtime.getRuntime().availableProcessors() + 1)。
比較奇怪的是,每個IoProcessor在創建的時候會本地自己和自己建立一個連接?
IoSession : IoSession是用來保持IoService的上下文,一個IoService在建立Connect之后建立一個IoSession(一個連接一個session),IoSession的生命周期從Connection建立到斷開為止
IoSession做兩件事情:
1.通過IoSession可以獲取IoService的所有相關配置對象(持有對IoService,Processor池,SocketChannel,SessionConfig和IoService.IoHandler的引用)
2.通過IoSession.write 是數據寫出的入口
關于線程
ThreadModel 1.x版本的mina還有線程模式選項在2.x之后就沒有了
1.x版本指定線程模式
SocketConnectorConfig cfg = new SocketConnectorConfig();
cfg.setThreadModel(ThreadModel.MANUAL);
MINA有3種worker線程
Acceptor、Connector、I/O processor 線程
Acceptor Thread: 一般作為服務器端鏈接的接收線程,實現了接口IoService,線程的數量就是創建SocketAcceptor 的數量
Connector Thread :一般作為客戶端的請求建立鏈接線程,實現了接口IoService,維持了一個和服務器端Acceptor的一個鏈接,線程數量就是創建SocketConnector 的數量
Mina的SocketAcceptor和SocketConnector均是繼承了BaseIoService,是對IoService的兩種不同的實現
I/O processor Thread :作為I/O真正處理的線程,存在于服務器端和客戶端,用來處理I/O的讀寫操作,線程的數量是可以配置的,默認最大數量是CPU個數+1
服務器端:在創建SocketAcceptor的時候指定ProcessorCount
SocketAcceptor acceptor = new SocketAcceptor(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool());
客戶端:在創建SocketConnector 的時候指定ProcessorCount
SocketConnector connector = new SocketConnector(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool());
I/O Processor Thread,是依附于IoService,類似上面的例子SocketConnector connector = new SocketConnector(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool());是指SocketConnector這個線程允許CPU+1個I/O Processor Thread
NioProcessor雖然是多線程,但是對與一個連接的時候業務處理只會使用一個線程進行處理(Processor線程對于一個客戶端連接只使用一個線程NioProcessor-n)如果handler的業務比較耗時,會導致NioProcessor線程堵塞 ,在2個客戶端同時連接上來的時候會創建第2個(前提是第1個NioProcessor正在忙),創建的最大數量由Acceptor構造方法的時候指定。如果:一個客戶端連接同服務器端有很多通信,并且I/O的開銷不大,但是Handler處理的業務時間比較長,那么需要采用獨立的線程模式,在 FilterChain的最后增加一個ExecutorFitler :
acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool()));
這樣可以保證processor和handler的線程是分開的,否則:客戶端發送3個消息,而服務器對于每個消息要處理10s左右,那么這3個消息是被串行處理,在處理第一個消息的時候,后面的消息將被堵塞,同樣反過來客戶端也有同樣的問題。
客戶端Porcessor堵塞測試情況:
1. 以下代碼在建立連接后連續發送了5個消息(item)
ConnectFuture future = connector.connect( new InetSocketAddress(HOSTNAME, PORT));
future.awaitUninterruptibly();
session = future.getSession();
Item item = new Item();
item.setId(12345 );
item.setName("hi");
session.write(item);
session.write(item);
session.write(item);
session.write(item);
session.write(item);
2. 在handle的messageSent方法進行了延時處理,延時3秒
public void messageSent(IoSession session, Object message) throws Exception {
Thread.sleep(3000 );
System.out.println(message);
}
3. 測試結果
5個消息是串行發送,都由同一個IoPorcessor線程處理
session.write(item);
session.write(item);
session.write(item);
session.write(item);
session.write(item);
服務器端每隔3秒收到一個消息。因為調用是由IoProcessor觸發,而一個connector只會使用一個IoProcessor線程
4. 增加ExecutorFilter,ExecutorFilter保證在處理handler的時候是獨立線程
connector.getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool()));
5. 測試結果
4個session.wirte變成了并行處理,服務器端同時收到了5條消息
關于MINA原理是怎樣的就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。