ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 5. JdbcPagingItemReader로 DB내용을 읽고, JdbcBatchItemWriter로 DB에 쓰기
    Spring/Spring Batch 2024. 11. 5. 17:38

    이전에는 csv 파일을 읽고 쓰는 것에 배웠습니다. 이번에는 DB에서 데이터를 읽고 CSV 파일로 쓰거나 CSV 파일을 읽고 DB에 저장하는 방법에 대해 알아보자.

     

    1. JdbcPagingItemReader

    • JdbcPagingItemReader는 Spring Batch에서 제공하는 ItemReader로, 데이터베이스로부터 데이터를 페이지 단위로 읽는다.
    • 대규모 데이터 처리 효율성: 메모리 사용량을 최소화하고 커밋 간격을 설정하여 대규모 데이터를 효율적으로 처리할 수 있다.
    • 쿼리 최적화: SQL 쿼리를 직접 작성하여 최적화된 데이터 읽기가 가능하다.
    • 커서 제어: 데이터베이스 커서를 사용하여 데이터 순회를 제어할 수 있다.

    2. 쿼리 Provider 생성하기

    @Bean
    public PagingQueryProvider queryProvider() throws Exception {
            SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
            queryProvider.setDataSource(dataSource);  // DB 에 맞는 PagingQueryProvider 를 선택하기 위함
            queryProvider.setSelectClause("id, name, age, gender");
            queryProvider.setFromClause("from customer");
            queryProvider.setWhereClause("where age >= :age");
    
            Map<String, Order> sortKeys = new HashMap<>(1);
            sortKeys.put("id", Order.DESCENDING);
    
            queryProvider.setSortKeys(sortKeys);
    
            return queryProvider.getObject();
        }
    • SqlPagingQueryProviderFactoryBean: 쿼리 프로파이더 팩토리
    • setDataSource: 데이터소스를 설정한다.
    • setSelectClause: select에서 프로젝션할 필드 이름을 지정한다.
    • setFromClause: 조회할 테이블
    • setWhereClause: 조건절
    • setSortKeys: 소트 키를 지정한다.

    3. JdbcPagingItemReader 작성

    @Bean
     public JdbcPagingItemReader<Customer> jdbcPagingItemReader() throws Exception {
            Map<String, Object> parameterValue = new HashMap<>();
            parameterValue.put("age", 20);
    
            return new JdbcPagingItemReaderBuilder<Customer>()
                    .name("jdbcPagingItemReader")
                    .fetchSize(CHUNK_SIZE)
                    .dataSource(dataSource)
                    .rowMapper(new BeanPropertyRowMapper<>(Customer.class))
                    .queryProvider(queryProvider())
                    .parameterValues(parameterValue)
                    .build();
        }

    ※ queryProvider를 이용해서 만든 쿼리에 조건을 넣고 CHUNK_SIZE 만큼 읽는다.


    4. 전체 코드

    @Slf4j
    @Configuration
    @RequiredArgsConstructor
    public class JdbcPagingReaderJobConfig {
    
    	public static final int CHUNK_SIZE = 2;
    	public static final String ENCODING = "UTF-8";
    	public static final String JDBC_PAGING_CHUNK_JOB = "JDBC_PAGING_CHUNK_JOB";
    
    	private final DataSource dataSource;
    
    	@Bean
    	public JdbcPagingItemReader<Customer> jdbcPagingItemReader() throws Exception {
    		Map<String, Object> parameterValue = new HashMap<>();
    		parameterValue.put("age", 20);
    
    		return new JdbcPagingItemReaderBuilder<Customer>()
    			.name("jdbcPagingItemReader")
    			.fetchSize(CHUNK_SIZE)
    			.dataSource(dataSource)
    			.rowMapper(new BeanPropertyRowMapper<>(Customer.class))
    			.queryProvider(queryProvider())
    			.parameterValues(parameterValue)
    			.build();
    	}
    
    	@Bean
    	public PagingQueryProvider queryProvider() throws Exception {
    		SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
    		queryProvider.setDataSource(dataSource);
    		queryProvider.setSelectClause("id, name, age, gender");
    		queryProvider.setFromClause("from customer2");
    		queryProvider.setWhereClause("where age >= :age");
    
    		Map<String, Order> sortKeys = new HashMap<>(1);
    		sortKeys.put("id", Order.DESCENDING);
    
    		queryProvider.setSortKeys(sortKeys);
    
    		return queryProvider.getObject();
    	}
    
    	@Bean
    	public FlatFileItemWriter<Customer> customerFlatFileItemWriter() {
    		return new FlatFileItemWriterBuilder<Customer>()
    			.name("customerFlatFileItemWriter")
    			.resource(new FileSystemResource("./customer_new_v1.csv"))
    			.encoding(ENCODING)
    			.delimited().delimiter("\t")
    			.names("Name", "Age", "Gender")
    			.build();
    	}
    
    	@Bean
    	public Step customerJdbcPagingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
    		log.info("------------------ Init customerJdbcPagingStep -----------------");
    
    		return new StepBuilder("customerJdbcPagingStep", jobRepository)
    			.<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
    			.reader(jdbcPagingItemReader())
    			.writer(customerFlatFileItemWriter())
    			.build();
    	}
    
    	@Bean
    	public Job customerJdbcPagingJob(@Qualifier("customerJdbcPagingStep") Step customerJdbcPagingStep, JobRepository jobRepository) {
    		log.info("------------------ Init customerJdbcPagingJob -----------------");
    		return new JobBuilder(JDBC_PAGING_CHUNK_JOB, jobRepository)
    			.incrementer(new RunIdIncrementer())
    			.start(customerJdbcPagingStep)
    			.build();
    	}
    
    }

    ※ DB에서 데이터를 읽고 CSV 파일에 읽은 데이터를 쓰고 저장한다.

     


    6. DB 설정

      datasource:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/testdb?useUnicode=true&characterEncoding=utf8&clusterInstanceHostPattern=?&zeroDateTimeBehavior=CONVERT_TO_NULL&allowMultiQueries=true
        username: root
        password: root1234

    * DB에 데이터를 데이터를 넣어놔야 읽고 CSV 파일로 만들 수 있기 때문에, docker를 이용해서 테이블 및 데이터를 준비 해둔다.


    7.  JdbcBatchItemWriter

    • JdbcBatchItemWriter Spring Batch에서 제공하는 ItemWriter 인터페이스를 구현하는 클래스이다.
    • 데이터를 JDBC를 통해 데이터베이스에 저장하는 데 사용된다.
    • DataSource: 데이터베이스 연결 정보를 지정한다.
    • SqlStatementCreator: INSERT 쿼리를 생성하는 역할을 한다.
    • PreparedStatementSetter: INSERT 쿼리의 파라미터 값을 설정하는 역할을 한다.
    • ItemSqlParameterSourceProvider: Item 객체를 기반으로 PreparedStatementSetter에 전달할 파라미터 값을 생성하는 역할을 한다.

    장점

    • 데이터베이스 연동: JDBC를 통해 다양한 데이터베이스에 데이터를 저장할 수 있다.
    • 성능: 대량의 데이터를 빠르게 저장할 수 있다.
    • 유연성: 다양한 설정을 통해 원하는 방식으로 데이터를 저장할 수 있다.

    단점

    • 설정 복잡성: JDBC 설정 및 쿼리 작성이 복잡할 수 있다.
    • 데이터베이스 종속: 특정 데이터베이스에 종속적이다.
    • 오류 가능성: 설정 오류 시 데이터 손상 가능성이 있다.

    8 . 테이블 생성하기

    create table testdb.customer2
    (
        id     int auto_increment primary key,
        name   varchar(100) null,
        age    int          null,
        gender varchar(10)  null
    );

     


    9. SqlPatameterSourceProvider 작성

    public class CustomerItemSqlParameterSourceProvider implements ItemSqlParameterSourceProvider<Customer> {
        @Override
        public SqlParameterSource createSqlParameterSource(Customer item) {
            return new BeanPropertySqlParameterSource(item);
        }
    }

     


    10. 전체코드

    @Slf4j
    @Configuration
    @RequiredArgsConstructor
    public class JdbcBatchItemJobConfig {
    
    	/**
    	 * CHUNK 크기를 지정한다.
    	 */
    	public static final int CHUNK_SIZE = 100;
    	public static final String ENCODING = "UTF-8";
    	public static final String JDBC_BATCH_WRITER_CHUNK_JOB = "JDBC_BATCH_WRITER_CHUNK_JOB";
    
    	private final DataSource dataSource;
    
    	@Bean
    	public FlatFileItemReader<Customer> jdbcFlatFileItemReader() {
    
    		return new FlatFileItemReaderBuilder<Customer>()
    			.name("FlatFileItemReader")
    			.resource(new ClassPathResource("./customer.csv"))
    			.encoding(ENCODING)
    			.delimited().delimiter(",")
    			.names("name", "age", "gender")
    			.targetType(Customer.class)
    			.build();
    	}
    
    	@Bean
    	public JdbcBatchItemWriter<Customer> jdbcFlatFileItemWriter() {
    
    		return new JdbcBatchItemWriterBuilder<Customer>()
    			.dataSource(dataSource)
    			.sql("INSERT INTO customer2 (name, age, gender) VALUES (:name, :age, :gender)")
    			.itemSqlParameterSourceProvider(new CustomerItemSqlParameterSourceProvider())
    			.build();
    	}
    
    
    	@Bean
    	public Step jdbcFlatFileStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    		log.info("------------------ Init flatFileStep -----------------");
    
    		return new StepBuilder("jdbcFlatFileStep", jobRepository)
    			.<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
    			.reader(jdbcFlatFileItemReader())
    			.writer(jdbcFlatFileItemWriter())
    			.build();
    	}
    
    	@Bean
    	public Job jdbcBatchWriterChunkJob(@Qualifier("jdbcFlatFileStep") Step flatFileStep, JobRepository jobRepository) {
    		log.info("------------------ Init flatFileJob -----------------");
    		return new JobBuilder(JDBC_BATCH_WRITER_CHUNK_JOB, jobRepository)
    			.incrementer(new RunIdIncrementer())
    			.start(flatFileStep)
    			.build();
    	}
    }

     

    * CSV 파일을 읽고 객체와 매핑 후 DB에 데이터를 저장한다.

    댓글

Designed by Tistory.