본문 바로가기

스터디/자바

자바 인액션으로 보는 병렬스트림

자바 인액션8으로 보는 병렬스트림

병렬스트림이 등장하기 이전에는 컬렉션의 병렬연산이 불편했다.

데이터 분할 -> 분할데이터 스레드 할당 -> 적절한 동기화로 결과합산

위의 과정을 개발자가 순수코딩으로 풀어내야 했다.

자바 7에서는 이 문제를 쉽게 풀 수 있도록 포크/조인 프레임워크기능을 제공한다.

병렬 스트림 사용법

병렬 스트림은 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다.

분할된 스트림은 멀티코어 프로세서가 각각의 청크를 처리하도록 할당 가능하다.

1.기본 for 문

   public long sum(long limit) {
           long sum = 0;
           for (long i = 1; i <= limit; i++) {
               sum += i;
           }

           return sum;
       }

2.순차스트림

   public long sumStream(long limit) {
           return LongStream.rangeClosed(1, limit)
                   .reduce(0L, Long::sum);
       }

3.병렬스트림

    public long sumParallelStream(long limit) {
          return LongStream.rangeClosed(1, limit)
                  .parallel()
                  .reduce(0L, Long::sum);
    }

정확한 성능 테스트는 아니지만 함수 수행 시간을 찍어보면 병렬스트림 쪽이 제일 빠르다.

위 그림과 같이 병렬 스트림청크단위로 분할되어 수행 되고 결과가 합쳐진다.

유의사항

1.스트림의 마지막으로 호출된 순차제어 메서드가 전체 파이프라인의 수행형태를 결정한다.

       stream.parallel()
               .map()
           .sequential()
           .filter()
           .parallel()
           .collect(Collector.toList());

위와 같이 호출된 파이프라인은 병렬로 실행된다. stream의 순차제어를 결정하는 건 내부에 있는 병렬제어 플래그이고,

병렬플래그는 제일 마지막에 등장한 parallel메서드로 인해 true로 변환되어 전체 파이프라인이 병렬로 실행된다.

2.병렬스트림이 사용하는 스레드풀은 공용 풀이다.

병렬스트림은 내부적으로 ForkJoinPool을 사용한다. ForkJoinPool의 공용 스레드 풀은 프로세스 수와 일치한다.

Runtime.getRuntime().availableProcessors()가 반환하는 값과 같다.

   System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12")

이 값은 설정 변경이 가능한대, 전체 공용풀에 영향을 미치므로 비추다.

사용하려면 커스텀한 ForkJoinPool을 만들어 사용하 걸 추천한다.

    new ForkJoinPool(20).submit(()->messages.parallelStream().foreach(this::send));

ForkJoinPool은 싱글턴 인스턴스로 띄워 같이 공유해 사용하도록 하는게 좋다

3.병렬스트림은 컬렉션 데이터 양 N, 하나의 연산 시간 Q, 전체 결과 시간 N*Q일때 Q가 비용이 높은경우 사용하는 것이 좋다.

4.병렬스트림에 효과적인 자료구조

-LinkedList보단 ArrayList

LinkedList는 Node로 연결되어 있는 컬렉션으로 전체 데이터를 청크단위로 분할하려면 Node를 다 순회해야 가능하다.

반면 ArrayList는 배열로 되어있어 메모리가 연속되어 저장되어 있어 쉽게 청크단위 분할이 가능하다.

포크/조인 프레임워크

병렬화할 수 있는 태스크를 재귀적으로 분할한 다음 서브태스크 각각의 결과를 합쳐 전체 결과를 만들도록 설계되었다.

포크/조인 프레임워크를 알아야하는 이유는 병렬스트림 내부에서 포크/조인 프레임워크를 활용하고 있기 때문에 정확한 동작방식을 이해한다면 병렬스트림을 쉽게 이해할 수 있을 것이다.

작업을 스레드 풀을 활용해 병렬처리를 하기 위해선 RecursiveTask<V>를 구현하면 된다. 여기서 V는 병렬화를 통해 연산된 결과이다.

RecursiveTask는 ForkJoinTask를 상속하고 있고 실제 포크/조인 프레임워크에서 활용되는 fork()와 join()메서드를 호출할 수 있다.

RecursiveTask를 구현할때 compute() 추상메서드를 구현해야한다.

compute() 메서드는 테스크 분할 로직과 더 이상 분할 불가능한 경우 서브태스크의 결과를 생산할 알고리즘을 정의해야 한다.

if(Task is small) { --> 분할 불가능한가?
    Execute the task -> 작업 수행

} else {
    //Split the task into smaller chunks
    ForkJoinTask first = getFirstHalfTask();
    first.fork(); --> 작업을 반으로 분할
    ForkJoinTask second = getSecondHalfTask(); 
    second.compute();--> 재귀호출로 다시 분할 수행 작업을 호출
    first.join(); --> 결과합산
}

https://java-8-tips.readthedocs.io/en/stable/forkjoin.html

위 코드와 같은 역할로 1~n까지 덧셈을 해주는 Task를 구현하였다.

public class Calculator extends RecursiveTask<Long> {
    private final long[] numbers;//1~8
    private final int start;//0
    private final int end;//8
    public static final long THRESHOLD = 4;

    public Calculator(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    private Calculator(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        int length = end - start; // 이 태스크에서 더할 배열의 길이
        if(length <= THRESHOLD) {
            return computeSequentially(); // 기준 값과 같거나 작으면 순차적으로 결과를 계산.
        }

          // 작업을 반으로 분할
        Calculator leftTask = new Calculator(numbers, start, start + length / 2);
        leftTask.fork();  //분할된 작업을 반대쪽 스레드가 실행

        Calculator rightTask = new Calculator(numbers, start + length / 2, end);
        long rightResult = rightTask.compute(); // 현재 스레드가 compute를 다시 재귀호출
        long leftResult = leftTask.join();  // 분할된 작업결과를 기다린다
        return rightResult + leftResult;  // 두 서브태스크의 결과를 조합한 값이 이 태스크의 결과
    }

    // 분할된 배열을 계산
    private long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
  }

    @Test
    public void name4() {
        //given
        //when
        long[] numbers = LongStream.rangeClosed(1, 400000L).toArray();
        ForkJoinTask<Long> task  = new ForkJoinSumCalculator(numbers);

        //then
        assertThat(new ForkJoinPool().invoke(task)).isEqualTo(80000200000L);
    }

이 코드는 위 병렬스트림에 대한 그림처럼 작업이 분할되고 조인되는 과정으로 진행된다.

포크/조인 프레임 워크에 특이점

-Work stealing

-아무리 공정하게 작업을 청크단위로 분할해 수행해도 유휴스레드가 생기기 마련이다.

-포크/조인 프레임워크에서는 이런 문제를 Work stealing이라는 기법으로 해결가능하다.

> WorkerQueue라는 배열이 있고 이 배열은 모든 작업자 스레드가 공유한다.
>
> 하나의 스레드는 자기 자신의 작업수행 WorkerQueue와 전체가 공유하는 WorkerQueue[]을 가지고 작업을 수행한다.
>
> fork()가 일어나면 자기 자신의 WorkQueue에 푸쉬한다.
>
> 각 스레드는 자기 자신의 WorkerQueue에서 작업을 pop하여 실행하고 자신의 작업이 비어있으면 전체 poll메서드를 통해 전체 작업배열에서 다른 작업자의 배열의 꼬리작업을 가져와 수행합니다.

Spliterator

왜 병렬 스트림은 분할하는 로직이 없이 알아서 청크단위로 분할되어 작업이 수행되었는가?

우리가 병렬스트림을 이용할때 왜 분할작업을 구현하지 않았는데도 알아서 청크단위로 분할되어 병렬로 실행되었는지 궁금할 것이다.

Spliterator라는 인터페이스 덕분이다.

병렬스트림은 각 자료구조에 맞는 Spliterator가 구현되어져 있어서 따로 분할에 관한 구현을 안해도 병렬로 수행이 가능했던 이유다.

Spliterator의 구현을 알아보자.

public interface Spliterator<T>{ -> T는 탐색하는 요소의 형식이다.
  boolean tryAdvance(Consumer<? super T> action); -> 요소 하나하나를 탐색, 아직 탐색 여부에 따라 참/불
  Spliterator<T> trySplit(); -> 요소를 분할에 새로운 Spliterator를 생성
  long estimateSize(); -> 탐색해야 할 요소 수 정보
  int characteristics(); -> 스플리터의 특성이다. 자세한 자료는 자바인액션을 참고(ORDERED등이 있다.)
}

Spliterator는 커스텀한 구현이 가능하다.

자세한 구현 예제를 보고싶다면 자바인액션8 병렬스트림 부분을 참고해보자.



병렬 스트림을 어떻게 활용하고 있는가?


사실 이펙티브 자바에서도 병렬스트림을 효과적으로 사용할 수 있는 케이스는 많지 않다고 이야기 한다.

종단연산(collect,reduce 등)에 따라 비용이 많이 달라지고, 순차적인 연산이라면 병렬 수행의 효과는 제한된다.



병렬수행 연산 중 가장 좋은 효과를 내는건 reduce 종단연산이다.

순서와 상관없이 전체 작업을 하나로 합치는 reduce연산은 병렬로 수행하기 적절하다.



foreach 같은 경우도 연산비용이 정말 높다면 적절히 사용할 수 있다.

스트림 데이터 건건마다 다른 시스템을 요청을 해야하는 상황이라면 foreach를 활용해 병렬 연산을 수행하는 것도 적절하다.

네트워크 통신 비용이 충분히 비싸기 때문에 병렬화해도 효율을 낼 것이다.

foreach는 결과를 join을 할 필요 없기 때문에 좋아보인다.

'스터디 > 자바' 카테고리의 다른 글

JMH 사용해보기  (1) 2020.06.28
자바의 제네릭  (1) 2020.06.14
자바의 스레드(Thread)  (0) 2020.04.27
gradle 자바 프로젝트  (0) 2020.03.22
Java concurrent 패키지의 동기화 장치  (0) 2020.03.15