R语言 作用域 ssm 金融信贷 d3 constructor pagination electron安装 jquery点击事件 jq选择子元素 jquery获取元素 matlab停止运行命令 js控制台打印 linux启动数据库 python读取数据库 python读取本地文件 java编程环境 java连数据库 java怎么写接口 javac java集合类 java入门基础 java连接数据库代码 java停止线程 java日期格式 金山wps2003 俄罗斯方块java代码 路由器有辐射吗 js删除数组指定元素 pdf拆分工具 超星网课助手 listpreference 无法打开搜索页 skycc组合营销软件 程序卸载 maven项目打包 js代码混淆工具 灰色按钮激活精灵 网页之家 android应用开发入门
当前位置: 首页 > 学习教程  > 编程语言

初识springBatch——demo配置

2020/8/31 15:02:44 文章标签:

全部手写,请尊重版权!

springbatch运行流程

在这里插入图片描述

1、maven依赖

<!--  spring batch -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    <!-- hibernate validator -->
    <dependency>
      <groupId>org.hibernate</groupId>
      <artifactId>hibernate-validator</artifactId>
      <version>6.0.7.Final</version>
    </dependency>
    <!-- alibaba dataSource -->
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>druid</artifactId>
      <version>1.1.12</version>
    </dependency>

2、创建配置类(使用@EnableBatchProcessing //开启批处理的支持)

(1):reader(读)

​ ItemReader:基础接口

​ FlatFileItemReader:读文件类

@Bean
public ItemReader<User> reader(){   
    FlatFileItemReader<User> reader = new FlatFileItemReader<>();   // 设置文件处在路径   
    reader.setResource(new ClassPathResource("user.csv"));   // entity与csv数据做映射   
    reader.setLineMapper(new DefaultLineMapper<User>() {    // 设置行映射  
        {         
            setLineTokenizer(new DelimitedLineTokenizer() {   // 设置字段         
                {               
                    setNames(new String[]{"id","name","sax"});            
                }        
            });        
            setFieldSetMapper(new BeanWrapperFieldSetMapper<User>() { // 设置映射类       
                {               
                    setTargetType(User.class);            
                }         
            });     
        }   
	});   
	return reader;
}

​ JdbcCursorItemReader:数据库读取

@Bean
public ItemReader<User> reader(DataSource dataSource){   
    JdbcCursorItemReader<User> reader = new JdbcCursorItemReader<User>();   // 设置文件处在路径   
    reader.setDataSource(dataSource);   
    reader.setFetchSize(500);   
    reader.setSql("select id,name,sax from user");   
    reader.setRowMapper(new RowMapper<User>() {      
        @Override      
        public User mapRow(ResultSet resultSet, int i) throws SQLException {        
            User user = new User();
            user.setId(resultSet.getLong("id"));
            user.setName(resultSet.getString("name"));
            user.setSax(resultSet.getString("sax"));         
            return user;      
        }   
    });   
    return reader;
}

(2):processor(数据处理)

​ ItemProcessor:基础接口

​ 自定义processor 实现 ItemProcessor 重写process方法.

//将user对象转为json
public class JsonProcessor implements ItemProcessor<User,String> {
    private Logger logger = LoggerFactory.getLogger(JsonProcessor.class); 
   
    @Override   
    public String process(User user) throws Exception { 
        logger.info("JsonProcessor-----start");     
        logger.info("JsonProcessor-----end");     
        return JacksonJsonUtils.bean2json(user);   
    }
}

(3):writer(写)

​ ItemWriter :基础写接口

​ JdbcBatchItemWriter:数据库批量写入类

@Bean
public ItemWriter<User> writer(DataSource dataSource){  
    JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();   
    // 设置有参数的sql语句 
  writer.setItemSqlParameterSourceProvider(newBeanPropertyItemSqlParameterSourceProvider<User>());   
    String sql = "insert into user values(:id,:name,:sax)";   
    writer.setSql(sql);   
    writer.setDataSource(dataSource);   
    return writer;
}

​ FlatFileItemWriter:写文件类

@Bean
public ItemWriter<String> writer(){  
    FlatFileItemWriter writer = new FlatFileItemWriter(); 
    writer.setShouldDeleteIfExists(true);  
    DelimitedLineAggregator aggregator = new DelimitedLineAggregator();  
    aggregator.setDelimiter(",");  //设置分隔符
    BeanWrapperFieldExtractor extractor = new BeanWrapperFieldExtractor(); 
    extractor.setNames(new String[]{"id","name"});  //设置字段
    aggregator.setFieldExtractor(extractor);  
    writer.setLineAggregator(aggregator);   
    writer.setResource(new FileSystemResource("user.txt"));  //设置写入的文件
    return writer;
}

​ 如果写的数据是字符串,不是对象则可以设置LineAggregator为PassThroughLineAggregator对象,不用设置分隔符和字段

@Bean
public ItemWriter<String> writer(){   
    JsonWriter writer = new JsonWriter();   
    writer.setShouldDeleteIfExists(true);   
    writer.setLineAggregator(new PassThroughLineAggregator());   
    writer.setResource(new FileSystemResource("user.txt"));   
    return writer;
}

(4):StepBuilder(创建Step)

@Bean
public Step step(StepBuilderFactory stepBuilderFactory,   
                 ItemReader<User> reader,   
                 ItemWriter<String> writer,   
                 ItemProcessor<User, String> processor){ 
    return stepBuilderFactory 
        .get("step")//设置step名称   
        .<User, String>chunk(65000) 
        // Chunk的机制(即每次读取一条数据,再处理一条数据,累积到一定数量后再一次性交给writer进行写入操作)      
        .reader(reader)      
        .processor(processor)   
        .writer(writer)    
        .build();
}

(5):JobBuilder(创建Job)

@Bean
public Job importJob(JobBuilderFactory jobs, Step step){  
    return jobs.get("importJob")    
        .incrementer(new RunIdIncrementer())     
        .flow(step)   //设置step可使用.next多设置几个 
        .end()    
        .listener(csvJobListener)  //设置监听 
        .build();
}

(6):JobRepository(注册Job容器)

​ dataSource 数据库配置,需要在数据库配置类中完成配置

@Bean
public JobRepository cvsJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{  
    JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
    jobRepositoryFactoryBean.setDatabaseType("mysql");   
    jobRepositoryFactoryBean.setTransactionManager(transactionManager); 
    jobRepositoryFactoryBean.setDataSource(dataSource); 
    return jobRepositoryFactoryBean.getObject();
}

(7):JobLauncher(设置执行器)或者 jobOperator

@Bean
public SimpleJobLauncher csvJobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{   
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();   // 设置jobRepository   
    jobLauncher.setJobRepository(cvsJobRepository(dataSource, transactionManager)); 
    return jobLauncher;
}

(8):配置Job的监听:listener

​ 自定义监听,实现JobExecutionListener接口

@Component
public class CsvJobListener implements JobExecutionListener {  
    private Logger logger = LoggerFactory.getLogger(CsvJobListener.class); 
    private long startTime;  
    private long endTime;   
    
    /**
     * job开始前执行
     */
    @Override   
    public void beforeJob(JobExecution jobExecution) {   
        startTime = System.currentTimeMillis();    
        logger.info("job process start..."); 
    } 
    
    /**
     * job完成后执行
     */
    @Override
    public void afterJob(JobExecution jobExecution) { 
        endTime = System.currentTimeMillis();   
        logger.info("job process end...");    
        logger.info("elapsed time: " + (endTime - startTime) + "ms"); 
    }
}

注:以上@bean 均在配置类中编写,注入到容器中,随后会附上源码地址,有建议可留言评论,谢谢!


本文链接: http://www.dtmao.cc/news_show_150321.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?