본문 바로가기

Reading Record/이펙티브자바

[아이템 81] wait와 notify 보다는 동시성 유틸리티를 애용하라

스레드를 다루는 메서드중에 wait() 과 notify()가 있다. 이 메서드는 Object의 메서드이다.

 

wait()는 스레드가 일시정지 상태로 돌아가도록 하고, notify()는 일시정지 상태인 스레드 중 하나를 실행대기 상태로, notifyAll() 은 일시정지 상태인 스레드를 모두 실행대기 상태로 변화시켜준다.

 

자바봄의 아래 포스팅에 위 내용이 짧게 포함되어있다.

https://javabom.tistory.com/53?category=835783

 

자바의 스레드(Thread)

저번 java.concurrent 패키지의 동기화 장치들을 살펴보면서 스레드에 대해 다시 한번 봐야겠다는 생각이 들었다. 스레드에 대해 간단하게 알아보자. 1. 프로세스와 스레드 프로세스란 운영체제에서

javabom.tistory.com

 

 

하지만 자바5부터 도입된 동시성 유틸리티가 이 작업들을 대신 해주기 때문에 사실 사용할 일이 거의없다.

 

까다로운 wait()과 notify() 대신 동시성 유틸리티를 사용하자.

 

아이템 80 에서는 고수준 동시성 유틸리티 중 실행자 프레임워크(ExecutorService)에 대해 알아보았다.

이번 아이템에서는 동시성 컬렉션과 동기화 장치에 대해 설명한다.

 

 

동시성 컬렉션 - Map

기본 컬렉션에 동시성을 만족하도록 구현한 컬렉션을 말하며, 아래 중 java.util.concurrent 클래스들이 해당한다.

 

Map 인터페이스를 살펴보자

  • HashMap - 동시성 고려 X

  • HashTable - 동시성을 고려한다. 하지만 메서드단위이기 때문에 느릴 수 있다.

public synchronized V put(K key, V value) {
        // Make sure the value is not null
        if (value == null) {
            throw new NullPointerException();
        }

        // Makes sure the key is not already in the hashtable.
        Entry<?,?> tab[] = table;
        int hash = key.hashCode();
        int index = (hash & 0x7FFFFFFF) % tab.length;
        @SuppressWarnings("unchecked")
  • ConcurrentHashMap - 동시성고려. 메서드를 통으로 동기화처리한 HashTable 보다 빠름. 아래는 put 메서드의 일부분
synchronized (f) { // 블럭으로 synchronized
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }

 

 

이 사실을 기반으로 아래 테스트코드를 살펴보자.

세개의 스레드를 가진 실행자를 가지고 1000번의 멀티스레드 요청을 시도했다.

그리고 태스크는 0부터 1000까지의 정수를 Key 로 put 한다.

 @DisplayName("동시성 컬렉션")
    @Test
    void concurrency() throws InterruptedException {
        ExecutorService exec = Executors.newFixedThreadPool(3);
        Map<Integer, Integer> hashMap = new HashMap<>();
        Map<Integer, Integer> hashTable = new Hashtable<>();
        Map<Integer, Integer> concurrentHashMap = new ConcurrentHashMap<>();

        for (int i = 0; i < 1000; i++) {
            exec.execute(() -> {
                for (int j = 0; j < 1000; j++) {
                    hashMap.put(j, 1);
                    hashTable.put(j, 1);
                    concurrentHashMap.put(j, 1);
                }
            });
        }
        exec.shutdown();

        while (!exec.isTerminated()) {
            Thread.sleep(1000);
        }

        System.out.println("HashMap: " + hashMap.size());
        System.out.println("HashTable: " + hashTable.size());
        System.out.println("ConcurrentHashMap: " + concurrentHashMap.size());
    }

 

HashTable과 ConcurrentHashMap 은 put 메서드에 대해 동시성을 지원한다. 따라서 실행자가 1000번의 동시수행을 시도해도 key 값이 1-1000으로 고정되어있기 때문에 항상 size는 1000 임을 보장한다.

 

하지만 HashMap 은 스레드세이프하지 않기 때문에 size가 1000이넘는 경우가 발생한다.

물론 내부를 수행해보면 key 는 1-1000까지로 동일하다. 하지만 실제 Map에 key 를 put 하기 전에 size 를 증가시키기 때문에 이와 같은 현상이 발생하는 것이다.

 

 

 

동시성 컬렉션은 동기화를 무력화하지 못한다. 동기화가 필요 없다면 기존 컬렉션을 사용하면 된다.

또, 만약 동시성 컬렉션에 동기화를 또 건다면 속도가 느려진다. - 간단한 예시로는 검증하기 어려움 'ㅁ' ㅠ

 

동시성 컬렉션에서 동시성을 무력화하지 못하기 때문에 여러 메서드를 원자적으로 호출하는 것이 불가능한데, 이런 것들을 지원하는 기본 메서드들이 생겼다.

image

 

일명 상태 의존적 메서드라고도하는데, putIfAbsent같은 경우 key 값이 없을 때에만 put을 한다.

get() -> null check -> put 을 하나의 원자적 동작으로 묶은 것이다.

 

이 메서드는 현재 일반 컬렉션 인터페이스에도 추가되어있다.

 

ConcurrentMap을 활용하는 예시를 한번 살펴보자

 

String.intern 메서드는 String 풀에 해당 문자열이 존재하면 반환하고 없다면 등록 후 반환하는 메서드이다.

 

이 메서드는 ConcurrentMap 으로 구현해보는 예시가 책에 나와있다.

    private static final ConcurrentMap<String, String> map = new ConcurrentHashMap<>();

    public static String intern(String s){
        String previousValue = map.putIfAbsent(s, s);
        return previousValue == null ? s : previousValue; // 원래 없었다면 null
    }

 

 

책에의하면 ConcurrentHashMap 은 get 메서등 성능최적화되어있다고 한다.

따라서 putIfAbsent 전에 get 으로 먼저 체크하는 것이 더 빠르다.

        private static final ConcurrentMap<String, String> map = new ConcurrentHashMap<>();

    public static String intern(String s){
                String result = map.get(s); // 이미 있으면 바로 return 한다.
                if(result == null){
          result = map.putIfAbsent(s, s);
          if(result == null){
            result = s;
          }
        }
      return result;
    }

 

 

HashMap 을 Collections.synchronizedMap을 통해 동시성을 지원하도록 만들 수도 있지만, 성능을 극대화하기 위해 동시성 맵(ConcurrentHashMap) 을 사용하는 편이 좋다.

 

 

동시성 컬렉션 - Queue

작업이 완료되기 전까지 Blocking 되도록 구현된 컬렉션도 있다.

    @DisplayName("동시성 컬렉션 - Queue")
    @Test
    void queue() throws InterruptedException {
        BlockingQueue<String> queue = new LinkedBlockingQueue<>();

        String take = queue.take();

        System.out.println("출력?");
    }

 

위 테스트코드는 끝나지 않는다.

BlokcingQueue 인터페이스 take 는 가져올 원소가 없다면 기다리게 구현되어있는데, 위 코드는 어떤 클라이언트도 원소를 넣어주고 있지 않기 때문이다. 즉, 생산자가 없다.

 

 

다시 구현해보자

    BlockingQueue<String> workingQueue = new LinkedBlockingDeque<>();

    @Test
    void work() {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        while(true){
            executorService.execute(() -> { // 생산자 태스크
                try {
                    workingQueue.put("작업입니다: " + System.currentTimeMillis());
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            executorService.execute(() -> { // 소비자 태스크
                try {
                    String take = workingQueue.take();
                    System.out.println("get: " + take);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }

 

편의상 같은 스레드 풀을 사용하기는 하지만 생산 태스크과 소비 태스크를 수행하고 있고, 실행하면 소비자 태스크에 의해 결과가 출력된다.

 

 

동기화 장치

서로 다른 스레드가 작업을 조율할 수 있도록 지원하는 장치를 동기화 장치라고 한다.

자바봄 스터디 중 스레드를 사용하는 테스트를 작성할 때 CountDownLatch를 자주 쓰는데, 포스팅 중에도 동기화 장치에 대한 포스팅이 있어 남긴다.

https://javabom.tistory.com/35?category=835783

 

Java concurrent 패키지의 동기화 장치

들어가며... 스터디를 진행하면서 스터디원 중 한명이 멀티 스레드 작업을 하면서 테스트 코드 작성에 어려움을 겪었다고 했다. 그 때 CountDownLatch의 도움을 받아 테스트 코드를 작성하였다고 했�

javabom.tistory.com

 

그래도 가장 자주 쓰이는 장치인 CountDownLatch 를 간단히 살펴보자.

이름에서 유추할 수 있듯이 CountDownLatch 는 초기에 설정한 횟수만큼의 countDown 후 대기중인 스레드를 깨운다.

 

책의 예시로 나온 "어떤 동작들을 동시에 시작해서 모두 완료할 때까지 걸리는 시간을 측정하는 프레임워크"를 작성해보자.

    @DisplayName("동시 실행 시간을 재는 프레임워크")
    @Test
    void countdown() throws InterruptedException {
        final int concurrency = 3;
        ExecutorService executorService = Executors.newFixedThreadPool(3); // 스레드풀에 스레드가 동시성 수준 이상만큼 있어야한다. 

        CountDownLatch ready = new CountDownLatch(concurrency); // 동시성 수준
        CountDownLatch start = new CountDownLatch(1); // 시작 방아쇠
        CountDownLatch done = new CountDownLatch(concurrency);

        for (int i = 0; i < concurrency; i++) {
            executorService.execute(() -> {
                ready.countDown(); // 준비
                try {
                    start.await(); // 아래에서 countDown 하면 수행
                    System.out.println("작업을 수행한다");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    done.countDown(); // 작업 끝나면 done Countdown --> 작업이 총 세개가 동시니까 수를 일치시켜
                }
            });
        }

        ready.await(); // 동시성 수준만큼 준비될 때까지 기다림
        long startNanos = System.nanoTime(); // 준비되고나서
        start.countDown(); // 준비됐으니까 start CountDown줌
        done.await();

        System.out.println(System.nanoTime() - startNanos);
    }

 

주석으로도 달아두었지만 간단히 설명을 덧붙이면,

이 프레임워크는 System.out.println("작업을 수행한다"); 라는 작업을 세개의 스레드가 동시에 수행한다.

 

ready CountdownLatch 가 countDown 을 세번 하고나면(세개의 스레드가 거치고 나면) for문 아래의 await 을 지나칠 수 있고 그때의 startNanos 를 측정한다. 세개의 스레드가 작업을 수행할 준비가 된 시점인 것이다.

 

세개의 이하의 스레드는 start.await(); 에서 기다리고 있는데, 시작 시점을 측정한 뒤 start.countDown 을 호출하면 이제 System.out.println("작업을 수행한다"); 작업을 수행할 수 있다.

 

그리고 모든 작업을 끝내면 finally 블럭에서 done.countDown() 을 호출하게되고 역시나 세개의 스레드가 모두 끝나고 나면 done.await() 을 지나쳐 최종 소모 시간을 출력한다.

 

나도 실수했던 부분인데 ExecutorService executorService = Executors.newFixedThreadPool(3); // 스레드풀에 스레드가 동시성 수준 이상만큼 있어야한다. 동시성 수준 이상의 스레드를 생성할 수 있어야한다.

그렇지 않으면 await 상태에서 깨워줄 스레드가 존재하지 않기 때문에 이 프레임워크는 영원히 끝나지 않는다.

 

다시 돌아와서 wait 과 notify

어쩔 수 없이 남아있는 레거시 코드를 위해 wait 과 notify 를 살펴본다.

 

wait를 사용하는 표준 방식은 아래와 같다.

        void check() throws InterruptedException {
            synchronized (lock) { 
                while (!flag) { // 조건검사, wait이후에도 한번 더 검사하게된다.
                    System.out.println("Wait");
                    lock.wait();
                }
                System.out.println("Flag is True");
            }
        }

 

wait 메서드는 반드시 반복문 안에서 사용하자.

반복문으로 실행가능 조건을 검사함으로써 불필요한 wait 의 호출을 막을 수 있다.

 

만약 반복문 밖에 wait 이 있다면 wait 한 스레드를 언제 notify 할지 보장할 수 없다.

 

대기 후 깨어난 스레드가 다시 조건을 검사하게 하는 것은 안전실패를 방지하는 방법이다.

대기 이후 남은 동작을 이어가면 다른 스레드에 의해 synchronized 블럭 안의 불변식을 깨트릴 수 있다.

 

예를들어,

  1. wait 이후 깨어나는 사이, 다른 스레드가 락을 얻고 동기화 블럭 안의 상태를 변경할 수 있다.
  2. 조건이 만족되지 않았는데 악의적인 스레드가 notifyAll을 호출한다.
  3. 대기중인 스레드가 notify 없이도 깨어나는 경우가 드물게 있다.

 

즉, 개발자의 의도와 상관 없이 wait 이후에 다루는 공유자원이 손상받을 수 있기 때문에 반드시 한번 더 조건을 검사해야한다. 그러니 반복문을 사용하자.

 

notify보다 notifyAll을 사용하는 것이 더 안전하다.

의도치 않은 스레드가 대기상태로 간다고 해도 괜찮다. 그 스레드 역시 조건을 다시한번 검사하고 만족하지 않는다면 다시 일시정지될 것이다.

 

또, 이런 문제가 발생할 수도 있다.

누군가 악의적으로 "notify"를 삼키기 위해 wait 을 무한히 호출한다고 가정하자.

개발자가 깨어나길 원하는 스레드는 이 악의적인 스레드 때문에 깨어나지 못할 것이다. 그러니 notifyAll 을 사용하는 편이 더 낫다.