ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 4. FlatFileItemReader로 단순 파일 읽고, FlatFileItemWriter로 파일에 쓰기
    Spring/Spring Batch 2024. 11. 4. 17:49

    CSV 파일을 읽고 내용을 집계하여 CSV 파일로 쓰도록 FlatFileItemReaderFlatFileItemWriter에 대해 알아보자

     

    1.FlatFileItemReader

    • FlatFileItemReader는 Spring Batch에서 제공하는 기본적인 ItemReader로, 텍스트 파일로부터 데이터를 읽습니다.
    • 고정 길이, 구분자 기반, 멀티라인 등 다양한 형식의 텍스트 파일을 지원하며, 다음과 같은 장점을 가집니다.
    • 간단하고 효율적인 구현: 설정 및 사용이 간편하며, 대규모 데이터 처리에도 효율적입니다.
    • 다양한 텍스트 파일 형식 지원: 고정 길이, 구분자 기반, 멀티라인 등 다양한 형식의 텍스트 파일을 읽을 수 있습니다.
    • 확장 가능성: 토크나이저, 필터 등을 통해 기능을 확장할 수 있습니다.
    • 사용처: 고정 길이, 구분자 기반, 멀티라인 등 다양한 형식의 텍스트 파일 데이터 처리

    FlatFileItemReader 장점

    • 간단하고 효율적인 구현, 다양한 텍스트 파일 형식 지원

    FlatFileItemReader 단점

    • 복잡한 데이터 구조 처리에는 적합하지 않음

    2.FlatFileItemWriter

    • FlatFileItemWriter는 Spring Batch에서 제공하는 ItemWriter 인터페이스를 구현하는 클래스
    • 데이터를 텍스트 파일로 출력하는 데 사용

    FlatFileItemWriter 장점

    • 간편성: 텍스트 파일로 데이터를 출력하는 간편한 방법을 제공한다.
    • 유연성: 다양한 설정을 통해 원하는 형식으로 출력 파일을 만들 수 있다.
    • 성능: 대량의 데이터를 빠르게 출력할 수 있다.

    FlatFileItemWriter

    • 형식 제약: 텍스트 파일 형식만 지원한다.
    • 복잡한 구조: 복잡한 구조의 데이터를 출력할 경우 설정이 복잡해질 수 있다.
    • 오류 가능성: 설정 오류 시 출력 파일이 손상될 수 있다.

    3. CSV로 부터 정보를 읽기 위한 객체 정의

    @Setter
    @Getter
    public class Customer {
    
    	private String name;
    	private int age;
    	private String gender;
    
    	public Customer() {
    	}
    	
    }

     

    ※ Customer 객체에 Setter와 기본 생성자가 필요한 이유

     

    1. 객체 생성 및 필드 값 할당을 위한 기본 생성자

    FlatFileItemReader는 리플렉션(reflection)을 사용하여 각 레코드를 읽고 매핑할 객체의 인스턴스를 생성합니다.

    이때, 기본 생성자가 없으면 객체를 생성할 수 없으므로 FlatFileItemReader는 기본 생성자가 반드시 있어야 합니다. 기본 생성자가 없으면 예외가 발생하게 됩니다.

     

    2. CSV 데이터 값을 객체 필드에 할당하기 위한 Setter 메서드

     

    FlatFileItemReader는 각 CSV 데이터 필드를 매핑할 객체의 필드에 할당하기 위해 Setter 메서드를 사용합니다.

    예를 들어, CSV 파일의 name, age, gender 데이터를 Customer 객체에 할당하려면 Customer 클래스에 setName(), setAge(), setGender() 메서드가 필요합니다. 이 방법으로 FlatFileItemReader는 필드 값을 각 필드에 쉽게 할당할 수 있습니다.

    만약 setter가 없다면 필드에 접근할 방법이 없기 때문에 값이 제대로 할당되지 않습니다.

     

    요약

     

    기본 생성자는 객체 인스턴스를 만들기 위해 필수입니다.

    Setter 메서드는 리플렉션을 통해 CSV의 각 필드 값을 객체의 필드에 할당하기 위해 필요합니다.


    4.FlatFileItemReader 작성

     

    @Bean
    public FlatFileItemReader<Customer> flatFileItemReader() {
    
    		return new FlatFileItemReaderBuilder<Customer>()
    			.name("FlatFileItemReader")
    			.resource(new ClassPathResource("./customer.csv"))
    			.encoding(ENCODING)
    			.delimited().delimiter(",")
    			.names("name", "age", "gender")
    			.targetType(Customer.class)
    			.build();
    	}
    • name : FlatFileItemReader 빈의 이름을 지정한다.
    • resource : 읽어들일 파일의 위치를 지정한다.
    • encoding : 파일의 인코딩을 지정한다.
    • delimited().delimiter() : 읽어들일 정보간 구분자를 지정한다.
    • names : 구분자로 구분된 데이터의 이름을 지정한다.
    • targerType : 매핑할 객체 타입을 지정한다.

    5. FlatFileItemWriter

    @Bean
    public FlatFileItemWriter<Customer> flatFileItemWriter() {
    
    		return new FlatFileItemWriterBuilder<Customer>()
    			.name("flatFileItemWriter")
    			.resource(new FileSystemResource("./customer_new.csv"))
    			.encoding(ENCODING)
    			.delimited().delimiter("\t")
    			.names("Name", "Age", "Gender")
    			.append(false)
    			.lineAggregator(new CustomerLineAggregator())
    			.headerCallback(new CustomerHeader())
    			.footerCallback(new CustomerFooter(aggregateInfos))
    			.build();
    	}
    • name : FlatFileItemWriter 빈의 이름을 지정한다.
    • resource : 읽어들일 파일의 위치를 지정한다.
    • encoding : 파일의 인코딩을 지정한다.
    • delimited().delimiter() : 읽어들일 정보간 구분자를 지정한다.
    • names : 구분자로 구분된 데이터의 이름을 지정한다.
    • append : 기존 파일에 추가 할지 true or false로 지정한다.
    • lineAggergator : 라인 구분자를 지정한다.
    • headerCallback : 출력 파일의 헤더를 지정할 수 있도록 한다.
    • footerCallback : 출력 파일의 푸터를 지정할 수 있도록 한다.
    • targerType : 매핑할 객체 타입을 지정한다.

    6. CustomerLineAggregator 작성

    public class CustomerLineAggregator implements LineAggregator<Customer> {
    	@Override
    	public String aggregate(Customer item) {
    		return item.getName() + "," + item.getAge();
    	}
    }
    • FlatFile에 저장할 아이템들을 스트링으로 변환하는 방법을 지정

    7. CustomerHeader 작성

    public class CustomerHeader implements FlatFileHeaderCallback {
    	@Override
    	public void writeHeader(Writer writer) throws IOException {
    		writer.write("ID,AGE");
    	}
    }
    • FlatFileHeaderCallback은 writeHeader를 구현하고, 출력될 파일의 헤더를 달아주는 역할을 한다.

    8. CustomerFooter 작성

    @Slf4j
    public class CustomerFooter implements FlatFileFooterCallback {
    	ConcurrentHashMap<String, Integer> aggregateCustomers;
    
    	public CustomerFooter(ConcurrentHashMap<String, Integer> aggregateCustomers) {
    		this.aggregateCustomers = aggregateCustomers;
    	}
    
    	@Override
    	public void writeFooter(Writer writer) throws IOException {
    		writer.write("총 고객 수: " + aggregateCustomers.get("TOTAL_CUSTOMERS"));
    		writer.write(System.lineSeparator());
    		writer.write("총 나이: " + aggregateCustomers.get("TOTAL_AGES"));
    	}
    }
    • FlatFileFooterCallback 은 푸터를 작성할때 사용한다.
    • 결과를 집계하여 총 고객수와 총 나이를 출력하고 있다.
    • 이때 전달된 HashMap을 전달하여, 결과를 출력하고 있음을 확인하자.

    9. AggregateCustomerProcessor 작성

    csv로 부터 읽은 데이터를 객체에 매핑한 결과를 writer를 쓰기 전에 별도의 처리를 하는 클래스이다.

    @Slf4j
    public class AggregateCustomerProcessor implements ItemProcessor<Customer, Customer> {
    
    	ConcurrentHashMap<String, Integer> aggregateCustomers;
    
    	public AggregateCustomerProcessor(ConcurrentHashMap<String, Integer> aggregateCustomers) {
    		this.aggregateCustomers = aggregateCustomers;
    	}
    
    	@Override
    	public Customer process(Customer item) throws Exception {
    		aggregateCustomers.putIfAbsent("TOTAL_CUSTOMERS", 0);
    		aggregateCustomers.putIfAbsent("TOTAL_AGES", 0);
    
    		aggregateCustomers.put("TOTAL_CUSTOMERS", aggregateCustomers.get("TOTAL_CUSTOMERS") + 1);
    		aggregateCustomers.put("TOTAL_AGES", aggregateCustomers.get("TOTAL_AGES") + item.getAge());
    		return item;
    	}
    }
    • ItemProcessor은 process 메소드를 구현하였으며, 각 아이템을 하나씩 읽고 아이템의 내용을 집계

    전체소스

    @Slf4j
    @Configuration
    public class FlatFileItemJobConfig {
    
    	/**
    	 * CHUNK 크기를 지정한다.
    	 */
    	public static final int CHUNK_SIZE = 100;
    	public static final String ENCODING = "UTF-8";
    	public static final String FLAT_FILE_WRITER_CHUNK_JOB = "FLAT_FILE_WRITER_CHUNK_JOB";
    
    	private ConcurrentHashMap<String, Integer> aggregateInfos = new ConcurrentHashMap<>();
    
    	private final ItemProcessor<Customer, Customer> itemProcessor = new AggregateCustomerProcessor(aggregateInfos);
    
    	@Bean
    	public FlatFileItemReader<Customer> flatFileItemReader() {
    
    		return new FlatFileItemReaderBuilder<Customer>()
    			.name("FlatFileItemReader")
    			.resource(new ClassPathResource("./customer.csv"))
    			.encoding(ENCODING)
    			.delimited().delimiter(",")
    			.names("name", "age", "gender")
    			.targetType(Customer.class)
    			.build();
    	}
    
    	@Bean
    	public FlatFileItemWriter<Customer> flatFileItemWriter() {
    
    		return new FlatFileItemWriterBuilder<Customer>()
    			.name("flatFileItemWriter")
    			.resource(new FileSystemResource("./customer_new.csv"))
    			.encoding(ENCODING)
    			.delimited().delimiter("\t")
    			.names("Name", "Age", "Gender")
    			.append(false)
    			.lineAggregator(new CustomerLineAggregator())
    			.headerCallback(new CustomerHeader())
    			.footerCallback(new CustomerFooter(aggregateInfos))
    			.build();
    	}
    
    
    	@Bean
    	public Step flatFileStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    		log.info("------------------ Init flatFileStep -----------------");
    
    		return new StepBuilder("flatFileStep", jobRepository)
    			.<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
    			.reader(flatFileItemReader())
    			.processor(itemProcessor)
    			.writer(flatFileItemWriter())
    			.build();
    	}
    
    	@Bean
    	public Job flatFileJob(@Qualifier("flatFileStep") Step flatFileStep, JobRepository jobRepository) {
    		log.info("------------------ Init flatFileJob -----------------");
    
    		return new JobBuilder(FLAT_FILE_WRITER_CHUNK_JOB, jobRepository)
    			.incrementer(new RunIdIncrementer())
    			.start(flatFileStep)
    			.build();
    	}
    }

     


    ※ 이전 3장에서 작성 한 batch가 있다면, step 및 job 구성 시에 @Qulifier를 이용하여 각 Job이 필요한 Bean을 넣도록 수정하자

    Spring Batch는 초기 실행 시 Job, Step 인터페이스를 구현한 모든 Bean을 등록하려고 하기 때문에 이름 지정을 통해 필요한

    Bean을 주입하는 것이 필요하다.

     

    Caused by: java.lang.IllegalArgumentException: Job name must be specified in case of multiple jobs 예외 발생

    해당 문제는 Spring Boot 3 버전에서부터 Spring Batch 사용 시 업데이트 된 내용 때문이었다.

     

    기존에는 스프링 부트가 구동될 때 모든 Job 빈들을 읽어 실행하는 구조였다면, 새로운 버전에서부터는 복수 개의 Job이 컨텍스트 내에 정의되어 있다면 부트 구동 시점에 가동시킬 Job을 프로프티에 명시해야한다. (하나만 등록 가능)

     

    만약 컨텍스트 내에 하나의 Job 존재한다면, 해당 Job 별도의 명시 없이 부트 구동 시점에 실행된다.

     

    해결방법

    # application.yml
    
    spring:
      batch:
        job:
          name: myJob

     

    초기 구동 시 실행 할 Job을 명시해준다.

    댓글

Designed by Tistory.