• 티스토리 홈
  • 프로필사진
    알쓸개잡
  • 방명록
  • 공지사항
  • 태그
  • 블로그 관리
  • 글 작성
알쓸개잡
  • 프로필사진
    알쓸개잡
    • 분류 전체보기 (92)
      • 스프링부트 (52)
      • AWS (5)
      • 쿠버네티스 (7)
      • 자바 (19)
      • 인프라 (0)
      • ETC (8)
  • 방문자 수
    • 전체:
    • 오늘:
    • 어제:
  • 최근 댓글
      등록된 댓글이 없습니다.
    • 최근 공지
        등록된 공지가 없습니다.
      • 반응형
      # Home
      # 공지사항
      #
      # 태그
      # 검색결과
      # 방명록
      • CompletableFuture 를 알아보자
        2023년 08월 06일
        • 알쓸개잡
        • 작성자
        • 2023.08.06.:20

        자바에서 스레드 처리시에 Future 객체를 통해서 스레드의 실행 결과를 얻어올 수 있다. 하지만 Future 를 사용하는 경우 단일 스레드 처리가 완료될 때까지 blocking 이 일어나게 된다.

         

        Future 의 한계점

        Future 는 기본적으로 isDone, isCanceled 와 같은 기본사항을 체크할 수 있는 메소드를 제공하지만 각기 다른 실행시간을 가지는 Future 들을 조합해서 계산을 한다든지 다른 질의의 결과와 같이 계산을 한다든지 하는 복잡한 로직을 다루기가 힘들다.

         

        CompletableFuture 는 Future 와 CompletionStage 를 상속하는데 CompletionStage 인터페이스를 통해서 다양한 연쇄 작업을 수행할 수 있도록 한다.

         

        CompletionStage

        CompletionStage 에서 제공하는 대표적인 메소드는 아래와 같다.

         

        xxxx vs xxxxAsync

        각 메소드명에 Async 가 붙은것과 붙지 않은것의 차이는 호출되는 CompletableFuture 에 의해 정의된 스레드에서 실행하느냐 CompletableFuture 에 지정된 thread pool executor 에서 실행하느냐의 차이라고 볼 수 있다.

        thenApply(fn) 의 경우 CompletableFuture 에 의해 정의된 스레드에서 fn 을 실행하므로 어느 스레드에서 실행되는지 알 수 없지만 thenApplyAsync(fn) 의 경우 일반적으로 ForkJoinPool.commonPool() 내의 스레드에서 fn 이 실행된다.

        https://stackoverflow.com/questions/47489338/what-is-the-difference-between-thenapply-and-thenapplyasync-of-java-completablef 이 링크를 참고하면 도움이 될 것 같다.

         

        thenApply

        CompletableFuture 가 동작을 종료한 다음에 결과를 이어받아 정의된 Function (functional interface) 를 실행한다. 이전 비지니스 로직 수행결과를 기반으로 새로운 비지니스 로직의 수행결과를 얻어야 하는 경우에 사용한다.

         

        thenAccept

        수행된 연산의 결과를 기반으로 Consumer (functional interface) 를 실행한다. 이전 비지니스 로직 수행결과를 기반으로 새로운 비지니스 로직을 수행해야 하는 경우에 사용한다.

         

        thenRun

        수행된 연산이 종료된 이후에 Runnable (functional interface) 를 실행한다. 이전 비지니스 로직 수행결과를 받을 수 없다.

         

        thenCompose

        thenApply 와 동일하게 Function (functional interface) 를 파라미터로 전달 받지만 Function 의 리턴 타입이 U 타입이 아닌 CompletionStage<U> 타입이라는 점이 다르다. 즉 Function 에서 정의하는 리턴 타입이 CompletableFuture 타입으로 리턴되어야 한다. stream 의 flatmap 과 비슷하다고 생각하면 좋다.

         

        thenCombine

        서로 독립적인 두개의 CompletableFuture 를 결합하여 하나의 결과를 도출하는 경우에 사용한다.

         

        exceptionally

        처리도중 발생한 예외 처리에 대한 핸들링을 할 수 있다. Function<Throwable, ? extends T> 로 정의된 functional interface 를 정의하여 사용한다. 

         

        join

        CompletableFuture 의 동작이 완료될 때까지 대기 후 완료되면 결과값을 리턴한다.

         

        supplyAsnyc

        Supplier (functional interface) 를 파라미터로 전달 받아 Supplier 를 실행하고 Supplier 가 리턴하는 타입을 generic 타입으로 하는 CompletableFuture 를 리턴한다. 정의는 아래와 같다.

         

        샘플 코드

        먼저 CompletableFuture 를 사용하여 병렬로 처리하는 경우와 순차적으로 처리하는 경우에 대한 샘플 코드이다.

        package com.example.completablefuture;
        
        import lombok.extern.slf4j.Slf4j;
        
        import java.util.List;
        import java.util.concurrent.CompletableFuture;
        
        @Slf4j
        public class CompletableFutureExample {
        
        	public Integer plus(Integer operand1, Integer operand2) {
        		try {
        			Thread.sleep(1000);
        		} catch (InterruptedException e) {
        			throw new RuntimeException(e);
        		}
        		return operand1 + operand2;
        	}
        
        	public Integer multiple(Integer operand1, Integer operand2) {
        		try {
        			Thread.sleep(1000);
        		} catch (InterruptedException e) {
        			throw new RuntimeException(e);
        		}
        		return operand1 * operand2;
        	}
        
        	public List<Integer> taskParallel(List<List<Integer>> list) {
        		List<CompletableFuture<Integer>> completableFutureList =
        			list.stream().map(subList ->
        				CompletableFuture
        					.supplyAsync(() -> this.plus(subList.get(0), subList.get(1)))
        					.thenApply(plusValue -> this.multiple(plusValue, subList.get(2)))
        			).toList();
        
        		return completableFutureList.stream().map(CompletableFuture::join).toList();
        	}
        
        	public List<Integer> taskSequential(List<List<Integer>> list) {
        
        		return list.stream().map(subList -> {
        				Integer plus = this.plus(subList.get(0), subList.get(1));
        				return this.multiple(plus, subList.get(2));
        			}
        		).toList();
        	}
        }
        package com.example.completablefuture;
        
        import lombok.extern.slf4j.Slf4j;
        import org.springframework.boot.CommandLineRunner;
        import org.springframework.boot.SpringApplication;
        import org.springframework.boot.autoconfigure.SpringBootApplication;
        
        import java.util.Arrays;
        import java.util.List;
        
        @SpringBootApplication
        @Slf4j
        public class CompletableFutureApplication implements CommandLineRunner {
        
        	public static void main(String[] args) {
        		SpringApplication.run(CompletableFutureApplication.class, args);
        	}
        
        	@Override
        	public void run(String... args) {
        		List<List<Integer>> list = Arrays.asList(
        			Arrays.asList(1, 2, 3),
        			Arrays.asList(4, 5, 6),
        			Arrays.asList(7, 8, 9)
        		);
        
        		CompletableFutureExample completableFutureExample = new CompletableFutureExample();
        		log.info("================== parallel start ===================");
        		List<Integer> parallel = completableFutureExample.taskParallel(list);
        		log.info("result: {}", parallel);
        		log.info("================== parallel end ===================");
        
        		log.info("================== sequential start ===================");
        		List<Integer> sequential = completableFutureExample.taskSequential(list);
        		log.info("result: {}", sequential);
        		log.info("================== sequential end ===================");
        	}
        }

        간단히 설명하자면 3개의 원소를 갖는 리스트의 리스트가 있고 각 리스트에 있는 3개의 원소의 (첫번째 + 두번째) * 세번째 연산을 한 결과를 얻는 코드이다. 차이를 확인하기 위해서 plus, multiple 메소드에는 각 1초간의 sleep 을 주었다.

        CompletableFuture 를 통해서 실행한 결과는 2초가 소요되었지만 순차적으로 실행한 결과는 6초가 소용됨을 알 수 있다. 위 예시에서는 thenApply 를 통해서 plus -> multiple 로 로직이 흘러간다.

         

         

        thenAccept

        public void taskParallelThenAccept() {
            List<List<Integer>> list = Arrays.asList(
                Arrays.asList(1, 2, 3),
                Arrays.asList(4, 5, 6),
                Arrays.asList(7, 8, 9)
            );
        
            List<CompletableFuture<Void>> completableFutureList =
                list.stream().map(subList ->
                    CompletableFuture
                        .supplyAsync(() -> subList.get(0) + subList.get(1))
                        .thenApplyAsync(plusValue -> plusValue * subList.get(2))
                        .thenAcceptAsync(result -> log.info("result: {}", result))
                ).toList();
        
            completableFutureList.forEach(CompletableFuture::join);
        }

        thenAccept 는 Consumer 를 수행하기 때문에 리턴 타입은 Void 가 된다.

        실행결과는 아래와 같다.

         

        thenCompose

        public void taskParallelThenCompose() {
            List<List<Integer>> list = Arrays.asList(
                Arrays.asList(1, 2, 3),
                Arrays.asList(4, 5, 6),
                Arrays.asList(7, 8, 9)
            );
        
            List<CompletableFuture<Void>> completableFutureList =
                list.stream().map(subList ->
                    CompletableFuture
                        .supplyAsync(() -> subList.get(0) + subList.get(1))
                        .thenCompose(plusValue -> CompletableFuture.supplyAsync(() -> plusValue * subList.get(2)))
                        .thenAcceptAsync(result -> log.info("result: {}", result))
                ).toList();
        
            completableFutureList.forEach(CompletableFuture::join);
        }

         

        thenCombine

        public void taskParallelThenCombine() {
            List<List<Integer>> list = Arrays.asList(
                Arrays.asList(1, 2, 3),
                Arrays.asList(4, 5, 6),
                Arrays.asList(7, 8, 9)
            );
        
            //각각의 리스트의 (첫번째요소 + 두번째요소) * 세번째요소 의 연산을 병렬적으로 수행하고
            //각 세개의 연산 결과에 대해서 합산하는 연산을 수행한다.
            Optional<CompletableFuture<Integer>> optionalCombined = list.stream().map(subList ->
                CompletableFuture
                    .supplyAsync(() -> subList.get(0) + subList.get(1))
                    .thenApply(plusValue -> plusValue * subList.get(2))
            ).reduce((future1, future2) -> future1.thenCombine(future2, Integer::sum));
        
            CompletableFuture<Integer> totalCompletableFuture = optionalCombined.get();
            Integer totalSum = totalCompletableFuture.join();
            log.info("all operation result sum: {}", totalSum);
        }

         

        도움이 되는 사이트

        https://www.baeldung.com/java-completablefuture

        반응형
        저작자표시 비영리 변경금지 (새창열림)

        '자바' 카테고리의 다른 글

        jdk pattern matching for instanceof  (0) 2023.08.26
        java switch expression - from jdk 14  (0) 2023.08.20
        java Array vs ArrayList  (0) 2023.08.19
        java record 용법 - from jdk 14  (0) 2023.08.19
        byte 배열에서 charset 정보 detecting 하기  (0) 2023.08.04
        다음글
        다음 글이 없습니다.
        이전글
        이전 글이 없습니다.
        댓글
      조회된 결과가 없습니다.
      스킨 업데이트 안내
      현재 이용하고 계신 스킨의 버전보다 더 높은 최신 버전이 감지 되었습니다. 최신버전 스킨 파일을 다운로드 받을 수 있는 페이지로 이동하시겠습니까?
      ("아니오" 를 선택할 시 30일 동안 최신 버전이 감지되어도 모달 창이 표시되지 않습니다.)
      목차
      표시할 목차가 없습니다.
        • 안녕하세요
        • 감사해요
        • 잘있어요

        티스토리툴바