一、SpringBatch 介紹
Spring Batch 是一個(gè)輕量級、全面的批處理框架,旨在支持開發(fā)對企業(yè)系統(tǒng)的日常操作至關(guān)重要的健壯的批處理應(yīng)用程序。Spring Batch 建立在人們期望的 Spring Framework 特性(生產(chǎn)力、基于 POJO 的開發(fā)方法和一般易用性)的基礎(chǔ)上,同時(shí)使開發(fā)人員可以在必要時(shí)輕松訪問和使用更高級的企業(yè)服務(wù)。
Spring Batch 不是一個(gè)調(diào)度框架。在商業(yè)和開源領(lǐng)域都有許多優(yōu)秀的企業(yè)調(diào)度程序(例如 Quartz、Tivoli、Control-M 等)。Spring Batch 旨在與調(diào)度程序結(jié)合使用,而不是替代調(diào)度程序。
二、業(yè)務(wù)場景
我們在業(yè)務(wù)開發(fā)中經(jīng)常遇到這種情況:
Spring Batch 支持以下業(yè)務(wù)場景:
定期提交批處理。
并發(fā)批處理:并行處理作業(yè)。
分階段的企業(yè)消息驅(qū)動處理。
大規(guī)模并行批處理。
失敗后手動或計(jì)劃重啟。
相關(guān)步驟的順序處理(擴(kuò)展到工作流驅(qū)動的批次)。
部分處理:跳過記錄(例如,在回滾時(shí))。
整批交易,適用于批量較小或已有存儲過程或腳本的情況。
三、基礎(chǔ)知識
3.1、整體架構(gòu)
名稱 | 作用 |
---|---|
JobRepository | 為所有的原型(Job、JobInstance、Step)提供持久化的機(jī)制 |
JobLauncher | JobLauncher表示一個(gè)簡單的接口,用于啟動一個(gè)Job給定的集合 JobParameters |
Job | Job是封裝了整個(gè)批處理過程的實(shí)體 |
Step | Step是一個(gè)域?qū)ο?,它封裝了批處理作業(yè)的一個(gè)獨(dú)立的順序階段 |
3.2、核心接口
ItemReader: is an abstraction that represents the output of a Step, one batch or chunk of items at a time
ItemProcessor:an abstraction that represents the business processing of an item.
ItemWriter: is an abstraction that represents the output of a Step, one batch or chunk of items at a time.
大體即為 輸入→數(shù)據(jù)加工→輸出 ,一個(gè)Job定義多個(gè)Step及處理流程,一個(gè)Step通常涵蓋ItemReader、ItemProcessor、ItemWriter
四、基礎(chǔ)實(shí)操
4.0、引入 SpringBatch
pom 文件引入 springboot
?? org.springframework.boot ??spring-boot-starter-parent ??2.2.5.RELEASE ???
pom 文件引入 spring-batch 及相關(guān)依賴
???? ?????? ????org.springframework.boot ??????spring-boot-starter-batch ?????????? ????org.springframework.boot ??????spring-boot-starter-validation ?????????? ????mysql ??????mysql-connector-java ?????????? ??org.springframework.boot ??????spring-boot-starter-jdbc ????
mysql 創(chuàng)建依賴的庫表
sql 腳本的 jar 包路徑:.....maven epositoryorgspringframeworkatchspring-batch-core4.2.1.RELEASEspring-batch-core-4.2.1.RELEASE.jar!orgspringframeworkatchcoreschema-mysql.sql
啟動類標(biāo)志@EnableBatchProcessing
@SpringBootApplication @EnableBatchProcessing public?class?SpringBatchStartApplication { ????public?static?void?main(String[]?args)?{ ????????SpringApplication.run(SpringBatchStartApplication.class,?args); ????} }
FirstJobDemo
@Component public?class?FirstJobDemo?{ ????@Autowired ????private?JobBuilderFactory?jobBuilderFactory; ????@Autowired ????private?StepBuilderFactory?stepBuilderFactory; ????@Bean ????public?Job?firstJob()?{ ????????return?jobBuilderFactory.get("firstJob") ????????????????.start(step()) ????????????????.build(); ????} ????private?Step?step()?{ ????????return?stepBuilderFactory.get("step") ????????????????.tasklet((contribution,?chunkContext)?->?{ ????????????????????System.out.println("執(zhí)行步驟...."); ????????????????????return?RepeatStatus.FINISHED; ????????????????}).build(); ????} }
4.1、流程控制
A、多步驟任務(wù)
@Bean public?Job?multiStepJob()?{ ????return?jobBuilderFactory.get("multiStepJob2") ????????????.start(step1()) ????????????.on(ExitStatus.COMPLETED.getExitCode()).to(step2()) ????????????.from(step2()) ????????????.on(ExitStatus.COMPLETED.getExitCode()).to(step3()) ????????????.from(step3()).end() ????????????.build(); } private?Step?step1()?{ ????return?stepBuilderFactory.get("step1") ????????????.tasklet((stepContribution,?chunkContext)?->?{ ????????????????System.out.println("執(zhí)行步驟一操作。。。"); ????????????????return?RepeatStatus.FINISHED; ????????????}).build(); } private?Step?step2()?{ ????return?stepBuilderFactory.get("step2") ????????????.tasklet((stepContribution,?chunkContext)?->?{ ????????????????System.out.println("執(zhí)行步驟二操作。。。"); ????????????????return?RepeatStatus.FINISHED; ????????????}).build(); } private?Step?step3()?{ ????return?stepBuilderFactory.get("step3") ????????????.tasklet((stepContribution,?chunkContext)?->?{ ????????????????System.out.println("執(zhí)行步驟三操作。。。"); ????????????????return?RepeatStatus.FINISHED; ????????????}).build(); }
B、并行執(zhí)行
創(chuàng)建了兩個(gè) Flow:flow1(包含 step1 和 step2)和 flow2(包含 step3)。然后通過JobBuilderFactory的split方法,指定一個(gè)異步執(zhí)行器,將 flow1 和 flow2 異步執(zhí)行(也就是并行)
@Component public?class?SplitJobDemo?{ ????@Autowired ????private?JobBuilderFactory?jobBuilderFactory; ????@Autowired ????private?StepBuilderFactory?stepBuilderFactory; ????@Bean ????public?Job?splitJob()?{ ????????return?jobBuilderFactory.get("splitJob") ????????????????.start(flow1()) ????????????????.split(new?SimpleAsyncTaskExecutor()).add(flow2()) ????????????????.end() ????????????????.build(); ????} ????private?Step?step1()?{ ????????return?stepBuilderFactory.get("step1") ????????????????.tasklet((stepContribution,?chunkContext)?->?{ ????????????????????System.out.println("執(zhí)行步驟一操作。。。"); ????????????????????return?RepeatStatus.FINISHED; ????????????????}).build(); ????} ????private?Step?step2()?{ ????????return?stepBuilderFactory.get("step2") ????????????????.tasklet((stepContribution,?chunkContext)?->?{ ????????????????????System.out.println("執(zhí)行步驟二操作。。。"); ????????????????????return?RepeatStatus.FINISHED; ????????????????}).build(); ????} ????private?Step?step3()?{ ????????return?stepBuilderFactory.get("step3") ????????????????.tasklet((stepContribution,?chunkContext)?->?{ ????????????????????System.out.println("執(zhí)行步驟三操作。。。"); ????????????????????return?RepeatStatus.FINISHED; ????????????????}).build(); ????} ????private?Flow?flow1()?{ ????????return?new?FlowBuilder("flow1") ????????????????.start(step1()) ????????????????.next(step2()) ????????????????.build(); ????} ????private?Flow?flow2()?{ ????????return?new?FlowBuilder ("flow2") ????????????????.start(step3()) ????????????????.build(); ????} }
C、任務(wù)決策
決策器的作用就是可以指定程序在不同的情況下運(yùn)行不同的任務(wù)流程,比如今天是周末,則讓任務(wù)執(zhí)行 step1 和 step2,如果是工作日,則之心 step1 和 step3。
@Component public?class?MyDecider?implements?JobExecutionDecider?{ ????@Override ????public?FlowExecutionStatus?decide(JobExecution?jobExecution,?StepExecution?stepExecution)?{ ????????LocalDate?now?=?LocalDate.now(); ????????DayOfWeek?dayOfWeek?=?now.getDayOfWeek(); ????????if?(dayOfWeek?==?DayOfWeek.SATURDAY?||?dayOfWeek?==?DayOfWeek.SUNDAY)?{ ????????????return?new?FlowExecutionStatus("weekend"); ????????}?else?{ ????????????return?new?FlowExecutionStatus("workingDay"); ????????} ????} } @Bean public?Job?deciderJob()?{ ?return?jobBuilderFactory.get("deciderJob") ???.start(step1()) ???.next(myDecider) ???.from(myDecider).on("weekend").to(step2()) ???.from(myDecider).on("workingDay").to(step3()) ???.from(step3()).on("*").to(step4()) ???.end() ???.build(); } private?Step?step1()?{ ?return?stepBuilderFactory.get("step1") ???.tasklet((stepContribution,?chunkContext)?->?{ ????System.out.println("執(zhí)行步驟一操作。。。"); ????return?RepeatStatus.FINISHED; ???}).build(); } private?Step?step2()?{ ?return?stepBuilderFactory.get("step2") ???.tasklet((stepContribution,?chunkContext)?->?{ ????System.out.println("執(zhí)行步驟二操作。。。"); ????return?RepeatStatus.FINISHED; ???}).build(); } private?Step?step3()?{ ?return?stepBuilderFactory.get("step3") ???.tasklet((stepContribution,?chunkContext)?->?{ ????System.out.println("執(zhí)行步驟三操作。。。"); ????return?RepeatStatus.FINISHED; ???}).build(); } private?Step?step4()?{ ?return?stepBuilderFactory.get("step4") ???.tasklet((stepContribution,?chunkContext)?->?{ ????System.out.println("執(zhí)行步驟四操作。。。"); ????return?RepeatStatus.FINISHED; ???}).build(); } D、任務(wù)嵌套
任務(wù) Job 除了可以由 Step 或者 Flow 構(gòu)成外,我們還可以將多個(gè)任務(wù) Job 轉(zhuǎn)換為特殊的 Step,然后再賦給另一個(gè)任務(wù) Job,這就是任務(wù)的嵌套。
@Component public?class?NestedJobDemo?{ ????@Autowired ????private?JobBuilderFactory?jobBuilderFactory; ????@Autowired ????private?StepBuilderFactory?stepBuilderFactory; ????@Autowired ????private?JobLauncher?jobLauncher; ????@Autowired ????private?JobRepository?jobRepository; ????@Autowired ????private?PlatformTransactionManager?platformTransactionManager; ????//?父任務(wù) ????@Bean ????public?Job?parentJob()?{ ????????return?jobBuilderFactory.get("parentJob") ????????????????.start(childJobOneStep()) ????????????????.next(childJobTwoStep()) ????????????????.build(); ????} ????//?將任務(wù)轉(zhuǎn)換為特殊的步驟 ????private?Step?childJobOneStep()?{ ????????return?new?JobStepBuilder(new?StepBuilder("childJobOneStep")) ????????????????.job(childJobOne()) ????????????????.launcher(jobLauncher) ????????????????.repository(jobRepository) ????????????????.transactionManager(platformTransactionManager) ????????????????.build(); ????} ????//?將任務(wù)轉(zhuǎn)換為特殊的步驟 ????private?Step?childJobTwoStep()?{ ????????return?new?JobStepBuilder(new?StepBuilder("childJobTwoStep")) ????????????????.job(childJobTwo()) ????????????????.launcher(jobLauncher) ????????????????.repository(jobRepository) ????????????????.transactionManager(platformTransactionManager) ????????????????.build(); ????} ????//?子任務(wù)一 ????private?Job?childJobOne()?{ ????????return?jobBuilderFactory.get("childJobOne") ????????????????.start( ????????????????????stepBuilderFactory.get("childJobOneStep") ????????????????????????????.tasklet((stepContribution,?chunkContext)?->?{ ????????????????????????????????System.out.println("子任務(wù)一執(zhí)行步驟。。。"); ????????????????????????????????return?RepeatStatus.FINISHED; ????????????????????????????}).build() ????????????????).build(); ????} ????//?子任務(wù)二 ????private?Job?childJobTwo()?{ ????????return?jobBuilderFactory.get("childJobTwo") ????????????????.start( ????????????????????stepBuilderFactory.get("childJobTwoStep") ????????????????????????????.tasklet((stepContribution,?chunkContext)?->?{ ????????????????????????????????System.out.println("子任務(wù)二執(zhí)行步驟。。。"); ????????????????????????????????return?RepeatStatus.FINISHED; ????????????????????????????}).build() ????????????????).build(); ????} }
4.2、讀取數(shù)據(jù)
定義 Model TestData,下面同一
@Data public?class?TestData?{ ????private?int?id; ????private?String?field1; ????private?String?field2; ????private?String?field3; }
讀取數(shù)據(jù)包含:文本數(shù)據(jù)讀取、數(shù)據(jù)庫數(shù)據(jù)讀取、XML 數(shù)據(jù)讀取、JSON 數(shù)據(jù)讀取等,具體自己查資料。
文本數(shù)據(jù)讀取 Demo
@Component public?class?FileItemReaderDemo?{ ????//?任務(wù)創(chuàng)建工廠 ????@Autowired ????private?JobBuilderFactory?jobBuilderFactory; ????//?步驟創(chuàng)建工廠 ????@Autowired ????private?StepBuilderFactory?stepBuilderFactory; ????@Bean ????public?Job?fileItemReaderJob()?{ ????????return?jobBuilderFactory.get("fileItemReaderJob2") ????????????????.start(step()) ????????????????.build(); ????} ????private?Step?step()?{ ????????return?stepBuilderFactory.get("step") ????????????????.chunk(2) ????????????????.reader(fileItemReader()) ????????????????.writer(list?->?list.forEach(System.out::println)) ????????????????.build(); ????} ????private?ItemReader ?fileItemReader()?{ ????????FlatFileItemReader ?reader?=?new?FlatFileItemReader<>(); ????????reader.setResource(new?ClassPathResource("reader/file"));?//?設(shè)置文件資源地址 ????????reader.setLinesToSkip(1);?//?忽略第一行 ????????//?AbstractLineTokenizer的三個(gè)實(shí)現(xiàn)類之一,以固定分隔符處理行數(shù)據(jù)讀取, ????????//?使用默認(rèn)構(gòu)造器的時(shí)候,使用逗號作為分隔符,也可以通過有參構(gòu)造器來指定分隔符 ????????DelimitedLineTokenizer?tokenizer?=?new?DelimitedLineTokenizer(); ????????//?設(shè)置屬性名,類似于表頭 ????????tokenizer.setNames("id",?"field1",?"field2",?"field3"); ????????//?將每行數(shù)據(jù)轉(zhuǎn)換為TestData對象 ????????DefaultLineMapper ?mapper?=?new?DefaultLineMapper<>(); ????????//?設(shè)置LineTokenizer ????????mapper.setLineTokenizer(tokenizer); ????????//?設(shè)置映射方式,即讀取到的文本怎么轉(zhuǎn)換為對應(yīng)的POJO ????????mapper.setFieldSetMapper(fieldSet?->?{ ????????????TestData?data?=?new?TestData(); ????????????data.setId(fieldSet.readInt("id")); ????????????data.setField1(fieldSet.readString("field1")); ????????????data.setField2(fieldSet.readString("field2")); ????????????data.setField3(fieldSet.readString("field3")); ????????????return?data; ????????}); ????????reader.setLineMapper(mapper); ????????return?reader; ????} }
4.3、輸出數(shù)據(jù)
輸出數(shù)據(jù)也包含:文本數(shù)據(jù)讀取、數(shù)據(jù)庫數(shù)據(jù)讀取、XML 數(shù)據(jù)讀取、JSON 數(shù)據(jù)讀取等
Component public?class?FileItemWriterDemo?{ ????@Autowired ????private?JobBuilderFactory?jobBuilderFactory; ????@Autowired ????private?StepBuilderFactory?stepBuilderFactory; ????@Resource(name?=?"writerSimpleReader") ????private?ListItemReader?writerSimpleReader; ????@Bean ????public?Job?fileItemWriterJob()?throws?Exception?{ ????????return?jobBuilderFactory.get("fileItemWriterJob") ????????????????.start(step()) ????????????????.build(); ????} ????private?Step?step()?throws?Exception?{ ????????return?stepBuilderFactory.get("step") ????????????????. chunk(2) ????????????????.reader(writerSimpleReader) ????????????????.writer(fileItemWriter()) ????????????????.build(); ????} ????private?FlatFileItemWriter ?fileItemWriter()?throws?Exception?{ ????????FlatFileItemWriter ?writer?=?new?FlatFileItemWriter<>(); ????????FileSystemResource?file?=?new?FileSystemResource("D:/code/spring-batch-demo/src/main/resources/writer/writer-file"); ????????Path?path?=?Paths.get(file.getPath()); ????????if?(!Files.exists(path))?{ ????????????Files.createFile(path); ????????} ????????//?設(shè)置輸出文件路徑 ????????writer.setResource(file); ????????//?把讀到的每個(gè)TestData對象轉(zhuǎn)換為JSON字符串 ????????LineAggregator ?aggregator?=?item?->?{ ????????????try?{ ????????????????ObjectMapper?mapper?=?new?ObjectMapper(); ????????????????return?mapper.writeValueAsString(item); ????????????}?catch?(JsonProcessingException?e)?{ ????????????????e.printStackTrace(); ????????????} ????????????return?""; ????????}; ????????writer.setLineAggregator(aggregator); ????????writer.afterPropertiesSet(); ????????return?writer; ????} }
4.5、處理數(shù)據(jù)
@Component
public?class?ValidatingItemProcessorDemo?{ ????@Autowired ????private?JobBuilderFactory?jobBuilderFactory; ????@Autowired ????private?StepBuilderFactory?stepBuilderFactory; ????@Resource(name?=?"processorSimpleReader") ????private?ListItemReader?processorSimpleReader; ????@Bean ????public?Job?validatingItemProcessorJob()?throws?Exception?{ ????????return?jobBuilderFactory.get("validatingItemProcessorJob3") ????????????????.start(step()) ????????????????.build(); ????} ????private?Step?step()?throws?Exception?{ ????????return?stepBuilderFactory.get("step") ????????????????. chunk(2) ????????????????.reader(processorSimpleReader) ????????????????.processor(beanValidatingItemProcessor()) ????????????????.writer(list?->?list.forEach(System.out::println)) ????????????????.build(); ????} //????private?ValidatingItemProcessor ?validatingItemProcessor()?{ //????????ValidatingItemProcessor ?processor?=?new?ValidatingItemProcessor<>(); //????????processor.setValidator(value?->?{ //????????????//?對每一條數(shù)據(jù)進(jìn)行校驗(yàn) //????????????if?("".equals(value.getField3()))?{ //????????????????//?如果field3的值為空串,則拋異常 //????????????????throw?new?ValidationException("field3的值不合法"); //????????????} //????????}); //????????return?processor; //????} ????private?BeanValidatingItemProcessor ?beanValidatingItemProcessor()?throws?Exception?{ ????????BeanValidatingItemProcessor ?beanValidatingItemProcessor?=?new?BeanValidatingItemProcessor<>(); ????????//?開啟過濾,不符合規(guī)則的數(shù)據(jù)被過濾掉; //????????beanValidatingItemProcessor.setFilter(true); ????????beanValidatingItemProcessor.afterPropertiesSet(); ????????return?beanValidatingItemProcessor; ????} }
4.6、任務(wù)調(diào)度
可以配合 quartz 或者 xxljob 實(shí)現(xiàn)定時(shí)任務(wù)執(zhí)行
@RestController @RequestMapping("job") public?class?JobController?{ ????@Autowired ????private?Job?job; ????@Autowired ????private?JobLauncher?jobLauncher; ????@GetMapping("launcher/{message}") ????public?String?launcher(@PathVariable?String?message)?throws?Exception?{ ????????JobParameters?parameters?=?new?JobParametersBuilder() ????????????????.addString("message",?message) ????????????????.toJobParameters(); ????????//?將參數(shù)傳遞給任務(wù) ????????jobLauncher.run(job,?parameters); ????????return?"success"; ????} } 編輯:黃飛
評論