본문 바로가기

Reading Record/이펙티브자바

[아이템 80] 스레드보다는 실행자, 태스크, 스트림을 애용하라

동시성 작업을 할 때는 작업 큐를 직접 생성할 수도 있겠지만 복잡한 작업들(안전실패, 응답불가 예방)이 필요하다.

때문에 java.util.concurrent 패키지의 실행자, 태스크, 스트림을 이용하는 편이 더 낫다.

 

Executors 가 제공하는 정적팩터리 메서드를 사용하면 다양한 작업 큐를 얻을 수 있다.

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html

 

Executors (Java Platform SE 7 )

Returns a thread factory used to create new threads that have the same permissions as the current thread. This factory creates threads with the same settings as defaultThreadFactory(), additionally setting the AccessControlContext and contextClassLoader of

docs.oracle.com

 

 

하나의 worker 만 가지는 작업큐는 아래 한줄로 생성할 수 있다.

ExecutorService exec = Executors.newSingleThreadExecutor();
    @DisplayName("하나의 worker를 가지는 쓰레드풀")
    @Test
    void single() {
        ExecutorService exec = Executors.newSingleThreadExecutor();

        exec.execute(() -> {
            System.out.println("First");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        exec.execute(() -> System.out.println("Second"));
    }

 

위와같이 코드를 작성하면 아래의 결과가 출력된다.

image

 

execute 실행 시 Runnable Task 는 실행대기 상태가되는데 그땐 먼저 실행한 Task 가 실행중이고, 실행가능한 쓰레드는 1개뿐이기 때문에 미처 실행상태가 되지 못한 상태로 끝나는 것이다.

 

 

 

    @DisplayName("하나의 worker를 가지는 쓰레드풀")
    @Test
    void single() throws ExecutionException, InterruptedException {
        ExecutorService exec = Executors.newSingleThreadExecutor();

        exec.submit(() -> {
            System.out.println("First");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).get();
        exec.execute(() -> System.out.println("Second"));
    }

 

하지만 submit 메서드를 사용하면 Future 인터페이스를 반환받을 수 있고 get() 메서드를 통해 해당 Runnable (Callable도 가능) 를 실행할 때까지 기다리는 작업이 가능하다. 따라서, 첫번째 Runnable 의 실행이 끝난 후 두번째 Runnable 도 실행이되고, 아래와 같은 출력을한다.

image

 

자바봄 블로그의 스레드 관련 포스팅을 참고하면 스레드의 생명주기에 대해 조금 더 이해할 수 있을 것이다.

https://javabom.tistory.com/53?category=835783

 

자바의 스레드(Thread)

저번 java.concurrent 패키지의 동기화 장치들을 살펴보면서 스레드에 대해 다시 한번 봐야겠다는 생각이 들었다. 스레드에 대해 간단하게 알아보자. 1. 프로세스와 스레드 프로세스란 운영체제에서

javabom.tistory.com

 

 

직접 작업 큐를 구현하려면 스레드를 안전하게 종료하는데도 큰 노력을 쏟아야한다.

작업중인 쓰레드, 작업 대기중인 스레드를 어떻게 처리해야할까?

ExecutorService 의 shutdown 메서드를 사용하면 "우아하게 종료하는 것"이 가능해진다.

우아한 종료?

남은 작업을 끝내고 안전하게 종료시키는 것을 말한다.

 

 

 

책에나오는 실행자 서비스의 예시

1) 특정 태스크가 완료되기를 기다린다 (위의 코드)

 @DisplayName("하나의 worker를 가지는 쓰레드풀")
    @Test
    void single() throws ExecutionException, InterruptedException {
        ExecutorService exec = Executors.newSingleThreadExecutor();

        exec.submit(() -> {
            System.out.println("First");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).get();
        exec.execute(() -> System.out.println("Second"));
    }

 

 

2) 태스크 모음 중 아무것 하나 혹은 모든 태스크가 완료되기를 기다린다.

@DisplayName("태스크 모음 중 아무것 하나 혹은 모든 태스크가 완료되기를 기다린다.")
    @Test
    void any() throws InterruptedException, ExecutionException {
        ExecutorService exec = Executors.newFixedThreadPool(3);

        List<Future<String>> returnStr = exec.invokeAll(tasks());
        System.out.println(returnStr.get(0).get()); // FIRST
        System.out.println(returnStr.get(1).get()); // SECOND
        System.out.println(returnStr.get(2).get()); // THRID
    }

    List<Callable<String>> tasks() {
        return Arrays.asList(() -> {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "FIRST";
                },
                () -> {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "SECOND";
                },
                () -> "THIRD");
    }
@DisplayName("태스크 모음 중 아무것 하나 혹은 모든 태스크가 완료되기를 기다린다.")
    @Test
    void any() throws InterruptedException, ExecutionException {
        ExecutorService exec = Executors.newFixedThreadPool(3);

        System.out.println(exec.invokeAny(tasks())); // THRID 출력 후 
    }

    List<Callable<String>> tasks() {
        return Arrays.asList(() -> {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace(); // Interrupt
                    }
                    return "FIRST";
                },
                () -> {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace(); // Interrupt
                    }
                    return "SECOND";
                },
                () -> "THIRD");
    }

 

 

3) 실행자 서비스가 종료하기를 기다린다

@DisplayName("실행자 서비스가 종료하기를 기다린다")
    @Test
    void waitForTerminate() throws InterruptedException {
        ExecutorService exec = Executors.newSingleThreadExecutor();

        exec.execute(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("TEST"); // 출력안된다
        });

        exec.awaitTermination(2000, TimeUnit.MILLISECONDS);
    }

@DisplayName("실행자 서비스가 종료하기를 기다린다")
    @Test
    void waitForTerminate() throws InterruptedException {
        ExecutorService exec = Executors.newSingleThreadExecutor();

        exec.execute(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("TEST"); // 출력된다
        });

        exec.awaitTermination(3000, TimeUnit.MILLISECONDS);
    }

 

 

4) 완료된 태스크들의 결과를 차례로 받는다

    @DisplayName("완료된 태스크들의 결과를 차례로 받는다")
    @Test
    void sequence() throws ExecutionException, InterruptedException {
        CompletionService<String> cs = new ExecutorCompletionService<>(Executors.newFixedThreadPool(3));
        List<Callable<String>> tasks = tasks();
        tasks.forEach(cs::submit);
        for (int i = tasks.size(); i > 0; i--) {
            String r = cs.take().get();
            if (r != null)
                System.out.println(r); // THRID, SECOND, FISRT 순서로 출력
        }
    }

 

 

5) 테스크를 특정 시간에 혹은 주기적으로 실행하게 한다.

@DisplayName("테스크를 특정 시간에 혹은 주기적으로 실행하게 한다.")
    @Test
    void schedule() throws InterruptedException {
        ScheduledExecutorService exc = Executors.newSingleThreadScheduledExecutor();

        exc.schedule(() -> System.out.println("Hello"), 3000, TimeUnit.MILLISECONDS);
        exc.scheduleAtFixedRate(() -> System.out.println("Hello!"), 3000, 3000, TimeUnit.MILLISECONDS); /// 3번출력
        Thread.sleep(10000);
    }

 

스레드풀의 설정을 조금 더 견고하게 하고 싶다면 ThreadPoolExecutor 클래스를 직접 사용할 수 있다.

image

 

 

Executors.newCachedThreadPool

: 태스크를 요청받으면 바로 가용가능한 스레드에 할당하고 없으면 그 즉시 만든다. 태스크가 자주 쌓이는 시스템이라면 적합하지 않다.

 

 

 

위에서 살펴본 실행자 프레임워크를 사용하면 작업단위(Task)와 실행매커니즘(execute, submit, ..) 을 분리할 수 있다.

여기서 Task는 Runnable(void)과 Callable(allow return Val, throw Exception)에 해당한다.

실행자 프레임워크를 사용하면 원하면 언제든 작업큐를 변경할 수 있다는 장점도 있다.

 

 

 

java 7 부터 실행자 프레임워킄 포크-조인 태스크를 지원하도록 확장되었다.

image

 

포크조인 태스크에서는 먼저 일을 끝낸 스레드가 다른 스레드의 남은 태스크를 수행하는 것이 가능해진다. 그렇기 때문에 CPU 활용률이 굉장히 높아진다.

 

 

자바 8부터 나온 병렬스트림은 내부적으로 포크조인 프레임워크를 사용하는데 관련 내용은 아래 자바봄 포스팅에서도 확인할 수 있다.

https://javabom.tistory.com/59

 

자바 인액션으로 보는 병렬스트림

자바 인액션8으로 보는 병렬스트림 병렬스트림이 등장하기 이전에는 컬렉션의 병렬연산이 불편했다. 데이터 분할 -> 분할데이터 스레드 할당 -> 적절한 동기화로 결과합산 위의 과정을 개발자가

javabom.tistory.com