동시성 작업을 할 때는 작업 큐를 직접 생성할 수도 있겠지만 복잡한 작업들(안전실패, 응답불가 예방)이 필요하다.
때문에 java.util.concurrent 패키지의 실행자, 태스크, 스트림을 이용하는 편이 더 낫다.
Executors 가 제공하는 정적팩터리 메서드를 사용하면 다양한 작업 큐를 얻을 수 있다.
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html
하나의 worker 만 가지는 작업큐는 아래 한줄로 생성할 수 있다.
ExecutorService exec = Executors.newSingleThreadExecutor();
@DisplayName("하나의 worker를 가지는 쓰레드풀")
@Test
void single() {
ExecutorService exec = Executors.newSingleThreadExecutor();
exec.execute(() -> {
System.out.println("First");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
exec.execute(() -> System.out.println("Second"));
}
위와같이 코드를 작성하면 아래의 결과가 출력된다.
execute 실행 시 Runnable Task 는 실행대기 상태가되는데 그땐 먼저 실행한 Task 가 실행중이고, 실행가능한 쓰레드는 1개뿐이기 때문에 미처 실행상태가 되지 못한 상태로 끝나는 것이다.
@DisplayName("하나의 worker를 가지는 쓰레드풀")
@Test
void single() throws ExecutionException, InterruptedException {
ExecutorService exec = Executors.newSingleThreadExecutor();
exec.submit(() -> {
System.out.println("First");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).get();
exec.execute(() -> System.out.println("Second"));
}
하지만 submit 메서드를 사용하면 Future 인터페이스를 반환받을 수 있고 get() 메서드를 통해 해당 Runnable (Callable도 가능) 를 실행할 때까지 기다리는 작업이 가능하다. 따라서, 첫번째 Runnable 의 실행이 끝난 후 두번째 Runnable 도 실행이되고, 아래와 같은 출력을한다.
자바봄 블로그의 스레드 관련 포스팅을 참고하면 스레드의 생명주기에 대해 조금 더 이해할 수 있을 것이다.
https://javabom.tistory.com/53?category=835783
직접 작업 큐를 구현하려면 스레드를 안전하게 종료하는데도 큰 노력을 쏟아야한다.
작업중인 쓰레드, 작업 대기중인 스레드를 어떻게 처리해야할까?
ExecutorService 의 shutdown 메서드를 사용하면 "우아하게 종료하는 것"이 가능해진다.
우아한 종료?
남은 작업을 끝내고 안전하게 종료시키는 것을 말한다.
책에나오는 실행자 서비스의 예시
1) 특정 태스크가 완료되기를 기다린다 (위의 코드)
@DisplayName("하나의 worker를 가지는 쓰레드풀")
@Test
void single() throws ExecutionException, InterruptedException {
ExecutorService exec = Executors.newSingleThreadExecutor();
exec.submit(() -> {
System.out.println("First");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).get();
exec.execute(() -> System.out.println("Second"));
}
2) 태스크 모음 중 아무것 하나 혹은 모든 태스크가 완료되기를 기다린다.
@DisplayName("태스크 모음 중 아무것 하나 혹은 모든 태스크가 완료되기를 기다린다.")
@Test
void any() throws InterruptedException, ExecutionException {
ExecutorService exec = Executors.newFixedThreadPool(3);
List<Future<String>> returnStr = exec.invokeAll(tasks());
System.out.println(returnStr.get(0).get()); // FIRST
System.out.println(returnStr.get(1).get()); // SECOND
System.out.println(returnStr.get(2).get()); // THRID
}
List<Callable<String>> tasks() {
return Arrays.asList(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "FIRST";
},
() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "SECOND";
},
() -> "THIRD");
}
@DisplayName("태스크 모음 중 아무것 하나 혹은 모든 태스크가 완료되기를 기다린다.")
@Test
void any() throws InterruptedException, ExecutionException {
ExecutorService exec = Executors.newFixedThreadPool(3);
System.out.println(exec.invokeAny(tasks())); // THRID 출력 후
}
List<Callable<String>> tasks() {
return Arrays.asList(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace(); // Interrupt
}
return "FIRST";
},
() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace(); // Interrupt
}
return "SECOND";
},
() -> "THIRD");
}
3) 실행자 서비스가 종료하기를 기다린다
@DisplayName("실행자 서비스가 종료하기를 기다린다")
@Test
void waitForTerminate() throws InterruptedException {
ExecutorService exec = Executors.newSingleThreadExecutor();
exec.execute(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("TEST"); // 출력안된다
});
exec.awaitTermination(2000, TimeUnit.MILLISECONDS);
}
@DisplayName("실행자 서비스가 종료하기를 기다린다")
@Test
void waitForTerminate() throws InterruptedException {
ExecutorService exec = Executors.newSingleThreadExecutor();
exec.execute(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("TEST"); // 출력된다
});
exec.awaitTermination(3000, TimeUnit.MILLISECONDS);
}
4) 완료된 태스크들의 결과를 차례로 받는다
@DisplayName("완료된 태스크들의 결과를 차례로 받는다")
@Test
void sequence() throws ExecutionException, InterruptedException {
CompletionService<String> cs = new ExecutorCompletionService<>(Executors.newFixedThreadPool(3));
List<Callable<String>> tasks = tasks();
tasks.forEach(cs::submit);
for (int i = tasks.size(); i > 0; i--) {
String r = cs.take().get();
if (r != null)
System.out.println(r); // THRID, SECOND, FISRT 순서로 출력
}
}
5) 테스크를 특정 시간에 혹은 주기적으로 실행하게 한다.
@DisplayName("테스크를 특정 시간에 혹은 주기적으로 실행하게 한다.")
@Test
void schedule() throws InterruptedException {
ScheduledExecutorService exc = Executors.newSingleThreadScheduledExecutor();
exc.schedule(() -> System.out.println("Hello"), 3000, TimeUnit.MILLISECONDS);
exc.scheduleAtFixedRate(() -> System.out.println("Hello!"), 3000, 3000, TimeUnit.MILLISECONDS); /// 3번출력
Thread.sleep(10000);
}
스레드풀의 설정을 조금 더 견고하게 하고 싶다면 ThreadPoolExecutor 클래스를 직접 사용할 수 있다.
Executors.newCachedThreadPool
: 태스크를 요청받으면 바로 가용가능한 스레드에 할당하고 없으면 그 즉시 만든다. 태스크가 자주 쌓이는 시스템이라면 적합하지 않다.
위에서 살펴본 실행자 프레임워크를 사용하면 작업단위(Task)와 실행매커니즘(execute, submit, ..) 을 분리할 수 있다.
여기서 Task는 Runnable(void)과 Callable(allow return Val, throw Exception)에 해당한다.
실행자 프레임워크를 사용하면 원하면 언제든 작업큐를 변경할 수 있다는 장점도 있다.
java 7 부터 실행자 프레임워킄 포크-조인 태스크를 지원하도록 확장되었다.
포크조인 태스크에서는 먼저 일을 끝낸 스레드가 다른 스레드의 남은 태스크를 수행하는 것이 가능해진다. 그렇기 때문에 CPU 활용률이 굉장히 높아진다.
자바 8부터 나온 병렬스트림은 내부적으로 포크조인 프레임워크를 사용하는데 관련 내용은 아래 자바봄 포스팅에서도 확인할 수 있다.
https://javabom.tistory.com/59
'Reading Record > 이펙티브자바' 카테고리의 다른 글
[아이템43~44] 람다 사용법 (0) | 2020.07.04 |
---|---|
[아이템 81] wait와 notify 보다는 동시성 유틸리티를 애용하라 (0) | 2020.07.03 |
[아이템 79] 과도한 동기화는 피하라 (0) | 2020.07.01 |
[아이템69 ~77] 예외 (0) | 2020.06.30 |
[아이템 40] @Override 애너테이션을 일관되게 사용하라 (0) | 2020.06.30 |