자바

CompletableFuture 를 알아보자

알쓸개잡 2023. 8. 6.

자바에서 스레드 처리시에 Future 객체를 통해서 스레드의 실행 결과를 얻어올 수 있다. 하지만 Future 를 사용하는 경우 단일 스레드 처리가 완료될 때까지 blocking 이 일어나게 된다.

 

Future 의 한계점

Future 는 기본적으로 isDone, isCanceled 와 같은 기본사항을 체크할 수 있는 메소드를 제공하지만 각기 다른 실행시간을 가지는 Future 들을 조합해서 계산을 한다든지 다른 질의의 결과와 같이 계산을 한다든지 하는 복잡한 로직을 다루기가 힘들다.

 

CompletableFuture 는 Future 와 CompletionStage 를 상속하는데 CompletionStage 인터페이스를 통해서 다양한 연쇄 작업을 수행할 수 있도록 한다.

 

CompletionStage

CompletionStage 에서 제공하는 대표적인 메소드는 아래와 같다.

 

xxxx vs xxxxAsync

각 메소드명에 Async 가 붙은것과 붙지 않은것의 차이는 호출되는 CompletableFuture 에 의해 정의된 스레드에서 실행하느냐 CompletableFuture 에 지정된 thread pool executor 에서 실행하느냐의 차이라고 볼 수 있다.

thenApply(fn) 의 경우 CompletableFuture 에 의해 정의된 스레드에서 fn 을 실행하므로 어느 스레드에서 실행되는지 알 수 없지만 thenApplyAsync(fn) 의 경우 일반적으로 ForkJoinPool.commonPool() 내의 스레드에서 fn 이 실행된다.

https://stackoverflow.com/questions/47489338/what-is-the-difference-between-thenapply-and-thenapplyasync-of-java-completablef 이 링크를 참고하면 도움이 될 것 같다.

 

thenApply

CompletableFuture 가 동작을 종료한 다음에 결과를 이어받아 정의된 Function (functional interface) 를 실행한다. 이전 비지니스 로직 수행결과를 기반으로 새로운 비지니스 로직의 수행결과를 얻어야 하는 경우에 사용한다.

 

thenAccept

수행된 연산의 결과를 기반으로 Consumer (functional interface) 를 실행한다. 이전 비지니스 로직 수행결과를 기반으로 새로운 비지니스 로직을 수행해야 하는 경우에 사용한다.

 

thenRun

수행된 연산이 종료된 이후에 Runnable (functional interface) 를 실행한다. 이전 비지니스 로직 수행결과를 받을 수 없다.

 

thenCompose

thenApply 와 동일하게 Function (functional interface) 를 파라미터로 전달 받지만 Function 의 리턴 타입이 U 타입이 아닌 CompletionStage<U> 타입이라는 점이 다르다. 즉 Function 에서 정의하는 리턴 타입이 CompletableFuture 타입으로 리턴되어야 한다. stream 의 flatmap 과 비슷하다고 생각하면 좋다.

 

thenCombine

서로 독립적인 두개의 CompletableFuture 를 결합하여 하나의 결과를 도출하는 경우에 사용한다.

 

exceptionally

처리도중 발생한 예외 처리에 대한 핸들링을 할 수 있다. Function<Throwable, ? extends T> 로 정의된 functional interface 를 정의하여 사용한다. 

 

join

CompletableFuture 의 동작이 완료될 때까지 대기 후 완료되면 결과값을 리턴한다.

 

supplyAsnyc

Supplier (functional interface) 를 파라미터로 전달 받아 Supplier 를 실행하고 Supplier 가 리턴하는 타입을 generic 타입으로 하는 CompletableFuture 를 리턴한다. 정의는 아래와 같다.

 

샘플 코드

먼저 CompletableFuture 를 사용하여 병렬로 처리하는 경우와 순차적으로 처리하는 경우에 대한 샘플 코드이다.

package com.example.completablefuture;

import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.concurrent.CompletableFuture;

@Slf4j
public class CompletableFutureExample {

	public Integer plus(Integer operand1, Integer operand2) {
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			throw new RuntimeException(e);
		}
		return operand1 + operand2;
	}

	public Integer multiple(Integer operand1, Integer operand2) {
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			throw new RuntimeException(e);
		}
		return operand1 * operand2;
	}

	public List<Integer> taskParallel(List<List<Integer>> list) {
		List<CompletableFuture<Integer>> completableFutureList =
			list.stream().map(subList ->
				CompletableFuture
					.supplyAsync(() -> this.plus(subList.get(0), subList.get(1)))
					.thenApply(plusValue -> this.multiple(plusValue, subList.get(2)))
			).toList();

		return completableFutureList.stream().map(CompletableFuture::join).toList();
	}

	public List<Integer> taskSequential(List<List<Integer>> list) {

		return list.stream().map(subList -> {
				Integer plus = this.plus(subList.get(0), subList.get(1));
				return this.multiple(plus, subList.get(2));
			}
		).toList();
	}
}
package com.example.completablefuture;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.Arrays;
import java.util.List;

@SpringBootApplication
@Slf4j
public class CompletableFutureApplication implements CommandLineRunner {

	public static void main(String[] args) {
		SpringApplication.run(CompletableFutureApplication.class, args);
	}

	@Override
	public void run(String... args) {
		List<List<Integer>> list = Arrays.asList(
			Arrays.asList(1, 2, 3),
			Arrays.asList(4, 5, 6),
			Arrays.asList(7, 8, 9)
		);

		CompletableFutureExample completableFutureExample = new CompletableFutureExample();
		log.info("================== parallel start ===================");
		List<Integer> parallel = completableFutureExample.taskParallel(list);
		log.info("result: {}", parallel);
		log.info("================== parallel end ===================");

		log.info("================== sequential start ===================");
		List<Integer> sequential = completableFutureExample.taskSequential(list);
		log.info("result: {}", sequential);
		log.info("================== sequential end ===================");
	}
}

간단히 설명하자면 3개의 원소를 갖는 리스트의 리스트가 있고 각 리스트에 있는 3개의 원소의 (첫번째 + 두번째) * 세번째 연산을 한 결과를 얻는 코드이다. 차이를 확인하기 위해서 plus, multiple 메소드에는 각 1초간의 sleep 을 주었다.

CompletableFuture 를 통해서 실행한 결과는 2초가 소요되었지만 순차적으로 실행한 결과는 6초가 소용됨을 알 수 있다. 위 예시에서는 thenApply 를 통해서 plus -> multiple 로 로직이 흘러간다.

 

 

thenAccept

public void taskParallelThenAccept() {
    List<List<Integer>> list = Arrays.asList(
        Arrays.asList(1, 2, 3),
        Arrays.asList(4, 5, 6),
        Arrays.asList(7, 8, 9)
    );

    List<CompletableFuture<Void>> completableFutureList =
        list.stream().map(subList ->
            CompletableFuture
                .supplyAsync(() -> subList.get(0) + subList.get(1))
                .thenApplyAsync(plusValue -> plusValue * subList.get(2))
                .thenAcceptAsync(result -> log.info("result: {}", result))
        ).toList();

    completableFutureList.forEach(CompletableFuture::join);
}

thenAccept 는 Consumer 를 수행하기 때문에 리턴 타입은 Void 가 된다.

실행결과는 아래와 같다.

 

thenCompose

public void taskParallelThenCompose() {
    List<List<Integer>> list = Arrays.asList(
        Arrays.asList(1, 2, 3),
        Arrays.asList(4, 5, 6),
        Arrays.asList(7, 8, 9)
    );

    List<CompletableFuture<Void>> completableFutureList =
        list.stream().map(subList ->
            CompletableFuture
                .supplyAsync(() -> subList.get(0) + subList.get(1))
                .thenCompose(plusValue -> CompletableFuture.supplyAsync(() -> plusValue * subList.get(2)))
                .thenAcceptAsync(result -> log.info("result: {}", result))
        ).toList();

    completableFutureList.forEach(CompletableFuture::join);
}

 

thenCombine

public void taskParallelThenCombine() {
    List<List<Integer>> list = Arrays.asList(
        Arrays.asList(1, 2, 3),
        Arrays.asList(4, 5, 6),
        Arrays.asList(7, 8, 9)
    );

    //각각의 리스트의 (첫번째요소 + 두번째요소) * 세번째요소 의 연산을 병렬적으로 수행하고
    //각 세개의 연산 결과에 대해서 합산하는 연산을 수행한다.
    Optional<CompletableFuture<Integer>> optionalCombined = list.stream().map(subList ->
        CompletableFuture
            .supplyAsync(() -> subList.get(0) + subList.get(1))
            .thenApply(plusValue -> plusValue * subList.get(2))
    ).reduce((future1, future2) -> future1.thenCombine(future2, Integer::sum));

    CompletableFuture<Integer> totalCompletableFuture = optionalCombined.get();
    Integer totalSum = totalCompletableFuture.join();
    log.info("all operation result sum: {}", totalSum);
}

 

도움이 되는 사이트

https://www.baeldung.com/java-completablefuture

'자바' 카테고리의 다른 글

jdk pattern matching for instanceof  (0) 2023.08.26
java switch expression - from jdk 14  (0) 2023.08.20
java Array vs ArrayList  (0) 2023.08.19
java record 용법 - from jdk 14  (0) 2023.08.19
byte 배열에서 charset 정보 detecting 하기  (0) 2023.08.04

댓글

💲 추천 글