자바에서 스레드 처리시에 Future 객체를 통해서 스레드의 실행 결과를 얻어올 수 있다. 하지만 Future 를 사용하는 경우 단일 스레드 처리가 완료될 때까지 blocking 이 일어나게 된다.
Future 의 한계점
Future 는 기본적으로 isDone, isCanceled 와 같은 기본사항을 체크할 수 있는 메소드를 제공하지만 각기 다른 실행시간을 가지는 Future 들을 조합해서 계산을 한다든지 다른 질의의 결과와 같이 계산을 한다든지 하는 복잡한 로직을 다루기가 힘들다.
CompletableFuture 는 Future 와 CompletionStage 를 상속하는데 CompletionStage 인터페이스를 통해서 다양한 연쇄 작업을 수행할 수 있도록 한다.
CompletionStage
CompletionStage 에서 제공하는 대표적인 메소드는 아래와 같다.
xxxx vs xxxxAsync
각 메소드명에 Async 가 붙은것과 붙지 않은것의 차이는 호출되는 CompletableFuture 에 의해 정의된 스레드에서 실행하느냐 CompletableFuture 에 지정된 thread pool executor 에서 실행하느냐의 차이라고 볼 수 있다.
thenApply(fn) 의 경우 CompletableFuture 에 의해 정의된 스레드에서 fn 을 실행하므로 어느 스레드에서 실행되는지 알 수 없지만 thenApplyAsync(fn) 의 경우 일반적으로 ForkJoinPool.commonPool() 내의 스레드에서 fn 이 실행된다.
https://stackoverflow.com/questions/47489338/what-is-the-difference-between-thenapply-and-thenapplyasync-of-java-completablef 이 링크를 참고하면 도움이 될 것 같다.
thenApply
CompletableFuture 가 동작을 종료한 다음에 결과를 이어받아 정의된 Function (functional interface) 를 실행한다. 이전 비지니스 로직 수행결과를 기반으로 새로운 비지니스 로직의 수행결과를 얻어야 하는 경우에 사용한다.
thenAccept
수행된 연산의 결과를 기반으로 Consumer (functional interface) 를 실행한다. 이전 비지니스 로직 수행결과를 기반으로 새로운 비지니스 로직을 수행해야 하는 경우에 사용한다.
thenRun
수행된 연산이 종료된 이후에 Runnable (functional interface) 를 실행한다. 이전 비지니스 로직 수행결과를 받을 수 없다.
thenCompose
thenApply 와 동일하게 Function (functional interface) 를 파라미터로 전달 받지만 Function 의 리턴 타입이 U 타입이 아닌 CompletionStage<U> 타입이라는 점이 다르다. 즉 Function 에서 정의하는 리턴 타입이 CompletableFuture 타입으로 리턴되어야 한다. stream 의 flatmap 과 비슷하다고 생각하면 좋다.
thenCombine
서로 독립적인 두개의 CompletableFuture 를 결합하여 하나의 결과를 도출하는 경우에 사용한다.
exceptionally
처리도중 발생한 예외 처리에 대한 핸들링을 할 수 있다. Function<Throwable, ? extends T> 로 정의된 functional interface 를 정의하여 사용한다.
join
CompletableFuture 의 동작이 완료될 때까지 대기 후 완료되면 결과값을 리턴한다.
supplyAsnyc
Supplier (functional interface) 를 파라미터로 전달 받아 Supplier 를 실행하고 Supplier 가 리턴하는 타입을 generic 타입으로 하는 CompletableFuture 를 리턴한다. 정의는 아래와 같다.
샘플 코드
먼저 CompletableFuture 를 사용하여 병렬로 처리하는 경우와 순차적으로 처리하는 경우에 대한 샘플 코드이다.
package com.example.completablefuture;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@Slf4j
public class CompletableFutureExample {
public Integer plus(Integer operand1, Integer operand2) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return operand1 + operand2;
}
public Integer multiple(Integer operand1, Integer operand2) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return operand1 * operand2;
}
public List<Integer> taskParallel(List<List<Integer>> list) {
List<CompletableFuture<Integer>> completableFutureList =
list.stream().map(subList ->
CompletableFuture
.supplyAsync(() -> this.plus(subList.get(0), subList.get(1)))
.thenApply(plusValue -> this.multiple(plusValue, subList.get(2)))
).toList();
return completableFutureList.stream().map(CompletableFuture::join).toList();
}
public List<Integer> taskSequential(List<List<Integer>> list) {
return list.stream().map(subList -> {
Integer plus = this.plus(subList.get(0), subList.get(1));
return this.multiple(plus, subList.get(2));
}
).toList();
}
}
package com.example.completablefuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.Arrays;
import java.util.List;
@SpringBootApplication
@Slf4j
public class CompletableFutureApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(CompletableFutureApplication.class, args);
}
@Override
public void run(String... args) {
List<List<Integer>> list = Arrays.asList(
Arrays.asList(1, 2, 3),
Arrays.asList(4, 5, 6),
Arrays.asList(7, 8, 9)
);
CompletableFutureExample completableFutureExample = new CompletableFutureExample();
log.info("================== parallel start ===================");
List<Integer> parallel = completableFutureExample.taskParallel(list);
log.info("result: {}", parallel);
log.info("================== parallel end ===================");
log.info("================== sequential start ===================");
List<Integer> sequential = completableFutureExample.taskSequential(list);
log.info("result: {}", sequential);
log.info("================== sequential end ===================");
}
}
간단히 설명하자면 3개의 원소를 갖는 리스트의 리스트가 있고 각 리스트에 있는 3개의 원소의 (첫번째 + 두번째) * 세번째 연산을 한 결과를 얻는 코드이다. 차이를 확인하기 위해서 plus, multiple 메소드에는 각 1초간의 sleep 을 주었다.
CompletableFuture 를 통해서 실행한 결과는 2초가 소요되었지만 순차적으로 실행한 결과는 6초가 소용됨을 알 수 있다. 위 예시에서는 thenApply 를 통해서 plus -> multiple 로 로직이 흘러간다.
thenAccept
public void taskParallelThenAccept() {
List<List<Integer>> list = Arrays.asList(
Arrays.asList(1, 2, 3),
Arrays.asList(4, 5, 6),
Arrays.asList(7, 8, 9)
);
List<CompletableFuture<Void>> completableFutureList =
list.stream().map(subList ->
CompletableFuture
.supplyAsync(() -> subList.get(0) + subList.get(1))
.thenApplyAsync(plusValue -> plusValue * subList.get(2))
.thenAcceptAsync(result -> log.info("result: {}", result))
).toList();
completableFutureList.forEach(CompletableFuture::join);
}
thenAccept 는 Consumer 를 수행하기 때문에 리턴 타입은 Void 가 된다.
실행결과는 아래와 같다.
thenCompose
public void taskParallelThenCompose() {
List<List<Integer>> list = Arrays.asList(
Arrays.asList(1, 2, 3),
Arrays.asList(4, 5, 6),
Arrays.asList(7, 8, 9)
);
List<CompletableFuture<Void>> completableFutureList =
list.stream().map(subList ->
CompletableFuture
.supplyAsync(() -> subList.get(0) + subList.get(1))
.thenCompose(plusValue -> CompletableFuture.supplyAsync(() -> plusValue * subList.get(2)))
.thenAcceptAsync(result -> log.info("result: {}", result))
).toList();
completableFutureList.forEach(CompletableFuture::join);
}
thenCombine
public void taskParallelThenCombine() {
List<List<Integer>> list = Arrays.asList(
Arrays.asList(1, 2, 3),
Arrays.asList(4, 5, 6),
Arrays.asList(7, 8, 9)
);
//각각의 리스트의 (첫번째요소 + 두번째요소) * 세번째요소 의 연산을 병렬적으로 수행하고
//각 세개의 연산 결과에 대해서 합산하는 연산을 수행한다.
Optional<CompletableFuture<Integer>> optionalCombined = list.stream().map(subList ->
CompletableFuture
.supplyAsync(() -> subList.get(0) + subList.get(1))
.thenApply(plusValue -> plusValue * subList.get(2))
).reduce((future1, future2) -> future1.thenCombine(future2, Integer::sum));
CompletableFuture<Integer> totalCompletableFuture = optionalCombined.get();
Integer totalSum = totalCompletableFuture.join();
log.info("all operation result sum: {}", totalSum);
}
도움이 되는 사이트
'자바' 카테고리의 다른 글
jdk pattern matching for instanceof (0) | 2023.08.26 |
---|---|
java switch expression - from jdk 14 (0) | 2023.08.20 |
java Array vs ArrayList (0) | 2023.08.19 |
java record 용법 - from jdk 14 (0) | 2023.08.19 |
byte 배열에서 charset 정보 detecting 하기 (0) | 2023.08.04 |
댓글