您好,登錄后才能下訂單哦!
本篇內容主要講解“Java怎么創建多線程服務器”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Java怎么創建多線程服務器”吧!
一個典型的單線程服務器示例如下:
while (true) { Socket socket = null; try { // 接收客戶連接 socket = serverSocket.accept(); // 從socket中獲得輸入流與輸出流,與客戶通信 ... } catch(IOException e) { e.printStackTrace() } finally { try { if(socket != null) { // 斷開連接 socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }
服務端接收到一個客戶連接,就與客戶進行通信,通信完畢后斷開連接,然后接收下一個客戶連接,假如同時有多個客戶連接請求這些客戶就必須排隊等候。如果長時間讓客戶等待,就會使網站失去信譽,從而降低訪問量。
一般用并發性能來衡量一個服務器同時響應多個客戶的能力,一個具有好的并發性能的服務器,必須符合兩個條件:
能同時接收并處理多個客戶連接
對于每個客戶,都會迅速給予響應
用多個線程來同時為多個客戶提供服務,這是提高服務器并發性能的最常用的手段,一般有三種方式:
為每個客戶分配一個工作線程
創建一個線程池,由其中的工作線程來為客戶服務
利用 Java 類庫中現成的線程池,由它的工作線程來為客戶服務
服務器的主線程負責接收客戶的連接,每次接收到一個客戶連接,都會創建一個工作線程,由它負責與客戶的通信
public class EchoServer { private int port = 8000; private ServerSocket serverSocket; public EchoServer() throws IOException { serverSocket = new ServerSocket(port); System.out.println("服務器啟動"); } public void service() { while(true) { Socket socket = null; try { // 接教客戶連接 socket = serverSocket.accept(); // 創建一個工作線程 Thread workThread = new Thread(new Handler(socket)); // 啟動工作線程 workThread.start(); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String args[])throws TOException { new EchoServer().service(); } // 負責與單個客戶的通信 class Handler implements Runnable { private Socket socket; pub1ic Handler(Socket socket) { this.socket = socket; } private PrintWriter getWriter(Socket socket) throws IOException {...} private BufferedReader getReader(Socket socket) throws IOException {...} public String echo(String msg) {...} public void run() { try { System.out.println("New connection accepted" + socket.getInetAddress() + ":" + socket.getPort()); BufferedReader br = getReader(socket); PrintWriter pw = getWriter(socket); String msg = null; // 接收和發送數據,直到通信結束 while ((msg = br.readLine()) != null) { System.out.println("from "+ socket.getInetAddress() + ":" + socket.getPort() + ">" + msg); pw.println(echo(msg)); if (msg.equals("bye")) break; } } catch (IOException e) { e.printStackTrace(); } finally { try { // 斷開連接 if(socket != nulll) socket.close(); } catch (IOException e) { e,printStackTrace(); } } } } }
上一種實現方式有以下不足之處:
服務器創建和銷毀工作線程的開銷很大,如果服務器需要與許多客戶通信,并且與每個客戶的通信時間都很短,那么有可能服務器為客戶創建新線程的開銷比實際與客戶通信的開銷還要大
除了創建和銷毀線程的開銷,活動的線程也消耗系統資源。每個線程都會占用一定的內存,如果同時有大量客戶連接服務器,就必須創建大量工作線程,它們消耗了大量內存,可能會導致系統的內存空間不足
線程池中預先創建了一些工作線程,它們不斷地從工作隊列中取出任務,然后執行該任務。當工作線程執行完一個任務,就會繼續執行工作隊列中的下一個任務
線程池具有以下優點:
減少了創建和銷毀線程的次數,每個工作線程都可以一直被重用,能執行多個任務
可以根據系統的承載能力,方便調整線程池中線程的數目,防止因為消耗過量系統資源而導致系統崩潰
public class ThreadPool extends ThreadGroup { // 線程池是否關閉 private boolean isClosed = false; // 表示工作隊列 private LinkedList<Runnable> workQueue; // 表示線程池ID private static int threadPoolID; // 表示工作線程ID // poolSize 指定線程池中的工作線程數目 public ThreadPool(int poolSize) { super("ThreadPool-"+ (threadPoolID++)); setDaemon(true); // 創建工作隊列 workQueue = new LinkedList<Runnable>(); for (int i = 0; i < poolSize; i++) { // 創建并啟動工作線程 new WorkThread().start(); } } /** * 向工作隊列中加入一個新任務,由工作線程去執行任務 */ public synchronized void execute(Runnable tank) { // 線程池被關則拋出IllegalStateException異常 if(isClosed) { throw new IllegalStateException(); } if(task != null) { workQueue.add(task); // 喚醒正在getTask()方法中等待任務的工作線限 notify(); } } /** * 從工作隊列中取出一個任務,工作線程會調用此方法 */ protected synchronized Runnable getTask() throws InterruptedException { while(workQueue,size() == 0) { if (isClosed) return null; wait(); // 如果工作隊列中沒有任務,就等待任務 } return workQueue.removeFirst(); } /** * 關閉線程池 */ public synchronized void close() { if(!isClosed) { isClosed = true; // 清空工作隊列 workQueue.clear(); // 中斷所有的工作線程,該方法繼承自ThreadGroup類 interrupt(); } } /** * 等待工作線程把所有任務執行完 */ public void join() { synchronized (this) { isClosed = true; // 喚醒還在getTask()方法中等待任務的工作線程 notifyAll(); } Thread[] threads = new Thread[activeCount()]; // enumerate()方法繼承自ThreadGroup類獲得線程組中當前所有活著的工作線程 int count = enumerate(threads); // 等待所有工作線程運行結束 for(int i = 0; i < count; i++) { try { // 等待工作線程運行結束 threads[i].join(); } catch((InterruptedException ex) {} } } /** * 內部類:工作線程 */ private class WorkThread extends Thread { public WorkThread() { // 加入當前 ThreadPool 線程組 super(ThreadPool.this, "WorkThread-" + (threadID++)); } public void run() { // isInterrupted()方法承自Thread類,判斷線程是否被中斷 while (!isInterrupted()) { Runnable task = null; try { // 取出任務 task = getTask(); } catch(InterruptedException ex) {} // 如果 getTask() 返回 nu11 或者線程執行 getTask() 時被中斷,則結束此線程 if(task != null) return; // 運行任務,異常在catch代碼塊中被捕獲 try { task.run(); } catch(Throwable t) { t.printStackTrace(); } } } } }
使用線程池實現的服務器如下:
publlc class EchoServer { private int port = 8000; private ServerSocket serverSocket; private ThreadPool threadPool; // 線程港 private final int POOL_SIZE = 4; // 單個CPU時線程池中工作線程的數目 public EchoServer() throws IOException { serverSocket = new ServerSocket(port); // 創建線程池 // Runtime 的 availableProcessors() 方法返回當前系統的CPU的數目 // 系統的CPU越多,線程池中工作線程的數目也越多 threadPool= new ThreadPool( Runtime.getRuntime().availableProcessors() * POOL_SIZE); System.out.println("服務器啟動"); } public void service() { while (true) { Socket socket = null; try { socket = serverSocket.accept(); // 把與客戶通信的任務交給線程池 threadPool.execute(new Handler(socket)); } catch(IOException e) { e.printStackTrace(); } } } public static void main(String args[])throws TOException { new EchoServer().service(); } // 負責與單個客戶的通信,與上例類似 class Handler implements Runnable {...} }
java.util.concurrent
包提供了現成的線程池的實現,更加健壯,功能也更強大,更多關于線程池的介紹可以這篇文章
public class Echoserver { private int port = 8000; private ServerSocket serverSocket; // 線程池 private ExecutorService executorService; // 單個CPU時線程池中工作線程的數目 private final int POOL_SIZE = 4; public EchoServer() throws IOException { serverSocket = new ServerSocket(port); // 創建線程池 // Runtime 的 availableProcessors() 方法返回當前系統的CPU的數目 // 系統的CPU越多,線程池中工作線程的數目也越多 executorService = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() * POOL_SIZE); System.out.println("服務器啟動"); } public void service() { while(true) { Socket socket = null; try { socket = serverSocket.accept(); executorService.execute(new Handler(socket)); } catch(IOException e) { e.printStackTrace(); } } } public static void main(String args[])throws TOException { new EchoServer().service(); } // 負責與單個客戶的通信,與上例類似 class Handler implements Runnable {...} }
雖然線程池能大大提高服務器的并發性能,但使用它也存在一定風險,容易引發下面的問題:
死鎖
任何多線程應用程序都有死鎖風險。造成死鎖的最簡單的情形是:線程 A 持有對象 X 的鎖,并且在等待對象 Y 的鎖,而線程 B 持有對象 Y 的鎖,并且在等待對象 X 的鎖,線程 A 與線程 B 都不釋放自己持有的鎖,并且等待對方的鎖,這就導致兩個線程永遠等待下去,死鎖就這樣產生了
任何多線程程序都有死鎖的風險,但線程池還會導致另外一種死鎖:假定線程池中的所有工作線程都在執行各自任務時被阻塞,它們都在等待某個任務 A 的執行結果。而任務 A 依然在工作隊列中,由于沒有空閑線程,使得任務 A 一直不能被執行。這使得線程池中的所有工作線程都永遠阻塞下去,死鎖就這樣產生了
系統資源不足
如果線程池中的線程數目非常多,這些線程就會消耗包括內存和其他系統資源在內的大量資源,從而嚴重影響系統性能
并發錯誤
線程池的工作隊列依靠 wait()
和 notify()
方法來使工作線程及時取得任務,但這兩個方法都難以使用。如果編碼不正確,就可能會丟失通知,導致工作線程一直保持空閑狀態,無視工作隊列中需要處理的任務
線程泄漏
對于工作線程數目固定的線程池,如果工作線程在執行任務時拋出 RuntimeException 或 Error,并且這些異常或錯誤沒有被捕獲,那么這個工作線程就會異常終止,使得線程池永久地失去了一個工作線程。如果所有的工作線程都異常終止,線程池變為空,沒有任何可用的工作線程來處理任務
導致線程泄漏的另一種情形是,工作線程在執行一個任務時被阻塞,比如等待用戶的輸入數據,但是由于用戶一直不輸入數據(可能是因為用戶走開了),導致這個工作線程一直被阻塞。這樣的工作線程名存實亡,它實際上不執行任何任務了。假如線程池中所有的工作線程都處于這樣的阻塞狀態,那么線程池就無法處理新加入的任務了
任務過載
當工作隊列中有大量排隊等候執行的任務,這些任務本身可能會消耗太多的系統資源而引起系統資源缺乏
綜上所述,線程池可能會帶來種種風險,為了盡可能避免它們,使用線程池時需要遵循以下原則:
如果任務 A 在執行過程中需要同步等待任務 B 的執行結果,那么任務 A 不適合加入線程池的工作隊列中。如集把像任務 A 一樣的需要等待其他任務執行結果的任務加入工作隊列中,就可能會導致線程池的死鎖
如果執行某個任務時可能會阻塞,并且是長時間的阻塞,則應該設定超時時間避免工作線程永久地阻塞下去而導致線程泄漏
了解任務的特點,分析任務是執行經常會阻塞的 IO 操作,還是執行一直不會阻塞的運算操作。前者時斷時續地占用 CPU,而后者對 CPU 具有更高的利用率。根據任務的特點,對任務進行分類,然后把不同類型的任務分別加入不同線程池的工作隊列中,這樣可以根據任務的特點分別調整每個線程池
調整線程池的大小,線程池的最佳大小主要取決于系統的可用 CPU 的數目以及工作隊列中任務的特點。假如在一個具有 N 個 CPU 的系統上只有一個工作隊列并且其中全部是運算性質的任務,那么當線程池具有 N 或 N+1 個工作線程時,一般會獲得最大的 CPU 利用率
如果工作隊列中包含會執行 IO 操作并經常阻塞的任務,則要讓線程池的大小超過可用 CPU 的數目,因為并不是所有工作線程都一直在工作。選擇一個典型的任務,然后估計在執行這個任務的過程中,等待時間(WT)與實際占用 CPU 進行運算的時間(ST)之間的比:WT/ST。對于一個具有 N 個 CPU 的系統,需要設置大約 N(1+WT/ST) 個線程來保證 CPU 得到充分利用
避免任務過載,服務器應根據系統的承受能力,限制客戶的并發連接的數目。當客戶的并發連接的數目超過了限制值,服務器可以拒絕連接請求,并給予客戶友好提示。
到此,相信大家對“Java怎么創建多線程服務器”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。