본문 바로가기

Reading Record/스프링 배치 완벽 가이드

4장 잡과 스텝 이해하기 : 스텝 알아보기

스텝은 잡의 구성 요소를 담당하며, 독립적이고 순차적으로 배치 처리를 수행한다. 
스텝은 모든 단위 작업의 조각이다.
트랜잭션은 스텝 내에서 이뤄지며, 스텝은 서로 독립되도록 의도적으로 설계됐다.

배치 처리 모델

  1. Tasklet 모델
    • Tasklet.execute 메소드가 RepeatStatus.FINISHED를 반환할 때까지 트랜잭션 범위 내에서 반복적으로 실행되는 코드블록을 만들 수 있음
  2. Chunk 기반 모델
    • 청크 단위로 처리할 모든 레코드를 반복적으로 읽어오는 ItemReader, 필수는 아니지만 읽어들인 아이템을 처리하는 ItemProcessor, 아이템을 한 번에 기록하는 ItemWriter 의 세가지 컴포넌트로 구성
    • ItemWriter의 단일 호출은 물리적 쓰기를 일괄적으로 처리함으로써 IO 최적화를 이룸
    • 각 청크는 자체 트랜잭션으로 실행하며, 처리에 실패했다면 마지막으로 성공한 트랜잭션 이후부터 다시 시작 가능

태스크릿 스텝

태스크릿 스텝을 만드는데에는 두 가지 유형 존재

  • 사용자가 작성한 코드를 마치 태스크릿 스텝처럼 실행되도록 하는 방법
  • Tasklet 인터페이스를 구현하여 사용하는 방법
    • execute 메소드를 구현하여 RepeatStatus 객체를 반환
    • RepeatStatus.CONTINUABLE 은 스프링 배치에게 해당 태스크릿을 다시 실행하라고 말하는 것
    • RepeatStatus.FINISHED 는 처리의 성공 여부 관계없이 태스크릿을 완료하고 다음 처리를 이어서 하겠다는 것

CallableTaskletAdapter

  • Callable<RepeatStatus> 인터페이스의 구현체를 구성할 수 있게 해주는 어댑터
  • 스텝의 특정 로직을 다른 스레드에서 실행하고 싶을 때 사용
  • 별개의 스레드에서 실행되지만 스텝과 병렬로 실행되는 것은 아님, Callable 객체가 RepeatStatus를 반환하기 전까지는 완료된 것으로 간주 안함

MethodInvokingTaskletAdapter

  • 다른 클래스 내의 메소드를 잡 내의 태스크릿처럼 실행 가능
  • 해당 메소드가 ExitStatus 타입을 반환하면 해당 값이 태스크릿에서 반환, 그렇지않으면 ExitStatus.COMPLETED 반환

SystemCommandTasklet

  • 시스템 명령을 실행할 때 사용, 지정한 명령은 비동기로 실행
  • SystemCommandTasklet.setInterruptOnCancel(boolean) 메소드는 잡이 비정상적으로 종료될 때 시스템 프로세스와 관련된 스레드를 강제로 종료할지의 여부를 스프링 배치에 알려주는데 사용

청크 기반 스텝

  • 청크는 커밋 간격에 의해 정의 -> 커밋 간격을 50으로 설정 했다면 50개 아이템을 읽고 처리하고 50개의 아이템을 한번에 기록한다. 
  • 만약 설정한 커밋 간격 만큼의 아이템을 처리하기 전에 오류가 발생하면, 스프링 배치는 현재 청크를 롤백하고 잡이 실패했다고 표시
  • 잡의 상태는 JobRepository에 갱신

청크 크기 구성

  1. 정적인 커밋 개수 설정
    stepBuilderFactory.get("chunkStep")
    	.<String, String>chunk(1000)​
  2. CompletionPolicy 구현체 사용

CompletionPolicy 인터페이스는 청크의 완료 여부를 결정할 수 있는 결정 로직을 구현할 수 있게 해준다.

  • SimpleCompletionPolicy : 처리된 아이템 갯수가 미리 설정한 임계값에 도달하면 청크 완료로 표시
  • TimeoutTerminationPolicy : 청크 내에서 처리시간이 설정한 시간을 넘을 때 해당 청크가 완료된 것으로 간주하고, 모든 트랜잭션 처리가 정상적으로 계속됨
  • CompositeCompletionPolicy : 자신이 포함하고 있는 여러 정책 중 하나라도 청크 완료라고 판단되면 해당 청크가 완료된 것으로 표시

CompletionPolicy 인터페이스의 메소드는 2개의 isComplete(), start(), update() 로 구성

  • start() : 청크의 시작을 알 수 있도록 정책을 초기화
    Completion 인터페이스의 구현체는 내부에 상태를 저장 할 수 있고 해당 상태를 사용해 청크의 완료 여부를 결정할 수 있어야 함
  • update() : 아이템이 처리댈 때 마다 해당 메소드가 호출되면서 내부 상태를 갱신
  • isComplete(RepeatContext) : 내부 상태를 이용해 청크 완료 여부를 판단
  • isComplete(RepeatContext, RepeatStatus) : 청크 완료 여부의 상태를 기반으로 결정 로직을 수행

스텝 리스너

스텝 리스너는 스텝의 시작과 끝에서 특정 로직을 처리할 수 있게해준다.

  • StepExecutionListner : beforeStep(), afterStep() 으로 구성
    • afterStep() 의 경우 반환값이 ExitStatus -> 리스너가 스텝이 반환한 ExitStatus를 잡에 전달하기 전에 수정할 수 있기 때문
    • 파일을 가져온 후 데이터베이스에 올바른 갯수의 레코드가 기록됐는지의 여부를 확인하는 무결성 검사를 수행하여 잡 처리의 성공 여부를 판별하는데 사용할 수 있음
  • ChunkListner : beforeChunk(), afterChunk() 로 구성

위의 4가지 메소드들은 어노테이션으로 제공된다.

@BeforeStep @AfterStep @BeforeChunk @AfterChunk


스텝 플로우

조건 로직

스텝은 StepBuilder의 next 메소드를 사용해 지정한 순서대로 실행 됨.
전이를 구성하면 스텝을 다른 순서로 실행할 수 있다.

@Bean(name = JOB_NAME)
public Job job() {
    return this.jobBuilderFactory.get(JOB_NAME)
            .start(passStep())
            .on("FAILED").to(failStep()) // passStep의 결과가 "FAILED" 일 경우 failStep() 실행
            .from(passStep()).on("*").to(successStep()) // "FAILED" 를 제외한 모든 결과는 successStep() 실행
            .end()
            .build();
}

// from() 메소드는 이전에 등록한 step으로 돌아가서 새로은 스텝을 시작.

on 메소드는 스프링 배치가 스텝의 ExitStatus를 평가해 어떤 일을 수행할지 결정하도록 한다.

스프링 배치는 자동으로 가장 제한적인 전이부터 덜 제한적인 전이 순으로 정렬한 뒤 각 전이를 순서대로 적용

스프링 배치는 기준에 따라 두 개의 와일드 카드를 허용

  • *  0 개 이상의 문자를 일치
  • ? 1개의 문자를 일치

JobExecutionDecider 인터페이스를 구현하면 다음 스텝을 임의 대로 결정할 수 있다.

    @Bean(name = DECIDER_JOB_NAME)
    public Job job2() {
        return this.jobBuilderFactory.get(DECIDER_JOB_NAME)
                .start(passStep())
                .next(decider())
                .from(decider())
                .on("FAILED").to(failStep())
                .from(decider())
                .on("*").to(successStep())
                .end()
                .build();
    }

-------------------------------------------------------

public class CustomDecider implements JobExecutionDecider {
    @Override
    public FlowExecutionStatus decide(final JobExecution jobExecution, final StepExecution stepExecution) {
        String value = jobExecution.getJobParameters()
                .getString("result");

        if (value.equals("success")) {
            return FlowExecutionStatus.COMPLETED;
        }

        return FlowExecutionStatus.FAILED;
    }
}

잡 종료하기

잡의 3가지 종료 상태

  • Completed : 스프링 배치 처리가 성공적으로 종료됐음을 의미. JobInstance 가 해당 상태로 종료되면 동일한 파라미터를 사용해 다시 실행 할 수 없다.
    • end() 메소드를 사용해 ExitStatus 값을 Completed로 설정
  • Failed : 잡이 성공적으로 완료되지 않았음을 의미, 해당 상태로 종료된 잡은 스프링 배치를 사용해 동일한 파라미터로 다시 실행 가능
    • fail() 메소드를 사용해 ExtiStatus 값을 Failed로 설정
  • Stopped : 해당 상태로 종료된 잡은 다시 시작할 수 있다. 잡에 오류가 생기지 않았지만 중단된 위치에서 잡을 다시 시작할 수 있다.
    • stopAndRestart(Step) 메소드 사용
    • 잡은 FAILED로 종료되지만 잡을 재실행하면 처음 스텝이 아닌 설정한 스텝부터 재시작한다.
@Bean(name = STOP_JOB_NAME)
    public Job job3() {
        return this.jobBuilderFactory.get(STOP_JOB_NAME)
                .start(exceptionStep()) //exception throw
                .on("FAILED").stopAndRestart(passStep())
                .from(passStep())
                .on("*").to(successStep())
                .end()
                .build();
    }
    
    @Bean(name = STOP_JOB_NAME)
    public Job job3() {
        return this.jobBuilderFactory.get(STOP_JOB_NAME)
                .start(exceptionStep())
                .on("FAILED").stopAndRestart(passStep())
                .from(passStep())
                .on("*").to(successStep())
                .end()
                .build();
    }

잡을 다시 실행하니 passStep 부터 시작하였다.

BATCH_JOB_EXECUTION 테이블

플로우 외부화하기

잡에서 사용한 스텝의 정의를 추철해서 재사용 가능한 컴포넌트 형태로 만들 수 있다.

1. 스텝의 시퀀스를 플로우로 만드는 방법

    @Bean(name = FLOW_NAME)
    public Flow customFlow() {
        return new FlowBuilder<Flow>(FLOW_NAME).start(helloStep())
                .next(byeStep())
                .build();
    }
    
    @Bean(name = FLOW_JOB_NAME)
    public Job flowJob() {
        return this.jobBuilderFactory.get(FLOW_JOB_NAME)
                .start(customFlow())
                .next(endStep())
                .end()
                .build();
    }

플로우를 잡 빌더로 전달한 잡의 결과

2. 플로우 스텝을 사용하는 방법

    @Bean(name = FLOW_NAME)
    public Flow customFlow() {
        return new FlowBuilder<Flow>(FLOW_NAME).start(helloStep())
                .next(byeStep())
                .build();
    }

    @Bean(name = "initializeStepFlow")
    public Step initializeStepFlow() {
        return this.stepBuilderFactory.get("initializeStepFlow")
                .flow(customFlow())
                .build();
    }

	@Bean(name = FLOW_STEP_JOB_NAME)
    public Job stepFlowJob() {
        return this.jobBuilderFactory.get(FLOW_STEP_JOB_NAME)
                .start(initializeStepFlow())
                .next(endStep())
                .build();
    }

플로우 스텝을 사용한 잡의 결과

플로우를 잡 빌더로 전달하는 것과 플로우 스텝을 사용하는 것에 차이는, 플로우 스텝을 사용하면 해당 플로우가 담긴 스텝을 하나의 스텝처럼 기록한다.

3. 잡내에서 다른 잡을 호출하는 방법

    @Bean(name = JOB_CALL_JOB_NAME)
    public Job jobCallJob() {
        return this.jobBuilderFactory.get(JOB_CALL_JOB_NAME)
                .start(initializeJobCallJob())
                .next(endStep())
                .build();
    }

    @Bean(name = "initializeJobCallJob")
    public Step initializeJobCallJob() {
        return this.stepBuilderFactory.get("initializeJobCallJob")
                .job(flowJob())
                .parametersExtractor(new DefaultJobParametersExtractor())
                .build();
    }

잡내에서 잡을 호출할 경우 jobParametersExtractor를 설정한다.
잡 파라미터를 전달하여 잡을 실행하면 서브잡에 해당 잡 파라미터를 jobParameterExtractor가 전달한다.