-
5. JdbcPagingItemReader로 DB내용을 읽고, JdbcBatchItemWriter로 DB에 쓰기Spring/Spring Batch 2024. 11. 5. 17:38
이전에는 csv 파일을 읽고 쓰는 것에 배웠습니다. 이번에는 DB에서 데이터를 읽고 CSV 파일로 쓰거나 CSV 파일을 읽고 DB에 저장하는 방법에 대해 알아보자.
1. JdbcPagingItemReader
- JdbcPagingItemReader는 Spring Batch에서 제공하는 ItemReader로, 데이터베이스로부터 데이터를 페이지 단위로 읽는다.
- 대규모 데이터 처리 효율성: 메모리 사용량을 최소화하고 커밋 간격을 설정하여 대규모 데이터를 효율적으로 처리할 수 있다.
- 쿼리 최적화: SQL 쿼리를 직접 작성하여 최적화된 데이터 읽기가 가능하다.
- 커서 제어: 데이터베이스 커서를 사용하여 데이터 순회를 제어할 수 있다.
2. 쿼리 Provider 생성하기
@Bean public PagingQueryProvider queryProvider() throws Exception { SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean(); queryProvider.setDataSource(dataSource); // DB 에 맞는 PagingQueryProvider 를 선택하기 위함 queryProvider.setSelectClause("id, name, age, gender"); queryProvider.setFromClause("from customer"); queryProvider.setWhereClause("where age >= :age"); Map<String, Order> sortKeys = new HashMap<>(1); sortKeys.put("id", Order.DESCENDING); queryProvider.setSortKeys(sortKeys); return queryProvider.getObject(); }
- SqlPagingQueryProviderFactoryBean: 쿼리 프로파이더 팩토리
- setDataSource: 데이터소스를 설정한다.
- setSelectClause: select에서 프로젝션할 필드 이름을 지정한다.
- setFromClause: 조회할 테이블
- setWhereClause: 조건절
- setSortKeys: 소트 키를 지정한다.
3. JdbcPagingItemReader 작성
@Bean public JdbcPagingItemReader<Customer> jdbcPagingItemReader() throws Exception { Map<String, Object> parameterValue = new HashMap<>(); parameterValue.put("age", 20); return new JdbcPagingItemReaderBuilder<Customer>() .name("jdbcPagingItemReader") .fetchSize(CHUNK_SIZE) .dataSource(dataSource) .rowMapper(new BeanPropertyRowMapper<>(Customer.class)) .queryProvider(queryProvider()) .parameterValues(parameterValue) .build(); }
※ queryProvider를 이용해서 만든 쿼리에 조건을 넣고 CHUNK_SIZE 만큼 읽는다.
4. 전체 코드
@Slf4j @Configuration @RequiredArgsConstructor public class JdbcPagingReaderJobConfig { public static final int CHUNK_SIZE = 2; public static final String ENCODING = "UTF-8"; public static final String JDBC_PAGING_CHUNK_JOB = "JDBC_PAGING_CHUNK_JOB"; private final DataSource dataSource; @Bean public JdbcPagingItemReader<Customer> jdbcPagingItemReader() throws Exception { Map<String, Object> parameterValue = new HashMap<>(); parameterValue.put("age", 20); return new JdbcPagingItemReaderBuilder<Customer>() .name("jdbcPagingItemReader") .fetchSize(CHUNK_SIZE) .dataSource(dataSource) .rowMapper(new BeanPropertyRowMapper<>(Customer.class)) .queryProvider(queryProvider()) .parameterValues(parameterValue) .build(); } @Bean public PagingQueryProvider queryProvider() throws Exception { SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean(); queryProvider.setDataSource(dataSource); queryProvider.setSelectClause("id, name, age, gender"); queryProvider.setFromClause("from customer2"); queryProvider.setWhereClause("where age >= :age"); Map<String, Order> sortKeys = new HashMap<>(1); sortKeys.put("id", Order.DESCENDING); queryProvider.setSortKeys(sortKeys); return queryProvider.getObject(); } @Bean public FlatFileItemWriter<Customer> customerFlatFileItemWriter() { return new FlatFileItemWriterBuilder<Customer>() .name("customerFlatFileItemWriter") .resource(new FileSystemResource("./customer_new_v1.csv")) .encoding(ENCODING) .delimited().delimiter("\t") .names("Name", "Age", "Gender") .build(); } @Bean public Step customerJdbcPagingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception { log.info("------------------ Init customerJdbcPagingStep -----------------"); return new StepBuilder("customerJdbcPagingStep", jobRepository) .<Customer, Customer>chunk(CHUNK_SIZE, transactionManager) .reader(jdbcPagingItemReader()) .writer(customerFlatFileItemWriter()) .build(); } @Bean public Job customerJdbcPagingJob(@Qualifier("customerJdbcPagingStep") Step customerJdbcPagingStep, JobRepository jobRepository) { log.info("------------------ Init customerJdbcPagingJob -----------------"); return new JobBuilder(JDBC_PAGING_CHUNK_JOB, jobRepository) .incrementer(new RunIdIncrementer()) .start(customerJdbcPagingStep) .build(); } }
※ DB에서 데이터를 읽고 CSV 파일에 읽은 데이터를 쓰고 저장한다.
6. DB 설정
datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/testdb?useUnicode=true&characterEncoding=utf8&clusterInstanceHostPattern=?&zeroDateTimeBehavior=CONVERT_TO_NULL&allowMultiQueries=true username: root password: root1234
* DB에 데이터를 데이터를 넣어놔야 읽고 CSV 파일로 만들 수 있기 때문에, docker를 이용해서 테이블 및 데이터를 준비 해둔다.
7. JdbcBatchItemWriter
- JdbcBatchItemWriter Spring Batch에서 제공하는 ItemWriter 인터페이스를 구현하는 클래스이다.
- 데이터를 JDBC를 통해 데이터베이스에 저장하는 데 사용된다.
- DataSource: 데이터베이스 연결 정보를 지정한다.
- SqlStatementCreator: INSERT 쿼리를 생성하는 역할을 한다.
- PreparedStatementSetter: INSERT 쿼리의 파라미터 값을 설정하는 역할을 한다.
- ItemSqlParameterSourceProvider: Item 객체를 기반으로 PreparedStatementSetter에 전달할 파라미터 값을 생성하는 역할을 한다.
장점
- 데이터베이스 연동: JDBC를 통해 다양한 데이터베이스에 데이터를 저장할 수 있다.
- 성능: 대량의 데이터를 빠르게 저장할 수 있다.
- 유연성: 다양한 설정을 통해 원하는 방식으로 데이터를 저장할 수 있다.
단점
- 설정 복잡성: JDBC 설정 및 쿼리 작성이 복잡할 수 있다.
- 데이터베이스 종속: 특정 데이터베이스에 종속적이다.
- 오류 가능성: 설정 오류 시 데이터 손상 가능성이 있다.
8 . 테이블 생성하기
create table testdb.customer2 ( id int auto_increment primary key, name varchar(100) null, age int null, gender varchar(10) null );
9. SqlPatameterSourceProvider 작성
public class CustomerItemSqlParameterSourceProvider implements ItemSqlParameterSourceProvider<Customer> { @Override public SqlParameterSource createSqlParameterSource(Customer item) { return new BeanPropertySqlParameterSource(item); } }
10. 전체코드
@Slf4j @Configuration @RequiredArgsConstructor public class JdbcBatchItemJobConfig { /** * CHUNK 크기를 지정한다. */ public static final int CHUNK_SIZE = 100; public static final String ENCODING = "UTF-8"; public static final String JDBC_BATCH_WRITER_CHUNK_JOB = "JDBC_BATCH_WRITER_CHUNK_JOB"; private final DataSource dataSource; @Bean public FlatFileItemReader<Customer> jdbcFlatFileItemReader() { return new FlatFileItemReaderBuilder<Customer>() .name("FlatFileItemReader") .resource(new ClassPathResource("./customer.csv")) .encoding(ENCODING) .delimited().delimiter(",") .names("name", "age", "gender") .targetType(Customer.class) .build(); } @Bean public JdbcBatchItemWriter<Customer> jdbcFlatFileItemWriter() { return new JdbcBatchItemWriterBuilder<Customer>() .dataSource(dataSource) .sql("INSERT INTO customer2 (name, age, gender) VALUES (:name, :age, :gender)") .itemSqlParameterSourceProvider(new CustomerItemSqlParameterSourceProvider()) .build(); } @Bean public Step jdbcFlatFileStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) { log.info("------------------ Init flatFileStep -----------------"); return new StepBuilder("jdbcFlatFileStep", jobRepository) .<Customer, Customer>chunk(CHUNK_SIZE, transactionManager) .reader(jdbcFlatFileItemReader()) .writer(jdbcFlatFileItemWriter()) .build(); } @Bean public Job jdbcBatchWriterChunkJob(@Qualifier("jdbcFlatFileStep") Step flatFileStep, JobRepository jobRepository) { log.info("------------------ Init flatFileJob -----------------"); return new JobBuilder(JDBC_BATCH_WRITER_CHUNK_JOB, jobRepository) .incrementer(new RunIdIncrementer()) .start(flatFileStep) .build(); } }
* CSV 파일을 읽고 객체와 매핑 후 DB에 데이터를 저장한다.
'Spring > Spring Batch' 카테고리의 다른 글
8. CompositeItemProcessor 으로 여러단계에 걸쳐 데이터 Transform하기 (0) 2024.12.02 7. MyBatisPagingItemReader로 DB내용을 읽고, MyBatisItemWriter로 DB에 쓰기 (2) 2024.11.18 4. FlatFileItemReader로 단순 파일 읽고, FlatFileItemWriter로 파일에 쓰기 (0) 2024.11.04 3.SpringBatch ChunkModel과 TaskletModel (1) 2024.10.21 2.SpringBatch 코드 설명 및 아키텍처 알아보기 (0) 2024.10.20