본문 바로가기

스터디/자바

Java concurrent 패키지의 동기화 장치

들어가며...

스터디를 진행하면서 스터디원 중 한명이 멀티 스레드 작업을 하면서 테스트 코드 작성에 어려움을 겪었다고 했다. 그 때 CountDownLatch의 도움을 받아 테스트 코드를 작성하였다고 했다. 당시에는 간단하게 사용방법을 듣기만 했었는데, 이펙티브 자바 리뷰를 하면서 스레드 관련 질문이 들어올때 사용할 상황이 생겼었고 한번 정리가 필요하다고 느꼈었다. 그리고 드디어 실천하게 되었다.

 

CountDownLatch와 동기화 장치들

이펙티브 자바의 아이템[81]에서 CountDownLatch를 동기화 장치 클래스라고 얘기하고 있다. 동기화 장치는 무엇일까?
동기화 장치란 스레드가 다른 스레드를 기다릴 수 있게 하여, 서로 작업을 조율할 수 있게 해주는 것이다. 
지금부터 java.util.concurrent 패키지의 몇 가지 동기화 장치들을 알아보자

 

CountDownLatch

CountDownLatch는 하나 이상의 스레드가 다른 스레드에서 수행되는 작업들이 완료될 때까지 대기할 수 있도록 하는 동기화 장치이다.

CountDownLatch

CountDownLatch는 생성할때 1 이상의 count를 인자값으로 받는다. 그리고 await()를 호출한 스레드는 대기 상태에 들어간다. 이후 다른 스레드에서 작업이 완료될 때 countDown() 메소드를 호출하면 count의 값이 1씩 감소한다. await()함수는 countDown() 메소드가 처음 설정한 count 만큼 호출되어 count의 값이 0이 될때 까지 대기했다가 0이 되었을 때 비로소 대기 상태를 해제한다. 또한 0이 된 latch는 재사용이 불가능하다.

public class LatchExampleTest {
    @DisplayName("await 호출 쓰레드는 대기한다.")
    @Test
    public void name() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(4);

        new Thread(job(latch,1)).start();
        new Thread(job(latch,2)).start();
        new Thread(job(latch,3)).start();
        new Thread(job(latch,4)).start();
        System.out.println("=======Main Thread await========");
        latch.await();
        System.out.println("=======Main Thread restart========");
    }
    Runnable job(CountDownLatch latch, int num) {
        return () -> {
            try {
                Thread.sleep(num*500L);
                System.out.println("Thread Num : "+ num + "countDown");
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
    }
}

테스트 결과

 

CyclicBarrier

CyclicBarrier는 CountDownLatch와 비슷하다. 하지만 CountDownLatch가 다른 스레드가 작업을 완료하고 CountDown을 호출했을때 대기상태를 풀어준다면, CyclicBarrier는 다른 스레드가 전부 대기 상태가 되었을 때 모든 스레드의 대기 상태를 해제하고 재사용이 가능하다. 

CyclicBarrier

CyclicBarrier는 생성할때 CountDownLatch와 마찬가지로 1이상의 count값을 인자로 받는다. 그리고 각 스레드에서 await()를 호출하면 그 스레드는 대기상태로 들어간다. 그리고 count만큼 await()가 호출되면 대기상태에 있던 스레드들은 대기 상태를 해제한다.

public class BarrierExampleTest {

    @DisplayName("Barrier는 다른 스레드의 대기 상태를 기다린다.")
    @Test
    void name() throws BrokenBarrierException,InterruptedException{
        CyclicBarrier cyclicBarrier = new CyclicBarrier(6);
        for (int i = 0 ; i < 5 ; i++) {
            new Thread(run(cyclicBarrier,i+1)).start();
        }
        cyclicBarrier.await();
        System.out.println("Main Thread End");
    }

    Runnable run(CyclicBarrier barrier,int num) {
        return () -> {
          try {
              Thread.sleep(num * 1000);
              System.out.println("Thread " + num + "await");
              barrier.await();
              System.out.println("Thread " + num + "end");
          } catch (BrokenBarrierException | InterruptedException e) {
              e.printStackTrace();
          }
        };
    }
}

테스트 결과

 

Semaphore

세마포어는 한 공유 자원 또는 연산을 점유하는 스레드의 개수를 제한할 때 사용한다. 세마포어를 생성할 때 점유할 수 있는 스레드의 최대 갯수를 의미하는 permit을 인자로 받는다. 그리고 boolean 형의 인자를 추가로 받을 수 있는데 이는 fair라는 정책을 의미하는데 이 값이 true라면 FIFO로 permit획득 순서를 지정한다.

acquire() 메소드와 release() 메소드로 lock을 획득하고 해제한다. 만약 acquire() 메소드를 호출했을때 생성할 때 지정한 스레드 갯수만큼 자원 또는 연산을 점유하고 있다면 다른 스레드에서 release 할 때까지 대기한다.

public class SemaphoreExample {
    private final Semaphore semaphore;
    private final int maxAvailable;

    public SemaphoreExample(int maxAvailable) {
        this.maxAvailable = maxAvailable;
        this.semaphore = new Semaphore(maxAvailable);
    }

    public void doSemaphoreWork() {
        System.out.println("Thread[" + Thread.currentThread().getName() + "] start ");
        try {
            Thread.sleep(1000L);
            semaphore.acquire();
            int delay = new Random().nextInt(10) + 1;
            System.out.println("Thread[" + Thread.currentThread().getName() + "] sleep, delay " + delay * 300);
            Thread.sleep(delay * 300L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Thread[" + Thread.currentThread().getName() + "] sleep end ");
        semaphore.release();
    }
}
public class SemaphoreExampleTest {
    @DisplayName("세마포어는 스레드의 점유 개수를 제한한다.")
    @Test
    void name() throws InterruptedException{
        int avaliableThreadCnt = 2;
        SemaphoreExample semaphoreExample = new SemaphoreExample(avaliableThreadCnt);
        CountDownLatch latch = new CountDownLatch(5);
        for (int i = 0 ; i < 5 ; i++) {
            new Thread(run(semaphoreExample,latch),""+(i+1)).start();
        }
        latch.await();
    }

    Runnable run(SemaphoreExample semaphoreExample,CountDownLatch latch) {
        return () -> {
            semaphoreExample.doSemaphoreWork();
            latch.countDown();
        };
    }
}

세마포어 테스트는 5개의 스레드가 최대 2개의 스레드가 점유할 수 있는 SemaphoreExample 클래스의 doSemaphoreWork() 메소드를 호출한다. 테스트 메인 스레드는 CountDownLatch를 사용하여 다른 스레드가 끝날때 까지 대기한다.

테스트 결과

추가로 acquire 메소드와 release 메소드는 int 형의 인자를 받을 수 있는데 이는 인자로 받은 수만큼 permit 갯수를 가져가거나 release한다.

 

Exchanger

Exchanger는 두 스레드 사이에서 객체를 교환하는데 사용한다. 한 쪽의 스레드에서 교환 메소드를 호출 하였을 때 다른 쪽의 스레드가 교환메소드를 호출 한 상태가 아니라면 그 스레드가 메소드를 호출할 때 까지 대기한다. Exchager를 생성할 땐 Exchanger의 generic에 교환할 객체를 지정하고 생성한다.
다음의 코드에서 한 스레드는 리스트에 값을 계속해서 추가하는 작업을 하고, 다른 스레드는 리스트에 값을 계속해서 제거하는 작업을 한다. 만약 두 스레드의 리스트가 각각 가득 차거나 빈다면 교환 메소드를 호출하여 리스트를 교환하고 다시 작업을 진행한다.

@DisplayName("Exchanger는 객체를 교환한다.")
@Test
public void name() throws InterruptedException {
   Exchanger<List<Integer>> exchanger = new Exchanger<>();
   CountDownLatch latch = new CountDownLatch(2);

   Thread addThread = new Thread(adder(exchanger, latch), "adder");
   Thread removeThread = new Thread(remover(exchanger, latch), "remover");

   addThread.start();
   removeThread.start();
   latch.await();
}
Runnable adder(Exchanger<List<Integer>> exchanger,CountDownLatch latch) {
	return () -> {
		int num = 0;
		List<Integer> emptyList = new ArrayList<>();
		while (num != 2) {
			try {
				int size = 0;
				while (emptyList.size() < 8) {
					Thread.sleep(500L);
					emptyList.add(size);
					System.out.println(Thread.currentThread().getName()+" : add " + size);
					size++;
				}
				System.out.println("emptyList is Full");
				emptyList = exchanger.exchange(emptyList);
			}
			catch (InterruptedException e) {
				e.printStackTrace();
			}
			num++;
		}
		System.out.println("emptyList Size : " + emptyList.size());
		latch.countDown();
	};
}
Runnable remover(Exchanger<List<Integer>> exchanger,CountDownLatch latch) {
	return () -> {
		int num = 0;
		List<Integer> fullList = new ArrayList<>();
		while (num != 2) {
			try {
				while (!fullList.isEmpty()) {
					Thread.sleep(500L);
					int removed = fullList.get(0);
					fullList.remove(0);
					System.out.println(Thread.currentThread().getName()+" : remove " + removed);
				}
				System.out.println("fullList is empty");
				fullList = exchanger.exchange(fullList);
			}
			catch (InterruptedException e) {
				e.printStackTrace();
			}
			num++;
		}
		System.out.println("fullList Size : " + fullList.size());
		latch.countDown();
	};
}

1. 처음 시작할때 adder 스레드는 일단 List에 값을 추가하고 exchange 메소드를 호출한다, 반면에 remover는 List의 값이 처음부터 비어있었기 때문에 exchange 메소드를 바로 호출한다.

2. remover가 exchange 메소드를 호출하였지만 adder의 exchage 메소드는 호출되지않았기 때문에 대기한다.

3. adder가 값을 8개 추가하고 exchage 메소드를 호출한다. 그 결과 두 스레드의 List가 교환된다.

4. adder의 리스트는 비어있고, remover의 리스트는 가득 차있기 때문에 각각의 작업을 다시 수행하게된다.

 

Phaser

이펙티브 자바에선 Phaser를 가장 강력한 동기화 장치라고 소개한다. Phaser는 CyclicBarrier와 비슷한 동작을 하지만 동기화에 참여할 스레드의 수가 동적이고 재사용이 가능하다.

Phaser에서 주의깊게 볼 용어는 register, arrive, phase 이다. 먼저 register는 이 동기화 과정에 참여할 스레드의 갯수를 추가하는 과정이다. 처음 Phaser를 생성할 때 동기화에 참여하는 스레드의 수를 parties라 하고 int형으로 인자를 받는데 처음 생성할때 1개로 생성했더라도 register 메소드를 이용해서 동적으로 참여할 스레드의 갯수를 조정할 수 있다.
위에서 Phaser는 CyclicBarrier와 유사한 동작을 한다고 얘기했다. 이는 CyclicBarrier 처럼 이 동기화 과정에 참여한 모든 스레드가 대기 상태에 들어가면 대기 상태를 해제하기 때문이다. 그리고 각 스레드가 대기 상태에 들어가는 지점에 도달하는 것을 arrive 라 한다. 즉 모든 스레드가 arrive 가 되면 대기 상태가 풀리고 모든 스레드가 arrive가 되고 대기상태가 풀리는 과정을 phase 라 한다. 그리고 한 phase가 끝나면 Phaser 내부의 phase값이 하나씩 증가한다. 

Runnable run(Phaser ph, CountDownLatch latch) {
        return () -> {
            ph.register(); //동기화 과정에 참여할 수 있는 스레드 수 를 늘림
            try {
                Thread.sleep(1000L);
                System.out.println(Thread.currentThread().getName() + " arrived");
                ph.arriveAndAwaitAdvance(); // 다른 스레드가 대기 상태가 되길 기다리는 지점
                Thread.sleep(200L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " sleep end");
            ph.arriveAndDeregister(); // 위에서 늘린 스레드 수를 줄인다.
            latch.countDown();
        };
}

위 코드는 생성될 스레드가 수행하는 run 메소드 이다. Phaser를 인자로 받아 register() 메소드로 참여할 수 있는 스레드 갯수를 동적으로 늘릴 수 있다. arriveAndAwaitAdvance() 메소드로 해당 스레드가 arrive되었음을 알리고 참여하는 다른 스레드가 대기 상태가 되기를 기다린다. arriveAndDeregister() 메소드는 phase가 끝나고 phaser에 동적으로 추가된 참여 갯수를 하나 줄일 수 있다. 즉, 해당 메소드가 없으면 phaser는 증가된 스레드 갯수를 유지한다. 

다음 코드는 실제로 스레드들을 phaser를 이용해 동기화 하는 과정이다. 

CountDownLatch latch = new CountDownLatch(5); // 자식 스레드 종료를 기다리기 위해 추가

Phaser phaser = new Phaser(1); // 초기 phaser에 참여할 수 있는 갯수는 1개로 생성

for(int i = 0; i < 5; i++) { // 스레드 생성
    new Thread(run(phaser, latch)).start();
}
assertEquals(0, phaser.getPhase()); //phase가 시작하고 끝이 안났을 때

System.out.println(Thread.currentThread().getName() + " arrived");
phaser.arriveAndAwaitAdvance(); // arrive되고 대기하는 상태

System.out.println("main await end");
latch.await();

assertEquals(1, phaser.getPhase()); // 첫 phase가 시작하고 끝이 남

첫 phase 결과

처음 phaser를 초기화 했을 때 지정한 parties의 수는 1이다. 하지만 위에서 본 run 메소드에서 register() 메소드로 참여할 스레드 수를 하나씩 늘렸기 때문에 첫 phase의 parties는 6이 되고 6개 만큼의 스레드가 arrive가 되어야 대기상태가 해제된다. 그리고 한 phase가 끝나 phaser의 phase값이 1 증가한다.

System.out.println("======================");

latch = new CountDownLatch(2);

for (int i = 0; i < 2; i++) {
	new Thread(run(phaser, latch)).start();
}

System.out.println(Thread.currentThread().getName() + " arrived");
phaser.arriveAndAwaitAdvance();
System.out.println("main await end");

assertEquals(2, phaser.getPhase());
latch.await();

두 번째 phase결과

위의 첫 번째 phase에서 바로 이어지는 코드이다. 첫 번째 phase와 다른 점은 스레드를 2개 생성했다는 점이다. 첫 번째 phase에서 register로 추가한 parties를 arriveAndDeregister() 메소드로 다시 제거했기 때문에 다음 phase에서 스레드를 생성하기 직전에 초기 상태의 parties의 수로 돌아왔다. 그렇게 두 번째 phase가 정상적으로 진행되고 phaser의 phase값은 2로 증가한다.

 

마치며...

위에서 언급한 CountDownLatch나 CyclicBarrier 등은 추가로 timeout을 인자로 넘겨줘 해당 시간 동안만 대기하도록 할 수 있기도하고, phaser의 경우 arriveAndWaitAdvanced() 메소드 말고도 arrive() 메소드도 있는데 이는 해당 지점에서 대기하지 않고 바로 진행되도록 하는 방식으로 CountDownLatch나 CyclicBarrier 에 비해 유연성을 높였다. 

항상 포스팅 주제를 무엇을 써야하나 고민했었는데, 이번 포스팅을 쓰면서 다음 포스팅은 스레드에 대해서 다시 공부하고 포스팅하기로 자연스럽게 맘속에서 정하게 되었다. 또 항상 부족한 글이 겠지만 계속해서 발전하는 모습을 보일 수 있도록 공부 정말 많이 해야겠다고 느끼는 시간이었다.

'스터디 > 자바' 카테고리의 다른 글

자바의 스레드(Thread)  (0) 2020.04.27
gradle 자바 프로젝트  (0) 2020.03.22
부딪히며 적용하는 자코코(jacoco)  (2) 2020.03.07
크롤링 테스트를 위한 mock server test 구축  (0) 2020.01.31
자바 Garbage Collection  (2) 2020.01.31