티스토리 뷰

언어/Java

CompletableFuture 가이드

snvlqkq 2026. 1. 1. 15:26

1. CompletableFuture란 무엇인가?

CompletableFuture는 비동기 작업을 처리하던 Future의 한계를 극복하고자 Java 8부터 제공된 클래스입니다.

주요 특징

  • 비동기 작업의 결과를 담는 컨테이너
  • 작업의 완료 시점을 제어 가능
  • 후속 작업을 체이닝으로 연결 가능
  • 예외 처리를 비동기적으로 수행 가능

2. Future와의 차이점

Future의 한계

1. 결과를 얻으려면 무조건 블로킹 대기

ExecutorService executor = Executors.newSingleThreadExecutor();

// ❌ 문제 1: 결과를 얻으려면 무조건 블로킹 대기
Future<String> future = executor.submit(() -> {
    Thread.sleep(1000);
    return "USER";
});
String result = future.get();   // 블로킹 - 1초 동안 대기
System.out.println(result);

2. 여러 Future를 조합할 수 없음

Future<User> userFuture = getUserAsync();
Future<Order> orderFuture = getOrderAsync();
// 두 결과를 합치려면? -> 각각 get()으로 기다려야 함
User user = userFuture.get();
Order order = orderFuture.get();

3. 콜백을 등록할 수 없음

Future<String> future = executor.submit(() -> "결과");
// 작업이 완료되면 자동으로 실행되는 콜백을 등록할 방법이 없음

4. 예외 처리가 어려움

Future<String> future = executor.submit(() -> {
    throw new RuntimeException("에러 발생");
});
try {
    future.get(); // 여기서만 예외를 잡을 수 있음
} catch (ExecutionException e) {
    // 예외 처리
}

CompletableFuture의 개선점

1. 비동기 후속 처리 가능

CompletableFuture<String> future = 
    CompletableFuture.supplyAsync(() -> {
        sleep(1000);
        return "USER";
    })
    .thenApply(user -> user + "_DETAIL")
    .thenAccept(System.out::println);

System.out.println("비동기"); // 즉시 출력

2. 여러 Future 조합 가능

CompletableFuture<User> userFuture = getUserAsync();
CompletableFuture<Order> orderFuture = getOrderAsync();

userFuture.thenCombine(orderFuture, (user, order) -> 
    new UserOrderDetail(user, order)
);

3. 예외 처리 간편

CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("에러");
})
.exceptionally(ex -> {
    System.out.println("에러 처리: " + ex.getMessage());
    return "기본값";
})
.thenAccept(System.out::println);

3. 왜 "Completable" Future인가?

완료시킬 수 있는 Future라고 부르는 이유는 비동기 작업을 수행하는 스레드 외부에서 완료 시점을 제어할 수 있기 때문이다.

Future (제어 불가)

ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(() -> {
    // 이 안 = "내부" (작업 스레드)
    Thread.sleep(5000);
    return "결과";  // 여기서만 완료 가능
});

// 이 밖 = "외부" (호출한 스레드)
// ❌ 외부에서 완료시킬 방법이 없음!

CompletableFuture (제어 가능)

// CompletableFuture의 경우
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 이 안 = "내부" (작업 스레드)
    Thread.sleep(5000);
    return "결과";
});

// 이 밖 = "외부" (호출한 스레드, 다른 스레드 등)
// ✅ 외부에서 완료시킬 수 있음!
future.complete("외부에서 완료!");

4. 동작 원리

기본 스레드 풀: ForkJoinPool.commonPool()

명시적으로 작업을 수행할 스레드 풀을 지정하지 않으면 기본 스레드 풀을 사용한다.

CompletableFuture<String> future = 
    CompletableFuture.supplyAsync(() -> {
        System.out.println("Thread: " + Thread.currentThread().getName());
        // ForkJoinPool.commonPool-worker-1
        return "결과";
    });

ForkJoinPool 특징:

  • CPU 코어 수만큼 스레드 생성 (기본값)
  • 전역 공유 풀 (애플리케이션 전체에서 사용)

커스텀 Executor 사용 (실무 권장)

// 커스텀 Executor 생성
Executor customExecutor = Executors.newFixedThreadPool(10);

// supplyAsync에 Executor 전달
CompletableFuture<String> future = 
    CompletableFuture.supplyAsync(() -> {
        System.out.println("Thread: " + Thread.currentThread().getName());
        // pool-1-thread-1
        return "결과";
    }, customExecutor);

왜 커스텀 Executor를 사용하는가?

  • ForkJoinPool은 전역 공유이므로 다른 작업과 스레드를 경쟁
  • 작업 유형별로 스레드 풀을 분리 가능
  • 스레드 수를 작업 특성에 맞게 조절 가능

5. 자주 사용하는 메서드

5.1 비동기 작업 시작

// supplyAsync - 반환값이 있는 비동기 작업
CompletableFuture<String> future = 
    CompletableFuture.supplyAsync(() -> {
        return "결과";
    });

// runAsync - 반환값이 없는 비동기 작업
CompletableFuture<Void> future = 
    CompletableFuture.runAsync(() -> {
        System.out.println("실행");
    });

// Executor 지정
CompletableFuture<String> future = 
    CompletableFuture.supplyAsync(() -> "결과", customExecutor);

5.2 결과 변환 (thenApply)

CompletableFuture<String> future = 
    CompletableFuture.supplyAsync(() -> "hello")
        .thenApply(s -> s.toUpperCase())      // "HELLO"
        .thenApply(s -> s + " WORLD");        // "HELLO WORLD"

5.3 결과 소비 (thenAccept, thenRun)

// thenAccept - 결과를 받아서 처리 (반환 없음)
CompletableFuture<Void> future = 
    CompletableFuture.supplyAsync(() -> "결과")
        .thenAccept(result -> {
            System.out.println(result);
        });

// thenRun - 결과를 받지 않고 실행 (반환 없음)
CompletableFuture<Void> future = 
    CompletableFuture.supplyAsync(() -> "결과")
        .thenRun(() -> {
            System.out.println("완료됨");
        });

5.4 두 CompletableFuture 결합 (thenCombine)

CompletableFuture<User> userFuture = getUserAsync();
CompletableFuture<Order> orderFuture = getOrderAsync();

CompletableFuture<UserOrderDetail> combinedFuture = 
    userFuture.thenCombine(orderFuture, (user, order) -> {
        return new UserOrderDetail(user, order);
    });

5.5 여러 CompletableFuture 처리

// allOf - 모든 작업 완료 대기
CompletableFuture<Product> productFuture = getProductAsync(1L);
CompletableFuture<List<Review>> reviewFuture = getReviewsAsync(1L);
CompletableFuture<Stock> stockFuture = getStockAsync(1L);

CompletableFuture<Void> allFuture = 
    CompletableFuture.allOf(productFuture, reviewFuture, stockFuture);

allFuture.join();  // 모두 완료될 때까지 대기

// 결과 가져오기
Product product = productFuture.join();
List<Review> reviews = reviewFuture.join();
Stock stock = stockFuture.join();

// anyOf - 하나라도 완료되면 반환
CompletableFuture<Object> anyFuture = 
    CompletableFuture.anyOf(future1, future2, future3);

Object firstResult = anyFuture.join();  // 가장 먼저 완료된 결과

5.6 예외 처리

// exceptionally - 예외 발생 시 기본값 반환
CompletableFuture<String> future = 
    CompletableFuture.supplyAsync(() -> {
        throw new RuntimeException("에러 발생");
    })
    .exceptionally(ex -> {
        System.err.println("에러: " + ex.getMessage());
        return "기본값";
    });

// handle - 정상/예외 모두 처리
CompletableFuture<String> future = 
    CompletableFuture.supplyAsync(() -> {
        if (Math.random() > 0.5) {
            throw new RuntimeException("에러");
        }
        return "성공";
    })
    .handle((result, ex) -> {
        if (ex != null) {
            return "에러 발생: " + ex.getMessage();
        }
        return result;
    });

// whenComplete - 결과를 변경하지 않고 로깅/모니터링
CompletableFuture<String> future = 
    CompletableFuture.supplyAsync(() -> "결과")
        .whenComplete((result, ex) -> {
            if (ex != null) {
                System.err.println("실패: " + ex.getMessage());
            } else {
                System.out.println("성공: " + result);
            }
            // 결과는 그대로 전달됨
        });

5.7 결과 가져오기

// join() - Unchecked Exception (RuntimeException)
try {
    String result = future.join();
} catch (CompletionException e) {
    // 예외 처리
}

// get() - Checked Exception (try-catch 필수)
try {
    String result = future.get();
} catch (InterruptedException | ExecutionException e) {
    // 예외 처리
}

// get(timeout) - 타임아웃 설정
try {
    String result = future.get(3, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    System.out.println("3초 내에 완료되지 않음");
}

// getNow - 즉시 결과 반환 (완료 안됐으면 기본값)
String result = future.getNow("기본값");

5.8 타임아웃 처리 (Java 9+)

// orTimeout - 시간 내에 완료 안 되면 TimeoutException
CompletableFuture<String> future = 
    slowApiCall()
        .orTimeout(3, TimeUnit.SECONDS)
        .exceptionally(ex -> "타임아웃 - 기본값");

// completeOnTimeout - 시간 내에 완료 안 되면 기본값으로 완료
CompletableFuture<String> future = 
    slowApiCall()
        .completeOnTimeout("기본값", 3, TimeUnit.SECONDS);

6. 실무 사용 사례

6.1 병렬 API 호출

@RestController
@RequiredArgsConstructor
public class ProductController {

    private final ProductService productService;
    private final ReviewService reviewService;
    private final StockService stockService;

    @GetMapping("/products/{productId}")
    public ProductDetail getProductDetail(@PathVariable Long productId) {
        long startTime = System.currentTimeMillis();

        // 3개 API를 병렬로 호출
        CompletableFuture<Product> productFuture = 
            CompletableFuture.supplyAsync(() -> 
                productService.getProductInfo(productId));

        CompletableFuture<List<Review>> reviewFuture = 
            CompletableFuture.supplyAsync(() -> 
                reviewService.getReviews(productId));

        CompletableFuture<Stock> stockFuture = 
            CompletableFuture.supplyAsync(() -> 
                stockService.getStock(productId));

        // 모두 완료될 때까지 대기
        CompletableFuture.allOf(productFuture, reviewFuture, stockFuture).join();

        // 결과 조합
        Product product = productFuture.join();
        List<Review> reviews = reviewFuture.join();
        Stock stock = stockFuture.join();

        long responseTime = System.currentTimeMillis() - startTime;

        return new ProductDetail(product, reviews, stock, responseTime);
        // 동기: ~3초, 비동기: ~1초
    }
}

6.2 순차적 의존 작업

public class OrderService {

    public CompletableFuture<OrderResult> processOrder(OrderRequest request) {
        return CompletableFuture.supplyAsync(() -> {
            // 1. 재고 확인
            return checkStock(request.getProductId(), request.getQuantity());
        })
        .thenCompose(stockAvailable -> {
            if (!stockAvailable) {
                return CompletableFuture.completedFuture(
                    OrderResult.failed("재고 부족")
                );
            }
            // 2. 결제 처리
            return processPayment(request.getPaymentInfo());
        })
        .thenCompose(paymentResult -> {
            if (!paymentResult.isSuccess()) {
                return CompletableFuture.completedFuture(
                    OrderResult.failed("결제 실패")
                );
            }
            // 3. 주문 생성
            return createOrder(request);
        })
        .thenApply(order -> {
            // 4. 알림 발송 (비동기)
            sendNotificationAsync(order);
            return OrderResult.success(order);
        })
        .exceptionally(ex -> {
            return OrderResult.failed("주문 처리 중 오류: " + ex.getMessage());
        });
    }
}

6.3 @Async와 함께 사용

@Service
public class NotificationService {

    @Async("asyncExecutor")
    public CompletableFuture<Boolean> sendEmail(String to, String message) {
        try {
            // 이메일 발송 (2초 소요)
            emailClient.send(to, message);
            return CompletableFuture.completedFuture(true);
        } catch (Exception e) {
            return CompletableFuture.completedFuture(false);
        }
    }

    @Async("asyncExecutor")
    public CompletableFuture<Boolean> sendSms(String phone, String message) {
        try {
            // SMS 발송 (2초 소요)
            smsClient.send(phone, message);
            return CompletableFuture.completedFuture(true);
        } catch (Exception e) {
            return CompletableFuture.completedFuture(false);
        }
    }
}

@RestController
@RequiredArgsConstructor
public class NotificationController {

    private final NotificationService notificationService;

    @PostMapping("/notifications/send")
    public NotificationResult sendAll(@RequestBody NotificationRequest request) {
        CompletableFuture<Boolean> emailFuture = 
            notificationService.sendEmail(request.getEmail(), request.getMessage());

        CompletableFuture<Boolean> smsFuture = 
            notificationService.sendSms(request.getPhone(), request.getMessage());

        CompletableFuture.allOf(emailFuture, smsFuture).join();

        return new NotificationResult(
            emailFuture.join(),
            smsFuture.join()
        );
    }
}

7. 실무 사용 빈도

CompletableFuture는

  • 외부 API 호출
  • 병렬 처리
  • 비동기 후처리
  • 블로킹 코드의 비동기 래핑

등의 상황에서 자주 사용되며, 특히 Spring MVC 환경에서 비동기 응답 처리를 위해 Controller 또는 Service 계층에서 많이 활용된다.

다만, WebFlux / Reactor 기반 환경에서는 CompletableFuture 대신 Mono / Flux를 사용하는 것이 일반적이다.

8. 주의사항

8.1 예외 처리 필수

// ❌ 나쁜 예 - 예외 처리 없음
CompletableFuture<String> future = 
    CompletableFuture.supplyAsync(() -> {
        throw new RuntimeException("에러");
    });

String result = future.join();  // CompletionException 발생!

// ✅ 좋은 예 - 예외 처리
CompletableFuture<String> future = 
    CompletableFuture.supplyAsync(() -> {
        throw new RuntimeException("에러");
    })
    .exceptionally(ex -> {
        log.error("에러 발생", ex);
        return "기본값";
    });
  • 에러 로그가 제대로 남지 않음
  • 복구 기회가 없음
  • 사용자에게 적절한 에러 메시지를 보여줄 수 없음

8.2 스레드 풀 관리

// 시뮬레이션
public class BadThreadPoolExample {

    public static void main(String[] args) {
        System.out.println("CPU 코어 수: " + Runtime.getRuntime().availableProcessors());
        // 예: CPU 코어 수: 8

        System.out.println("ForkJoinPool 크기: " + 
            ForkJoinPool.commonPool().getParallelism());
        // 예: ForkJoinPool 크기: 7 (코어 수 - 1)

        // 1000개의 작업 생성
        for (int i = 0; i < 1000; i++) {
            final int taskNum = i;
            CompletableFuture.supplyAsync(() -> {
                System.out.println("Task " + taskNum + " - Thread: " + 
                    Thread.currentThread().getName());
                heavyTask();  // 5초 걸리는 무거운 작업
                return "완료";
            });
        }

        System.out.println("모든 작업 제출 완료");
    }

    static void heavyTask() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {}
    }
}

문제점:

1. 스레드 풀 크기: 7개
2. 제출된 작업: 1000개
3. 동시 실행: 7개만
4. 나머지 993개: 대기 큐에서 대기

타임라인:
0초:   7개 작업 실행 중
5초:   7개 완료, 다음 7개 시작
10초:  7개 완료, 다음 7개 시작
...
715초: 마지막 7개 완료 (약 12분!)

커스텀 스레드 풀 사용

public class GoodThreadPoolExample {

    public static void main(String[] args) {
        // 커스텀 스레드 풀 생성
        ExecutorService executor = Executors.newFixedThreadPool(10);

        System.out.println("커스텀 스레드 풀 크기: 10");

        List<CompletableFuture<String>> futures = new ArrayList<>();

        for (int i = 0; i < 1000; i++) {
            final int taskNum = i;
            CompletableFuture<String> future = 
                CompletableFuture.supplyAsync(() -> {
                    System.out.println("Task " + taskNum + " - Thread: " + 
                        Thread.currentThread().getName());
                    heavyTask();  // 5초
                    return "완료";
                }, executor);  // 커스텀 Executor 사용!

            futures.add(future);
        }

        // 모든 작업 완료 대기
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

        executor.shutdown();
        System.out.println("완료!");
    }
}

성능 비교:

ForkJoinPool (7개):
- 동시 실행: 7개
- 총 소요 시간: 약 715초 (12분)

커스텀 풀 (10개):
- 동시 실행: 10개
- 총 소요 시간: 약 500초 (8분)
- 약 30% 빠름!

8.3 메모리 누수 주의

// ❌ 나쁜 예 - Future를 계속 생성만 하고 결과를 가져가지 않음
List<CompletableFuture<String>> futures = new ArrayList<>();

for (int i = 0; i < 10000; i++) {
    futures.add(CompletableFuture.supplyAsync(() -> "데이터"));
}
// join()을 호출하지 않으면 완료된 Future가 메모리에 계속 쌓임

무슨 문제가 발생하나?

// 메모리 사용량 시뮬레이션
public class MemoryLeakExample {

    public static void main(String[] args) throws Exception {
        printMemory("시작");

        List<CompletableFuture<String>> futures = new ArrayList<>();

        for (int i = 0; i < 10000; i++) {
            CompletableFuture<String> future = 
                CompletableFuture.supplyAsync(() -> {
                    // 1MB 크기의 데이터 생성
                    return generateLargeData();
                });

            futures.add(future);

            if (i % 1000 == 0) {
                printMemory("작업 " + i + "개 생성 후");
            }
        }

        printMemory("모든 작업 생성 완료");

        // 이 시점에 10,000개의 Future가 모두 메모리에!
        // 각 Future에 1MB 데이터 = 총 10GB 메모리 사용!

        Thread.sleep(10000);  // 10초 대기
        printMemory("10초 대기 후");

        // GC가 회수할 수 없음 (futures 리스트가 참조 보유)
    }

    static String generateLargeData() {
        // 1MB 크기의 데이터
        byte[] data = new byte[1024 * 1024];
        Arrays.fill(data, (byte) 'A');
        return new String(data);
    }

    static void printMemory(String point) {
        Runtime runtime = Runtime.getRuntime();
        long usedMemory = runtime.totalMemory() - runtime.freeMemory();
        long usedMB = usedMemory / (1024 * 1024);
        System.out.println(point + " - 사용 메모리: " + usedMB + "MB");
    }
}

출력 예:

시작 - 사용 메모리: 10MB
작업 1000개 생성 후 - 사용 메모리: 1010MB
작업 2000개 생성 후 - 사용 메모리: 2010MB
작업 3000개 생성 후 - 사용 메모리: 3010MB
...
모든 작업 생성 완료 - 사용 메모리: 10010MB
10초 대기 후 - 사용 메모리: 10010MB  ← GC 안 됨!

왜 메모리가 누적되는가?

// 메모리 구조
futures 리스트
├─ CompletableFuture[0] → 결과: "1MB 데이터" ✅ 완료됨, ❌ GC 안 됨
├─ CompletableFuture[1] → 결과: "1MB 데이터" ✅ 완료됨, ❌ GC 안 됨
├─ CompletableFuture[2] → 결과: "1MB 데이터" ✅ 완료됨, ❌ GC 안 됨
...
└─ CompletableFuture[9999] → 결과: "1MB 데이터" ✅ 완료됨, ❌ GC 안 됨

문제:
1. 모든 Future가 완료됨
2. 하지만 futures 리스트가 계속 참조 보유
3. GC가 회수할 수 없음
4. 메모리 계속 증가

개선

// ✅ 좋은 예 - 배치로 처리
List<CompletableFuture<String>> batch = new ArrayList<>();

for (int i = 0; i < 10000; i++) {
    batch.add(CompletableFuture.supplyAsync(() -> "데이터"));

    if (batch.size() >= 100) {
        CompletableFuture.allOf(batch.toArray(new CompletableFuture[0])).join();
        batch.clear();
    }
}

개선 효과

public class GoodMemoryManagement {

    public static void main(String[] args) {
        printMemory("시작");

        List<CompletableFuture<String>> batch = new ArrayList<>();

        for (int i = 0; i < 10000; i++) {
            CompletableFuture<String> future = 
                CompletableFuture.supplyAsync(() -> {
                    return generateLargeData();  // 1MB
                });

            batch.add(future);

            // 배치 크기 도달 시
            if (batch.size() >= 100) {
                // 현재 배치 처리
                CompletableFuture.allOf(
                    batch.toArray(new CompletableFuture[0])
                ).join();

                // 결과 처리
                for (CompletableFuture<String> f : batch) {
                    String result = f.join();
                    processResult(result);  // 결과 사용
                }

                // 배치 클리어 - 중요!
                batch.clear();

                if (i % 1000 == 0) {
                    printMemory("작업 " + i + "개 처리 후");
                    System.gc();  // GC 힌트
                    Thread.sleep(100);
                }
            }
        }

        printMemory("모든 작업 완료");
    }

    static void processResult(String result) {
        // 결과를 DB에 저장하거나 다른 처리
        // result는 이후 필요 없음 → GC 대상
    }
}

출력 예:

시작 - 사용 메모리: 10MB
작업 1000개 처리 후 - 사용 메모리: 110MB  ← 100MB만 유지
작업 2000개 처리 후 - 사용 메모리: 110MB  ← 안정적!
작업 3000개 처리 후 - 사용 메모리: 110MB
...
모든 작업 완료 - 사용 메모리: 110MB

개선 원리

// 배치 1 (100개)
for (0 ~ 99) {
    batch에 추가
}
→ allOf().join()  // 완료 대기
→ 결과 처리
→ batch.clear()  // 참조 제거! ← GC 가능해짐

// 배치 2 (100개)
for (100 ~ 199) {
    batch에 추가
}
→ allOf().join()
→ 결과 처리
→ batch.clear()  // 참조 제거! ← GC 가능해짐

// 최대 메모리: 100개 * 1MB = 100MB만 유지

9. 한 문장 요약

CompletableFuture는
“미래의 결과를 담아두고,
그 결과의 완료 시점과 후속 흐름을
외부에서 선언적으로 제어할 수 있는 비동기 도구”다.

'언어 > Java' 카테고리의 다른 글

VO, 언제 쓰고 언제 쓰지 말아야 할까  (0) 2026.02.08
TDD 알아보기  (0) 2026.02.06
애플리케이션 로그 분석 가이드  (0) 2025.12.28
Record 살펴보기  (0) 2025.09.14
Optional 바르게 사용하기  (0) 2025.08.26
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
TAG
more
«   2026/05   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
31
글 보관함