스텝은 잡의 구성 요소를 담당하며, 독립적이고 순차적으로 배치 처리를 수행한다.
스텝은 모든 단위 작업의 조각이다.
트랜잭션은 스텝 내에서 이뤄지며, 스텝은 서로 독립되도록 의도적으로 설계됐다.
배치 처리 모델
- Tasklet 모델
- Tasklet.execute 메소드가 RepeatStatus.FINISHED를 반환할 때까지 트랜잭션 범위 내에서 반복적으로 실행되는 코드블록을 만들 수 있음
- Chunk 기반 모델
- 청크 단위로 처리할 모든 레코드를 반복적으로 읽어오는 ItemReader, 필수는 아니지만 읽어들인 아이템을 처리하는 ItemProcessor, 아이템을 한 번에 기록하는 ItemWriter 의 세가지 컴포넌트로 구성
- ItemWriter의 단일 호출은 물리적 쓰기를 일괄적으로 처리함으로써 IO 최적화를 이룸
- 각 청크는 자체 트랜잭션으로 실행하며, 처리에 실패했다면 마지막으로 성공한 트랜잭션 이후부터 다시 시작 가능
태스크릿 스텝
태스크릿 스텝을 만드는데에는 두 가지 유형 존재
- 사용자가 작성한 코드를 마치 태스크릿 스텝처럼 실행되도록 하는 방법
- Tasklet 인터페이스를 구현하여 사용하는 방법
- execute 메소드를 구현하여 RepeatStatus 객체를 반환
- RepeatStatus.CONTINUABLE 은 스프링 배치에게 해당 태스크릿을 다시 실행하라고 말하는 것
- RepeatStatus.FINISHED 는 처리의 성공 여부 관계없이 태스크릿을 완료하고 다음 처리를 이어서 하겠다는 것
CallableTaskletAdapter
- Callable<RepeatStatus> 인터페이스의 구현체를 구성할 수 있게 해주는 어댑터
- 스텝의 특정 로직을 다른 스레드에서 실행하고 싶을 때 사용
- 별개의 스레드에서 실행되지만 스텝과 병렬로 실행되는 것은 아님, Callable 객체가 RepeatStatus를 반환하기 전까지는 완료된 것으로 간주 안함
MethodInvokingTaskletAdapter
- 다른 클래스 내의 메소드를 잡 내의 태스크릿처럼 실행 가능
- 해당 메소드가 ExitStatus 타입을 반환하면 해당 값이 태스크릿에서 반환, 그렇지않으면 ExitStatus.COMPLETED 반환
SystemCommandTasklet
- 시스템 명령을 실행할 때 사용, 지정한 명령은 비동기로 실행
- SystemCommandTasklet.setInterruptOnCancel(boolean) 메소드는 잡이 비정상적으로 종료될 때 시스템 프로세스와 관련된 스레드를 강제로 종료할지의 여부를 스프링 배치에 알려주는데 사용
청크 기반 스텝
- 청크는 커밋 간격에 의해 정의 -> 커밋 간격을 50으로 설정 했다면 50개 아이템을 읽고 처리하고 50개의 아이템을 한번에 기록한다.
- 만약 설정한 커밋 간격 만큼의 아이템을 처리하기 전에 오류가 발생하면, 스프링 배치는 현재 청크를 롤백하고 잡이 실패했다고 표시
- 잡의 상태는 JobRepository에 갱신
청크 크기 구성
- 정적인 커밋 개수 설정
stepBuilderFactory.get("chunkStep") .<String, String>chunk(1000)
- CompletionPolicy 구현체 사용
CompletionPolicy 인터페이스는 청크의 완료 여부를 결정할 수 있는 결정 로직을 구현할 수 있게 해준다.
- SimpleCompletionPolicy : 처리된 아이템 갯수가 미리 설정한 임계값에 도달하면 청크 완료로 표시
- TimeoutTerminationPolicy : 청크 내에서 처리시간이 설정한 시간을 넘을 때 해당 청크가 완료된 것으로 간주하고, 모든 트랜잭션 처리가 정상적으로 계속됨
- CompositeCompletionPolicy : 자신이 포함하고 있는 여러 정책 중 하나라도 청크 완료라고 판단되면 해당 청크가 완료된 것으로 표시
CompletionPolicy 인터페이스의 메소드는 2개의 isComplete(), start(), update() 로 구성
- start() : 청크의 시작을 알 수 있도록 정책을 초기화
Completion 인터페이스의 구현체는 내부에 상태를 저장 할 수 있고 해당 상태를 사용해 청크의 완료 여부를 결정할 수 있어야 함 - update() : 아이템이 처리댈 때 마다 해당 메소드가 호출되면서 내부 상태를 갱신
- isComplete(RepeatContext) : 내부 상태를 이용해 청크 완료 여부를 판단
- isComplete(RepeatContext, RepeatStatus) : 청크 완료 여부의 상태를 기반으로 결정 로직을 수행
스텝 리스너
스텝 리스너는 스텝의 시작과 끝에서 특정 로직을 처리할 수 있게해준다.
- StepExecutionListner : beforeStep(), afterStep() 으로 구성
- afterStep() 의 경우 반환값이 ExitStatus -> 리스너가 스텝이 반환한 ExitStatus를 잡에 전달하기 전에 수정할 수 있기 때문
- 파일을 가져온 후 데이터베이스에 올바른 갯수의 레코드가 기록됐는지의 여부를 확인하는 무결성 검사를 수행하여 잡 처리의 성공 여부를 판별하는데 사용할 수 있음
- ChunkListner : beforeChunk(), afterChunk() 로 구성
위의 4가지 메소드들은 어노테이션으로 제공된다.
@BeforeStep @AfterStep @BeforeChunk @AfterChunk
스텝 플로우
조건 로직
스텝은 StepBuilder의 next 메소드를 사용해 지정한 순서대로 실행 됨.
전이를 구성하면 스텝을 다른 순서로 실행할 수 있다.
@Bean(name = JOB_NAME)
public Job job() {
return this.jobBuilderFactory.get(JOB_NAME)
.start(passStep())
.on("FAILED").to(failStep()) // passStep의 결과가 "FAILED" 일 경우 failStep() 실행
.from(passStep()).on("*").to(successStep()) // "FAILED" 를 제외한 모든 결과는 successStep() 실행
.end()
.build();
}
// from() 메소드는 이전에 등록한 step으로 돌아가서 새로은 스텝을 시작.
on 메소드는 스프링 배치가 스텝의 ExitStatus를 평가해 어떤 일을 수행할지 결정하도록 한다.
스프링 배치는 자동으로 가장 제한적인 전이부터 덜 제한적인 전이 순으로 정렬한 뒤 각 전이를 순서대로 적용
스프링 배치는 기준에 따라 두 개의 와일드 카드를 허용
- * 0 개 이상의 문자를 일치
- ? 1개의 문자를 일치
JobExecutionDecider 인터페이스를 구현하면 다음 스텝을 임의 대로 결정할 수 있다.
@Bean(name = DECIDER_JOB_NAME)
public Job job2() {
return this.jobBuilderFactory.get(DECIDER_JOB_NAME)
.start(passStep())
.next(decider())
.from(decider())
.on("FAILED").to(failStep())
.from(decider())
.on("*").to(successStep())
.end()
.build();
}
-------------------------------------------------------
public class CustomDecider implements JobExecutionDecider {
@Override
public FlowExecutionStatus decide(final JobExecution jobExecution, final StepExecution stepExecution) {
String value = jobExecution.getJobParameters()
.getString("result");
if (value.equals("success")) {
return FlowExecutionStatus.COMPLETED;
}
return FlowExecutionStatus.FAILED;
}
}
잡 종료하기
잡의 3가지 종료 상태
- Completed : 스프링 배치 처리가 성공적으로 종료됐음을 의미. JobInstance 가 해당 상태로 종료되면 동일한 파라미터를 사용해 다시 실행 할 수 없다.
- end() 메소드를 사용해 ExitStatus 값을 Completed로 설정
- Failed : 잡이 성공적으로 완료되지 않았음을 의미, 해당 상태로 종료된 잡은 스프링 배치를 사용해 동일한 파라미터로 다시 실행 가능
- fail() 메소드를 사용해 ExtiStatus 값을 Failed로 설정
- Stopped : 해당 상태로 종료된 잡은 다시 시작할 수 있다. 잡에 오류가 생기지 않았지만 중단된 위치에서 잡을 다시 시작할 수 있다.
- stopAndRestart(Step) 메소드 사용
- 잡은 FAILED로 종료되지만 잡을 재실행하면 처음 스텝이 아닌 설정한 스텝부터 재시작한다.
@Bean(name = STOP_JOB_NAME)
public Job job3() {
return this.jobBuilderFactory.get(STOP_JOB_NAME)
.start(exceptionStep()) //exception throw
.on("FAILED").stopAndRestart(passStep())
.from(passStep())
.on("*").to(successStep())
.end()
.build();
}
@Bean(name = STOP_JOB_NAME)
public Job job3() {
return this.jobBuilderFactory.get(STOP_JOB_NAME)
.start(exceptionStep())
.on("FAILED").stopAndRestart(passStep())
.from(passStep())
.on("*").to(successStep())
.end()
.build();
}
잡을 다시 실행하니 passStep 부터 시작하였다.
플로우 외부화하기
잡에서 사용한 스텝의 정의를 추철해서 재사용 가능한 컴포넌트 형태로 만들 수 있다.
1. 스텝의 시퀀스를 플로우로 만드는 방법
@Bean(name = FLOW_NAME)
public Flow customFlow() {
return new FlowBuilder<Flow>(FLOW_NAME).start(helloStep())
.next(byeStep())
.build();
}
@Bean(name = FLOW_JOB_NAME)
public Job flowJob() {
return this.jobBuilderFactory.get(FLOW_JOB_NAME)
.start(customFlow())
.next(endStep())
.end()
.build();
}
2. 플로우 스텝을 사용하는 방법
@Bean(name = FLOW_NAME)
public Flow customFlow() {
return new FlowBuilder<Flow>(FLOW_NAME).start(helloStep())
.next(byeStep())
.build();
}
@Bean(name = "initializeStepFlow")
public Step initializeStepFlow() {
return this.stepBuilderFactory.get("initializeStepFlow")
.flow(customFlow())
.build();
}
@Bean(name = FLOW_STEP_JOB_NAME)
public Job stepFlowJob() {
return this.jobBuilderFactory.get(FLOW_STEP_JOB_NAME)
.start(initializeStepFlow())
.next(endStep())
.build();
}
플로우를 잡 빌더로 전달하는 것과 플로우 스텝을 사용하는 것에 차이는, 플로우 스텝을 사용하면 해당 플로우가 담긴 스텝을 하나의 스텝처럼 기록한다.
3. 잡내에서 다른 잡을 호출하는 방법
@Bean(name = JOB_CALL_JOB_NAME)
public Job jobCallJob() {
return this.jobBuilderFactory.get(JOB_CALL_JOB_NAME)
.start(initializeJobCallJob())
.next(endStep())
.build();
}
@Bean(name = "initializeJobCallJob")
public Step initializeJobCallJob() {
return this.stepBuilderFactory.get("initializeJobCallJob")
.job(flowJob())
.parametersExtractor(new DefaultJobParametersExtractor())
.build();
}
잡내에서 잡을 호출할 경우 jobParametersExtractor를 설정한다.
잡 파라미터를 전달하여 잡을 실행하면 서브잡에 해당 잡 파라미터를 jobParameterExtractor가 전달한다.
'Reading Record > 스프링 배치 완벽 가이드' 카테고리의 다른 글
6장 잡 실행하기 (0) | 2021.11.15 |
---|---|
5장 JobRepository와 메타데이터 (0) | 2021.10.30 |
4장 잡과 스텝 이해하기 : ExecutionContext 까지 (0) | 2021.10.17 |
13장 배치 처리 테스트하기 (0) | 2021.10.12 |
2장 스프링 배치 (0) | 2021.10.03 |