您好,登錄后才能下訂單哦!
今天小編給大家分享一下怎么使用Java多線程實現第三方數據同步的相關知識點,內容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。
最近的一項開發任務是同步第三方數據,而第三方數據一般有存量數據和增量數據,存量數據有100w+。在得知此需求時,進行了一定的信息檢索和工具學習,提前獲取存量數據到目標庫,再使用kettle進行存量數據轉換;增量數據則根據業務方規定的請求時間,通過定時任務去獲取增量數據并進行數據轉換。在數據獲取和轉換時,我們應該要記錄每一次的請求信息,便于溯源和數據對賬!!!
2.1 遞歸方式
使用遞歸方式時,要求數據量少,否則會出現棧溢出或堆溢出!!!并且遞歸方式是單線程,所以會導致同步速度很慢!!!
/** * 數據同步 - 遞歸方式 * 此處存量數據只需要請求到數據并保存數據庫即可,后期通過kettle進行轉換。 * Data為自定義實體類,這里僅做示例!!! */ private void fetchAndSaveDB(int pageIndex, int pageSize) throws Exception { log.info("【數據同步 - 存量】,第{}次同步,", pageIndex); List<Data> datas= getDataByPage(pageIndex,pageSize); if (CollectionUtils.isNotEmpty(datas)) { dataService.saveOrUpdateBatch(datas); log.info("【數據同步 - 存量】,第{}次同步,同步成功", pageIndex); if (datas.size() < pageSize) { log.info("【數據同步 - 存量】,第{}次同步,獲取數據小于每頁獲取條數,證明已全部同步完畢!!!", pageIndex); return; } // 遞歸操作-直到數據同步完畢 fetchAndSaveDB(pageIndex + 1, pageSize); } else { log.info("【數據同步 - 存量】,第{}次同步,獲取數據為空,證明已全部同步完畢!!!", pageIndex); return; } } /** * 獲取分頁數據,Data為自定義實體類,這里僅做示例!!! */ private List<Data> getDataByPage(int pageIndex, int pageSize) throws Exception { //通過feign調用第三方接口獲取數據 String data = dataFeignService.fetchAllData(pageSize, pageIndex); JSONObject jsonObject = JSONObject.parseObject(data); JSONArray datalist = jsonObject.getJSONArray("datalist"); List<Data> datas = datalist.toJavaList(Data.class); return datas; }
2.2 多線程方式
由于遞歸方式是單線程,考慮到數據的龐大,且易造成內存溢出,因此將遞歸更換成多線程方式,不僅避免了內存溢出的情況,且速度大大的提升!!!
public void synAllData() { // 定義原子變量 - 頁數 AtomicInteger pageIndex = new AtomicInteger(0); // 創建線程池 ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10); // 100萬數據 int total = 1000000;//數據總量 int times = total / 1000; if (total % 1000!= 0) { times = times + 1; } LocalDateTime beginLocalDateTime = LocalDateTime.now(); log.info("【數據同步 - 存量】開始同步時間:{}", beginLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); for (int index = 1; index <= times; index++) { fixedThreadPool.submit(new Runnable() { @Override public void run() { try { multiFetchAndSaveDB(pageIndex.incrementAndGet(), 1000); } catch (Exception e) { log.error("并發獲取并保存數據異常:{}", e); } } }); } LocalDateTime endLocalDateTime = LocalDateTime.now(); log.info("【數據同步 - 存量】同步結束時間:{},總共耗時:{}分鐘", endLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), Duration.between(beginLocalDateTime, endLocalDateTime).toMinutes()); } /** * 數據同步 - 【多線程方式】 * * @throws Exception */ private void multiFetchAndSaveDB(int pageIndex, int pageSize) throws Exception { log.info("【數據同步 - 存量】,第{}次同步,", pageIndex); List<Data> datas= getDataByPage(pageIndex, pageSize);//getDataByPage()同上2.1 if (CollectionUtils.isNotEmpty(datas)) { log.info("【數據同步 - 存量】,第{}次同步,同步成功", pageIndex); if (datas.size() < pageSize) { log.info("【數據同步 - 存量】,第{}次同步,獲取數據小于每頁獲取條數,證明已全部同步完畢!!!", pageIndex); return; } } else { log.info("【數據同步 - 存量】,第{}次同步,獲取數據為空,證明已全部同步完畢!!!", pageIndex); return; } }
增量數據需要寫定時任務,可使用Scheduled注解,并需要將增量數據存放到目標庫中且進行數據轉換!
以上就是“怎么使用Java多線程實現第三方數據同步”這篇文章的所有內容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。