您好,登錄后才能下訂單哦!
這篇文章主要介紹“JAVA多線程怎么實現用戶任務排隊并預估排隊時長”,在日常操作中,相信很多人在JAVA多線程怎么實現用戶任務排隊并預估排隊時長問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”JAVA多線程怎么實現用戶任務排隊并預估排隊時長”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
初始化一定數量的任務處理線程和緩存線程池,用戶每次調用接口,開啟一個線程處理。
假設初始化5個處理器,代碼執行 BlockingQueue.take 時候,每次take都會處理器隊列就會減少一個,當處理器隊列為空時,take就是阻塞線程,當用戶處理某某任務完成時候,調用資源釋放接口,在處理器隊列put 一個處理器對象,原來阻塞的take ,就繼續執行。
排隊論是研究系統隨機聚散現象和隨機系統工作工程的數學理論和方法,又稱隨機服務系統理論,為運籌學的一個分支。我們下面對排隊論做下簡化處理,先看下圖:
任務隊列初始化 TaskQueue
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; /** * 初始化隊列及線程池 * @author tarzan * */ @Component public class TaskQueue { //處理器隊列 public static BlockingQueue<TaskProcessor> taskProcessors; //等待任務隊列 public static BlockingQueue<CompileTask> waitTasks; //處理任務隊列 public static BlockingQueue<CompileTask> executeTasks; //線程池 public static ExecutorService exec; //初始處理器數(計算機cpu可用線程數) public static Integer processorNum=Runtime.getRuntime().availableProcessors(); /** * 初始化處理器、等待任務、處理任務隊列及線程池 */ @PostConstruct public static void initEquipmentAndUsersQueue(){ exec = Executors.newCachedThreadPool(); taskProcessors =new LinkedBlockingQueue<TaskProcessor>(processorNum); //將空閑的設備放入設備隊列中 setFreeDevices(processorNum); waitTasks =new LinkedBlockingQueue<CompileTask>(); executeTasks=new LinkedBlockingQueue<CompileTask>(processorNum); } /** * 將空閑的處理器放入處理器隊列中 */ private static void setFreeDevices(int num) { //獲取可用的設備 for (int i = 0; i < num; i++) { TaskProcessor dc=new TaskProcessor(); try { taskProcessors.put(dc); } catch (InterruptedException e) { e.printStackTrace(); } } } public static CompileTask getWaitTask(Long clazzId) { return get(TaskQueue.waitTasks,clazzId); } public static CompileTask getExecuteTask(Long clazzId) { return get(TaskQueue.executeTasks,clazzId); } private static CompileTask get(BlockingQueue<CompileTask> users, Long clazzId) { CompileTask compileTask =null; if (CollectionUtils.isNotEmpty(users)){ Optional<CompileTask> optional=users.stream().filter(e->e.getClazzId().longValue()==clazzId.longValue()).findFirst(); if(optional.isPresent()){ compileTask = optional.get(); } } return compileTask; } public static Integer getSort(Long clazzId) { AtomicInteger index = new AtomicInteger(-1); BlockingQueue<CompileTask> compileTasks = TaskQueue.waitTasks; if (CollectionUtils.isNotEmpty(compileTasks)){ compileTasks.stream() .filter(e -> { index.getAndIncrement(); return e.getClazzId().longValue() == clazzId.longValue(); }) .findFirst(); } return index.get(); } //單位秒 public static int estimatedTime(Long clazzId){ return estimatedTime(60,getSort(clazzId)+1); } //單位秒 public static int estimatedTime(int cellMs,int num){ int a= (num-1)/processorNum; int b= cellMs*(a+1); return b; }
編譯任務類 CompileTask
import lombok.Data; import org.springblade.core.tool.utils.SpringUtil; import org.springblade.gis.common.enums.DataScheduleEnum; import org.springblade.gis.dynamicds.service.DynamicDataSourceService; import org.springblade.gis.modules.feature.schedule.service.DataScheduleService; import java.util.Date; @Data public class CompileTask implements Runnable { //當前請求的線程對象 private Long clazzId; //用戶id private Long userId; //當前請求的線程對象 private Thread thread; //綁定處理器 private TaskProcessor taskProcessor; //任務狀態 private Integer status; //開始時間 private Date startTime; //結束時間 private Date endTime; private DataScheduleService dataScheduleService= SpringUtil.getBean(DataScheduleService.class); private DynamicDataSourceService dataSourceService= SpringUtil.getBean(DynamicDataSourceService.class); @Override public void run() { compile(); } /** * 編譯 */ public void compile() { try { //取出一個設備 TaskProcessor taskProcessor = TaskQueue.taskProcessors.take(); //取出一個任務 CompileTask compileTask = TaskQueue.waitTasks.take(); //任務和設備綁定 compileTask.setTaskProcessor(taskProcessor); //放入 TaskQueue.executeTasks.put(compileTask); System.out.println(DataScheduleEnum.DEAL_WITH.getName()+" "+userId); //切換用戶數據源 dataSourceService.switchDataSource(userId); //添加進度 dataScheduleService.addSchedule(clazzId, DataScheduleEnum.DEAL_WITH.getState()); } catch (InterruptedException e) { System.err.println( e.getMessage()); } } }
任務處理器 TaskProcessor
import lombok.Data; import java.util.Date; @Data public class TaskProcessor { /** * 釋放 */ public static Boolean release(CompileTask task) { Boolean flag=false; Thread thread=task.getThread(); synchronized (thread) { try { if(null!=task.getTaskProcessor()){ TaskQueue.taskProcessors.put(task.getTaskProcessor()); TaskQueue.executeTasks.remove(task); task.setEndTime(new Date()); long intervalMilli = task.getEndTime().getTime() - task.getStartTime().getTime(); flag=true; System.out.println("用戶"+task.getClazzId()+"耗時"+intervalMilli+"ms"); } } catch (InterruptedException e) { e.printStackTrace(); } return flag; } } }
Controller控制器接口實現
import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springblade.core.tool.api.R; import org.springblade.gis.multithread.TaskProcessor; import org.springblade.gis.multithread.TaskQueue; import org.springblade.gis.multithread.CompileTask; import org.springframework.web.bind.annotation.*; import java.util.Date; @RestController @RequestMapping("task") @Api(value = "數據編譯任務", tags = "數據編譯任務") public class CompileTaskController { @ApiOperation(value = "添加等待請求 @author Tarzan Liu") @PostMapping("compile/{clazzId}") public R<Integer> compile(@PathVariable("clazzId") Long clazzId) { CompileTask checkUser=TaskQueue.getWaitTask(clazzId); if(checkUser!=null){ return R.fail("已經正在排隊!"); } checkUser=TaskQueue.getExecuteTask(clazzId); if(checkUser!=null){ return R.fail("正在執行編譯!"); } //獲取當前的線程 Thread thread=Thread.currentThread(); //創建當前的用戶請求對象 CompileTask compileTask =new CompileTask(); compileTask.setThread(thread); compileTask.setClazzId(clazzId); compileTask.setStartTime(new Date()); //將當前用戶請求對象放入隊列中 try { TaskQueue.waitTasks.put(compileTask); } catch (InterruptedException e) { e.printStackTrace(); } TaskQueue.exec.execute(compileTask); return R.data(TaskQueue.waitTasks.size()-1); } @ApiOperation(value = "查詢當前任務前還有多少任務等待 @author Tarzan Liu") @PostMapping("sort/{clazzId}") public R<Integer> sort(@PathVariable("clazzId") Long clazzId) { return R.data(TaskQueue.getSort(clazzId)); } @ApiOperation(value = "查詢當前任務預估時長 @author Tarzan Liu") @PostMapping("estimate/time/{clazzId}") public R<Integer> estimatedTime(@PathVariable("clazzId") Long clazzId) { return R.data(TaskQueue.estimatedTime(clazzId)); } @ApiOperation(value = "任務釋放 @author Tarzan Liu") @PostMapping("release/{clazzId}") public R<Boolean> release(@PathVariable("clazzId") Long clazzId) { CompileTask task=TaskQueue.getExecuteTask(clazzId); if(task==null){ return R.fail("資源釋放異常"); } return R.status(TaskProcessor.release(task)); } @ApiOperation(value = "執行 @author Tarzan Liu") @PostMapping("exec") public R exec() { Long start=System.currentTimeMillis(); for (Long i = 1L; i < 100; i++) { compile(i); } System.out.println("消耗時間:"+(System.currentTimeMillis()-start)+"ms"); return R.status(true); } }
根據任務id查詢該任務前還有多少個任務待執行
根據任務id查詢該任務預估執行完成的剩余時間,單位秒
BlockingQueue即阻塞隊列,它是基于ReentrantLock,依據它的基本原理,我們可以實現Web中的長連接聊天功能,當然其最常用的還是用于實現生產者與消費者模式,大致如下圖所示:
在Java中,BlockingQueue是一個接口,它的實現類有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它們的區別主要體現在存儲結構上或對元素操作上的不同,但是對于take與put操作的原理,卻是類似的。
入隊
offer(E e):如果隊列沒滿,立即返回true; 如果隊列滿了,立即返回false-->不阻塞
put(E e):如果隊列滿了,一直阻塞,直到隊列不滿了或者線程被中斷-->阻塞
offer(E e, long timeout, TimeUnit unit):在隊尾插入一個元素,,如果隊列已滿,則進入等待,直到出現以下三種情況:-->阻塞
被喚醒
等待時間超時
當前線程被中斷
出隊
poll():如果沒有元素,直接返回null;如果有元素,出隊
take():如果隊列空了,一直阻塞,直到隊列不為空或者線程被中斷-->阻塞
poll(long timeout, TimeUnit unit):如果隊列不空,出隊;如果隊列已空且已經超時,返回null;如果隊列已空且時間未超時,則進入等待,直到出現以下三種情況:
被喚醒
等待時間超時
當前線程被中斷
到此,關于“JAVA多線程怎么實現用戶任務排隊并預估排隊時長”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。