7장
1. 병렬 스트림
병렬 스트림(parallel stream)이란 개별 스레드에서 실행할 수 있도록 여러 청크로 분할한 스트림이다. 컬렉션에 parallelStream을 호출하면 병렬 스트림이 생성된다. 병렬 스트림을 활용하면 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있다.
1.1. 순차 스트림을 병렬 스트림으로 변환하기
순차 스트림에 parallel 메서드를 호출하면 기존의 연산이 병렬로 처리된다.
public long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel() // 스트림을 병렬 스트림으로 변환
.reduce(0L, Long::sum);
}
병렬 스트림으로 변환하면 리듀싱 연산을 여러 청크에 병렬로 수행할 수 있다. 병렬 리듀싱 연산을 그림으로 나타내면 다음과 같다.

순차 스트림에 parallel을 호출해도 스트림 자체는 변하지 않는다. 단지 내부적으로 병렬 수행을 의미하는 boolean 플래그가 설정된다. 반대로 sequential 메서드를 호출하면 병렬 스트림을 순차 스트림으로 바꿀 수 있다.
1.2. 스트림 성능 측정
JMH(Java Microbenchmark Hamess)를 활용하면 벤치마크를 구현해 성능을 측정할 수 있다.
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Fork(2, jvmArgs={"-Xms4G", "-Xmx4G"})
public class ParallelStreamBenchmark {
private static final long N = 10_000_000L;
@Benchmark
public long sequentialSum() {
return Stream.iterate(1L, i -> i + 1)
.limit(N)
.reduce(0L, Long::sum);
}
@TearDown(Level.Invocation)
public void tearDown() {
System.gc();
}
}
1.3. 병렬 스트림의 올바른 사용법
다음은 n까지의 자연수를 더하면서 공유된 누적자를 바꾸는 코드이다.
public long sideEffectParallelSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n)
.parallel()
.forEach(accumulator::add);
return accumulator.total;
}
위 코드는 올바른 결과값을 출력하지 않는다. 왜냐하면 여러 스레드에서 동시에 누적자인 total += value를 실행하기 때문이다. 병렬 스트림이 제대로 동작하기 위해서는 공유된 가변 상태를 피해야 한다. 다음은 병렬 스트림을 올바르게 활용하기 위한 몇 가지 기준이다.
직접 벤치마크 측정하기
limit이나 findFirst처럼 요소의 순서에 의존하는 연산은 순차 스트림이 효율적
스트림에서 수행하는 전체 파이프라인 연산 비용 고려
전체 스트림 파이프라인 처리 비용 = 원소 개수 N * 원소 처리 비용 Q
Q가 높은 경우 병렬 스트림으로 성능 개선 가능
소량의 데이터는 순차 스트림이 효과적
자료구조 확인
분해성이 훌륭함: ArrayList, IntStream.range
분해성이 좋음: HashSet, TreeSet
분해성이 나쁨: LinkedList, Stream.iterate
최종 연산의 병합 과정 비용 살펴보기
2. 포크/조인 프레임워크
포크/조인 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되었다. 포크/조인 프레임워크에서는 서브태스크를 스레드 풀의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현한다. 스레드 풀을 이용하려면 RecursiveTask<R>의 서브 클래스를 만들어야 한다. RecursiveTask를 정의하려면 추상 메서드 compute를 구현해야 한다. compute 메서드는 태스크를 서브태스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘을 정의한다. 따라서 대부분의 compute 메서드 구현 로직은 다음과 같다.
if (태스크가 충분히 작거나 더 이상 분할할 수 없으면) {
순차적으로 태스크 계산
} else {
태스크를 두 서브태스크로 분할
태스크가 다시 서브태스크로 분할되도록 이 메서드를 재귀적으로 호출
모든 서브태스크의 연산이 완료될 때까지 기다림
각 서브태스크의 결과를 합침
}
이 알고리즘은 분할 정복(divide-and-conquer) 알고리즘의 병렬화 버전이다. 로직을 그림으로 나타내면 다음과 같다.

이론적으로는 코어 개수만큼 병렬화된 태스크로 작업 부하를 분할하면 모든 CPU 코어에서 태스크를 실행하고, 각각의 태스크가 같은 시간에 종료될 것이락 생각할 수 있다. 그러나 현실에서는 각각의 서브태스크 작업완료 시간이 크게 달라질 수 있다. 각각의 서브태스크의 작업 완료 시간을 적절하게 분배하기 위해서 작업 훔치기(work stealing) 기법을 활용할 수 있다. 작업 훔치기 알고리즘은 ForkJoinPool의 모든 스레드를 거의 공정하게 분할한다. 각각의 스레드는 자신에게 할당된 태스크를 포함하는 이중 연결 리스트를 참조하면서 작업이 끝날 때마다 큐의 헤드에서 다른 태스크를 가져와서 작업을 처리한다. 만일 한 스레드가 작업을 더 빨리 처리하면 할 일이 없어진 스레드는 다른 스레드 큐의 꼬리에서 작업을 훔쳐온다. 모든 태스크가 작업을 끝낼 때까지 이 과정을 반복한다.

포크/조인 프레임워크를 잘못 사용할 경우 병렬 스트림을 활용할 때보다 성능이 나빠질 수 있다. 다음은 포크/조인 프레임워크를 올바르게 활용하기 위한 몇 가지 기준이다.
서브태스크가 모두 시작된 다음 join 호출
join 메서드를 먼저 호출하면 결과가 준비될 때까지 호출자를 블록시킴
순차 코드에서 병렬 계산을 시작할 때만 invoke 사용
서브태스크에 fork 메서드를 호출해서 ForkJoinPool의 일정 조정
병렬 계산 디버깅의 어려움 고려
3. Spliterator 인터페이스
자바 8은 Spliterator라는 새로운 인터페이스를 제공한다. Spliterator는 분할할 수 있는 반복자(splitable iterator)로 Iterator처럼 소스의 원소 탐색 기능을 제공한다. 다른 점은 Spliterator가 병렬 작업에 특화되어 있다는 것이다. Spliterator 인터페이스는 여러 메서드를 정의한다.
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characteristics();
}
T: Spliterator에서 탐색하는 요소의 형식
tryAdvance 메서드: Spliterator의 원소를 하나씩 순차적으로 소비하면서 탐색해야 할 요소가 남아있으면 참을 반환
trySplit 메서드: Spliterator의 일부 원소를 분할해서 두 번째 Spliterator 생성
estimateSize 메서드: 탐색해야 할 원소 수 정보 제공
Spliterator를 효과적으로 활용하려면 분할 과정을 이해하는게 좋다. 스트림의 분할 과정은 재귀적으로 일어난다.
참고 자료
모던 자바 인 액션 - 한빛미디어
Last updated