본문 바로가기

Reading Record/스프링 배치 완벽 가이드

6장 잡 실행하기

간단목차

  • 스프링 부트 이외에도 다양한 잡 실행방법
  • 실행중인 잡 중지하기
  • 오류 처리 및 재시작 제어

스프링 부트로 잡 시작

  • 스프링 부트는 CommnadLinerRunner와 ApplicationRunner라는 두 가지 매커니즘을 사용해 실행 시 로직을 수행
  • JobLauncherCommandLinerRunner는 스프링 배치의 JobLauncer를 사용해 잡을 실행
  • 스프링 부트가 ApplicationContext 내에 구성된 모든 CommnadLineRunner를 실행할 때, ClassPath에 spring-boot-starter-batch가 존재하면 JobLauncherCommandLinerRunner는 컨텍스트 내에서 찾아낸 모든 잡을 실행시킴
  • JobLauncherCommandLinerRunner Deprecated되어 JobLauncherApplicationRunner를 주로 사용
  • 해당 메커니즘을 통해 스프링 부트가 잡을 실행시키고 있음

기동 시 잡 실행하지 않도록 설정

  • 어플리케이션 기동 시에 잡을 실행하지 않고, REST호출이나 특정 이벤트 등으로 잡을 실행시 아래와 같이 기동 시점에 실행되지 않도록 설정
spring:
  batch:
    job:
      enabled: false

특정한 잡만 지정하여 실행

  • 컨텍스트에 여러 잡이 정의돼 있는 상태에서 기동 시에 특정한 잡만 실행
  • spring.batch.job.names 프로퍼티를 사용해 애플리케이션 기동 시에 실행할 잡을 구성
spring:
  batch:
    job:
      names: {job.name}

REST 방식

  • 직접 개발해서 REST API를 만들어 잡을 실행하는 방법
  • JobLauncher를 통해 잡을 실행
public interface JobLauncher {
    public JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException,
            JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException;

}
  • 실행할 잡에 전달할 잡 파라미터를 Argument로 전달받아서 실행

REST 방식으로 잡 실행하기

@RestController
@RequiredArgsConstructor
public class JobLaunchingController {

    private final JobLauncher jobLauncher;

    private final ApplicationContext context;

    @PostMapping("/run")
    public ExitStatus runJob(@RequestBody JobLaunchRequest request) throws Exception {
        Job job = context.getBean(request.getName(), Job.class);
        return jobLauncher.run(job, request.getJobParameters()).getExitStatus();
    }
}
  1. ApplicationContext를 통해 실행할 Job빈을 가져옴
  2. 요청바디를 통해 JobParameters 객체를 가져옴
  3. 가져온 Job과 JobParameters를 JobLaucher에 전달

REST 방식으로 잡을 실행한 테스트 코드

...
public class JobLaunchingControllerTest {

    @Autowired
    private JobLaunchingController jobLaunchingController;

    @Test
    @DisplayName("REST 방식으로 잡 실행하기 테스트")
    public void runJobTest() throws Exception {
        JobLaunchRequest request = new JobLaunchRequest();
        request.setName(RestJobConfiguration.JOB_NAME);
        request.setJobParameters(new Properties());

        ExitStatus exitStatus = jobLaunchingController.runJob(request);

        assertThat(exitStatus).isEqualTo(ExitStatus.COMPLETED);
    }
}

사용자가 REST API를 사용해 잡을 다시 실행 요청할 경우

  • 실행시킬 잡에 incrementer를 지정
@Bean(name = JOB_NAME)
public Job job() {
        return jobBuilderFactory.get(JOB_NAME)
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
}
  • JobParmetersIncrementer를 사용할 때 파라미터의 변경 사항을 적용하는 일은 JobLauncher가 수행하기 때문에 파라미터가 잡에 전달되면 더 이상 변경할 수 없음
  • JobParametersBuildergetNextJobParameters 메서드를 활용해 파라미터를 증가시킬 수 있음
@RestController
@RequiredArgsConstructor
public class JobLaunchingController {

    private final JobLauncher jobLauncher;

    private final ApplicationContext context;

    @PostMapping("/run")
    public ExitStatus runJob(@RequestBody JobLaunchRequest request) throws Exception {
                Job job = context.getBean(request.getName(), Job.class);
        JobParameters jobParameters = new JobParametersBuilder(request.getJobParameters(), jobExplorer)
                .getNextJobParameters(job)
                .toJobParameters();
        return jobLauncher.run(job, request.getJobParameters()).getExitStatus();
    }
}
  • JobParametersBuilder.getNextJobParameters를 호출하면 run.id라는 파라미터가 추가된 새로운 JobParameters 인스턴스가 생성
    • getNextJobParameters는 Job이 JobParametersIncrementer를 가지고 있는지 해당 job을 보고 판별 후 마지막 JobExecution에 적용

적용 후 결과 화면

image

Quartz를 사용해 스케줄링하기

Quartz란

  • 오픈소스 스케쥴러로 자바 환경의 규모와 상관없이 사용이 가능하고 잡 실행에 유용한 스프링 부트 지원과 같이 오래전부터 스프링 연동을 지원

구성요소

  • Scheduler
    • SchedulerFactory를 통해서 가져올 수 있으며 JobDetails 및 트리거의 저장소기능을 함
  • Job
    • 실행할 작업의 단위
  • Trigger
    • 작업 실행 시점을 정의
    • Trigger가 작동되어 쿼츠에게 잡을 실행하도록 지시하면 잡의 개별 실행을 정의하는 JobDetails 객체가 생성

스프링 배치와 쿼츠 통합처리 과정

  1. 스프링 배치 잡 작성
  2. 스프링의 QuartzJobBean을 사용해 스프링 배치 잡을 기동하는 쿼츠 잡 작성
  3. Quartz JobDetail을 생성하도록 스프링이 제공하는 JobDetailBean 구성
  4. 잡 실행 시점을 정의하도록 트리거 구성

배치 잡

@Configuration
@RequiredArgsConstructor
public class QuartzJobConfiguration {
    public static final String JOB_NAME = "chap5_quartz_job";
    public static final String JOB_STEP_NAME = "chap5_quartz_job_step";
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean(name = JOB_NAME)
    public Job job() {
        return jobBuilderFactory.get(JOB_NAME)
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }

    @Bean(name = JOB_STEP_NAME)
    public Step step1() {
        return stepBuilderFactory.get(JOB_STEP_NAME)
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println(JOB_STEP_NAME + " ran!");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }
}

Quartz 잡 작성

@RequiredArgsConstructor
public class BatchScheduledJob extends QuartzJobBean {
    private final Job chap5_quartz_job;
    private final JobExplorer jobExplorer;
    private final JobLauncher jobLauncher;

    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        JobParameters jobParameters = new JobParametersBuilder(jobExplorer)
                .getNextJobParameters(chap5_quartz_job)
                .toJobParameters();
        try {
            jobLauncher.run(chap5_quartz_job, jobParameters);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • executeInternal 메서드를 재정의해 목적에 맞게 확장
  • 해당 메서드는 스케줄링된 이벤트가 발생할 때마다 한 번씩 호출됨

스케줄링 구성

  • Quartz 잡의 빈을 구성 후 트리거를 만들어 스케줄링
  • JobDetail은 실행할 Quartz 잡 수행 시에 사용되는 메타데이터
@Configuration
public class QuartzConfiguration {

    @Bean
    public JobDetail quartzJobDetail() {
        return JobBuilder.newJob(BatchScheduledJob.class)
                .storeDurably()
                .build();
    }

    @Bean
    public Trigger jobTrigger() {
        SimpleScheduleBuilder scheduleBuilder = SimpleScheduleBuilder.simpleSchedule()
                .withIntervalInHours(5).withRepeatCount(4);

        return TriggerBuilder.newTrigger()
                .forJob(quartzJobDetail())
                .withSchedule(scheduleBuilder)
                .build();
    }
}

잡 중지하기

프로그래밍적으로 중지

중지 트랜지션 사용

  • 4장에 중지 트랜지션을 활용해 잡을 중지
  • 아래 3가지 과정을 통해 중지 트갠지션을 사용해 중지하도록 구성된 잡을 만들고 재시작 위치를 지정하도록 구성
    1. 단순한 거래 파일을 불러온다.
    2. 거래 정보를 거래 테이블에 저장한 이후 계좌번호와 현재 계좌잔액으로 구성된 별도의 계좌 요약 테이블에 적용
    3. 각 계좌의 계좌번호와 잔액을 나열하는 요약 파일 생성

해당 과정을 구현하기 위해 필요한 컴포넌트

  • 커스텀 ItemReader
  • 커스텀 ItemProcessor
  • 도메인 객체
  • DAO

Transaction 과 AcoountSummary라는 데이터 모델 구성

image

커스텀 ItemReader

  • Footer 레코드에 기록된 수와 실제 읽어들이 레코드 수를 비교해 서로 다르면 잡 실행을 중지하도록 구성
  • AfterStep 메서드에서 위 조건을 검증해 ExitStatus.STOPPED 반환
public class TransactionReader implements ItemStreamReader<Transaction> {
    ...

    @Override
    public Transaction read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        return process(fieldSetReader.read());
    }

    private Transaction process(FieldSet fieldSet) {
        Transaction result = null;

        if (fieldSet != null) {
            if (fieldSet.getFieldCount() > 1) {
                result = new Transaction();
                result.setAccountNumber(fieldSet.readString(0));;
                result.setTimestamp(fieldSet.readDate(1, "yyyy-MM-DD HH:mm:ss"));
                result.setAmount(fieldSet.readDouble(2));
                recordCount++;
            } else {
                expectedRecordCount = fieldSet.readInt(0);
            }
        }
        return result;
    }

    ...

    @AfterStep
    public ExitStatus afterStep(StepExecution execution) {
        if (recordCount == expectedRecordCount) {
            return execution.getExitStatus();
        } else {
            return ExitStatus.STOPPED;
        }
    }
    ...
}

잡 구성


@Configuration
@RequiredArgsConstructor
public class TransactionProcessingJob {
    public static final String JOB_NAME = "chap5_transaction_job";
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean(name = JOB_NAME)
    public Job job() {
        return jobBuilderFactory.get(JOB_NAME)
                .start(importTransactionFileStep())
                .on("STOPPED").stopAndRestart(importTransactionFileStep())
                .from(importTransactionFileStep()).on("*").to(applyTransactionsStep())
                .from(applyTransactionsStep()).next(generateAccountSummaryStep())
                .end()
                .build();
    }
}
  • 커스텀 ItemReader를 사용해 importTransactionFileStep 구성
  • 해당 스텝이 중지되면 다시 실행될 수 있도록 구성.
    • Reader는 레코드 개수와 푸터의 개수가 일치하지 않다면 가져온 파일이 유효하지 않다고 판단 후 정보 초기화 진행 → TransactionReader코드 참고
  • applyTransactionsStep, generateAccountSummaryStep을 구성해 accountSummary에 데이터를 요약본을 저장 후 csv파일로 생성

StepExecution을 사용해 중지

  • 이전 예제는 StepListner의 ExitStatus와 잡의 트랜지션을 구성해 수동으로 잡 중지
    • 이 방법은 잡의 트랜지션을 별도로 구성하고 스텝의 ExitStatus를 재정의해야함
  • AfterStep 대신 BeforeStep을 사용하도록 변경해 StepExecution을 가져온 후 푸터 레코드를 읽을 때 StepExecution.setTerminateOnly() 메소드를 호출해 스텝이 완료된 후 배치가 종료되도록 플래그 설정
  • 잡이 STOPPED 상태를 반환하는 대신 스프링 배치가 JobInterruptedException을 던짐

StepExecution을 사용해 중지한 코드

public class TransactionReader implements ItemStreamReader<Transaction> {

    ...
    private Transaction process(FieldSet fieldSet) {
        Transaction result = null;

        if (fieldSet != null) {
            if (fieldSet.getFieldCount() > 1) {
                result = new Transaction();
                result.setAccountNumber(fieldSet.readString(0));;
                result.setTimestamp(fieldSet.readDate(1, "yyyy-MM-DD HH:mm:ss"));
                result.setAmount(fieldSet.readDouble(2));
                recordCount++;
            } else {
                expectedRecordCount = fieldSet.readInt(0);

                if (expectedRecordCount != recordCount) {
                    stepExecution.setTerminateOnly();
                }
            }
        }
        return result;
    }

    @BeforeStep
    public void beforeStep(StepExecution execution) {
        this.stepExecution = stepExecution;
    }
}

StepExecution을 사용해 잡을 중지한 잡 구성


@Configuration
@RequiredArgsConstructor
public class TransactionProcessingJob {
    public static final String JOB_NAME = "chap5_transaction_job";
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean(name = JOB_NAME)
    public Job job() {
        return jobBuilderFactory.get(JOB_NAME)
                .start(importTransactionFileStep())
                .next(applyTransactionsStep())
                .next(generateAccountSummaryStep())
                .build();
    }
}

오류 처리

잡 실패

public class TransactionReader implements ItemStreamReader<Transaction> {
    private ItemStreamReader<FieldSet> fieldSetReader;
    private int recordCount = 0;
    private int expectedRecordCount = 0;
    private StepExecution stepExecution;

    ...
        @Override
    public Transaction read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        if (recordCount == 25) {
            throw new ParseException("recordCount = 25 Parsed Exception");
        }
        return process(fieldSetReader.read());
    }
}
  • 위와 같이 스프링 배치는 예외가 발생하면 기본적으로 스텝 및 잡이 실패한 것으로 간주해 잡이 중지됨
  • StepExecution을 사용해 잡을 중지하는 방식과 예외를 발생시켜 잡을 중지하는 방식의 차이
    • StepExecution을 사용해 예외를 발생
      • ExitStatus.STOPPED 상태로 스텝이 완료된 후 잡이 중지
    • 코드상으로 예외 발생
      • 스텝이 완료되지 않고 스텝과 잡에 ExitStatus.FAILED 레이블이 지정됨
  • 스텝이 FAILED로 식별되면 스프링 배치는 해당 스텝을 처음부터 다시 시작하지 않음
    • 어떤 청크를 처리하고 있는 중인지 기억해 해당 중단 부분을 가져와 다시 시작
    • ex) 10개의 청크 중에 2번째 청크의 4번째 아이템에서 에외가 발생된 경우 형재 청크의 1~4번째 아이템 처리는 롤백되고 이후 스프링 배치는 2번째 청크부터 실행

image

재시작 제어

잡의 재시작 방지하기

  • 스프링 배치는 기본적으로 모든 잡은 실패하거나 중지될 때 다시 실행할 수 있음
  • 그러나 다시 실행하면 안 되는 잡의 경우 preventRestart() 를 통해 잡을 다시 시작할 수 없도록 구성이 가능함
    @Bean(name = JOB_NAME)
    public Job job() {
        return jobBuilderFactory.get(JOB_NAME)
                .preventRestart()
                .start(importTransactionFileStep())
                .next(applyTransactionsStep())
                .next(generateAccountSummaryStep())
                .build();
    }

image

재시작 횟수 제한

  • 스텝 수준에서 재시작 횟수를 제한시킬 수 있음

    @Bean
    public Step importTransactionFileStep() {
        return stepBuilderFactory.get("importTransactionFileStep")
                .startLimit(2)
                .<Transaction, Transaction>chunk(100)
                .reader(transactionReader())
                .writer(transactionWriter(null))
                .allowStartIfComplete(true)
                .listener(transactionReader())
                .build();
    }

완료된 스텝 재실행

  • 스프링 배치는 동일한 파라미터로 잡을 한 번만 성공적으로 실행할 수 있는데, 스텝은 이 규칙이 반드시 적용되지 않음
  • 프레임워크의 기본 구성을 재정의함으로써 완료된 스텝을 두 번 이상 실행할 있음

    @Bean
    public Step importTransactionFileStep() {
        return stepBuilderFactory.get("importTransactionFileStep")
                .allowStartIfComplete(true)
                .<Transaction, Transaction>chunk(100)
                .reader(transactionReader())
                .writer(transactionWriter(null))
                .allowStartIfComplete(true)
                .listener(transactionReader())
                .build();
    }