- 7.1 병렬 스트림 : 송민진
- 7.2 포크, 조인 프레임 워크 : 김대현
- 7.3 Spliterator 인터페이스 : 어정윤
- 2023년 5월 18일 (목요일) -> 5월 25일 (목요일)
- stream 인터페이스를 통해 데이터 collection을 선언형으로 제어하는 방법들
- 또, 외부 반복을 내부 반복으로 바꾸면 Native Java Library가 stream 요소의 처리를 제어할 수 있음
👉🏻 Java 개발자는 collection 데이터 처리 속도를 높이려고 따로 고민할 필요가 없음!
- 컴퓨터의 멀티코어를 활용해서 파이프라인 연산을 실행할 수 있음⭐⭐ (가장 중요!)
: 데이터 컬렉션을 병렬로 처리하기 어려웠음
- step 1. 데이터를 서브파트로 분할함
- step 2. 분할된 서브파트를 각각의 스레드로 할당함
- step 3. 의도치 않은 race condition이 발생하지 않도록 적절한 동기화를 추가함
- step 4. 부분 결과를 합침
- fork/join framework 제공
- 더 쉽게 병렬화를 수행하면서 에러를 최소화할 수 있는 기능
- stream → 순차 스트림을 병렬 스트림으로 자연스럽게 바꿀 수 있음!
- 병렬 스트림의 내부적 처리 과정을 알아야만 → 스트림을 잘못 사용하는 상황을 피할 수 있음
- 내부적 원리 : 병렬 스트림이 요소를 여러 청크로 분할하는 것부터 시작
- Custom Spliterator를 직접 구현 → 분할 과정을 직접 제어 가능
- Stream 인터페이스 : 간단히 요소를 병렬로 처리할 수 있음
- Collection에 parallelStream을 호출 → 병렬 스트림(parallel stream) 생성됨
: 병렬 스트림
- 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림
- 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있음
예시 : 1 ~n 까지의 모든 숫자의 합계를 반환하는 메서드
- 스트림으로 투박하게 구현하는 방법
- step 1) 무한 스트림을 만듦
- step 2) 주어진 크기로 스트림을 제한함
- step 3) 두 숫자를 더하는 BinaryOperator로 리듀싱 작업 수행
public long sequentialSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.reduce(0Lz Long::sum);
- 전통적인 자바에서 반복문으로 구현하는 방법
public long iterativeSum(long n) {
long result = 0;
for (long i = 1L; i <= n; i++) {
result += i;
}
return result;
}
이럴 경우, n이 커진다면 이 연산을 병렬로 처리하는 것이 더욱 좋다.
- 결과 변수는 어떻게 동기화할 것인가?
- 몇 개의 스레드를 사용해야 할 것인가?
- 숫자는 어떻게 생성할 것인가?
- 생성된 숫자는 누가 더할 것인가?
👉🏻 병렬 스트림을 사용하면 위의 문제를 모두 쉽게 해결할 수 있다.
- 순차 스트림 내에 parallel 호출하면, 기존의 함수형 리듀싱 연산(숫자 합계 계산)이 병렬로 처리됨
public long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel() // 스트림을 병렬 스트림으로 변환
.reduce(0L, Long::sum);
}
-
스트림이 여러 청크로 분할되어 처리됨 → 여러 청크에 병렬로 수행 → 리듀싱 연산으로 생성된 부분 결과를 다시 리듀싱 연산으로 합침 → 전체 스트림의 리듀싱 결과를 도출
-
parallel을 호출해도 스트림 자체에는 아무런 변화도 일어나지 않음
-
원리
- 내부적으로, 이후 연산이 병렬로 수행해야 함을 의미하는 boolean 플래그가 설정됨
💡 병렬 스트림에서 사용하는 스레드 풀 설정
Q. 스트림의 parallel 메서드로 병렬 작업 수행하는 스레드는 어디서, 어떻게, 몇 개나 생성되는가?
A.
- 병렬 스트림은 내부적으로
ForkJoinPool<br>
을 사용한다(포크/조인 프레임워크는 7.2절에서 자세히 설명한다).- 기본적으로
ForkJoinPool
은 프로세서 수, 즉Runtime.getRuntime().availableProcessors()
가 반환하는 값에 상응하는 스레드를 가진다.- 만약
ForkJoinPool
의property
값을 바꿔 설정하고 싶다면?System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");
- 위의 예시는 전역 설정 코드이므로 이후의 모든 병렬 스트림 연산에 영향을 준다. 즉, 현재는 하나의 병렬 스트림에 사용할 수 있는 특정한 값을 지정할 수 없다. 일반적으로 기기의 프로세서 수와 같으므로 특별한 이유가 없다면
ForkJoinPool
의 기본값을 그대로 사용할 것을 권장한다.
- sequential을 사용하면 병렬스트림 → 순차스트림으로 전환 가능
👉🏻 두 메서드를 이용하면, 어떤 연산을 병렬로 실행하고 어떤 연산을 순차로 실행할지 제어할 수 있음!
- 동시에 쓰면?
- 두 메서드 중 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 미친다.
stream.para1lel() .filter(...) .sequential() .map() .parallel() .reduce();
- 이 예제에서 파이프라인의 마지막 호출은 parallel이므로 파이프라인은 전체적으로 병렬로 실행된다.
- 두 메서드 중 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 미친다.
(JMH, 자바 마이크로벤치마크 하니스)
- Java Library
- 간단하고, 어노테이션 기반 방식을 지원함
- 안정적으로 자바 프로그램이나 JVM을 대상으로 하는 다른 언어용 벤치마크를 구현할 수 있음
Benchmark
- 전자기기의 연산성능을 시험하여 수치화하는 것
- 특히 전산용어로써 벤치마크는 여러 가지 전자기기의 성능을 비교 평가하는 의미
HotSpot
- 데스크톱과 서버 컴퓨터를 위한 JVM
환경 설정
- Maven : pom.xml 파일(빌드 과정 정의)에 몇 가지 dependency를 추가하여 사용 가능
<!-- 핵심 JMH 구현 --> <dependency> <groupId>org.openjdk.jmh</groupld> <artifactId>jmh-core</artifactId> <version>1.17.4</version> </dependency> <!-- JAR 파일을 만드는 데 도움을 주는 어노테이션 프로세서 --> <dependency> <groupId>org.openjdk.jmh</groupld> <artifactId>jmh-generator-annprocess</artifactId> <version>1.17.4</version> </dependency>
- 다음 플러그인도 추가하면 자바 아카이브 파일을 이용해서 벤치마크를 편리하게 실행할 수 있다.
<build> <plugins> <plugins> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals><goal>shade</goal></goals> <configuration> <finalName>benchmarks</finalName> <transformers> <transformer implementation= "org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>org.openjdk.jmh.Main</mainClass> </transformer> </transformers> </configuration> </execution> </executions> <plugin> </plugins> <build>
@BenchmarkMode(Mode.AverageTime) // 벤치마크 대상 메서드를 실행하는 데 걸린 평균 시간 측정
@OutputTimeUnit(TimeUnit.MILLISECONDS) // 벤치마크 결과를 밀리초 단위로 출력
@Fork(2, jvmArgs={"-Xms4G", "-Xmx4G"}) // 4Gb의 힙공간을 제공한 환경에서 두 번 벤치마크를 수행해 신뢰성 확보
public class ParallelStreamBenchmark {
private static final long N = 10_000_000L;
@Benchmark // 벤치마크 대상 메서드
public long sequentialSum() {
return Stream.iterate(1L, i -> i + 1).limit(N)
.reduce(0L, Long::sum);
}
@TearDown(Level.Invocation) // 매 번 벤치마크를 실행한 다음에는 가비지컬렉터 동작 시도
public void tearDown() {
System.gc();
}
}
-
벤치마크가 가능한 한 가비지 컬렉터의 영향을 받지 않도록 힙의 크기를 충분하게 설정
-
벤치마크가 끝날 때마다 가비지 컬렉터가 실행되도록 강제
💡 이렇게 주의를 들여 설정했지만, 여전히 결과는 정확하지 않을 수 있다.
기계가 지원하는 코어의 갯수 등이 실행 시간에 영향을 미칠 수 있기 때문! -
위의 클래스를 컴파일하면, maven plugin이
benchmarks.jar
라는 두 번째 파일을 만듦- jar 파일 실행 방법
java -jar ./target/benchmarks.jar ParallelStreamBenchmark
- JMH 명령이 계산하는 과정
- 핫스팟이 코드를 최적화 할 수 있도록 20 번을 실행 → 벤치 마크를 준비한 → 20번을 더 실행 → 최종 결과를 계산
-
for 루프
@Benchmark public long iterativeSum() { long result = 0; for (long i = 1L; i <= N; i++) { result += i; } return result; }
💡 병렬 처리가 무조건 빠르진 않다!
- 병렬 버전이 쿼드 코어 CPU를 활용하지 못하고 순차 버전에 비해 다섯 배나 느린 실망스러운 결과가 나왔다. 두 가지 문제를 발견할 수 있다.
- 반복 결과로 박싱된 객체가 만들어지므로 숫자를 더하려면 언박싱을 해야 한다.
- 반복 작업은 병렬로 수행할 수 있는 독립 단위로 나누기가 어렵다.
-
리듀싱 연산이 수행되지 않음 - 리듀싱 과정을 시작하는 시점에 전체 숫자 리스트가 준비되지 않았기 때문에 → 청크로 분할할 수 없음
-
오히려 순차처리 방식과 크게 다른 점이 없는데 스레드를 할당하는 오버헤드만 증가하게 됨
- 병렬 프로그래밍을 오용(예를 들어 병렬과 거리가 먼 반복 작업)하면 오히려 전체 프로그램의 성능 이 더 나빠질 수도 있다.
- 따라서
parallel
메서드를 호출했을 때 내부적으로 어떤 일이 일어나는지 꼭 이해해야 함!
-
기본형 long을 직접 사용 → 박싱과 언박싱 오버헤드가 사라짐
-
쉽게 청크로 분할할 수 있는 숫자 범위를 생산 (ex: 1-20을 각각 1-5, 6-10,11-15,16-20 범위의 숫자로 분할)
-
순차 스트림 측정
@Benchmark public long rangedSum() { return LongStream.rangeClosed(1, N) .reduce(0L, Long::sum); }
-
이렇게 특화된 메서드를 활용한 처리 속도가 더 빠름!
- 특화되지 않은 스트림 : 오토박싱, 언박싱 등의 오버헤드를 수반하기 때문
상황에 따라서는 어떤 알고리즘을 병렬화하는 것보다 적절한 자료구조를 선택하는 것이 더 중요하다는 사실을 단적으로 보여줌
-
병렬 스트림을 적용하면? → 순차 실행보다 빠른 성능!
@Benchmark
public long parallelRangedSum() {
return LongStream.rangeClosed(1, N)
.parallel()
.reduce(0L, Long::sum);
}
올바른 자료구조를 선택해야 병렬 실행도 최적의 성능을 발휘할 수 있다 는 사실!
병렬화가 완전 공짜는 아니다.
병렬화를 이용하려면?
- 스트림을 재귀적으로 분할해야 함
- 각 서브스트림을 서로 다른 스레드의 리듀싱 연산으로 할당하고, 이들 결과를 하나의 값으로 합쳐야 함
- 멀티코어 간의 데이터 이동은 우리 생각보다 비싸다.
따라서 코어 간에 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 병렬로 다른 코어에서 수행하는 것이 바람직! - 또한 상황에 따라 쉽게 병렬화를 이용할 수 있거나 아니면 아예 병렬화를 이용할 수 없는 때도 있음
- 그리고 스트림을 병렬화해서 코드 실행 속도를 빠르게 하고 싶으면 항상 병렬화를 올바르게 사용하고 있는지 확인해야 함!!
- 병렬 스트림을 사용하며, 많은 실수는 공유된 상태를 바꾸는 알고리즘을 사용하기 때문에 일어난다.
- ex) n까지의 자연수를 더하면서 공유된 누적자를 바꾸는 프로그램
public long sideEffeetSum(long n) { Accumulator accumulator = new Accumulator() LongStream.rangeClosed(1, n) .forEach(accumulator::add); return accumulator.total; } public class Accumulator { public long total = 0; public void add(long value) { total += value; } }
- 이 코드는 순차 실행하도록 구현되어있음
- 따라서, 병렬로 실행하면 참사가 일어남
- total을 접근할 때마다 (다수의 스레드에서 동시에 데이터에 접근하는) 데이터 레이스 문제가 일어남
- ⇒ 동기화로 문제를 해결 → 결국 병렬화라는 특성이 없어져 버림
- 위의 스트림을 병렬로 만들어보면??
public long sideEffectParallelSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n) .parallel() .forEach(accumulator::add); return accumulator.total; }
- 위에서 소개한 하니스로 실행한 결과를 출력하기
System.out.println("SideEffeet parallel sum done in: " + measurePerf(Parallelstreams::sideEffectParallelSum, 10_000_000L) + " msecs" );
💡 Tip!
상태 변화를 피하는 방법은 18, 19장에서 설명한다.
우선은 병렬 스트림이 올바로 동작하려면 공유된 가변 상태를 피해야 한다는 사실만 기억하자.
- 양을 기준으로 병렬 스트림 사용을 결정하는 것은 바람직하지 못함 (ex: 1,000개 이상의 요소일 경우 등)
- 확신이 서지 않으면 직접 성능을 측정하라!
- 순차 스트림을 병렬스트림으로 쉽게 바꿀 수는 있지만, 병렬 스트림으로 바꾸는 것만이 능사는 아님
- 언제나 병렬 스트림이 순차 스트림보다 빠른 것은 아님
- 또, 병렬 스트림의 수행 과정은 투명하지 않을 때가 많음
- 박싱을 주의하라
- 자동 박싱과 언박싱은 성능을 크게 저하시킬 수 있는 요소
- Java 8 : 박싱 동작을 피할 수 있도록 기본형 특화 스트림 (IntStream,LongStream, DoubleStream)을 제공함
- 따라서 되도록이면 기본형 특화 스트림을 사용하는 것이 좋다.
- 순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있다.
- 특히
limit
나findFirst
처럼 요소의 순서에 의존하는 연산을 병렬 스트림에서 수행하려면 비싼 비용을 치러야 함- ex)
findAny
는 요소의 순서와 상관없이 연산하므로findFirst
보다 성능이 좋음
- ex)
- 정렬된 스트림에
unordered
를 호출하면 비정렬된 스트림을 얻을 수 있음 - 스트림에 N개 요소가 있을 때 요소의 순서가 상관없다면(ex: List) 비정렬된 스트림에
limit
를 호출하는 것이 더 효율적이다 - 스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하라.
- 처리해야 할 요소 수가
N
이고 하나의 요소를 처리하는 데 드는 비용을Q
라 하면 → 전체 스트림 파이프라인 처리 비용을N*Q
로 예상할 수 있다. Q
가 높아진다는 것은 병렬 스트림으로 성능을 개선할 수 있는 가능성이 있음을 의미한다.
- 처리해야 할 요소 수가
- 소량의 데이터에서는 병렬 스트림이 도움 되지 않는다.
- 소량의 데이터를 처리하는 상황에서는 병렬화 과정에서 생기는 부가 비용을 상쇄할 수 있을 만큼의 이득을 얻지 못하기 때문이다.
- 소량의 데이터를 처리하는 상황에서는 병렬화 과정에서 생기는 부가 비용을 상쇄할 수 있을 만큼의 이득을 얻지 못하기 때문이다.
- 특히
- 스트림을 구성하는 자료구조가 적절한지 확인하라.
- ex)
ArrayList
를LinkedList
보다 효율적으로 분할할 수 있다.LinkedList
를 분할하려면 모든 요소를 탐색해야 하지만ArrayList
는 요소를 탐색하지 않고도 리스트를 분할할 수 있기 때문이다.- 또한
range
팩토리 메서드로 만든 기본형 스트림도 쉽게 분해할 수 있다. - 마지막으로 7.3절에서 설명 하는 것처럼 커스텀
Spliterator
를 구현해서 분해 과정을 완벽하게 제어할 수 있다.
- ex)
- 스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다.
- 예를 들어 SIZED 스트림은 정확히 같은 크기의 두 스트림으로 분할할 수 있으므로 효과적으로 스트림을 병렬 처리할 수 있다.
- 반면 필터 연산이 있으면 스트림의 길이를 예측할 수 없으므로 효과적으로 스트림을 병렬 처리할 수 있을지 알 수 없게 된다.
- 최종 연산의 병합 과정 비용을 살펴보라.
- 병렬 스트림이 수행되는 내부 인프라구조도 살펴봐야 한다.
- Java 7에서 추가된 fork/join Framework로 병렬 스트림이 처리된다.
- 병렬 스트림을 제대로 사용하려면 병렬 스트림의 내부 구조를 잘 알아야 한다.
병렬화 할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음, 서브 태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되었음.
기본적인 구조: 작업자(Worker)들을 작업장(Thread pool)에 등록한다.
이 프레임워크에서는 서브태스크를 스레드 풀 ForkJoinPool의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현한다.
ForkJoinPool 은 AbstractExecutorService를 상속하고, AES는 ES를 상속한다. ES는 Executor를 상속한다.
무언가 쓰레드 단위의 작업을 실행해주는 역할을 하는 듯 하다.
ForkJoinPool은 invoke를 통해 ForkJoinTask를 실행할 수 있다.
RecursiveTask는 ForkJoinTask를 상속한다.
즉, ForkJoinPool은 RecursiveTask를 invoke 할 수 있다.
return FORK_JOIN_POOL.invoke(task);
이런 형식이다.
protected abstract R compute();
if task is not divisible {
순차적 태스크 계산
} else {
divide into two
rA = compute(A)
rB = compute(B)
merge(rA, rB)
}
@Override
protected Long compute() {
int length = end - start;
if (length <= THRESHOLD) {
return computeSequentially();
}
ForkJoinSumCalculator leftTask =
new ForkjoinSumCalculator(numbers, start, start + length / 2);
// 중요!!!
leftTask.fork(); // 병렬 실행
ForkJoinSumCalculator rightTask =
new ForkJoinSumCalculator(numbers, start + length / 2, end);
Long rightResult = rightTask.compute();
Long leftResult = leftTask.join();
return leftResult + rightResult;
}
public static long forkJoinSum(long n){
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
질문: 그냥 fork fork join join 해도 되나요?
답: 밑에서 책에서 설명해줍니다.
질문: ForkJoinPool().invoke(task) 하지 말고 그냥 task.compute() 하면 안될까요?
되긴 하네요? 차이가 뭘지...
답: https://stackoverflow.com/questions/34132326/forkjoinpool-invoke-and-forkjointask-invoke-or-compute
풀에 등록하고 쓰면 이미 만들어져 있는 풀을 재사용할 수 있지만, compute로 하면 프레임워크가 알아서 새로운 풀을 만듬. pool.invoke(task) 가 정석.
일반적으로 ForkJoinPool은 애플리케이션에서 단 한 번만 인스턴스화 해서 정적 필드에 싱글턴으로 저장한다.
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Iterative Sum done in: 7 msecs
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Sequential Sum done in: 205 msecs
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Parallel forkJoinSum done in: 405 msecs
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Range forkJoinSum done in: 6 msecs
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Parallel range forkJoinSum done in: 1 msecs
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
ForkJoin sum done in: 20 msecs
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
ForkJoin sum done in: 22 msecs
Result: 1
Result: 1
Result: 1
Result: 1
Result: 1
Result: 1
Result: 1
Result: 1
Result: 1
Result: 1
ForkJoin sum done in: 16 msecs
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
SideEffect sum done in: 56 msecs
Result: 10305945166823
Result: 6607953176395
Result: 7791133171914
Result: 8657678393018
Result: 13548949593583
Result: 8075795039826
Result: 13872708668531
Result: 16710882018292
Result: 11759395261769
Result: 13221808214408
SideEffect parallel sum done in: 93 msecs
Process finished with exit code 0
ForkJoin sum의 성능은 책에서는 41, 여기서는 20msec 인데, 이는 stream을 long[]으로 변환했기 때문에 발생하는 오버헤드일 뿐이지, ForkJoin의 성능이 느린 것은 아니다.
실제로 ForkJoin을 사용하지 않고 무조건 1을 반환하도록 코드를 수정한 결과, 16ms의 결과가 나온다. ForkJoin sum은 4msec + alpha 정도만 사용하는 것이다.
RecursiveTask 내에서는 ForkJoinPool의 invoke를 사용하면 안된다. 대신 compute나 fork를 호출할 수 있다. 계산을 시작할 때만 invoke를 사용한다.
왼쪽 작업과 오른쪽 작업에 모두 fork를 호출하는 것이 자연스러울 것 같지만, 한쪽 작업에는 compute를 하는 것이 더 효율적이다. (질문에 대한 답변!)
같은 스레드를 재사용할 수 있다.
책 내용 x 질문: 이때, fork를 먼저 부르고 사용해야 하는가?
fork라 불리는 다른 스레드에서 compute를 호출하므로 스택 트레이스가 의미 없다.
- 태스크를 여러 독립 서브태스크로 분할할 수 있어야 한다.
- 서브태스크의 실행시간이 새로운 태스크를 포킹하는 시간보다 길어야 한다.
- 예를 들면 I/O와 계산을 병렬로 실행
- 준비 과정을 거쳐야 할 수 있다. 따라서 여러 번 프로그램을 실행해 봐야 한다.
- branch prediction
- 컴파일러 최적화는 순차 버전에 집중될 수도 있다.
문제: 서브 태스크를 어디까지 분할할 것인지, 결정은 어떻게 내리는가???
다음 절에서 계속
Work Stealing
ForkJoinCalculator에서 숫자가 만 개 이하면 서브태스크 분할을 중단했다.
1000만개의 숫자라면 1000+개의 서브태스크를 포크할 것이다.
하지만 코어는 4개밖에 안되는데? 천 개로 나눠봤자...?
하지만 적절한 크기로 많은 태스크를 포킹하는 것이 더 낫다.
질문: 코어가 4개인 CPU에서 쓰레드를 4개 이상 나누는 것이 의미가 있는가? 1000개씩 쓰레드를 만드는 게 어떤 의미가 있을까?
답: 잘게 자르는 것이 의미가 있다. 현실에서는 각각의 태스크가 다른 시간에 종료될 수 있다. 노는 코어는 큐의 헤드에서 다른 작업을 가져와서 처리한다. 모든 작업을 끝낸 스레드는 다른 스레드의 작업 큐의 꼬리에서 작업을 훔쳐와서 계속 일한다. 때문에 각각의 스레드 간의 작업 부하를 비슷한 수준으로 유지할 수 있다.
이번 절에서는 숫자 배열을 여러 태스크로 분할하는 로직을 직접 개발했다.
다음 절에서는 자동으로 분할해주는 Spliterator를 사용해 더 편하게 병렬 작업을 수행한다.
자바 8은 Spliterator(Splitable Iterator, 분할할 수 있는 반복자)
라는 새로운 인터페이스를 제공한다.
Iterator처럼 소스의 요소 탐색 기능을 제공하지만, 병렬 작업에 특화되어 있다.
자바 8은 컬렉션 프레임워크에 포함된 모든 자료구조에 사용할 수 있는 디폴트 Spliterator 구현을 제공한다.
컬렉션은 spliterator
라는 메서드를 제공하는 Spliterator 인터페이스를 구현한다.
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action); // Spliterator의 요소를 하나씩 순차적으로 소비하면서 탐색해야 할 요소가 남아있으면 참을 반환
Spliterator<T> trySplit(); // Spliterator의 일부 요소(자신이 반환한 요소)를 분할해서 두 번째 Spliterator를 생성
long estimateSize(); // 탐색해야 할 요소 수 정보 제공
int characteristics(); // Spliterato 자체의 특성 집합을 포함하는 int를 반환
}
스트림을 여러 스트림으로 분할하는 과정은 재귀적으로 일어난다.
- 첫 번째 Spliterator에 trySplit을 호출 시 두 번째 Spliterator 생성
- 두 개의 Spliterator에 trySplit를 다시 호출 시 네 개의 Spliterator 생성(trySplit의 결과가 null이 될 때까지 이 과정 반복)
- trySplit이 null 반환 시 더 이상 자료구조 분할할 수 없음
- Spliterator에 호출한 모든 trySplit의 결과가 null이면 재귀 분할 과정 종료
이 분할 과정은 characteristics
메서드로 정의하는 Spliterator의 특성에 영향을 받는다.
문자열의 단어 수를 계산하는 메서드를 구현해보자.
| 반복형으로 단어 수를 세는 메소드
public int countWordsIteratively(String s) {
int counter = 0;
boolean lastSpace = true;
for (char c : s.toCharArray()) {
if (Character.isWhitespace(c)) {
lastSpace = true;
} else {
if (lastSpace) {
counter++;
} else {
lastSpace = false;
}
}
}
return counter;
}
단어 사이에 공백이 여러 개일 때도 반복 구현이 제대로 작동된다.
class WordCounter {
private final int counter;
private final boolean lastSpace;
public WordCounter(int counter, boolean lastSpace) {
this.counter = counter;
this.lastSpace = lastSpace;
}
/**
* 반복 알고리즘처럼 accumulate 메소드는 문자열의 문자를 하나씩 탐색한다.
*
* @param c
* @return
*/
public WordCounter accumulate(Character c) {
if (Character.isWhitespace(c)) {
return lastSpace ? this : new WordCounter(counter, true);
} else {
return lastSpace ? new WordCounter(counter + 1, false) : this; // 문자를 하나씩 탐색하다 공백 문자 만나면 지금까지 탐색한 문자를 단어로 간주하여(공백 문자 제외) 단어 수를 증가시킨다.
}
}
/**
* 두 WordCounter의 counter 값을 더하고, counter 값만 더할 것이므로 마지막 공백은 신경 쓰지 않고 새로운 WordCounter를 반환한다.
*
* @param wordCounter
* @return
*/
public WordCounter combine(WordCounter wordCounter) {
return new WordCounter(counter + wordCounter.getCounter(), wordCounter.lastSpace);
}
public int getCounter() {
return counter;
}
}
새로운 문자 c를 탐색했을 때 WordCounter의 상태 변화
단어 수를 계산하는 연산을 병렬 스트림으로 처리하면 원하는 결과가 나오지 않는다.
원래 문자열을 임의의 위치에서 둘로 나누다보니 하나의 단어를 둘로 계산하는 상황이 발생할 수 있기 때문이다.
즉, 순차 스트림을 병렬 스트림으로 바꿀 때 스트림 분할 위치에 따라 잘못된 결과가 나올 수 있다.
public class WordCounterSpliterator implements Spliterator<Character> {
private final String string;
private int currentChar = 0;
public WordCounterSpliterator(String string) {
this.string = string;
}
/**
* 문자열에서 현재 인덱스에 해당하는 문자를 Consumer에 제공한 다음, 인데스를 증가시킨다.
*
* @param action 소비한 문자를 전달
* @return 소비할 문자가 남아있으면 true를 반환 (반복해야 할 문자가 남아있음을 의미)
*/
@Override
public boolean tryAdvance(Consumer<? super Character> action) {
action.accept(string.charAt(currentChar++)); // 현재 문자를 소비
return currentChar < string.length();
}
/**
* 반복될 자료구조를 분할하는 로직을 포함한다.
* 분할이 필요한 상황에서는 파싱해야 할 문자열 청크의 중간 위치를 기준으로 분할하도록 지시한다.
*
* @return 남은 문자 수가 한계값 이하면 null 반환 -> 분할을 중지하도록 지시
*/
@Override
public Spliterator<Character> trySplit() {
int currentSize = string.length() - currentChar;
if (currentSize < 10) {
return null; // 파싱할 문자열이 순차 처리할 수 있을 만큼 충분히 작아졌음을 알림
}
// 1. 파싱할 문자열의 중간을 분할 위치로 설정
for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) {
// 2. 다음 공백이 나올 때까지 분할 위치를 뒤로 이동시킴
if (Character.isWhitespace(string.charAt(splitPos))) {
// 3. 처음부터 분할위치까지 문자열을 파싱할 새로운 WordCounterSpliterator를 생성
Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos));
// 4. 이 WordCounterSpliterator의 시작 위치를 분할 위치로 설정
currentChar = splitPos;
// 5. 공백을 찾았고 문자열을 분리했으므로 루프를 종료
return spliterator;
}
}
return null;
}
/**
* @return 탐색해야 할 요소의 개수
*/
@Override
public long estimateSize() {
return string.length() - currentChar;
}
/**
* @return 특성들
*/
@Override
public int characteristics() {
return ORDERED // 문자열의 문자 등장 순서가 유의미함
+ SIZED // estimatedSize의 메서드의 반환값이 정확함
+ SUBSIZED // trySplit으로 생성된 Spliterator도 정확한 크기를 가짐
+ NONNULL // 문자열에는 null 문자가 존재하지 않음
+ IMMUTABLE // 문자열 자체가 불변 클래스이므로 문자열을 파싱하면서 속성이 추가되지 않음
;
}
}
문자열을 단어가 끝나는 위치에서만 분할하는 방법으로 위의 문제를 해결할 수 있다.
Spliterator는 첫 번째 탐색 시점, 첫 번째 분할 시점, 또는 첫 번째 예상 크기(estimatedSize) 요청 시점에 요소의 소스를 바인딩할 수 있다.
이와 같은 동작을 늦은 바인딩 Spliterator
라고 부른다.