간단목차
- 스프링 부트 이외에도 다양한 잡 실행방법
- 실행중인 잡 중지하기
- 오류 처리 및 재시작 제어
스프링 부트로 잡 시작
- 스프링 부트는 CommnadLinerRunner와 ApplicationRunner라는 두 가지 매커니즘을 사용해 실행 시 로직을 수행
- JobLauncherCommandLinerRunner는 스프링 배치의 JobLauncer를 사용해 잡을 실행
- 스프링 부트가 ApplicationContext 내에 구성된 모든 CommnadLineRunner를 실행할 때, ClassPath에 spring-boot-starter-batch가 존재하면 JobLauncherCommandLinerRunner는 컨텍스트 내에서 찾아낸 모든 잡을 실행시킴
JobLauncherCommandLinerRunner
Deprecated되어JobLauncherApplicationRunner
를 주로 사용- 해당 메커니즘을 통해 스프링 부트가 잡을 실행시키고 있음
기동 시 잡 실행하지 않도록 설정
- 어플리케이션 기동 시에 잡을 실행하지 않고, REST호출이나 특정 이벤트 등으로 잡을 실행시 아래와 같이 기동 시점에 실행되지 않도록 설정
spring:
batch:
job:
enabled: false
특정한 잡만 지정하여 실행
- 컨텍스트에 여러 잡이 정의돼 있는 상태에서 기동 시에 특정한 잡만 실행
spring.batch.job.names
프로퍼티를 사용해 애플리케이션 기동 시에 실행할 잡을 구성
spring:
batch:
job:
names: {job.name}
REST 방식
- 직접 개발해서 REST API를 만들어 잡을 실행하는 방법
- JobLauncher를 통해 잡을 실행
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException,
JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
- 실행할 잡에 전달할 잡 파라미터를 Argument로 전달받아서 실행
REST 방식으로 잡 실행하기
@RestController
@RequiredArgsConstructor
public class JobLaunchingController {
private final JobLauncher jobLauncher;
private final ApplicationContext context;
@PostMapping("/run")
public ExitStatus runJob(@RequestBody JobLaunchRequest request) throws Exception {
Job job = context.getBean(request.getName(), Job.class);
return jobLauncher.run(job, request.getJobParameters()).getExitStatus();
}
}
- ApplicationContext를 통해 실행할 Job빈을 가져옴
- 요청바디를 통해 JobParameters 객체를 가져옴
- 가져온 Job과 JobParameters를 JobLaucher에 전달
REST 방식으로 잡을 실행한 테스트 코드
...
public class JobLaunchingControllerTest {
@Autowired
private JobLaunchingController jobLaunchingController;
@Test
@DisplayName("REST 방식으로 잡 실행하기 테스트")
public void runJobTest() throws Exception {
JobLaunchRequest request = new JobLaunchRequest();
request.setName(RestJobConfiguration.JOB_NAME);
request.setJobParameters(new Properties());
ExitStatus exitStatus = jobLaunchingController.runJob(request);
assertThat(exitStatus).isEqualTo(ExitStatus.COMPLETED);
}
}
사용자가 REST API를 사용해 잡을 다시 실행 요청할 경우
- 실행시킬 잡에 incrementer를 지정
@Bean(name = JOB_NAME)
public Job job() {
return jobBuilderFactory.get(JOB_NAME)
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
JobParmetersIncrementer
를 사용할 때 파라미터의 변경 사항을 적용하는 일은JobLauncher
가 수행하기 때문에 파라미터가 잡에 전달되면 더 이상 변경할 수 없음JobParametersBuilder
의getNextJobParameters
메서드를 활용해 파라미터를 증가시킬 수 있음
@RestController
@RequiredArgsConstructor
public class JobLaunchingController {
private final JobLauncher jobLauncher;
private final ApplicationContext context;
@PostMapping("/run")
public ExitStatus runJob(@RequestBody JobLaunchRequest request) throws Exception {
Job job = context.getBean(request.getName(), Job.class);
JobParameters jobParameters = new JobParametersBuilder(request.getJobParameters(), jobExplorer)
.getNextJobParameters(job)
.toJobParameters();
return jobLauncher.run(job, request.getJobParameters()).getExitStatus();
}
}
- JobParametersBuilder.getNextJobParameters를 호출하면
run.id
라는 파라미터가 추가된 새로운 JobParameters 인스턴스가 생성- getNextJobParameters는 Job이 JobParametersIncrementer를 가지고 있는지 해당 job을 보고 판별 후 마지막 JobExecution에 적용
적용 후 결과 화면
Quartz를 사용해 스케줄링하기
Quartz란
- 오픈소스 스케쥴러로 자바 환경의 규모와 상관없이 사용이 가능하고 잡 실행에 유용한 스프링 부트 지원과 같이 오래전부터 스프링 연동을 지원
구성요소
- Scheduler
- SchedulerFactory를 통해서 가져올 수 있으며 JobDetails 및 트리거의 저장소기능을 함
- Job
- 실행할 작업의 단위
- Trigger
- 작업 실행 시점을 정의
- Trigger가 작동되어 쿼츠에게 잡을 실행하도록 지시하면 잡의 개별 실행을 정의하는 JobDetails 객체가 생성
스프링 배치와 쿼츠 통합처리 과정
- 스프링 배치 잡 작성
- 스프링의 QuartzJobBean을 사용해 스프링 배치 잡을 기동하는 쿼츠 잡 작성
- Quartz JobDetail을 생성하도록 스프링이 제공하는 JobDetailBean 구성
- 잡 실행 시점을 정의하도록 트리거 구성
배치 잡
@Configuration
@RequiredArgsConstructor
public class QuartzJobConfiguration {
public static final String JOB_NAME = "chap5_quartz_job";
public static final String JOB_STEP_NAME = "chap5_quartz_job_step";
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean(name = JOB_NAME)
public Job job() {
return jobBuilderFactory.get(JOB_NAME)
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
@Bean(name = JOB_STEP_NAME)
public Step step1() {
return stepBuilderFactory.get(JOB_STEP_NAME)
.tasklet((stepContribution, chunkContext) -> {
System.out.println(JOB_STEP_NAME + " ran!");
return RepeatStatus.FINISHED;
})
.build();
}
}
Quartz 잡 작성
@RequiredArgsConstructor
public class BatchScheduledJob extends QuartzJobBean {
private final Job chap5_quartz_job;
private final JobExplorer jobExplorer;
private final JobLauncher jobLauncher;
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
JobParameters jobParameters = new JobParametersBuilder(jobExplorer)
.getNextJobParameters(chap5_quartz_job)
.toJobParameters();
try {
jobLauncher.run(chap5_quartz_job, jobParameters);
} catch (Exception e) {
e.printStackTrace();
}
}
}
- executeInternal 메서드를 재정의해 목적에 맞게 확장
- 해당 메서드는 스케줄링된 이벤트가 발생할 때마다 한 번씩 호출됨
스케줄링 구성
- Quartz 잡의 빈을 구성 후 트리거를 만들어 스케줄링
- JobDetail은 실행할 Quartz 잡 수행 시에 사용되는 메타데이터
@Configuration
public class QuartzConfiguration {
@Bean
public JobDetail quartzJobDetail() {
return JobBuilder.newJob(BatchScheduledJob.class)
.storeDurably()
.build();
}
@Bean
public Trigger jobTrigger() {
SimpleScheduleBuilder scheduleBuilder = SimpleScheduleBuilder.simpleSchedule()
.withIntervalInHours(5).withRepeatCount(4);
return TriggerBuilder.newTrigger()
.forJob(quartzJobDetail())
.withSchedule(scheduleBuilder)
.build();
}
}
잡 중지하기
프로그래밍적으로 중지
중지 트랜지션 사용
- 4장에 중지 트랜지션을 활용해 잡을 중지
- 아래 3가지 과정을 통해 중지 트갠지션을 사용해 중지하도록 구성된 잡을 만들고 재시작 위치를 지정하도록 구성
- 단순한 거래 파일을 불러온다.
- 거래 정보를 거래 테이블에 저장한 이후 계좌번호와 현재 계좌잔액으로 구성된 별도의 계좌 요약 테이블에 적용
- 각 계좌의 계좌번호와 잔액을 나열하는 요약 파일 생성
해당 과정을 구현하기 위해 필요한 컴포넌트
- 커스텀 ItemReader
- 커스텀 ItemProcessor
- 도메인 객체
- DAO
Transaction 과 AcoountSummary라는 데이터 모델 구성
커스텀 ItemReader
- Footer 레코드에 기록된 수와 실제 읽어들이 레코드 수를 비교해 서로 다르면 잡 실행을 중지하도록 구성
AfterStep
메서드에서 위 조건을 검증해ExitStatus.STOPPED
반환
public class TransactionReader implements ItemStreamReader<Transaction> {
...
@Override
public Transaction read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
return process(fieldSetReader.read());
}
private Transaction process(FieldSet fieldSet) {
Transaction result = null;
if (fieldSet != null) {
if (fieldSet.getFieldCount() > 1) {
result = new Transaction();
result.setAccountNumber(fieldSet.readString(0));;
result.setTimestamp(fieldSet.readDate(1, "yyyy-MM-DD HH:mm:ss"));
result.setAmount(fieldSet.readDouble(2));
recordCount++;
} else {
expectedRecordCount = fieldSet.readInt(0);
}
}
return result;
}
...
@AfterStep
public ExitStatus afterStep(StepExecution execution) {
if (recordCount == expectedRecordCount) {
return execution.getExitStatus();
} else {
return ExitStatus.STOPPED;
}
}
...
}
잡 구성
@Configuration
@RequiredArgsConstructor
public class TransactionProcessingJob {
public static final String JOB_NAME = "chap5_transaction_job";
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean(name = JOB_NAME)
public Job job() {
return jobBuilderFactory.get(JOB_NAME)
.start(importTransactionFileStep())
.on("STOPPED").stopAndRestart(importTransactionFileStep())
.from(importTransactionFileStep()).on("*").to(applyTransactionsStep())
.from(applyTransactionsStep()).next(generateAccountSummaryStep())
.end()
.build();
}
}
- 커스텀 ItemReader를 사용해 importTransactionFileStep 구성
- 해당 스텝이 중지되면 다시 실행될 수 있도록 구성.
- Reader는 레코드 개수와 푸터의 개수가 일치하지 않다면 가져온 파일이 유효하지 않다고 판단 후 정보 초기화 진행 → TransactionReader코드 참고
applyTransactionsStep
,generateAccountSummaryStep
을 구성해 accountSummary에 데이터를 요약본을 저장 후 csv파일로 생성
StepExecution을 사용해 중지
- 이전 예제는 StepListner의 ExitStatus와 잡의 트랜지션을 구성해 수동으로 잡 중지
- 이 방법은 잡의 트랜지션을 별도로 구성하고 스텝의 ExitStatus를 재정의해야함
AfterStep
대신BeforeStep
을 사용하도록 변경해 StepExecution을 가져온 후 푸터 레코드를 읽을 때StepExecution.setTerminateOnly()
메소드를 호출해 스텝이 완료된 후 배치가 종료되도록 플래그 설정- 잡이 STOPPED 상태를 반환하는 대신 스프링 배치가 JobInterruptedException을 던짐
StepExecution을 사용해 중지한 코드
public class TransactionReader implements ItemStreamReader<Transaction> {
...
private Transaction process(FieldSet fieldSet) {
Transaction result = null;
if (fieldSet != null) {
if (fieldSet.getFieldCount() > 1) {
result = new Transaction();
result.setAccountNumber(fieldSet.readString(0));;
result.setTimestamp(fieldSet.readDate(1, "yyyy-MM-DD HH:mm:ss"));
result.setAmount(fieldSet.readDouble(2));
recordCount++;
} else {
expectedRecordCount = fieldSet.readInt(0);
if (expectedRecordCount != recordCount) {
stepExecution.setTerminateOnly();
}
}
}
return result;
}
@BeforeStep
public void beforeStep(StepExecution execution) {
this.stepExecution = stepExecution;
}
}
StepExecution을 사용해 잡을 중지한 잡 구성
@Configuration
@RequiredArgsConstructor
public class TransactionProcessingJob {
public static final String JOB_NAME = "chap5_transaction_job";
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean(name = JOB_NAME)
public Job job() {
return jobBuilderFactory.get(JOB_NAME)
.start(importTransactionFileStep())
.next(applyTransactionsStep())
.next(generateAccountSummaryStep())
.build();
}
}
오류 처리
잡 실패
public class TransactionReader implements ItemStreamReader<Transaction> {
private ItemStreamReader<FieldSet> fieldSetReader;
private int recordCount = 0;
private int expectedRecordCount = 0;
private StepExecution stepExecution;
...
@Override
public Transaction read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (recordCount == 25) {
throw new ParseException("recordCount = 25 Parsed Exception");
}
return process(fieldSetReader.read());
}
}
- 위와 같이 스프링 배치는 예외가 발생하면 기본적으로 스텝 및 잡이 실패한 것으로 간주해 잡이 중지됨
StepExecution
을 사용해 잡을 중지하는 방식과 예외를 발생시켜 잡을 중지하는 방식의 차이StepExecution
을 사용해 예외를 발생ExitStatus.STOPPED
상태로 스텝이 완료된 후 잡이 중지
- 코드상으로 예외 발생
- 스텝이 완료되지 않고 스텝과 잡에
ExitStatus.FAILED
레이블이 지정됨
- 스텝이 완료되지 않고 스텝과 잡에
- 스텝이
FAILED
로 식별되면 스프링 배치는 해당 스텝을 처음부터 다시 시작하지 않음- 어떤 청크를 처리하고 있는 중인지 기억해 해당 중단 부분을 가져와 다시 시작
- ex) 10개의 청크 중에 2번째 청크의 4번째 아이템에서 에외가 발생된 경우 형재 청크의 1~4번째 아이템 처리는 롤백되고 이후 스프링 배치는 2번째 청크부터 실행
재시작 제어
잡의 재시작 방지하기
- 스프링 배치는 기본적으로 모든 잡은 실패하거나 중지될 때 다시 실행할 수 있음
- 그러나 다시 실행하면 안 되는 잡의 경우
preventRestart()
를 통해 잡을 다시 시작할 수 없도록 구성이 가능함
@Bean(name = JOB_NAME)
public Job job() {
return jobBuilderFactory.get(JOB_NAME)
.preventRestart()
.start(importTransactionFileStep())
.next(applyTransactionsStep())
.next(generateAccountSummaryStep())
.build();
}
재시작 횟수 제한
- 스텝 수준에서 재시작 횟수를 제한시킬 수 있음
@Bean
public Step importTransactionFileStep() {
return stepBuilderFactory.get("importTransactionFileStep")
.startLimit(2)
.<Transaction, Transaction>chunk(100)
.reader(transactionReader())
.writer(transactionWriter(null))
.allowStartIfComplete(true)
.listener(transactionReader())
.build();
}
완료된 스텝 재실행
- 스프링 배치는 동일한 파라미터로 잡을 한 번만 성공적으로 실행할 수 있는데, 스텝은 이 규칙이 반드시 적용되지 않음
- 프레임워크의 기본 구성을 재정의함으로써 완료된 스텝을 두 번 이상 실행할 있음
@Bean
public Step importTransactionFileStep() {
return stepBuilderFactory.get("importTransactionFileStep")
.allowStartIfComplete(true)
.<Transaction, Transaction>chunk(100)
.reader(transactionReader())
.writer(transactionWriter(null))
.allowStartIfComplete(true)
.listener(transactionReader())
.build();
}
'Reading Record > 스프링 배치 완벽 가이드' 카테고리의 다른 글
7장 ItemReader: JDBC 부터 (0) | 2021.11.29 |
---|---|
7장 ItemReader: Json 까지 (0) | 2021.11.22 |
5장 JobRepository와 메타데이터 (0) | 2021.10.30 |
4장 잡과 스텝 이해하기 : 스텝 알아보기 (0) | 2021.10.24 |
4장 잡과 스텝 이해하기 : ExecutionContext 까지 (0) | 2021.10.17 |