Spring Batch簡單入門(一)- 簡介
Spring Batch根植於Spring Framework大家庭,是一個輕量級的批處理框架,在企業級應用中,我們常見一些批處理業務場景,藉助Spring Batch我們可以很方便的開發出健壯、易用的批處理應用。因為,Spring Framework框架可以說滿足了開發人員對於理想框架的所有期望:高效的開發效率、基於POJO的開發方法、簡單易用。另外,與市面上我們常見的調度框架: Quartz, Tivoli, Control-M等等不同,Spring Batch並不是一個調度框架,Spring Batch設計的初衷是與調度框架完美協作,而非作為一個潛在的調度框架選項。
基於代碼簡潔性與管理便捷性的考慮,在日常大數據處理業務場景中,我們應儘可能的發揮功能重用的優勢。作為一個設計優良的批處理框架,SpringBatch提供了許多可重用的功能:日誌跟蹤、事務管理、任務處理統計、任務重啟、跳過與資源管理等。此外,通過更為高級的優化及分區技術,Spring Batch提供支持大容量、高性能的批處理特性。Spring Batch可謂是“老少咸宜”,即可用作簡單的文件讀取或執行存儲過程,也可用作複雜的、大容量的數據庫與數據庫之間的數據遷移、轉換等場景。大容量批處理作業可以充分利用框架的可擴展特性來處理重要的業務信息。
![Spring Batch簡單教程,可操作性強,入門必看](http://p2.ttnews.xyz/loading.gif)
![Spring Batch簡單教程,可操作性強,入門必看](http://p2.ttnews.xyz/loading.gif)
Spring Batch簡單入門(二)- Job配置
上圖中一個Job對應N個Step(N>=1),一個Step包含一個Reader,一個Processor,一個Writer。我們的代碼組織如下
定義ItemReader
@Bean
@Qualifier("repositoryItemReader")
public RepositoryItemReader<User> repositoryItemReader() {
Map<string>Sort.Direction> map = new HashMap<>();/<string>
map.put("id", Sort.Direction.DESC);
RepositoryItemReader<User> repositoryItemReader = new RepositoryItemReader<>(); repositoryItemReader.setRepository(userReaderRepository);
repositoryItemReader.setPageSize(5);
repositoryItemReader.setMethodName("findAll");
repositoryItemReader.setSort(map);
return repositoryItemReader;
}
定義ItemProcessor
@Override
public People process(User user) throws Exception {
final String firstName = user.getFirstName().toUpperCase();
final String lastName = user.getLastName().toUpperCase();
final People transformedPeople = People.builder()
.firstName(firstName)
.lastName(lastName)
.personId(UUID.randomUUID().toString())
.build();
log.info("converting (" + user + ") into (" + transformedPeople + ")");
return transformedPeople;
}
定義ItemWriter
@Bean
@Qualifier("repositoryItemWriter")
public RepositoryItemWriter<People> repositoryItemWriter() {
RepositoryItemWriter<People> peopleRepositoryItemWriter = new RepositoryItemWriter<>();
peopleRepositoryItemWriter.setRepository(peopleCrudRepository);
peopleRepositoryItemWriter.setMethodName("save");
return peopleRepositoryItemWriter;
}
定義Step如下:
@Bean
public Step stepWithRepository() {
return stepBuilderFactory.get("stepWithRepository")
.<User, People>chunk(10)
.reader(repositoryItemReader)
.processor(userPeopleItemProcessor)
.writer(repositoryItemWriter)
.build();
}
定義Job
@Configuration
public class RepositoryJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private Step stepWithRepositoryReaderAndWriter;
@Autowired
private Step stepWithRepository;
@Bean
public Job jobWithRepositoryStep(@Autowired CommonJobListener commonJobListener) {
return jobBuilderFactory.get("jobWithRepositoryStep")
.incrementer(new RunIdIncrementer())
.listener(commonJobListener)
.flow(stepWithRepository)
.end()
.build();
}
}
暴露Rest 接口,以JobLauncher啟動指定Job
@Autowired
private JobLaunchService jobLaunchService;
@Autowired
private Job jobWithRepositoryStep;
@GetMapping
public JobResult launchJobWithRepositoryStep() {
return jobLaunchService.launchJob(jobWithRepositoryStep);
}
其中JobLaunchService定義如下
@Service
@Slf4j
public class JobLaunchService {
@Autowired
private JobLauncher jobLauncher;
public JobResult launchJob(Job job) {
try {
JobParameters jobParameters = new JobParametersBuilder()
.addDate("timestamp", Calendar.getInstance().getTime())
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(job, jobParameters);
return JobResult.builder()
.jobName(job.getName())
.jobId(jobExecution.getJobId())
.jobExitStatus(jobExecution.getExitStatus())
.timestamp(Calendar.getInstance().getTimeInMillis())
.build();
} catch (Exception e) {
log.error(e.getMessage());
throw new RuntimeException("launch job exception ", e);
}
}
}
這樣,一個完整的Job就配置完成了,你可以通過簡單的HTTP接口調用啟動指定Job。
閱讀更多 陝西文都智鏈 的文章