본문 바로가기

Reading Record/스프링 배치 완벽 가이드

7장 ItemReader: JDBC 부터

7장 ItemRader - 데이터베이스 입력

JDBC

스프링 배치는 한 번에 처리할 만큼의 레코드만 로딩하는 기법을 두가지 제공한다.

  • 커서
  • 페이징

커서는 ResultSet으로 구현되고 ResultSet이 open 되면 next() 메서드를 호출 할 때 마다 데이터 베이스에서 배치 레코드를 가져와 반환한다. 이러한 방식을 스트리밍이라고 부른다.

페이징은 청크 사이즈만큼의 레코드를 페이지수 만큼 가져오는 방식이다.

JDBC 커서 처리

JdbcCursorItemReader를 구성하면 되는데 이를 위해 최소 3개의 의존성이 필요하다.

  • 데이터 소스
  • 실행할 쿼리
  • RowMapper
public class CustomerJdbcRowMapper implements RowMapper<CustomerJdbc> {
    @Override
    public CustomerJdbc mapRow(ResultSet rs, int rowNum) throws SQLException {
        return CustomerJdbc.builder()
                .id(rs.getLong("id"))
                .address(rs.getString("address"))
                .city(rs.getString("city"))
                .firstName(rs.getString("firstName"))
                .lastName(rs.getString("lastName"))
                .middleInitial(rs.getString("middleInitial"))
                .state(rs.getString("state"))
                .zipCode(rs.getString("zipCode"))
                .build();
    }
}

RowMapper는 스프링 프레임워크 코어가 제공하는 JDBC 지원 표준 컴포넌트이다. ResultSet에서 로우 하나를 도메인 객체로 맵핑하는 역할을 한다.

단, 모든 데이터 베이스가 ResultSet으로 데이터를 스트리밍 하지는 않기 때문에 사용하는 데이터베이스의 메뉴얼을 숙지해야한다.

쿼리 파라미터가 필요한 쿼리를 작성한 경우 ArgumentPreparedStatementSetter를 이용하여 적용할 수 있다.

@Bean(CURSOR_READER_NAME)
public JdbcCursorItemReader<CustomerJdbc> customerJdbcItemReader(DataSource dataSource) {
    return new JdbcCursorItemReaderBuilder<CustomerJdbc>()
            .name(CURSOR_READER_NAME)
            .dataSource(dataSource)
            .sql("SELECT * FROM customer WHERE city = ?")
            .preparedStatementSetter(citySetter(null))
            .rowMapper(new CustomerJdbcRowMapper())
            .build();
}

@Bean
@StepScope
public ArgumentPreparedStatementSetter citySetter(
    @Value("#{jobParameters['city']}") String city
) {
    return new ArgumentPreparedStatementSetter(new Object[]{city});
}

커서 방식, 스트리밍 처리방식은 많은 양의 레코드를 처리할 때 네트워크 오버헤드의 문제가 발생할 수 있고, ResultSet이 스레드 세이프하지 않기 때문에 멀티 스레드 환경에서 사용하기 에는 적합하지 않다.

JDBC 페이징 처리

페이지라 부르는 청크로 결과 목록을 반환한다. 커서와 같이 레코드 처리 자체에는 차이가 없고 데이터베이스에서 로우를 가져오는 방식의 차이가 있을뿐이다.

페이징 기법을 사용하기 위해선 반환될 레코드의 개수를 나타내는 페이지의 크기, 처리중인 페이지를 나타내는 페이지 번호가 존재하는 쿼리를 작성할 수 있어야 한다.

PagingQueryProvider 인터페이스의 구현체를 사용하면 되고 두가지 선택지가 존재한다.

  • 직접 데이터베이스 전용 구현체를 만든다.
  • SqlPagingQueryProviderFactoryBean을 사용하여 구현체를 주입받는다.

따라서 JdbcPagingItemReader를 구성하고 사용하려면 적어도 네 가지 의존성이 필요하다.

  • 데이터 소스
  • PagingQueryProvider 구현체
  • RowMapper 구현체
  • 페이지 크기
  • 필요에 따라 파라미터 주입
@StepScope
@Bean(PAGING_READER_NAME)
public JdbcPagingItemReader<CustomerJdbc> customerJdbcPagingItemReader(
        DataSource dataSource,
        PagingQueryProvider queryProvider,
        @Value("#{jobParameters['city']}") String city
) {
    Map<String, Object> parameters = new HashMap<>();
    parameters.put("city", city);

    return new JdbcPagingItemReaderBuilder<CustomerJdbc>()
            .name(PAGING_READER_NAME)
            .dataSource(dataSource)
            .queryProvider(queryProvider)
            .parameterValues(parameters)
            .pageSize(10)
            .rowMapper(new CustomerJdbcRowMapper())
            .build();
}

@Bean
public SqlPagingQueryProviderFactoryBean pagingQueryProviderFactoryBean(DataSource dataSource) {
    SqlPagingQueryProviderFactoryBean factoryBean = new SqlPagingQueryProviderFactoryBean();

    factoryBean.setSelectClause("SELECT *");
    factoryBean.setFromClause("FROM customer");
    factoryBean.setWhereClause("WHERE city = :city");
    factoryBean.setSortKey("lastName");
    factoryBean.setDataSource(dataSource);
    //factoryBean.setDatabaseType("H2");

    return factoryBean;
}

페이징을 할 때 레코드 순서의 보장을 위해 항상 정렬을 해줘야한다.

정렬키가 ResultSet에서 중복되면 페이징이 제대로 되지 않는다.

Datasource를 주입하여 데이터 베이스 타입을 추론하도록 할수도 있지만 명시적으로 데이터 베이스 타입을 지정할 수 있다.

하이버네이트

배치처리에서 하이버네이트를 있는 그대로 사용하면 일반적인 스테이트풀 세션 구현체를 사용하고 청크단위 작업이 끝날때까지 세션에 객체를 저장하다가 OOM 이 발생하게 될 수 있다.

그래서 스프링 배치가 제공하는 하이버네이트 기반의 ItemReader는 이러한 문제점을 해결하기 위해 ItemReader가 커밋할 때 세션을 플러시하며 위 문제를 해결한다.

하이버네이트 커서 처리

하이버네이트로 커서를 사용하기 위해서 다음 준비가 필요하다.

  • sessiongFactory
  • Customer 매핑
  • HibernateCursorItemReader
  • 하이버네이트 의존성 추가 (책에서는 JPA 의존성을 추가하였다.)

스프링 배치는 기본 TransactionManager로 DataSourceTransactionManager를 사용한다. 그러나 하이버네이트는 세션을 사용하기 때문에 이러한 세션을 아우르는 HibernateTransactionManager가 필요하다. 이를 위해 DefaultBatchConfigurer를 사용한다.

@Component
public class HibernateBatchConfigurer extends DefaultBatchConfigurer {
    private final DataSource dataSource;
    private final SessionFactory sessionFactory;
    private PlatformTransactionManager transactionManager;

    public HibernateBatchConfigurer(DataSource dataSource, EntityManagerFactory entityManagerFactory) {
        super(dataSource);
        this.dataSource = dataSource;
        this.sessionFactory = entityManagerFactory.unwrap(SessionFactory.class);
        this.transactionManager = new HibernateTransactionManager(this.sessionFactory);
    }

    @Override
    public PlatformTransactionManager getTransactionManager() {
        return this.transactionManager;
    }
}

트랜잭션 매니저와 아래 요소를 토대로 하이버네이트 커서 리더를 구성한다.

  • 이름
  • SessionFactory
  • 쿼리 문자열
  • 파라미터
@StepScope
@Bean(CURSOR_READER_NAME)
public HibernateCursorItemReader<CustomerJdbc> customerJdbcCursorItemReader(
        EntityManagerFactory entityManagerFactory,
        @Value("#{jobParameters['city']}") String city
) {
    return new HibernateCursorItemReaderBuilder<CustomerJdbc>()
            .name(CURSOR_READER_NAME)
            .sessionFactory(entityManagerFactory.unwrap(SessionFactory.class))
            .queryString("* FROM customer_jdbc WHERE city = :city")
            .parameterValues(Collections.singletonMap("city",city))
            .build();
}

하이버네이트 페이징 처리

하이버네이트 커서 처리와 다른점은 ItemReader가 사용할 페이지 크기를 정하는 점 밖에 없다.

@StepScope
@Bean(PAGING_READER_NAME)
public HibernatePagingItemReader<CustomerEntity> customerHibernatePagingItemReader(
        EntityManagerFactory entityManagerFactory,
        @Value("#{jobParameters['city']}") String city
) {
    return new HibernatePagingItemReaderBuilder<CustomerEntity>()
            .name(PAGING_READER_NAME)
            .sessionFactory(entityManagerFactory.unwrap(SessionFactory.class))
            .queryString("FROM customer_entity WHERE city = :city")
            .parameterValues(Collections.singletonMap("city", city))
            .pageSize(10)
            .build();
}

JPA

스프링 부트 스타터 의존성을 하용한다면 하이버네이트를 사용할때 생성한 별도의 TransactionManager를 구현할 필요가 없다. JpaTransactionManager를 제공하기 때문이다.

JPA 커서 처리

책에서 JPA는 커서 처리를 지원하지 않는다고 하지만 스프링 배치4.3(스프링 부트 2.4) 기준 지원하고 있다.

@Bean(CURSOR_READER_NAME)
public JpaCursorItemReader<CustomerEntity> jpaCursorItemReader(
        EntityManagerFactory entityManagerFactory,
        @Value("#{jobParameters['city']}") String city
) {
    return new JpaCursorItemReaderBuilder<CustomerEntity>()
            .name(CURSOR_READER_NAME)
            .entityManagerFactory(entityManagerFactory)
            .queryString("SELECT c FROM customer_entity c")
            .parameterValues(Collections.singletonMap("city", city))
            .build();
}

JPA 페이징 처리

JpaPaingItemReader를 사용하면 되는데 아래 네 가지 의존성이 필요하다

  • 이름
  • EntityManger
  • 쿼리
  • 파라미터
@StepScope
@Bean(Paging_READER_NAME)
public JpaPagingItemReader<CustomerEntity> jpaPagingItemReader(
        EntityManagerFactory entityManagerFactory,
        @Value("#{jobParameters['city']}") String city
) {
    CustomerByCityQueryProvider cityQueryProvider = new CustomerByCityQueryProvider(city);
    return new JpaPagingItemReaderBuilder<CustomerEntity>()
            .name(Paging_READER_NAME)
            .entityManagerFactory(entityManagerFactory)
            .queryProvider(cityQueryProvider)
            .build();
}

쿼리를 String 으로 작성하는 방법이 있는 반면, JpaQueryProvider 구현체를 생성하여 Query 객체를 사용하는 방법도 있다.

public class CustomerByCityQueryProvider extends AbstractJpaQueryProvider {
    private final String city;

    public CustomerByCityQueryProvider(String city) {
        this.city = city;
    }

    @Override
    public Query createQuery() {
        EntityManager entityManager = getEntityManager();

        Query query = entityManager.createQuery("SELECT c FROM customer_entity c WHERE c.city = :city");
        query.setParameter("city", city);

        return query;
    }

    @Override
    public void afterPropertiesSet() {
        Assert.notNull(city, "도시 이름이 비어있음");
    }
}

저장 프로시저

저장 프로시저는 따로 정리하지 않는다.

스프링 데이터

스프링 데이터에서 지금까지 RDS 계열을 다뤄왔다면 이번에는 NoSQL 저장소를 다룬다.

몽고DB

문서(JSON, BSON)기반 데이터베이스이다.

페이징 처리

MongoItemReader는 페이지 기반 ItemReader로 아래와 같은 의존성이 필요하다.

  • MongoOperations 구현체 (MongoTemplate이 필요하다.)
  • name: saveState가 true일 때 스테이트풀한 스프링 배치 컴포넌트 처럼 실행 상태를 저장하기 위해 필요하다.
  • targetType: 읽은 데이터를 어떠한 클래스 타입으로 변환할지 정한다.
  • JSON 기반 쿼리, Query 기반 인스턴스: 실행할 쿼리

그 외에도 정렬, 힌트, 필드목록 질의 대상등이 있다.
그리고 의존성으로 spring-boot-starter-data-mongo 를 추가해야 한다.

스프링 데이터 리포지터리

스프링 데이터 프로젝트의 목적은 일관성 있는 프로그래밍 모델을 제공하는 것이다. 이러한 일관성을 보장하기 위해 중요한 부분이 Repository의 추상화이다. Repository를 추상화하고 이를 정의하면 기본적은 crud를 할수 있다.

즉, 어떠한 스프링 데이터와 연관된 데이터 저장소건 Repository의 메커니즘에 필요한 부분을 구현하면, 스프링 배치에서 직접 지원하지 않는 데이터 저장소도 사용할 수 있게 된다는 것이다.

이렇게 쉽게 스프링 데이터가 지원하는 데이터 저장소를 스프링 배치로 사용할 수 있는것은 RepositoryItemReader가 내부적으로 PagingAndSortingRepsitory 인터페이스를 사용하기 때문인데 스프링 데이터에서 지원하는 데이터 저장소는 대부분 PagingAndSortingRepsitory 인터페이스를 기반으로 구현체가 존재한다.

기존서비스

이미 검증된 서비스 로직을 이용해서 ItemReaderAdapter에서 사용 할 수 있다.

후술할 커스텀 입력에도 해당하는 내용인데, 서비스 로직은 변경이 잦다고 생각한다. 따라서 이를 그대로 사용하면 유지보수성이 떨어질 것 같다. 그리고 기존 서비스 로직은 배치 모듈에 존재하지 않을 가능성이 굉장히 크기 때문에 책에 나온 방식을 그대로 사용하려면 모듈의 의존관계가 이상해지거나 불필요하게 의존성을 추가해줘야할 것이라고 생각한다.

정말 반드시 사용해야 겠다면 직접 기존 서비스에 구현을 하는 방식이 아닌 조합의 방식으로 배치 모듈에서 해당 서비스를 한번 감싸서 사용해야하지 않을까 생각한다.

@Bean
public ItemReaderAdapter<Customer> customerItemReader(CustomerService customerService) {
    ItemReaderAdapter<Customer> adapter = new ItemReaderAdapder<>();

    adapter.setTargetObject(customerService);
    adapter.setTargetMethod("getCustomer");

    return adapter;
}

커스텀 입력

기존 서비스를 사용하는 방식에서 ItemReader를 구현하고 ItemStreamSupport를 구현하도록 하여 바로 ItemReader로 사용 할 수 있다.

ItemReader를 구현할 때는 기존에 호출하던 메서드의 이름을 read로 변경하여 오버라이드하면 바로 사용할 수 있다.

추가로 이전 종료 시점부터 ItemReader를 다시 시작하고 싶다면 ItemStreamSupport를 구현하여 open, update, close를 오버라이드 하도록 한다.

  • open: ItemReader에서 필요한 상태를 초기화 하는 작업
    • ex) 다시 잡을 실행할 때 건너 뛸 레코드를 설정
  • update: 스프링 배치가 잡의 상태를 갱신하는 작업
    • ex) 얼마나 많은 레코드, 청크가 처리됐는지 기록
  • close: 리소스를 닫는 작업
    • ex) 파일을 닫는 작업

에러 처리

레코드 건너뛰기

레코드를 건너뛸때 두 가지를 고려해야한다.

  • 어떠한 에러를 건너뛸 것인가?
    • 반대로 어떠한 에러는 허용할 것인가?
  • 몇번 까지 에러를 허용할 것인가?
@Bean
public Step copyFileStep() {
    return this.stepBuilderFactory.get("copyFileStep")
        .<Customer, Customer>chunk(10)
        .reader(customerItemReader())
        .writer(itemWriter())
        .faultTolernat()
        .skip(Exception.class)
        .noSkip(ParseException.class)
        .skipLimit(10)
        .build()
}

이렇게 분산된 설정을 SkipPolicy 구현체로 표현할 수 있다.

public class FileVerificationSkipper implements SkipPolicy {
    @Override
    public boolean shouldSkip(Throwable exception, int skipCount) throws SkipLimitExceededException {
        if(exception instanceof FileNotFoundException) {
            return false;
        } else if(exception instanceof ParseException && skip <= 10) {
            return true;
        } else {
            return false;
        }
    }
}

잘못된 레코드 로그 남기기

잘못된 로그를 기록하는 방법으로 StepListener + @OnReadError를 이용할 수 있다.

@Slf4j
public class CustomerItemReadErrorListener {
    @OnReadError
    public void onReadError(Exception e) {
        if (e instanceof FlatFileParseException) {
            FlatFileParseException ffpe = (FlatFileParseException) e;

            String errorMessage = ffpe.getLineNumber() + "번째 줄에서 에러 발생" +
                    System.lineSeparator() +
                    "input: " + ffpe.getInput();

            log.error(errorMessage);
        } else {
            log.error("에러 발생");
        }
    }
}

입력이 없을때의 처리

빈 입력을 처리하는 배치처리를 실패처리 하고 부가작업을 실행하고 싶다면 StepListener + @AfterStep를 이용할 수 있다.

public class EmptyInputStepFailer {

    @AfterStep
    public ExitStatus afterStep(StepExecution stepExecution){
        if(stepExecution.getReadCount() > 0){
            return stepExecution.getExitStatus();
        }
        return ExitStatus.FAILED;
    }
}

'Reading Record > 스프링 배치 완벽 가이드' 카테고리의 다른 글

[9장] ItemWriter (1)  (0) 2021.12.20
8장 ItemProcessor  (0) 2021.12.06
7장 ItemReader: Json 까지  (0) 2021.11.22
6장 잡 실행하기  (0) 2021.11.15
5장 JobRepository와 메타데이터  (0) 2021.10.30