您好,登錄后才能下訂單哦!
這篇文章主要講解了“Spring Batch的概念和作用是什么”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Spring Batch的概念和作用是什么”吧!
一、Spring Batch的概念知識
1.1、分層架構
1.2、關鍵概念
1.2.1、JobRepository
1.2.2、任務啟動器JobLauncher
1.2.3、任務Job
1.2.4、步驟Step
1.2.5、輸入——處理——輸出
二、代碼實例
2.1、基本框架
2.2、輸入——處理——輸出
2.2.1、讀取ItemReader
2.2.2、處理ItemProcessor
2.2.3、輸出ItremWriter
2.3、Step
2.4、Job
2.5、運行
三、監聽Listener
Spring Batch
的分層架構圖如下:
可以看到它分為三層,分別是:
Application
應用層:包含了所有任務batch jobs
和開發人員自定義的代碼,主要是根據項目需要開發的業務流程等。
Batch Core
核心層:包含啟動和管理任務的運行環境類,如JobLauncher
等。
Batch Infrastructure
基礎層:上面兩層是建立在基礎層之上的,包含基礎的讀入reader
和寫出writer
、重試框架等。
理解下圖所涉及的概念至關重要,不然很難進行后續開發和問題分析。
專門負責與數據庫打交道,對整個批處理的新增、更新、執行進行記錄。所以Spring Batch
是需要依賴數據庫來管理的。
負責啟動任務Job
。
Job
是封裝整個批處理過程的單位,跑一個批處理任務,就是跑一個Job
所定義的內容。
上圖介紹了Job
的一些相關概念:
Job
:封裝處理實體,定義過程邏輯。
JobInstance
:Job
的運行實例,不同的實例,參數不同,所以定義好一個Job
后可以通過不同參數運行多次。
JobParameters
:與JobInstance
相關聯的參數。
JobExecution
:代表Job
的一次實際執行,可能成功、可能失敗。
所以,開發人員要做的事情,就是定義Job
。
Step
是對Job
某個過程的封裝,一個Job
可以包含一個或多個Step
,一步步的Step
按特定邏輯執行,才代表Job
執行完成。
通過定義Step
來組裝Job
可以更靈活地實現復雜的業務邏輯。
所以,定義一個Job
關鍵是定義好一個或多個Step
,然后把它們組裝好即可。而定義Step
有多種方法,但有一種常用的模型就是輸入——處理——輸出
,即Item Reader
、Item Processor
和Item Writer
。比如通過Item Reader
從文件輸入數據,然后通過Item Processor
進行業務處理和數據轉換,最后通過Item Writer
寫到數據庫中去。
Spring Batch
為我們提供了許多開箱即用的Reader
和Writer
,非常方便。
理解了基本概念后,就直接通過代碼來感受一下吧。整個項目的功能是從多個csv
文件中讀數據,處理后輸出到一個csv
文件。
添加依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>com.h3database</groupId> <artifactId>h3</artifactId> <scope>runtime</scope> </dependency>
需要添加Spring Batch
的依賴,同時使用H2
作為內存數據庫比較方便,實際生產肯定是要使用外部的數據庫,如Oracle
、PostgreSQL
。
入口主類:
@SpringBootApplication @EnableBatchProcessing public class PkslowBatchJobMain { public static void main(String[] args) { SpringApplication.run(PkslowBatchJobMain.class, args); } }
也很簡單,只是在Springboot
的基礎上添加注解@EnableBatchProcessing
。
領域實體類Employee
:
package com.pkslow.batch.entity; public class Employee { String id; String firstName; String lastName; }
對應的csv
文件內容如下:
id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
4,David,Miller
因為有多個輸入文件,所以定義如下:
@Value("input/inputData*.csv") private Resource[] inputResources; @Bean public MultiResourceItemReader<Employee> multiResourceItemReader() { MultiResourceItemReader<Employee> resourceItemReader = new MultiResourceItemReader<Employee>(); resourceItemReader.setResources(inputResources); resourceItemReader.setDelegate(reader()); return resourceItemReader; } @Bean public FlatFileItemReader<Employee> reader() { FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>(); //跳過csv文件第一行,為表頭 reader.setLinesToSkip(1); reader.setLineMapper(new DefaultLineMapper() { { setLineTokenizer(new DelimitedLineTokenizer() { { //字段名 setNames(new String[] { "id", "firstName", "lastName" }); } }); setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() { { //轉換化后的目標類 setTargetType(Employee.class); } }); } }); return reader; }
這里使用了FlatFileItemReader
,方便我們從文件讀取數據。
為了簡單演示,處理很簡單,就是把最后一列轉為大寫:
public ItemProcessor<Employee, Employee> itemProcessor() { return employee -> { employee.setLastName(employee.getLastName().toUpperCase()); return employee; }; }
比較簡單,代碼及注釋如下:
private Resource outputResource = new FileSystemResource("output/outputData.csv"); @Bean public FlatFileItemWriter<Employee> writer() { FlatFileItemWriter<Employee> writer = new FlatFileItemWriter<>(); writer.setResource(outputResource); //是否為追加模式 writer.setAppendAllowed(true); writer.setLineAggregator(new DelimitedLineAggregator<Employee>() { { //設置分割符 setDelimiter(","); setFieldExtractor(new BeanWrapperFieldExtractor<Employee>() { { //設置字段 setNames(new String[] { "id", "firstName", "lastName" }); } }); } }); return writer; }
有了Reader-Processor-Writer
后,就可以定義Step
了:
@Bean public Step csvStep() { return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5) .reader(multiResourceItemReader()) .processor(itemProcessor()) .writer(writer()) .build(); }
這里有一個chunk
的設置,值為5
,意思是5條記錄后再提交輸出,可以根據自己需求定義。
完成了Step
的編碼,定義Job
就容易了:
@Bean public Job pkslowCsvJob() { return jobBuilderFactory .get("pkslowCsvJob") .incrementer(new RunIdIncrementer()) .start(csvStep()) .build(); }
完成以上編碼后,執行程序,結果如下:
成功讀取數據,并將最后字段轉為大寫,并輸出到outputData.csv
文件。
可以通過Listener
接口對特定事件進行監聽,以實現更多業務功能。比如如果處理失敗,就記錄一條失敗日志;處理完成,就通知下游拿數據等。
我們分別對Read
、Process
和Write
事件進行監聽,對應分別要實現ItemReadListener
接口、ItemProcessListener
接口和ItemWriteListener
接口。因為代碼比較簡單,就是打印一下日志,這里只貼出ItemWriteListener
的實現代碼:
public class PkslowWriteListener implements ItemWriteListener<Employee> { private static final Log logger = LogFactory.getLog(PkslowWriteListener.class); @Override public void beforeWrite(List<? extends Employee> list) { logger.info("beforeWrite: " + list); } @Override public void afterWrite(List<? extends Employee> list) { logger.info("afterWrite: " + list); } @Override public void onWriteError(Exception e, List<? extends Employee> list) { logger.info("onWriteError: " + list); } }
把實現的監聽器listener
整合到Step
中去:
@Bean public Step csvStep() { return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5) .reader(multiResourceItemReader()) .listener(new PkslowReadListener()) .processor(itemProcessor()) .listener(new PkslowProcessListener()) .writer(writer()) .listener(new PkslowWriteListener()) .build(); }
執行后看一下日志:
這里就能明顯看到之前設置的chunk
的作用了。Writer
每次是處理5條記錄,如果一條輸出一次,會對IO
造成壓力。
感謝各位的閱讀,以上就是“Spring Batch的概念和作用是什么”的內容了,經過本文的學習后,相信大家對Spring Batch的概念和作用是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。