Spring Batch 架构介绍
- 
从数据库,文件或队列中读取大量记录。
 - 
以某种方式处理数据。
 - 
以修改之后的形式写回数据。
 


Spring Batch 核心概念介绍
什么是 Job
    
    
    
 
     
     
     /**
 * Batch domain object representing a job. Job is an explicit abstraction
 * representing the configuration of a job specified by a developer. It should
 * be noted that restart policy is applied to the job as a whole and not to a
 * step.
 */
public interface Job {
 
 String getName();
 
 boolean isRestartable();
 
 void execute(JobExecution execution);
 
 JobParametersIncrementer getJobParametersIncrementer();
 
 JobParametersValidator getJobParametersValidator();
 
}
    
    
     
    
    
    
 
     
     
     @Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .end()
                     .build();
}
    
    
     
什么是 JobInstance
    
    
    
 
     
     
     public interface JobInstance {
 /**
  * Get unique id for this JobInstance.
  * @return instance id
  */
 public long getInstanceId();
 /**
  * Get job name.
  * @return value of 'id' attribute from <job>
  */
 public String getJobName(); 
}
    
    
     
什么是 JobParameters

什么是 JobExecution
    
    
    
 
     
     
     public interface JobExecution {
 /**
  * Get unique id for this JobExecution.
  * @return execution id
  */
 public long getExecutionId();
 /**
  * Get job name.
  * @return value of 'id' attribute from <job>
  */
 public String getJobName(); 
 /**
  * Get batch status of this execution.
  * @return batch status value.
  */
 public BatchStatus getBatchStatus();
 /**
  * Get time execution entered STARTED status. 
  * @return date (time)
  */
 public Date getStartTime();
 /**
  * Get time execution entered end status: COMPLETED, STOPPED, FAILED 
  * @return date (time)
  */
 public Date getEndTime();
 /**
  * Get execution exit status.
  * @return exit status.
  */
 public String getExitStatus();
 /**
  * Get time execution was created.
  * @return date (time)
  */
 public Date getCreateTime();
 /**
  * Get time execution was last updated updated.
  * @return date (time)
  */
 public Date getLastUpdatedTime();
 /**
  * Get job parameters for this execution.
  * @return job parameters 
  */
 public Properties getJobParameters();
 
}
    
    
     
    
    
    
 
     
     
     public enum BatchStatus {STARTING, STARTED, STOPPING, 
   STOPPED, FAILED, COMPLETED, ABANDONED }
    
    
     
batch_job_execution, 下面是一个从数据库当中截图的实例:

什么是 Step

什么是 StepExecution

什么是 ExecutionContext
    
    
    
 
     
     
     ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
    
    
     
什么是 JobRepository
@EnableBatchProcessing注解可以为 JobRepository 提供自动配置。
什么是 JobLauncher
    
    
    
 
     
     
     public interface JobLauncher {
 
public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
    
    
     
什么是 Item Reader
    
    
    
 
     
     
     @Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("status", "NEW");
 
        return new JdbcPagingItemReaderBuilder<CustomerCredit>()
                                           .name("creditReader")
                                           .dataSource(dataSource)
                                           .queryProvider(queryProvider)
                                           .parameterValues(parameterValues)
                                           .rowMapper(customerCreditMapper())
                                           .pageSize(1000)
                                           .build();
}
 
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
        SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
 
        provider.setSelectClause("select id, name, credit");
        provider.setFromClause("from customer");
        provider.setWhereClause("where status=:status");
        provider.setSortKey("id");
 
        return provider;
}
    
    
     
    
    
    
 
     
     
     private JdbcCursorItemReader<Map<String, Object>> buildItemReader(final DataSource dataSource, String tableName,
            String tenant) {
 
        JdbcCursorItemReader<Map<String, Object>> itemReader = new JdbcCursorItemReader<>();
        itemReader.setDataSource(dataSource);
        itemReader.setSql("sql here");
        itemReader.setRowMapper(new RowMapper());
        return itemReader;
    }
    
    
     
什么是 Item Writer
什么是 Item Processor
chunk 处理流程


skip 策略和失败处理

批处理操作指南
批处理原则
理解决方案时,应考虑以下关键原则和注意事项。
- 
批处理体系结构通常会影响体系结构 
 - 
尽可能简化并避免在单批应用程序中构建复杂的逻辑结构 
 - 
保持数据的处理和存储在物理上靠得很近(换句话说,将数据保存在处理过程中)。 
 - 
最大限度地减少系统资源的使用,尤其是 I / O. 在 internal memory 中执行尽可能多的操作。 
 
- 
当数据可以被读取一次并缓存或保存在工作存储中时,读取每个事务的数据。 
 - 
重新读取先前在同一事务中读取数据的事务的数据。 
 - 
导致不必要的表或索引扫描。 
 - 
未在 SQL 语句的 WHERE 子句中指定键值。 
 
在批处理运行中不要做两次一样的事情。例如,如果需要数据汇总以用于报告目的,则应该(如果可能)在最初处理数据时递增存储的总计,因此您的报告应用程序不必重新处理相同的数据。
总是假设数据完整性最差。插入适当的检查和记录验证以维护数据完整性。
尽可能实施校验和以进行内部验证。例如,对于一个文件里的数据应该有一个数据条数纪录,告诉文件中的记录总数以及关键字段的汇总。
在具有真实数据量的类似生产环境中尽早计划和执行压力测试。
如何默认不启动 job
spring.batch.job.enabled=false
在读数据时内存不够

Resource exhaustion event:the JVM was unable to allocate memory from the heap.- 
调整 reader 读数据逻辑,按分页读取,但实现上会麻烦一些,且运行效率会下降 
 - 
增大 service 内存 
 

推荐阅读
1. GitHub 上有什么好玩的项目?
2. Linux 运维必备 150 个命令汇总
3. SpringSecurity + JWT 实现单点登录

本文分享自微信公众号 - Java后端(web_resource)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。
相关文章
暂无评论...
