您好,登錄后才能下訂單哦!
這篇文章主要介紹Spring Batch輕量級批處理框架的示例分析,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
Spring Batch 是一個輕量級、全面的批處理框架,旨在支持開發對企業系統日常運營至關重要的強大的批處理應用程序。同時使開發人員在必要時可以輕松訪問和利用更先進的企業服務。Spring Batch 不是調度框架,它旨在與調度程序一起工作,而不是取代調度程序。
自動化、復雜的大量信息處理,無需用戶交互即可最有效地處理。這些操作通常包括基于時間的事件(例如月末計算、通知或通信)。
定期應用在非常大的數據集上重復處理的復雜業務規則(例如,保險福利確定或費率調整)。
將從內部和外部系統接收的信息集成到記錄系統中,這些信息通常需要以事務方式進行格式化、驗證和處理。批處理用于每天為企業處理數十億筆交易。
業務場景:
定期提交批處理
并發批處理:作業的并行處理
分階段的、企業消息驅動的處理
大規模并行批處理
失敗后手動或計劃重啟
依賴步驟的順序處理(擴展到工作流驅動的批處理)
部分處理:跳過記錄(例如,在回滾時)
整批事務,適用于小批量或現有存儲過程/腳本的情況
總之Spring batch可以做的:
從數據庫、文件或隊列中讀取大量記錄。
以某種方式處理數據。
以修改后的形式寫回數據。
核心概念:一個 Job 有一對多的Step,每個步驟都正好有一個 ItemReader
、一個ItemProcessor
和 一個ItemWriter
。需要啟動作業(使用 JobLauncher
),并且需要存儲有關當前運行進程的元數據(在 中 JobRepository
)。
Job
是封裝了整個批處理過程的實體。與其他 Spring 項目一樣,一個Job
與 XML 配置文件或基于 Java 的配置連接在一起。這種配置可以被稱為“作業配置”。
可配置項:
作業的簡單名稱。
Step
實例的定義和排序。
作業是否可重新啟動。
一個Step
是一個域對象,它封裝了批處理作業的一個獨立的、連續的階段。因此,每個 Job 完全由一個或多個步驟組成。一個Step
包含定義和控制實際批處理所需的所有信息。
一個StepExecution
代表一次嘗試執行一個Step
。StepExecution
每次Step
運行時都會創建一個新的,類似于JobExecution
。
一個ExecutionContext
表示由框架持久化和控制的鍵/值對的集合,以允許開發人員有一個地方來存儲范圍為StepExecution
對象或JobExecution
對象的持久狀態。
JobRepository
是上述所有 Stereotypes 的持久性機制。它提供了CRUD操作JobLauncher
,Job
以及Step
實現。當 Job
第一次啟動,一個JobExecution
被從庫中獲得,并且,執行的過程中,StepExecution
和JobExecution
實施方式是通過將它們傳遞到存儲庫持續。
使用 Java 配置時,@EnableBatchProcessing
注解提供了一個 JobRepository
作為開箱即用自動配置的組件之一。
JobLauncher
表示一個簡單的接口,用于Job
使用給定的 集合 啟動JobParameters
,如以下示例所示:
public interface JobLauncher { public JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException; }
期望實現JobExecution
從 中 獲得有效JobRepository
并執行Job
。
ItemReader
是一種抽象,表示一次檢索Step
一個項目的輸入。當ItemReader
用完它可以提供的項目時,它通過返回來表明這一點null
。
ItemWriter
是一種抽象,表示一次一個Step
、一批或一大塊項目的輸出。通常, anItemWriter
不知道它接下來應該接收的輸入,并且只知道在其當前調用中傳遞的項目。
ItemProcessor
是表示項目的業務處理的抽象。當ItemReader
讀取一個項目并ItemWriter
寫入它們時,它 ItemProcessor
提供了一個訪問點來轉換或應用其他業務處理。如果在處理該項目時確定該項目無效,則返回 null
表示不應寫出該項目。
下面就利用我們所學的理論實現一個最簡單的Spring Batch批處理項目
依賴
<!--Spring batch--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <!-- web依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- lombok--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.20</version> </dependency> <!-- mysql--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency> <!-- mybatis--> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.2.0</version> </dependency>
項目結構
配置文件
server.port=9000 spring.datasource.url=jdbc:mysql://localhost:3306/test spring.datasource.username=root spring.datasource.password=12345 spring.datasource.driver-class-name=com.mysql.jdbc.Driver
數據表
CREATE TABLE `student` ( `id` int(100) NOT NULL AUTO_INCREMENT, `name` varchar(45) DEFAULT NULL, `age` int(2) DEFAULT NULL, `address` varchar(45) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `id_UNIQUE` (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=203579 DEFAULT CHARSET=utf8 ROW_FORMAT=REDUNDANT
Student實體類
/** * @desc: Student實體類 * @author: YanMingXin * @create: 2021/10/15-12:17 **/ @Data @Accessors(chain = true) @NoArgsConstructor @AllArgsConstructor @ToString @TableName("student") public class Student { @TableId(value = "id", type = IdType.AUTO) private Long sId; @TableField("name") private String sName; @TableField("age") private Integer sAge; @TableField("address") private String sAddress; }
Mapper層
/** * @desc: Mapper層 * @author: YanMingXin * @create: 2021/10/15-12:17 **/ @Mapper @Repository public interface StudentDao extends BaseMapper<Student> { }
模擬數據庫(文件)中讀取類
/** * @desc: 模擬數據庫中讀取 * @author: YanMingXin * @create: 2021/10/16-10:13 **/ public class StudentVirtualDao { /** * 模擬從數據庫中讀取 * * @return */ public List<Student> getStudents() { ArrayList<Student> students = new ArrayList<>(); students.add(new Student(1L, "zs", 23, "Beijing")); students.add(new Student(2L, "ls", 23, "Beijing")); students.add(new Student(3L, "ww", 23, "Beijing")); students.add(new Student(4L, "zl", 23, "Beijing")); students.add(new Student(5L, "mq", 23, "Beijing")); students.add(new Student(6L, "gb", 23, "Beijing")); students.add(new Student(7L, "lj", 23, "Beijing")); students.add(new Student(8L, "ss", 23, "Beijing")); students.add(new Student(9L, "zsdd", 23, "Beijing")); students.add(new Student(10L, "zss", 23, "Beijing")); return students; } }
Service層接口
/** * @desc: * @author: YanMingXin * @create: 2021/10/15-12:16 **/ public interface StudentService { List<Student> selectStudentsFromDB(); void insertStudent(Student student); }
Service層實現類
/** * @desc: Service層實現類 * @author: YanMingXin * @create: 2021/10/15-12:16 **/ @Service public class StudentServiceImpl implements StudentService { @Autowired private StudentDao studentDao; @Override public List<Student> selectStudentsFromDB() { return studentDao.selectList(null); } @Override public void insertStudent(Student student) { studentDao.insert(student); } }
最核心的配置類BatchConfiguration
/** * @desc: BatchConfiguration * @author: YanMingXin * @create: 2021/10/15-12:25 **/ @Configuration @EnableBatchProcessing @SuppressWarnings("all") public class BatchConfiguration { /** * 注入JobBuilderFactory */ @Autowired public JobBuilderFactory jobBuilderFactory; /** * 注入StepBuilderFactory */ @Autowired public StepBuilderFactory stepBuilderFactory; /** * 注入JobRepository */ @Autowired public JobRepository jobRepository; /** * 注入JobLauncher */ @Autowired private JobLauncher jobLauncher; /** * 注入自定義StudentService */ @Autowired private StudentService studentService; /** * 注入自定義job */ @Autowired private Job studentJob; /** * 封裝writer bean * * @return */ @Bean public ItemWriter<Student> writer() { ItemWriter<Student> writer = new ItemWriter() { @Override public void write(List list) throws Exception { //debug發現是嵌套的List reader的線程List嵌套真正的List list.forEach((stu) -> { for (Student student : (ArrayList<Student>) stu) { studentService.insertStudent(student); } }); } }; return writer; } /** * 封裝reader bean * * @return */ @Bean public ItemReader<Student> reader() { ItemReader<Student> reader = new ItemReader() { @Override public Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { //模擬數據獲取 StudentVirtualDao virtualDao = new StudentVirtualDao(); return virtualDao.getStudents(); } }; return reader; } /** * 封裝processor bean * * @return */ @Bean public ItemProcessor processor() { ItemProcessor processor = new ItemProcessor() { @Override public Object process(Object o) throws Exception { //debug發現o就是reader單次單線程讀取的數據 return o; } }; return processor; } /** * 封裝自定義step * * @return */ @Bean public Step studentStepOne() { return stepBuilderFactory.get("studentStepOne") .chunk(1) .reader(reader()) //加入reader .processor(processor()) //加入processor .writer(writer())//加入writer .build(); } /** * 封裝自定義job * * @return */ @Bean public Job studentJob() { return jobBuilderFactory.get("studentJob") .flow(studentStepOne())//加入step .end() .build(); } /** * 使用spring 定時任務執行 */ @Scheduled(fixedRate = 5000) public void printMessage() { try { JobParameters jobParameters = new JobParametersBuilder() .addLong("time", System.currentTimeMillis()) .toJobParameters(); jobLauncher.run(studentJob, jobParameters); } catch (Exception e) { e.printStackTrace(); } } }
項目啟動1s之后
看數據庫,除了我們實體類定義的表以外多出來這么多表,這些表都是spring batch自帶的記錄日志和錯誤的表,具體的字段含義的有待研究
Spring Batch有非常快的寫入和讀取速度,但是帶來的影響就是非常耗費內存和數據庫連接池的資源如果使用不好的話還會發生異常,因此我們要進行正確的配置,接下來我們進行簡單的源碼探究:
job的獲取使用了簡單工廠模式和建造者模式JobBuilderFactory獲取JobBuilder在經過配置返回一個job對象的實例,該實例就是Spring Batch中最頂級的組件,包含了n和step
public class JobBuilderFactory { private JobRepository jobRepository; public JobBuilderFactory(JobRepository jobRepository) { this.jobRepository = jobRepository; } //返回JobBuilder public JobBuilder get(String name) { JobBuilder builder = new JobBuilder(name).repository(jobRepository); return builder; } }
jobBuilder類
public class JobBuilder extends JobBuilderHelper<JobBuilder> { /** * 為指定名稱的作業創建一個新的構建器 */ public JobBuilder(String name) { super(name); } /** * 創建將執行步驟或步驟序列的新作業構建器。 */ public SimpleJobBuilder start(Step step) { return new SimpleJobBuilder(this).start(step); } /** * 創建將執行流的新作業構建器。 */ public JobFlowBuilder start(Flow flow) { return new FlowJobBuilder(this).start(flow); } /** * 創建將執行步驟或步驟序列的新作業構建器 */ public JobFlowBuilder flow(Step step) { return new FlowJobBuilder(this).start(step); } }
直接看StepBuilder類
public class StepBuilder extends StepBuilderHelper<StepBuilder> { public StepBuilder(String name) { super(name); } /** * 用自定義微線程構建步驟,不一定是項處理。 */ public TaskletStepBuilder tasklet(Tasklet tasklet) { return new TaskletStepBuilder(this).tasklet(tasklet); } /** * 構建一個步驟,按照提供的大小以塊的形式處理項。為了將這一步擴展到容錯, * 在構建器上調用SimpleStepBuilder的 faultolerant()方法。 * @param <I> 輸入類型 * @param <O> 輸出類型 */ public <I, O> SimpleStepBuilder<I, O> chunk(int chunkSize) { return new SimpleStepBuilder<I, O>(this).chunk(chunkSize); } public <I, O> SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) { return new SimpleStepBuilder<I, O>(this).chunk(completionPolicy); } public PartitionStepBuilder partitioner(String stepName, Partitioner partitioner) { return new PartitionStepBuilder(this).partitioner(stepName, partitioner); } public PartitionStepBuilder partitioner(Step step) { return new PartitionStepBuilder(this).step(step); } public JobStepBuilder job(Job job) { return new JobStepBuilder(this).job(job); } /** * 創建將執行流的新步驟構建器。 */ public FlowStepBuilder flow(Flow flow) { return new FlowStepBuilder(this).flow(flow); } }
以上是“Spring Batch輕量級批處理框架的示例分析”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。