一、SpringBatch 介紹
Spring Batch 是一個(gè)輕量級、全面的批處理框架,旨在支持開發(fā)對企業(yè)系統(tǒng)的日常操作至關(guān)重要的健壯的批處理應(yīng)用程序。Spring Batch 建立在人們期望的 Spring Framework 特性(生產(chǎn)力、基于 POJO 的開發(fā)方法和一般易用性)的基礎(chǔ)上,同時(shí)使開發(fā)人員可以在必要時(shí)輕松訪問和使用更高級的企業(yè)服務(wù)。
Spring Batch 不是一個(gè)調(diào)度框架。在商業(yè)和開源領(lǐng)域都有許多優(yōu)秀的企業(yè)調(diào)度程序(例如 Quartz、Tivoli、Control-M 等)。Spring Batch 旨在與調(diào)度程序結(jié)合使用,而不是替代調(diào)度程序。

二、業(yè)務(wù)場景
我們在業(yè)務(wù)開發(fā)中經(jīng)常遇到這種情況:

Spring Batch 支持以下業(yè)務(wù)場景:
定期提交批處理。
并發(fā)批處理:并行處理作業(yè)。
分階段的企業(yè)消息驅(qū)動處理。
大規(guī)模并行批處理。
失敗后手動或計(jì)劃重啟。
相關(guān)步驟的順序處理(擴(kuò)展到工作流驅(qū)動的批次)。
部分處理:跳過記錄(例如,在回滾時(shí))。
整批交易,適用于批量較小或已有存儲過程或腳本的情況。
三、基礎(chǔ)知識
3.1、整體架構(gòu)

| 名稱 | 作用 |
|---|---|
| JobRepository | 為所有的原型(Job、JobInstance、Step)提供持久化的機(jī)制 |
| JobLauncher | JobLauncher表示一個(gè)簡單的接口,用于啟動一個(gè)Job給定的集合 JobParameters |
| Job | Job是封裝了整個(gè)批處理過程的實(shí)體 |
| Step | Step是一個(gè)域?qū)ο?,它封裝了批處理作業(yè)的一個(gè)獨(dú)立的順序階段 |
3.2、核心接口
ItemReader: is an abstraction that represents the output of a Step, one batch or chunk of items at a time
ItemProcessor:an abstraction that represents the business processing of an item.
ItemWriter: is an abstraction that represents the output of a Step, one batch or chunk of items at a time.

大體即為 輸入→數(shù)據(jù)加工→輸出 ,一個(gè)Job定義多個(gè)Step及處理流程,一個(gè)Step通常涵蓋ItemReader、ItemProcessor、ItemWriter
四、基礎(chǔ)實(shí)操
4.0、引入 SpringBatch
pom 文件引入 springboot
?? org.springframework.boot ??spring-boot-starter-parent ??2.2.5.RELEASE ???
pom 文件引入 spring-batch 及相關(guān)依賴
???? ?????? ????org.springframework.boot ??????spring-boot-starter-batch ?????????? ????org.springframework.boot ??????spring-boot-starter-validation ?????????? ????mysql ??????mysql-connector-java ?????????? ??org.springframework.boot ??????spring-boot-starter-jdbc ????
mysql 創(chuàng)建依賴的庫表

sql 腳本的 jar 包路徑:.....maven epositoryorgspringframeworkatchspring-batch-core4.2.1.RELEASEspring-batch-core-4.2.1.RELEASE.jar!orgspringframeworkatchcoreschema-mysql.sql
啟動類標(biāo)志@EnableBatchProcessing
@SpringBootApplication @EnableBatchProcessing public?class?SpringBatchStartApplication { ????public?static?void?main(String[]?args)?{ ????????SpringApplication.run(SpringBatchStartApplication.class,?args); ????} }
FirstJobDemo
@Component
public?class?FirstJobDemo?{
????@Autowired
????private?JobBuilderFactory?jobBuilderFactory;
????@Autowired
????private?StepBuilderFactory?stepBuilderFactory;
????@Bean
????public?Job?firstJob()?{
????????return?jobBuilderFactory.get("firstJob")
????????????????.start(step())
????????????????.build();
????}
????private?Step?step()?{
????????return?stepBuilderFactory.get("step")
????????????????.tasklet((contribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟....");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
}
4.1、流程控制
A、多步驟任務(wù)
@Bean
public?Job?multiStepJob()?{
????return?jobBuilderFactory.get("multiStepJob2")
????????????.start(step1())
????????????.on(ExitStatus.COMPLETED.getExitCode()).to(step2())
????????????.from(step2())
????????????.on(ExitStatus.COMPLETED.getExitCode()).to(step3())
????????????.from(step3()).end()
????????????.build();
}
private?Step?step1()?{
????return?stepBuilderFactory.get("step1")
????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????System.out.println("執(zhí)行步驟一操作。。。");
????????????????return?RepeatStatus.FINISHED;
????????????}).build();
}
private?Step?step2()?{
????return?stepBuilderFactory.get("step2")
????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????System.out.println("執(zhí)行步驟二操作。。。");
????????????????return?RepeatStatus.FINISHED;
????????????}).build();
}
private?Step?step3()?{
????return?stepBuilderFactory.get("step3")
????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????System.out.println("執(zhí)行步驟三操作。。。");
????????????????return?RepeatStatus.FINISHED;
????????????}).build();
}
B、并行執(zhí)行
創(chuàng)建了兩個(gè) Flow:flow1(包含 step1 和 step2)和 flow2(包含 step3)。然后通過JobBuilderFactory的split方法,指定一個(gè)異步執(zhí)行器,將 flow1 和 flow2 異步執(zhí)行(也就是并行)
@Component
public?class?SplitJobDemo?{
????@Autowired
????private?JobBuilderFactory?jobBuilderFactory;
????@Autowired
????private?StepBuilderFactory?stepBuilderFactory;
????@Bean
????public?Job?splitJob()?{
????????return?jobBuilderFactory.get("splitJob")
????????????????.start(flow1())
????????????????.split(new?SimpleAsyncTaskExecutor()).add(flow2())
????????????????.end()
????????????????.build();
????}
????private?Step?step1()?{
????????return?stepBuilderFactory.get("step1")
????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟一操作。。。");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
????private?Step?step2()?{
????????return?stepBuilderFactory.get("step2")
????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟二操作。。。");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
????private?Step?step3()?{
????????return?stepBuilderFactory.get("step3")
????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟三操作。。。");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
????private?Flow?flow1()?{
????????return?new?FlowBuilder("flow1")
????????????????.start(step1())
????????????????.next(step2())
????????????????.build();
????}
????private?Flow?flow2()?{
????????return?new?FlowBuilder("flow2")
????????????????.start(step3())
????????????????.build();
????}
}
C、任務(wù)決策
決策器的作用就是可以指定程序在不同的情況下運(yùn)行不同的任務(wù)流程,比如今天是周末,則讓任務(wù)執(zhí)行 step1 和 step2,如果是工作日,則之心 step1 和 step3。
@Component
public?class?MyDecider?implements?JobExecutionDecider?{
????@Override
????public?FlowExecutionStatus?decide(JobExecution?jobExecution,?StepExecution?stepExecution)?{
????????LocalDate?now?=?LocalDate.now();
????????DayOfWeek?dayOfWeek?=?now.getDayOfWeek();
????????if?(dayOfWeek?==?DayOfWeek.SATURDAY?||?dayOfWeek?==?DayOfWeek.SUNDAY)?{
????????????return?new?FlowExecutionStatus("weekend");
????????}?else?{
????????????return?new?FlowExecutionStatus("workingDay");
????????}
????}
}
@Bean
public?Job?deciderJob()?{
?return?jobBuilderFactory.get("deciderJob")
???.start(step1())
???.next(myDecider)
???.from(myDecider).on("weekend").to(step2())
???.from(myDecider).on("workingDay").to(step3())
???.from(step3()).on("*").to(step4())
???.end()
???.build();
}
private?Step?step1()?{
?return?stepBuilderFactory.get("step1")
???.tasklet((stepContribution,?chunkContext)?->?{
????System.out.println("執(zhí)行步驟一操作。。。");
????return?RepeatStatus.FINISHED;
???}).build();
}
private?Step?step2()?{
?return?stepBuilderFactory.get("step2")
???.tasklet((stepContribution,?chunkContext)?->?{
????System.out.println("執(zhí)行步驟二操作。。。");
????return?RepeatStatus.FINISHED;
???}).build();
}
private?Step?step3()?{
?return?stepBuilderFactory.get("step3")
???.tasklet((stepContribution,?chunkContext)?->?{
????System.out.println("執(zhí)行步驟三操作。。。");
????return?RepeatStatus.FINISHED;
???}).build();
}
private?Step?step4()?{
?return?stepBuilderFactory.get("step4")
???.tasklet((stepContribution,?chunkContext)?->?{
????System.out.println("執(zhí)行步驟四操作。。。");
????return?RepeatStatus.FINISHED;
???}).build();
}
D、任務(wù)嵌套
任務(wù) Job 除了可以由 Step 或者 Flow 構(gòu)成外,我們還可以將多個(gè)任務(wù) Job 轉(zhuǎn)換為特殊的 Step,然后再賦給另一個(gè)任務(wù) Job,這就是任務(wù)的嵌套。
@Component
public?class?NestedJobDemo?{
????@Autowired
????private?JobBuilderFactory?jobBuilderFactory;
????@Autowired
????private?StepBuilderFactory?stepBuilderFactory;
????@Autowired
????private?JobLauncher?jobLauncher;
????@Autowired
????private?JobRepository?jobRepository;
????@Autowired
????private?PlatformTransactionManager?platformTransactionManager;
????//?父任務(wù)
????@Bean
????public?Job?parentJob()?{
????????return?jobBuilderFactory.get("parentJob")
????????????????.start(childJobOneStep())
????????????????.next(childJobTwoStep())
????????????????.build();
????}
????//?將任務(wù)轉(zhuǎn)換為特殊的步驟
????private?Step?childJobOneStep()?{
????????return?new?JobStepBuilder(new?StepBuilder("childJobOneStep"))
????????????????.job(childJobOne())
????????????????.launcher(jobLauncher)
????????????????.repository(jobRepository)
????????????????.transactionManager(platformTransactionManager)
????????????????.build();
????}
????//?將任務(wù)轉(zhuǎn)換為特殊的步驟
????private?Step?childJobTwoStep()?{
????????return?new?JobStepBuilder(new?StepBuilder("childJobTwoStep"))
????????????????.job(childJobTwo())
????????????????.launcher(jobLauncher)
????????????????.repository(jobRepository)
????????????????.transactionManager(platformTransactionManager)
????????????????.build();
????}
????//?子任務(wù)一
????private?Job?childJobOne()?{
????????return?jobBuilderFactory.get("childJobOne")
????????????????.start(
????????????????????stepBuilderFactory.get("childJobOneStep")
????????????????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????????????????System.out.println("子任務(wù)一執(zhí)行步驟。。。");
????????????????????????????????return?RepeatStatus.FINISHED;
????????????????????????????}).build()
????????????????).build();
????}
????//?子任務(wù)二
????private?Job?childJobTwo()?{
????????return?jobBuilderFactory.get("childJobTwo")
????????????????.start(
????????????????????stepBuilderFactory.get("childJobTwoStep")
????????????????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????????????????System.out.println("子任務(wù)二執(zhí)行步驟。。。");
????????????????????????????????return?RepeatStatus.FINISHED;
????????????????????????????}).build()
????????????????).build();
????}
}
4.2、讀取數(shù)據(jù)
定義 Model TestData,下面同一
@Data
public?class?TestData?{
????private?int?id;
????private?String?field1;
????private?String?field2;
????private?String?field3;
}
讀取數(shù)據(jù)包含:文本數(shù)據(jù)讀取、數(shù)據(jù)庫數(shù)據(jù)讀取、XML 數(shù)據(jù)讀取、JSON 數(shù)據(jù)讀取等,具體自己查資料。
文本數(shù)據(jù)讀取 Demo
@Component
public?class?FileItemReaderDemo?{
????//?任務(wù)創(chuàng)建工廠
????@Autowired
????private?JobBuilderFactory?jobBuilderFactory;
????//?步驟創(chuàng)建工廠
????@Autowired
????private?StepBuilderFactory?stepBuilderFactory;
????@Bean
????public?Job?fileItemReaderJob()?{
????????return?jobBuilderFactory.get("fileItemReaderJob2")
????????????????.start(step())
????????????????.build();
????}
????private?Step?step()?{
????????return?stepBuilderFactory.get("step")
????????????????.chunk(2)
????????????????.reader(fileItemReader())
????????????????.writer(list?->?list.forEach(System.out::println))
????????????????.build();
????}
????private?ItemReader?fileItemReader()?{
????????FlatFileItemReader?reader?=?new?FlatFileItemReader<>();
????????reader.setResource(new?ClassPathResource("reader/file"));?//?設(shè)置文件資源地址
????????reader.setLinesToSkip(1);?//?忽略第一行
????????//?AbstractLineTokenizer的三個(gè)實(shí)現(xiàn)類之一,以固定分隔符處理行數(shù)據(jù)讀取,
????????//?使用默認(rèn)構(gòu)造器的時(shí)候,使用逗號作為分隔符,也可以通過有參構(gòu)造器來指定分隔符
????????DelimitedLineTokenizer?tokenizer?=?new?DelimitedLineTokenizer();
????????//?設(shè)置屬性名,類似于表頭
????????tokenizer.setNames("id",?"field1",?"field2",?"field3");
????????//?將每行數(shù)據(jù)轉(zhuǎn)換為TestData對象
????????DefaultLineMapper?mapper?=?new?DefaultLineMapper<>();
????????//?設(shè)置LineTokenizer
????????mapper.setLineTokenizer(tokenizer);
????????//?設(shè)置映射方式,即讀取到的文本怎么轉(zhuǎn)換為對應(yīng)的POJO
????????mapper.setFieldSetMapper(fieldSet?->?{
????????????TestData?data?=?new?TestData();
????????????data.setId(fieldSet.readInt("id"));
????????????data.setField1(fieldSet.readString("field1"));
????????????data.setField2(fieldSet.readString("field2"));
????????????data.setField3(fieldSet.readString("field3"));
????????????return?data;
????????});
????????reader.setLineMapper(mapper);
????????return?reader;
????}
}
4.3、輸出數(shù)據(jù)
輸出數(shù)據(jù)也包含:文本數(shù)據(jù)讀取、數(shù)據(jù)庫數(shù)據(jù)讀取、XML 數(shù)據(jù)讀取、JSON 數(shù)據(jù)讀取等
Component
public?class?FileItemWriterDemo?{
????@Autowired
????private?JobBuilderFactory?jobBuilderFactory;
????@Autowired
????private?StepBuilderFactory?stepBuilderFactory;
????@Resource(name?=?"writerSimpleReader")
????private?ListItemReader?writerSimpleReader;
????@Bean
????public?Job?fileItemWriterJob()?throws?Exception?{
????????return?jobBuilderFactory.get("fileItemWriterJob")
????????????????.start(step())
????????????????.build();
????}
????private?Step?step()?throws?Exception?{
????????return?stepBuilderFactory.get("step")
????????????????.chunk(2)
????????????????.reader(writerSimpleReader)
????????????????.writer(fileItemWriter())
????????????????.build();
????}
????private?FlatFileItemWriter?fileItemWriter()?throws?Exception?{
????????FlatFileItemWriter?writer?=?new?FlatFileItemWriter<>();
????????FileSystemResource?file?=?new?FileSystemResource("D:/code/spring-batch-demo/src/main/resources/writer/writer-file");
????????Path?path?=?Paths.get(file.getPath());
????????if?(!Files.exists(path))?{
????????????Files.createFile(path);
????????}
????????//?設(shè)置輸出文件路徑
????????writer.setResource(file);
????????//?把讀到的每個(gè)TestData對象轉(zhuǎn)換為JSON字符串
????????LineAggregator?aggregator?=?item?->?{
????????????try?{
????????????????ObjectMapper?mapper?=?new?ObjectMapper();
????????????????return?mapper.writeValueAsString(item);
????????????}?catch?(JsonProcessingException?e)?{
????????????????e.printStackTrace();
????????????}
????????????return?"";
????????};
????????writer.setLineAggregator(aggregator);
????????writer.afterPropertiesSet();
????????return?writer;
????}
}
4.5、處理數(shù)據(jù)
@Component
public?class?ValidatingItemProcessorDemo?{
????@Autowired
????private?JobBuilderFactory?jobBuilderFactory;
????@Autowired
????private?StepBuilderFactory?stepBuilderFactory;
????@Resource(name?=?"processorSimpleReader")
????private?ListItemReader?processorSimpleReader;
????@Bean
????public?Job?validatingItemProcessorJob()?throws?Exception?{
????????return?jobBuilderFactory.get("validatingItemProcessorJob3")
????????????????.start(step())
????????????????.build();
????}
????private?Step?step()?throws?Exception?{
????????return?stepBuilderFactory.get("step")
????????????????.chunk(2)
????????????????.reader(processorSimpleReader)
????????????????.processor(beanValidatingItemProcessor())
????????????????.writer(list?->?list.forEach(System.out::println))
????????????????.build();
????}
//????private?ValidatingItemProcessor?validatingItemProcessor()?{
//????????ValidatingItemProcessor?processor?=?new?ValidatingItemProcessor<>();
//????????processor.setValidator(value?->?{
//????????????//?對每一條數(shù)據(jù)進(jìn)行校驗(yàn)
//????????????if?("".equals(value.getField3()))?{
//????????????????//?如果field3的值為空串,則拋異常
//????????????????throw?new?ValidationException("field3的值不合法");
//????????????}
//????????});
//????????return?processor;
//????}
????private?BeanValidatingItemProcessor?beanValidatingItemProcessor()?throws?Exception?{
????????BeanValidatingItemProcessor?beanValidatingItemProcessor?=?new?BeanValidatingItemProcessor<>();
????????//?開啟過濾,不符合規(guī)則的數(shù)據(jù)被過濾掉;
//????????beanValidatingItemProcessor.setFilter(true);
????????beanValidatingItemProcessor.afterPropertiesSet();
????????return?beanValidatingItemProcessor;
????}
}
4.6、任務(wù)調(diào)度
可以配合 quartz 或者 xxljob 實(shí)現(xiàn)定時(shí)任務(wù)執(zhí)行
@RestController @RequestMapping("job") public?class?JobController?{ ????@Autowired ????private?Job?job; ????@Autowired ????private?JobLauncher?jobLauncher; ????@GetMapping("launcher/{message}") ????public?String?launcher(@PathVariable?String?message)?throws?Exception?{ ????????JobParameters?parameters?=?new?JobParametersBuilder() ????????????????.addString("message",?message) ????????????????.toJobParameters(); ????????//?將參數(shù)傳遞給任務(wù) ????????jobLauncher.run(job,?parameters); ????????return?"success"; ????} } 編輯:黃飛
電子發(fā)燒友App
























評論