-
9. 입맛에 맞는 배치 처리를 위한 Custom ItemReader/ItemWriter 구현Spring/Spring Batch 2024. 12. 10. 17:07
이전 글에서는 ItemReader와 ItemWriter를 구현하는 방법에 대해 다뤘습니다. 기본 제공되는 ItemReader와 ItemWriter로 충분하지 않거나 복합한 쿼리를 수행하는 경우, 비지니스 로직에 맞게 Custom ItemReader와 ItemWriter 구현이 필요합니다.
이럴 땐, QueryDsl을 이용하여 Custom하게 작성 할 수 있습니다. 허나 문제가 있습니다.
Spring Batch는 공식적으로 QuerydslItemReader를 지원하지 않습니다. 이를 위해서는 AbstractPagingItemReader를 상속하여 QuerydslPagingItemReader를 생성해야 합니다.
Spring Batch ItemReader/ItemWriter 지원 목록
QuerydslPagingItemReader
public class QuerydslPagingItemReader<T> extends AbstractPagingItemReader<T> { private EntityManager em; private final Function<JPAQueryFactory, JPAQuery<T>> querySupplier; private final Boolean alwaysReadFromZero; public QuerydslPagingItemReader(EntityManagerFactory entityManagerFactory, Function<JPAQueryFactory, JPAQuery<T>> querySupplier, int chunkSize) { this(ClassUtils.getShortName(QuerydslPagingItemReader.class), entityManagerFactory, querySupplier, chunkSize, false); } public QuerydslPagingItemReader(String name, EntityManagerFactory entityManagerFactory, Function<JPAQueryFactory, JPAQuery<T>> querySupplier, int chunkSize, Boolean alwaysReadFromZero) { super.setPageSize(chunkSize); setName(name); this.querySupplier = querySupplier; this.em = entityManagerFactory.createEntityManager(); this.alwaysReadFromZero = alwaysReadFromZero; } @Override protected void doClose() throws Exception { if (em != null) em.close(); super.doClose(); } @Override protected void doReadPage() { initQueryResult(); JPAQueryFactory jpaQueryFactory = new JPAQueryFactory(em); long offset = 0; if (!alwaysReadFromZero) { offset = (long) getPage() * getPageSize(); } JPAQuery<T> query = querySupplier.apply(jpaQueryFactory).offset(offset).limit(getPageSize()); List<T> queryResult = query.fetch(); for (T entity: queryResult) { em.detach(entity); results.add(entity); } } private void initQueryResult() { if (CollectionUtils.isEmpty(results)) { results = new CopyOnWriteArrayList<>(); } else { results.clear(); } } }
- alwaysReadFromZero: 항상 0부터 페이징을 읽을지 여부를 지정한다. 만약 paging 처리된 데이터 자체를 수정하는경우 배치처리 누락이 발생할 수 있으므로 이를 해결하기 위한 방안으로 사용된다.
- AbstractPagingItemReader은 어댑터 패턴으로, 상속받는 쪽은 doReadPage만 구현하여 페이징 처리하여 데이터를 가져올 수 있도록 한다.
- doClose는 기본적으로 AbstractPagingItemReader를 자체 구현되어 있지만 EntityManager자원을 해제하기 위해서 em.close() 를 수행한다.
QuerydslPagingItemReader를 편하게 작성하기 위해 빌더 코드를 추가 해보자
public class QuerydslPagingItemReaderBuilder<T> { private EntityManagerFactory entityManagerFactory; private Function<JPAQueryFactory, JPAQuery<T>> querySupplier; private int chunkSize = 10; private String name; private Boolean alwaysReadFromZero; public QuerydslPagingItemReaderBuilder<T> entityManagerFactory(EntityManagerFactory entityManagerFactory) { this.entityManagerFactory = entityManagerFactory; return this; } public QuerydslPagingItemReaderBuilder<T> querySupplier(Function<JPAQueryFactory, JPAQuery<T>> querySupplier) { this.querySupplier = querySupplier; return this; } public QuerydslPagingItemReaderBuilder<T> chunkSize(int chunkSize) { this.chunkSize = chunkSize; return this; } public QuerydslPagingItemReaderBuilder<T> name(String name) { this.name = name; return this; } public QuerydslPagingItemReaderBuilder<T> alwaysReadFromZero(Boolean alwaysReadFromZero) { this.alwaysReadFromZero = alwaysReadFromZero; return this; } public QuerydslPagingItemReader<T> build() { if (name == null) { this.name = ClassUtils.getShortName(QuerydslPagingItemReader.class); } if (this.entityManagerFactory == null) { throw new IllegalArgumentException("EntityManagerFactory can not be null.!"); } if (this.querySupplier == null) { throw new IllegalArgumentException("Function<JPAQueryFactory, JPAQuery<T>> can not be null.!"); } if (this.alwaysReadFromZero == null) { alwaysReadFromZero = false; } return new QuerydslPagingItemReader<>(this.name, entityManagerFactory, querySupplier, chunkSize, alwaysReadFromZero); } }
소스 샘플
@Slf4j @Configuration @RequiredArgsConstructor public class QueryDSLPagingReaderJobConfig { /** * CHUNK 크기를 지정한다. */ public static final int CHUNK_SIZE = 2; public static final String ENCODING = "UTF-8"; public static final String QUERYDSL_PAGING_CHUNK_JOB = "QUERYDSL_PAGING_CHUNK_JOB"; private final DataSource dataSource; private final EntityManagerFactory entityManagerFactory; // @Bean // public QuerydslPagingItemReader<Customer> customerQuerydslPagingItemReader() throws Exception { // // Function<JPAQueryFactory, JPAQuery<Customer>> query = jpaQueryFactory -> jpaQueryFactory.select(QCustomer.customer).from(QCustomer.customer); // // return new QuerydslPagingItemReader<>("customerQuerydslPagingItemReader", entityManagerFactory, query, CHUNK_SIZE, false); // } @Bean public QuerydslPagingItemReader<Customer> customerQuerydslPagingItemReader() { return new QuerydslPagingItemReaderBuilder<Customer>() .name("customerQuerydslPagingItemReader") .entityManagerFactory(entityManagerFactory) .chunkSize(2) .querySupplier(jpaQueryFactory -> jpaQueryFactory.select(QCustomer.customer).from(QCustomer.customer).where(QCustomer.customer.age.gt(20))) .build(); } @Bean public FlatFileItemWriter<Customer> customerQuerydslFlatFileItemWriter() { return new FlatFileItemWriterBuilder<Customer>() .name("customerQuerydslFlatFileItemWriter") .resource(new FileSystemResource("./customer_new_v2.csv")) .encoding(ENCODING) .delimited().delimiter("\t") .names("Name", "Age", "Gender") .build(); } @Bean public Step customerQuerydslPagingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception { log.info("------------------ Init customerQuerydslPagingStep -----------------"); return new StepBuilder("customerJpaPagingStep", jobRepository) .<Customer, Customer>chunk(CHUNK_SIZE, transactionManager) .reader(customerQuerydslPagingItemReader()) .processor(new CustomerItemProcessor()) .writer(customerQuerydslFlatFileItemWriter()) .build(); } @Bean public Job customerQueryDslJpaPagingJob(@Qualifier("customerQuerydslPagingStep") Step customerJdbcPagingStep, JobRepository jobRepository) { log.info("------------------ Init customerQueryDslJpaPagingJob -----------------"); return new JobBuilder(QUERYDSL_PAGING_CHUNK_JOB, jobRepository) .incrementer(new RunIdIncrementer()) .start(customerJdbcPagingStep) .build(); } }
CustomItemWriter
- CustomItemWriter는 Spring Batch에서 제공하는 기본 ItemWriter 인터페이스를 구현하여 직접 작성한 ItemWriter 클래스이다.
- 기본 ItemWriter 클래스로는 제공되지 않는 특정 기능을 구현할 때 사용된다
구성 요소
- ItemWriter 인터페이스 구현: write() 메소드를 구현하여 원하는 처리를 수행한다.
- 필요한 라이브러리 및 객체 선언: 사용할 라이브러리 및 객체를 선언한다.
- 데이터 처리 로직 구현: write() 메소드에서 데이터 처리 로직을 구현한다.
장점
- 유연성: 기본 ItemWriter 클래스로는 제공되지 않는 특정 기능을 구현할 수 있다.
- 확장성: 다양한 방식으로 데이터 처리를 확장할 수 있다.
- 제어 가능성: 데이터 처리 과정을 완벽하게 제어할 수 있다.
단점
- 개발 복잡성: 기본 ItemWriter 클래스보다 개발 과정이 더 복잡하다.
- 테스트 어려움: 테스트 작성이 더 어려울 수 있다.
- 디버깅 어려움: 문제 발생 시 디버깅이 더 어려울 수 있다.
CustomService
- 청크 아이템을 받아서 호출하는 서비스를 작성한다.
@Slf4j @Component public class CustomItemWriter implements ItemWriter<CustomerDto> { private final CustomService customService; public CustomItemWriter(CustomService customService) { this.customService = customService; } @Override public void write(Chunk<? extends CustomerDto> chunk) throws Exception { for (CustomerDto customer: chunk) { log.info("Call Porcess in CustomItemWriter..."); customService.processToOtherService(customer); } } }
전체코드
@Slf4j @Configuration @RequiredArgsConstructor public class MybatisItemWriterJobConfig { /** * CHUNK 크기를 지정한다. */ public static final int CHUNK_SIZE = 100; public static final String ENCODING = "UTF-8"; public static final String MY_BATIS_ITEM_WRITER = "MY_BATIS_ITEM_CUSTOM_WRITER"; private final DataSource dataSource; private final SqlSessionFactory sqlSessionFactory; private final CustomItemWriter customItemWriter; @Bean public FlatFileItemReader<CustomerDto> flatFileItemReader9() { return new FlatFileItemReaderBuilder<CustomerDto>() .name("flatFileItemReader9") .resource(new ClassPathResource("./customer.csv")) .encoding(ENCODING) .delimited().delimiter(",") .names("name", "age", "gender") .targetType(CustomerDto.class) .build(); } @Bean public Step flatFileStep9(JobRepository jobRepository, PlatformTransactionManager transactionManager) { log.info("------------------ Init flatFileStep -----------------"); return new StepBuilder("flatFileStep", jobRepository) .<CustomerDto, CustomerDto>chunk(CHUNK_SIZE, transactionManager) .reader(flatFileItemReader9()) .writer(customItemWriter) .build(); } @Bean public Job flatFileJob9(@Qualifier("flatFileStep9") Step flatFileStep, JobRepository jobRepository) { log.info("------------------ Init flatFileJob -----------------"); return new JobBuilder(MY_BATIS_ITEM_WRITER, jobRepository) .incrementer(new RunIdIncrementer()) .start(flatFileStep) .build(); } }
'Spring > Spring Batch' 카테고리의 다른 글
10. 스프링배치 플로우 컨트롤 하기 (0) 2024.12.10 8. CompositeItemProcessor 으로 여러단계에 걸쳐 데이터 Transform하기 (0) 2024.12.02 7. MyBatisPagingItemReader로 DB내용을 읽고, MyBatisItemWriter로 DB에 쓰기 (2) 2024.11.18 5. JdbcPagingItemReader로 DB내용을 읽고, JdbcBatchItemWriter로 DB에 쓰기 (0) 2024.11.05 4. FlatFileItemReader로 단순 파일 읽고, FlatFileItemWriter로 파일에 쓰기 (0) 2024.11.04