您好,登錄后才能下訂單哦!
一、多線程介紹
在編程中,我們不可逃避的會遇到多線程的編程問題,因為在大多數的業務系統中需要并發處理,如果是在并發的場景中,多線程就非常重要了。另外,我們在面試的時候,面試官通常也會問到我們關于多線程的問題,如:如何創建一個線程?我們通常會這么回答,主要有兩種方法,第一種:繼承Thread類,重寫run方法;第二種:實現Runnable接口,重寫run方法。那么面試官一定會問這兩種方法各自的優缺點在哪,不管怎么樣,我們會得出一個結論,那就是使用方式二,因為面向對象提倡少繼承,盡量多用組合。
這個時候,我們還可能想到,如果想得到多線程的返回值怎么辦呢?根據我們多學到的知識,我們會想到實現Callable接口,重寫call方法。那么多線程到底在實際項目中怎么使用呢,他有多少種方式呢?
首先,我們來看一個例子:
這是一種創建多線程的簡單方法,很容易理解,在例子中,根據不同的業務場景,我們可以在Thread()里邊傳入不同的參數實現不同的業務邏輯,但是,這個方法創建多線程暴漏出來的問題就是反復創建線程,而且創建線程后還得銷毀,如果對并發場景要求低的情況下,這種方式貌似也可以,但是高并發的場景中,這種方式就不行了,因為創建線程銷毀線程是非常耗資源的。所以根據經驗,正確的做法是我們使用線程池技術,JDK提供了多種線程池類型供我們選擇,具體方式可以查閱jdk的文檔。
這里代碼我們需要注意的是,傳入的參數代表我們配置的線程數,是不是越多越好呢?肯定不是。因為我們在配置線程數的時候要充分考慮服務器的性能,線程配置的多,服務器的性能未必就優。通常,機器完成的計算是由線程數決定的,當線程數到達峰值,就無法在進行計算了。如果是耗CPU的業務邏輯(計算較多),線程數和核數一樣就到達峰值了,如果是耗I/O的業務邏輯(操作數據庫,文件上傳、下載等),線程數越多一定意義上有助于提升性能。
線程數大小的設定又一個公式決定:
Y=N*((a+b)/a),其中,N:CPU核數,a:線程執行時程序的計算時間,b:線程執行時,程序的阻塞時間。有了這個公式后,線程池的線程數配置就會有約束了,我們可以根據機器的實際情況靈活配置。
二、多線程優化及性能比較
最近的項目中用到了所線程技術,在使用過程中遇到了很多的麻煩,趁著熱度,整理一下幾種多線程框架的性能比較。目前所掌握的大致分三種,第一種:ThreadPool(線程池)+CountDownLatch(程序計數器),第二種:Fork/Join框架,第三種JDK8并行流,下面對這幾種方式的多線程處理性能做一下比較總結。
首先,假設一種業務場景,在內存中生成多個文件對象,這里暫定30000,(Thread.sleep(時間))線程睡眠模擬業務處理業務邏輯,來比較這幾種方式的多線程處理性能。
1) 單線程
這種方式非常簡單,但是程序在處理的過程中非常的耗時,使用的時間會很長,因為每個線程都在等待當前線程執行完才會執行,和多線程沒有多少關系,所以效率非常低。
首先創建文件對象,代碼如下:
public class FileInfo { private String fileName;//文件名 private String fileType;//文件類型 private String fileSize;//文件大小 private String fileMD5;//MD5碼 private String fileVersionNO;//文件版本號 public FileInfo() { super(); } public FileInfo(String fileName, String fileType, String fileSize, String fileMD5, String fileVersionNO) { super(); this.fileName = fileName; this.fileType = fileType; this.fileSize = fileSize; this.fileMD5 = fileMD5; this.fileVersionNO = fileVersionNO; } public String getFileName() { return fileName; } public void setFileName(String fileName) { this.fileName = fileName; } public String getFileType() { return fileType; } public void setFileType(String fileType) { this.fileType = fileType; } public String getFileSize() { return fileSize; } public void setFileSize(String fileSize) { this.fileSize = fileSize; } public String getFileMD5() { return fileMD5; } public void setFileMD5(String fileMD5) { this.fileMD5 = fileMD5; } public String getFileVersionNO() { return fileVersionNO; } public void setFileVersionNO(String fileVersionNO) { this.fileVersionNO = fileVersionNO; }
接著,模擬業務處理,創建30000個文件對象,線程睡眠1ms,之前設置的1000ms,發現時間很長,整個Eclipse卡掉了,所以將時間改為了1ms。
public class Test { private static List<FileInfo> fileList= new ArrayList<FileInfo>(); public static void main(String[] args) throws InterruptedException { createFileInfo(); long startTime=System.currentTimeMillis(); for(FileInfo fi:fileList){ Thread.sleep(1); } long endTime=System.currentTimeMillis(); System.out.println("單線程耗時:"+(endTime-startTime)+"ms"); } private static void createFileInfo(){ for(int i=0;i<30000;i++){ fileList.add(new FileInfo("身份證正面照","jpg","101522","md5"+i,"1")); } } }
測試結果如下:
可以看到,生成30000個文件對象消耗的時間比較長,接近1分鐘,效率比較低。
2) ThreadPool (線程池) +CountDownLatch (程序計數器)
顧名思義,CountDownLatch為線程計數器,他的執行過程如下:首先,在主線程中調用await()方法,主線程阻塞,然后,將程序計數器作為參數傳遞給線程對象,最后,每個線程執行完任務后,調用countDown()方法表示完成任務。countDown()被執行多次后,主線程的await()會失效。實現過程如下:
public class Test2 { private static ExecutorService executor=Executors.newFixedThreadPool(100); private static CountDownLatch countDownLatch=new CountDownLatch(100); private static List<FileInfo> fileList= new ArrayList<FileInfo>(); private static List<List<FileInfo>> list=new ArrayList<>(); public static void main(String[] args) throws InterruptedException { createFileInfo(); addList(); long startTime=System.currentTimeMillis(); int i=0; for(List<FileInfo> fi:list){ executor.submit(new FileRunnable(countDownLatch,fi,i)); i++; } countDownLatch.await(); long endTime=System.currentTimeMillis(); executor.shutdown(); System.out.println(i+"個線程耗時:"+(endTime-startTime)+"ms"); } private static void createFileInfo(){ for(int i=0;i<30000;i++){ fileList.add(new FileInfo("身份證正面照","jpg","101522","md5"+i,"1")); } } private static void addList(){ for(int i=0;i<100;i++){ list.add(fileList); } } }
FileRunnable類:
/** * 多線程處理 * @author wangsj * * @param <T> */ public class FileRunnable<T> implements Runnable { private CountDownLatch countDownLatch; private List<T> list; private int i; public FileRunnable(CountDownLatch countDownLatch, List<T> list, int i) { super(); this.countDownLatch = countDownLatch; this.list = list; this.i = i; } @Override public void run() { for(T t:list){ try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown(); } } }
測試結果如下:
3) Fork/Join 框架
Jdk從版本7開始,出現了Fork/join框架,從字面來理解,fork就是拆分,join就是合并,所以,該框架的思想就是。通過fork拆分任務,然后join來合并拆分后各個人物執行完畢后的結果并匯總。比如,我們要計算連續相加的幾個數,2+4+5+7=?,我們利用Fork/join框架來怎么完成呢,思想就是拆分子任務,我們可以把這個運算拆分為兩個子任務,一個計算2+4,另一個計算5+7,這是Fork的過程,計算完成后,把這兩個子任務計算的結果匯總,得到總和,這是join的過程。
Fork/Join框架執行思想:首先,分割任務,使用fork類將大任務分割為若干子任務,這個分割過程需要按照實際情況來定,直到分割出的任務足夠小。然后,join類執行任務,分割的子任務在不同的隊列里,幾個線程分別從隊列里獲取任務并執行,執行完的結果放到一個單獨的隊列里,最后,啟動線程,隊列里拿取結果并合并結果。
使用Fork/Join框架要用到幾個類,關于類的使用方式可以參考JDK的API,使用該框架,首先需要繼承ForkJoinTask類,通常,只需要繼承他的子類RecursiveTask或RecursiveAction即可,RecursiveTask,用于有返回結果的場景,RecursiveAction用于沒有返回結果的場景。ForkJoinTask的執行需要用到ForkJoinPool來執行,該類用于維護分割出的子任務添加到不同的任務隊列。
下面是實現代碼:
public class Test3 { private static List<FileInfo> fileList= new ArrayList<FileInfo>(); // private static ForkJoinPool forkJoinPool=new ForkJoinPool(100); // private static Job<FileInfo> job=new Job<>(fileList.size()/100, fileList); public static void main(String[] args) { createFileInfo(); long startTime=System.currentTimeMillis(); ForkJoinPool forkJoinPool=new ForkJoinPool(100); //分割任務 Job<FileInfo> job=new Job<>(fileList.size()/100, fileList); //提交任務返回結果 ForkJoinTask<Integer> fjtResult=forkJoinPool.submit(job); //阻塞 while(!job.isDone()){ System.out.println("任務完成!"); } long endTime=System.currentTimeMillis(); System.out.println("fork/join框架耗時:"+(endTime-startTime)+"ms"); } private static void createFileInfo(){ for(int i=0;i<30000;i++){ fileList.add(new FileInfo("身份證正面照","jpg","101522","md5"+i,"1")); } } } /** * 執行任務類 * @author wangsj * */ public class Job<T> extends RecursiveTask<Integer> { private static final long serialVersionUID = 1L; private int count; private List<T> jobList; public Job(int count, List<T> jobList) { super(); this.count = count; this.jobList = jobList; } /** * 執行任務,類似于實現Runnable接口的run方法 */ @Override protected Integer compute() { //拆分任務 if(jobList.size()<=count){ executeJob(); return jobList.size(); }else{ //繼續創建任務,直到能夠分解執行 List<RecursiveTask<Long>> fork = new LinkedList<RecursiveTask<Long>>(); //拆分子任務,這里采用二分法 int countJob=jobList.size()/2; List<T> leftList=jobList.subList(0, countJob); List<T> rightList=jobList.subList(countJob, jobList.size()); //分配任務 Job leftJob=new Job<>(count,leftList); Job rightJob=new Job<>(count,rightList); //執行任務 leftJob.fork(); rightJob.fork(); return Integer.parseInt(leftJob.join().toString()) +Integer.parseInt(rightJob.join().toString()); } } /** * 執行任務方法 */ private void executeJob() { for(T job:jobList){ try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }
測試結果如下:
4) JDK8 并行流
并行流是jdk8的新特性之一,思想就是將一個順序執行的流變為一個并發的流,通過調用parallel()方法來實現。并行流將一個流分成多個數據塊,用不同的線程來處理不同的數據塊的流,最后合并每個塊數據流的處理結果,類似于Fork/Join框架。
并行流默認使用的是公共線程池ForkJoinPool,他的線程數是使用的默認值,根據機器的核數,我們可以適當調整線程數的大小。線程數的調整通過以下方式來實現。
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "100");
以下是代碼的實現過程,非常簡單:
public class Test4 { private static List<FileInfo> fileList= new ArrayList<FileInfo>(); public static void main(String[] args) { // System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "100"); createFileInfo(); long startTime=System.currentTimeMillis(); fileList.parallelStream().forEach(e ->{ try { Thread.sleep(1); } catch (InterruptedException f) { f.printStackTrace(); } }); long endTime=System.currentTimeMillis(); System.out.println("jdk8并行流耗時:"+(endTime-startTime)+"ms"); } private static void createFileInfo(){ for(int i=0;i<30000;i++){ fileList.add(new FileInfo("身份證正面照","jpg","101522","md5"+i,"1")); } } }
下面是測試,第一次沒有設置線程池的數量,采用默認,測試結果如下:
我們看到,結果并不是很理想,耗時較長,接下來設置線程池的數量大小,即添加如下代碼:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "100");
接著進行測試,結果如下:
這次耗時較小,比較理想。
三、總結
綜上幾種情況來看,以單線程作為參考,耗時最長的還是原生的Fork/Join框架,這里邊盡管配置了線程池的數量,但效果較精確配置了線程池數量的JDK8并行流較差。并行流實現代碼簡單易懂,不需要我們寫多余的for循環,一個parallelStream方法全部搞定,代碼量大大的減少了,其實,并行流的底層還是使用的Fork/Join框架,這就要求我們在開發的過程中靈活使用各種技術,分清各種技術的優缺點,從而能夠更好的為我們服務。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。