본문 바로가기

스터디/스프링

Spring Scheduler Cluster 기능 추가해보기

Spring Scheduler는 여러 인스턴스가 떠 있을때 각 인스턴스의 job을 클러스터링하는 기능이 없다.
각 인스턴스 중 하나의 인스턴스에서만 job을 실행하고 싶은 경우 Scheduler를 커스텀화 해야한다.
Scheduler에 Cluster 기능을 추가하는 방법을 알아보자.

코드예제  https://github.com/Java-Bom/bomscheduler/tree/team1


특정 시간마다 반복해서 돌아야하는 작업은 주로 Spring batch + jenkins 조합으로 많이 해결한다.
하지만 반복되는 주기가 1분 혹은 1분보다 짧아야 하는 경우 jenkins가 jar 파일을 실행하는 방법은 무거울 수 있다.
이런 경우는 Spring Scheduler를 사용하면 application이 계속 실행된 상태에서 job을 실행할 수 있으므로 위 문제가 해결된다.
Spring Scheduler를 사용할 때도 한가지 문제가 발생하는데 블루/그린 배포 혹은 가용서버를 확보하기 위한 용도로 둘 이상의 인스턴스가 유지될 때 하나의 인스턴스에서만 실행되어야하는 작업이 두개의 인스턴스에서 실행될 수 있다.
위 문제를 AOP와 RDB를 이용해 하나의 인스턴스에서만 작업이 실행되도록 만들어보자.
Scheduler를 하나의 인스턴스에서만 실행되게 하기 위한 컨셉으로 DB의 Row lock을 사용해 해당 row를 점유한 인스턴스만 작업을 실행할 수 있도록 한다.
인스턴스간 작업 할당 코드를 살펴보자
@Entity
class JobAlloc(
    val jobName: String,
    var allocId: String,
    var startDateTime: LocalDateTime,
    var endDateTime: LocalDateTime,
) {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    var id: Long? = null
....
}

===================================

class SingleJpaJobCoordinator(
    private val jobAllocRepository: JobAllocRepository,
    private val jobManager: JobManager
) : JobCoordinator {
...

fun alloc(allocId: String, jobName: String) : Boolean {
        val startDateTime = LocalDateTime.now()
        val endDateTime = startDateTime.plusSeconds(30)

        val alloc: JobAlloc? = jobAllocRepository.findByName(jobName)
        when {
            //1. 스케줄러 작업을 실행하고 있는 인스턴스가 없는 경우 alloc을 생성한다
            alloc == null -> {
                jobAllocRepository.save(JobAlloc(
                        jobName = jobName,
                        allocId = allocId,
                        startDateTime = startDateTime,
                        endDateTime = endDateTime))
                return true
            }
            //2. allocId는 인스턴스 별 유니크하게 생성되는 키로 같은 인스턴스가 요청한 경우 만료시간을 연장한다. 
            alloc.allocId == allocId -> {
                alloc.endDateTime = endDateTime //30초 연장
                return true
            }
            //3. 다른 인스턴스가 요청한 경우 만료시간이 새로 시작한 인스턴스의 작업시간보다 전이면 alloc을 교체한다.
            alloc.endDateTime.isBefore(startDateTime) -> { //인스턴스 교체
                alloc.updateJobAlloc(allocId, startDateTime, endDateTime)
                jobAllocRepository.save(alloc)
                return true
            }
            //4. 현재 작업을 수행하고 있는 인스턴스의 만료시간이 지나지 않았을 경우 
            else -> {
                log.info { "job alloc wait - $request" }
                return false
            }
        }
 }
...
}
위 코드는 인스턴스간 작업을 제어하는  코드이다 . 
JobAlloc은 수행되어야 할 작업이고,JobCoordinator는 작업을 할당할 인스턴스를 관리해주는 작업 할당 관리자이다.
우선 JobAlloc의 필드를 살펴보자.
  • allocId -  작업을 수행하는 인스턴스 별 유니크 id
  • jobName - 수행할 작업 명
  • startDateTime - 작업 시작시간
  • endDateTime - 종료시간
scheduler에서 실행될 작업을 JobAlloc으로 정의하고 JobAlloc row의 소유권을 가진 인스턴스만 해당 작업을 실행한다.
처음 스케줄러가 배포가 될때는 JobAlloc에 아무런 작업도 없는 상태로 배포가 되고, 가장 먼저 실행되는 인스턴스가 JobAlloc을 생성한다.
해당 인스턴스의 AllocId로 설정된 JobAlloc은 작업 할당 모니터링이 반복해서 돌면서 만료시간을 지속적으로 연장시킨다.
다른 인스턴스들도 작업 할당 모니터링을 반복해서 돌고 있지만, 현재 작업을 실행하고 있는 인스턴스가 계속 만료시간을 30초씩 연장하고 있으므로 allocId를 교체하지 않고 작업 할당 상태를 false로 리턴한다.
위 코드에서 중요한 부분중 하나는 findByName이다.
Mysql에서 JobAlloc을 조회하는 코드는 SELECT FOR UPDATE 을 사용해서 row lock을 걸어 가장 먼저 lock을 획득한 세션만 수정할 수 있도록 제약한다.
 

이제 작업을 실행하고 있는 인스턴스가 비정상 종료될때  다른 인스턴스가 JobAlloc을 할당 받는 경우를 살펴보자.

현재 작업을 수행하고 있는 인스턴스가 종료가 되면 해당 JobAlloc의 만료시간(endDateTime)이 더 이상 연장되지 않는다.
만료시간이 연장되지 않는다면, 다른 정상 인스턴스의 작업 할당 모니터링이 수행되면서 해당 작업 할당 시작 시간 > 비정상 종료된 인스턴스의 만료시간 조건을 만족하면서 allocId가 신규 인스턴스로 교체된다.
지금까지 JobCoordinator와 JobAlloc을 통해 작업을 여러 인스턴스중 하나의 인스턴스에게 동시성 이슈없이  할당하는 방법을 살펴봤다.
이제 JobCoordinator에게 작업 할당 대상(JobName)을 주입해주는 JobAllocTaskBroker와 작업 할당을 주기적으로 호출해주는 JobAllocProcessor를 보자.
class SingleJobAllocTaskSupplier : JobAllocTaskSupplier {

//스프링 스케줄러에서 단일 스케줄러로 돌아야하는 작업은 DEFAULT_SCHEDULER로 정의한다.
//해당 작업이 JobAlloc의 row로 정의된다.
    override fun createJobAllocTasks(): List<JobAllocTask> {
        return listOf(
            JobAllocTask(
                jobName = jobName = BomScheduleJob.DEFAULT_JOB.jobName,
                delayInMilliseconds = 10000 //작업 할당이 반복되는 주기 (10초)
            )
        )
    }
}

==================================

class JobAllocTaskBroker(private val allocTaskSuppliers: List<JobAllocTaskSupplier>) {

    private val waitingJobs: MutableSet<JobAllocTask> = mutableSetOf()

//DelayQueue를 활용해 생성된 작업을 지정된 delayInMilliseconds만큼 지연 수신될 수 있도록 한다.

    private val jobAllocTaskQueue: DelayQueue<JobAllocTask> = DelayQueue()

    fun getJobAllocTask(): JobAllocTask {
        generateTasks()
        val task = jobAllocTaskQueue.take()//작업이 노출될때까지 대기한다.
        waitingJobs.remove(task)
        return task
    }

    private fun generateTasks() {
//아직 작업이 처리되지 않았을때 작업 생성 요청이 들어와 중복 작업이 생성될 수 있다. 중복작업을 생성하지 않도록 하기 위해 set으로 작업의 유일성을 관리한다.
        val jobAllocTasks = allocTaskSuppliers
            .flatMap { it.createJobAllocTasks() }
            .filterNot { waitingJobs.contains(it) }

        waitingJobs.addAll(jobAllocTasks)
        jobAllocTaskQueue.addAll(jobAllocTasks)
    }
}


==================================

class JobAllocProcessor(
    private val allocId: String = UUID.randomUUID().toString(),
    private val jobManager: JobManager,
    private val jobCoordinator: JobCoordinator,
    private val jobAllocTaskBroker: JobAllocTaskBroker
) : SmartLifecycle {
// SmartLifecycle를 구현해 스프링 빈 사이클 이용 (가장 나중에 실행되고, 가장 먼저 종료된다)

    private val log = logger()
    private val lock: Lock = ReentrantLock() // 공유변수의 상태변경을 동기화 관리하기 위해 lock을 사용
    private val countDownLatch: CountDownLatch = CountDownLatch(1) // 어플리케이션 종료 명령이 들어올때 작업 할당이 마무리될떄까지 대기할때 사용
    private var running: Boolean = false // 프로세스 실행 여부
    private var pleaseStop: Boolean = true // 작업 할당 종료 명령

    override fun start() {
        log.info { "start job alloc processor" }
        lock.withLock {
            check(!running) { "already running" }
            running = true
            pleaseStop = false
        }

//모니터링 작업을 수행할 별도 스레드를 생성한다.
        Thread {
            try {
                this.process()
            } catch (e: InterruptedException) {
                log.error("thread interrupted error.", e)
                Thread.currentThread().interrupt()
            } finally {
                running = false
                countDownLatch.countDown()
            }
        }.start()
    }

    private fun process() {
        while (!pleaseStop) {
            try { 
               // 작업이 나올때까지 해당 위치에서 대기한다.
               // 작업 지연 시간을 10초로 설정했으므로 10초에 한번씩 process가 수행된다고 보면 된다.
                val jobAllocTask: JobAllocTask = jobAllocTaskBroker.getJobAllocTask()  
                val request = jobAllocTask.toRequest(allocId)
                jobManager.alloc = jobCoordinator.alloc(request)
                log.info { "alloc job-$request" }
            } catch (e: RuntimeException) {
                log.error("job alloc execute error.", e)
            }
        }
    }

    override fun stop() {
        lock.withLock {
            while (isRunning) {
                pleaseStop = true
                //어플리케이션 종료 명령이 들어오면 pleaseStop의 상태를 변경하고 대기한다.
                //작업할당 스레드가 작업할당을 다 마무리한 후 countDownLatch의 락을 풀어주면 종료된다.
                countDownLatch.await()
            }
        }
        log.info { "stop job alloc processor" }
    }

    override fun isRunning(): Boolean {
        return running
    }
}
JobAllocTaskBroker는 JobAllocTaskSupplier에서 생성하는 작업 할당 업무를 중계해주는 큐 역할을 한다.
Supplier에서 지연 시간이 설정된 작업 할당 업무를 JobAllocTaskBroker의 deleayQueue에 수신받아 지연시간이 지나면 작업을 공급해준다.
JobAllocProcessor는 빈이 처음 생성될때 스레드를 새로 생성해  JobAllocTaskBroker에서 작업을 가져와 jobCoordinator에 작업 할당 업무를 넘겨 작업을 할당하도록 명령한다.
작업 할당 업무의 결과를 JobManager에 반영해 해당 상태를 스케줄러 작업 수행여부를 결정하는 역할을 한다.
위 프로세스를  어플리케이션이 종료될때까지 계속 반복 수행된다.
* lifeCycle관리와 graceful shutdown 관련 자세한 구현내용은 코드에 주석을 참고

 

 
지금까지 스케줄러 작업을 수행할 인스턴스를 클러스터링 하는 작업 코디네이터와 할당 작업을 반복 수행하는 프로세스 코드를 살펴봤다.
이제 스케줄러 작업 수행 여부를 조절하는 부분을 살펴보자.
스케줄러 작업은  MethodInterceptor와 Aop를 통해 조절된다.
annotation class BomScheduleJob(val jobName:String = "DEFAULT_SCHEDULER"){
    companion object{
        val DEFAULT_JOB = BomScheduleJob()
    }
}
=======

class SingleJobScheduleInterceptor(
    private val jobManager: JobManager
) : MethodInterceptor {

    override fun invoke(invocation: MethodInvocation): Any? {
        if(jobManager.alloc){
            return null
        }
        return invocation.proceed()
    }
}
=====
...
  @Bean
    fun singleJobScheduleInterceptor(): Advisor {
        val interceptor = SingleJobScheduleInterceptor(jobManager())
        val pointcut: Pointcut = AnnotationMatchingPointcut(null, SingleJob::class.java)
        val pointcutAdvisor = DefaultPointcutAdvisor(pointcut, interceptor)
        pointcutAdvisor.order = Ordered.HIGHEST_PRECEDENCE + 1
        return pointcutAdvisor
    }
...
====

@Component
class TestJob(
    private val testService: TestService
) {

..
    @Scheduled(fixedRate = 1000)
    @BomScheduleJob
    fun job() {
        log.info { "end" }
    }
스케줄러 어노테이션이 붙은 함수는 설정된 시간마다 반복 실행된다.
해당 함수에 @BomScheduleJob 어노테이션과 해당 어노테이션이 붙은 함수에 포인트 컷을 걸어 해당 함수가 실행되기 전에  MethodInterceptor가 수행되도록 한다.
인터셉터에서는 JobAllocProcessor에서 작업의 할당 여부를 수신받은 JobManager의 alloc 상태를 보고 해당 함수의 실행 여부를 결정한다.
클래스 다이어그램

 

어플리케이션 실행

이제 실제 수행여부를 확인해보자. 로컬에서 mysql을 실행하고 8080/8081 포트로 두개의 어플리케이션을 띄운다.
8080 서버
8080포트의 서버를 먼저 실행해서 JobAlloc에 먼저 할당되고 testJob의 스케줄러가 정상적으로 돌아가는걸 확인할 수 있다.
  • execute job log는 스케줄러 작업
  • thread-4에서 실행되는 alloc job 로그는 스케줄러 작업 할당 업무 수행중

jobAlloc은 해당 인스턴스에서 생성한 uuid로 alloc_id가 생성된다.
8081포트로 두번째 스케줄러를 실행해보자. JobAlloc의 만료시간이 계속 실행되고 있으므로 해당 스케줄러에서는 testJob이 수행되지 않는다.

8081 서버

그 다음 8080포트의 서버를 종료하고 마지막 만료시간인 21:00:36초가 지난 이후에 8081포트의 서버로 JobAlloc이 교체되고 8081 서버의 testJob도 정상 수행된다.

808서버 종료

마지막 만료 시간

8081 서버 스케줄 작업 실행
8081 서버로 alloc 교체

 
 
위 방법을 사용하면 항상 하나의 인스턴스에서만 실행하고 싶은 작업을 설정해 단일 실행을 보장한다.
하지만 인스턴스간 작업 할당자를 교체할때 작업이 실행되지 않는 타이밍이 발생할 수 있으므로 짧은 시간 주기적으로 반복되는 작업에만 스케줄러를 거는게 좋다.
(하루에 한번 특정 시간에 돌아야 하는 작업은 Batch + Jenkins 조합이 적절)