티스토리 뷰
1. CompletableFuture란 무엇인가?
CompletableFuture는 비동기 작업을 처리하던 Future의 한계를 극복하고자 Java 8부터 제공된 클래스입니다.
주요 특징
- 비동기 작업의 결과를 담는 컨테이너
- 작업의 완료 시점을 제어 가능
- 후속 작업을 체이닝으로 연결 가능
- 예외 처리를 비동기적으로 수행 가능
2. Future와의 차이점
Future의 한계
1. 결과를 얻으려면 무조건 블로킹 대기
ExecutorService executor = Executors.newSingleThreadExecutor();
// ❌ 문제 1: 결과를 얻으려면 무조건 블로킹 대기
Future<String> future = executor.submit(() -> {
Thread.sleep(1000);
return "USER";
});
String result = future.get(); // 블로킹 - 1초 동안 대기
System.out.println(result);
2. 여러 Future를 조합할 수 없음
Future<User> userFuture = getUserAsync();
Future<Order> orderFuture = getOrderAsync();
// 두 결과를 합치려면? -> 각각 get()으로 기다려야 함
User user = userFuture.get();
Order order = orderFuture.get();
3. 콜백을 등록할 수 없음
Future<String> future = executor.submit(() -> "결과");
// 작업이 완료되면 자동으로 실행되는 콜백을 등록할 방법이 없음
4. 예외 처리가 어려움
Future<String> future = executor.submit(() -> {
throw new RuntimeException("에러 발생");
});
try {
future.get(); // 여기서만 예외를 잡을 수 있음
} catch (ExecutionException e) {
// 예외 처리
}
CompletableFuture의 개선점
1. 비동기 후속 처리 가능
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "USER";
})
.thenApply(user -> user + "_DETAIL")
.thenAccept(System.out::println);
System.out.println("비동기"); // 즉시 출력
2. 여러 Future 조합 가능
CompletableFuture<User> userFuture = getUserAsync();
CompletableFuture<Order> orderFuture = getOrderAsync();
userFuture.thenCombine(orderFuture, (user, order) ->
new UserOrderDetail(user, order)
);
3. 예외 처리 간편
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("에러");
})
.exceptionally(ex -> {
System.out.println("에러 처리: " + ex.getMessage());
return "기본값";
})
.thenAccept(System.out::println);
3. 왜 "Completable" Future인가?
완료시킬 수 있는 Future라고 부르는 이유는 비동기 작업을 수행하는 스레드 외부에서 완료 시점을 제어할 수 있기 때문이다.
Future (제어 불가)
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(() -> {
// 이 안 = "내부" (작업 스레드)
Thread.sleep(5000);
return "결과"; // 여기서만 완료 가능
});
// 이 밖 = "외부" (호출한 스레드)
// ❌ 외부에서 완료시킬 방법이 없음!
CompletableFuture (제어 가능)
// CompletableFuture의 경우
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 이 안 = "내부" (작업 스레드)
Thread.sleep(5000);
return "결과";
});
// 이 밖 = "외부" (호출한 스레드, 다른 스레드 등)
// ✅ 외부에서 완료시킬 수 있음!
future.complete("외부에서 완료!");
4. 동작 원리
기본 스레드 풀: ForkJoinPool.commonPool()
명시적으로 작업을 수행할 스레드 풀을 지정하지 않으면 기본 스레드 풀을 사용한다.
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
System.out.println("Thread: " + Thread.currentThread().getName());
// ForkJoinPool.commonPool-worker-1
return "결과";
});
ForkJoinPool 특징:
- CPU 코어 수만큼 스레드 생성 (기본값)
- 전역 공유 풀 (애플리케이션 전체에서 사용)
커스텀 Executor 사용 (실무 권장)
// 커스텀 Executor 생성
Executor customExecutor = Executors.newFixedThreadPool(10);
// supplyAsync에 Executor 전달
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
System.out.println("Thread: " + Thread.currentThread().getName());
// pool-1-thread-1
return "결과";
}, customExecutor);
왜 커스텀 Executor를 사용하는가?
- ForkJoinPool은 전역 공유이므로 다른 작업과 스레드를 경쟁
- 작업 유형별로 스레드 풀을 분리 가능
- 스레드 수를 작업 특성에 맞게 조절 가능
5. 자주 사용하는 메서드
5.1 비동기 작업 시작
// supplyAsync - 반환값이 있는 비동기 작업
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
return "결과";
});
// runAsync - 반환값이 없는 비동기 작업
CompletableFuture<Void> future =
CompletableFuture.runAsync(() -> {
System.out.println("실행");
});
// Executor 지정
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> "결과", customExecutor);
5.2 결과 변환 (thenApply)
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> "hello")
.thenApply(s -> s.toUpperCase()) // "HELLO"
.thenApply(s -> s + " WORLD"); // "HELLO WORLD"
5.3 결과 소비 (thenAccept, thenRun)
// thenAccept - 결과를 받아서 처리 (반환 없음)
CompletableFuture<Void> future =
CompletableFuture.supplyAsync(() -> "결과")
.thenAccept(result -> {
System.out.println(result);
});
// thenRun - 결과를 받지 않고 실행 (반환 없음)
CompletableFuture<Void> future =
CompletableFuture.supplyAsync(() -> "결과")
.thenRun(() -> {
System.out.println("완료됨");
});
5.4 두 CompletableFuture 결합 (thenCombine)
CompletableFuture<User> userFuture = getUserAsync();
CompletableFuture<Order> orderFuture = getOrderAsync();
CompletableFuture<UserOrderDetail> combinedFuture =
userFuture.thenCombine(orderFuture, (user, order) -> {
return new UserOrderDetail(user, order);
});
5.5 여러 CompletableFuture 처리
// allOf - 모든 작업 완료 대기
CompletableFuture<Product> productFuture = getProductAsync(1L);
CompletableFuture<List<Review>> reviewFuture = getReviewsAsync(1L);
CompletableFuture<Stock> stockFuture = getStockAsync(1L);
CompletableFuture<Void> allFuture =
CompletableFuture.allOf(productFuture, reviewFuture, stockFuture);
allFuture.join(); // 모두 완료될 때까지 대기
// 결과 가져오기
Product product = productFuture.join();
List<Review> reviews = reviewFuture.join();
Stock stock = stockFuture.join();
// anyOf - 하나라도 완료되면 반환
CompletableFuture<Object> anyFuture =
CompletableFuture.anyOf(future1, future2, future3);
Object firstResult = anyFuture.join(); // 가장 먼저 완료된 결과
5.6 예외 처리
// exceptionally - 예외 발생 시 기본값 반환
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("에러 발생");
})
.exceptionally(ex -> {
System.err.println("에러: " + ex.getMessage());
return "기본값";
});
// handle - 정상/예외 모두 처리
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("에러");
}
return "성공";
})
.handle((result, ex) -> {
if (ex != null) {
return "에러 발생: " + ex.getMessage();
}
return result;
});
// whenComplete - 결과를 변경하지 않고 로깅/모니터링
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> "결과")
.whenComplete((result, ex) -> {
if (ex != null) {
System.err.println("실패: " + ex.getMessage());
} else {
System.out.println("성공: " + result);
}
// 결과는 그대로 전달됨
});
5.7 결과 가져오기
// join() - Unchecked Exception (RuntimeException)
try {
String result = future.join();
} catch (CompletionException e) {
// 예외 처리
}
// get() - Checked Exception (try-catch 필수)
try {
String result = future.get();
} catch (InterruptedException | ExecutionException e) {
// 예외 처리
}
// get(timeout) - 타임아웃 설정
try {
String result = future.get(3, TimeUnit.SECONDS);
} catch (TimeoutException e) {
System.out.println("3초 내에 완료되지 않음");
}
// getNow - 즉시 결과 반환 (완료 안됐으면 기본값)
String result = future.getNow("기본값");
5.8 타임아웃 처리 (Java 9+)
// orTimeout - 시간 내에 완료 안 되면 TimeoutException
CompletableFuture<String> future =
slowApiCall()
.orTimeout(3, TimeUnit.SECONDS)
.exceptionally(ex -> "타임아웃 - 기본값");
// completeOnTimeout - 시간 내에 완료 안 되면 기본값으로 완료
CompletableFuture<String> future =
slowApiCall()
.completeOnTimeout("기본값", 3, TimeUnit.SECONDS);
6. 실무 사용 사례
6.1 병렬 API 호출
@RestController
@RequiredArgsConstructor
public class ProductController {
private final ProductService productService;
private final ReviewService reviewService;
private final StockService stockService;
@GetMapping("/products/{productId}")
public ProductDetail getProductDetail(@PathVariable Long productId) {
long startTime = System.currentTimeMillis();
// 3개 API를 병렬로 호출
CompletableFuture<Product> productFuture =
CompletableFuture.supplyAsync(() ->
productService.getProductInfo(productId));
CompletableFuture<List<Review>> reviewFuture =
CompletableFuture.supplyAsync(() ->
reviewService.getReviews(productId));
CompletableFuture<Stock> stockFuture =
CompletableFuture.supplyAsync(() ->
stockService.getStock(productId));
// 모두 완료될 때까지 대기
CompletableFuture.allOf(productFuture, reviewFuture, stockFuture).join();
// 결과 조합
Product product = productFuture.join();
List<Review> reviews = reviewFuture.join();
Stock stock = stockFuture.join();
long responseTime = System.currentTimeMillis() - startTime;
return new ProductDetail(product, reviews, stock, responseTime);
// 동기: ~3초, 비동기: ~1초
}
}
6.2 순차적 의존 작업
public class OrderService {
public CompletableFuture<OrderResult> processOrder(OrderRequest request) {
return CompletableFuture.supplyAsync(() -> {
// 1. 재고 확인
return checkStock(request.getProductId(), request.getQuantity());
})
.thenCompose(stockAvailable -> {
if (!stockAvailable) {
return CompletableFuture.completedFuture(
OrderResult.failed("재고 부족")
);
}
// 2. 결제 처리
return processPayment(request.getPaymentInfo());
})
.thenCompose(paymentResult -> {
if (!paymentResult.isSuccess()) {
return CompletableFuture.completedFuture(
OrderResult.failed("결제 실패")
);
}
// 3. 주문 생성
return createOrder(request);
})
.thenApply(order -> {
// 4. 알림 발송 (비동기)
sendNotificationAsync(order);
return OrderResult.success(order);
})
.exceptionally(ex -> {
return OrderResult.failed("주문 처리 중 오류: " + ex.getMessage());
});
}
}
6.3 @Async와 함께 사용
@Service
public class NotificationService {
@Async("asyncExecutor")
public CompletableFuture<Boolean> sendEmail(String to, String message) {
try {
// 이메일 발송 (2초 소요)
emailClient.send(to, message);
return CompletableFuture.completedFuture(true);
} catch (Exception e) {
return CompletableFuture.completedFuture(false);
}
}
@Async("asyncExecutor")
public CompletableFuture<Boolean> sendSms(String phone, String message) {
try {
// SMS 발송 (2초 소요)
smsClient.send(phone, message);
return CompletableFuture.completedFuture(true);
} catch (Exception e) {
return CompletableFuture.completedFuture(false);
}
}
}
@RestController
@RequiredArgsConstructor
public class NotificationController {
private final NotificationService notificationService;
@PostMapping("/notifications/send")
public NotificationResult sendAll(@RequestBody NotificationRequest request) {
CompletableFuture<Boolean> emailFuture =
notificationService.sendEmail(request.getEmail(), request.getMessage());
CompletableFuture<Boolean> smsFuture =
notificationService.sendSms(request.getPhone(), request.getMessage());
CompletableFuture.allOf(emailFuture, smsFuture).join();
return new NotificationResult(
emailFuture.join(),
smsFuture.join()
);
}
}
7. 실무 사용 빈도
CompletableFuture는
- 외부 API 호출
- 병렬 처리
- 비동기 후처리
- 블로킹 코드의 비동기 래핑
등의 상황에서 자주 사용되며, 특히 Spring MVC 환경에서 비동기 응답 처리를 위해 Controller 또는 Service 계층에서 많이 활용된다.
다만, WebFlux / Reactor 기반 환경에서는 CompletableFuture 대신 Mono / Flux를 사용하는 것이 일반적이다.
8. 주의사항
8.1 예외 처리 필수
// ❌ 나쁜 예 - 예외 처리 없음
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("에러");
});
String result = future.join(); // CompletionException 발생!
// ✅ 좋은 예 - 예외 처리
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("에러");
})
.exceptionally(ex -> {
log.error("에러 발생", ex);
return "기본값";
});
- 에러 로그가 제대로 남지 않음
- 복구 기회가 없음
- 사용자에게 적절한 에러 메시지를 보여줄 수 없음
8.2 스레드 풀 관리
// 시뮬레이션
public class BadThreadPoolExample {
public static void main(String[] args) {
System.out.println("CPU 코어 수: " + Runtime.getRuntime().availableProcessors());
// 예: CPU 코어 수: 8
System.out.println("ForkJoinPool 크기: " +
ForkJoinPool.commonPool().getParallelism());
// 예: ForkJoinPool 크기: 7 (코어 수 - 1)
// 1000개의 작업 생성
for (int i = 0; i < 1000; i++) {
final int taskNum = i;
CompletableFuture.supplyAsync(() -> {
System.out.println("Task " + taskNum + " - Thread: " +
Thread.currentThread().getName());
heavyTask(); // 5초 걸리는 무거운 작업
return "완료";
});
}
System.out.println("모든 작업 제출 완료");
}
static void heavyTask() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {}
}
}
문제점:
1. 스레드 풀 크기: 7개
2. 제출된 작업: 1000개
3. 동시 실행: 7개만
4. 나머지 993개: 대기 큐에서 대기
타임라인:
0초: 7개 작업 실행 중
5초: 7개 완료, 다음 7개 시작
10초: 7개 완료, 다음 7개 시작
...
715초: 마지막 7개 완료 (약 12분!)
커스텀 스레드 풀 사용
public class GoodThreadPoolExample {
public static void main(String[] args) {
// 커스텀 스레드 풀 생성
ExecutorService executor = Executors.newFixedThreadPool(10);
System.out.println("커스텀 스레드 풀 크기: 10");
List<CompletableFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
final int taskNum = i;
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
System.out.println("Task " + taskNum + " - Thread: " +
Thread.currentThread().getName());
heavyTask(); // 5초
return "완료";
}, executor); // 커스텀 Executor 사용!
futures.add(future);
}
// 모든 작업 완료 대기
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
executor.shutdown();
System.out.println("완료!");
}
}
성능 비교:
ForkJoinPool (7개):
- 동시 실행: 7개
- 총 소요 시간: 약 715초 (12분)
커스텀 풀 (10개):
- 동시 실행: 10개
- 총 소요 시간: 약 500초 (8분)
- 약 30% 빠름!
8.3 메모리 누수 주의
// ❌ 나쁜 예 - Future를 계속 생성만 하고 결과를 가져가지 않음
List<CompletableFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
futures.add(CompletableFuture.supplyAsync(() -> "데이터"));
}
// join()을 호출하지 않으면 완료된 Future가 메모리에 계속 쌓임
무슨 문제가 발생하나?
// 메모리 사용량 시뮬레이션
public class MemoryLeakExample {
public static void main(String[] args) throws Exception {
printMemory("시작");
List<CompletableFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
// 1MB 크기의 데이터 생성
return generateLargeData();
});
futures.add(future);
if (i % 1000 == 0) {
printMemory("작업 " + i + "개 생성 후");
}
}
printMemory("모든 작업 생성 완료");
// 이 시점에 10,000개의 Future가 모두 메모리에!
// 각 Future에 1MB 데이터 = 총 10GB 메모리 사용!
Thread.sleep(10000); // 10초 대기
printMemory("10초 대기 후");
// GC가 회수할 수 없음 (futures 리스트가 참조 보유)
}
static String generateLargeData() {
// 1MB 크기의 데이터
byte[] data = new byte[1024 * 1024];
Arrays.fill(data, (byte) 'A');
return new String(data);
}
static void printMemory(String point) {
Runtime runtime = Runtime.getRuntime();
long usedMemory = runtime.totalMemory() - runtime.freeMemory();
long usedMB = usedMemory / (1024 * 1024);
System.out.println(point + " - 사용 메모리: " + usedMB + "MB");
}
}
출력 예:
시작 - 사용 메모리: 10MB
작업 1000개 생성 후 - 사용 메모리: 1010MB
작업 2000개 생성 후 - 사용 메모리: 2010MB
작업 3000개 생성 후 - 사용 메모리: 3010MB
...
모든 작업 생성 완료 - 사용 메모리: 10010MB
10초 대기 후 - 사용 메모리: 10010MB ← GC 안 됨!
왜 메모리가 누적되는가?
// 메모리 구조
futures 리스트
├─ CompletableFuture[0] → 결과: "1MB 데이터" ✅ 완료됨, ❌ GC 안 됨
├─ CompletableFuture[1] → 결과: "1MB 데이터" ✅ 완료됨, ❌ GC 안 됨
├─ CompletableFuture[2] → 결과: "1MB 데이터" ✅ 완료됨, ❌ GC 안 됨
...
└─ CompletableFuture[9999] → 결과: "1MB 데이터" ✅ 완료됨, ❌ GC 안 됨
문제:
1. 모든 Future가 완료됨
2. 하지만 futures 리스트가 계속 참조 보유
3. GC가 회수할 수 없음
4. 메모리 계속 증가
개선
// ✅ 좋은 예 - 배치로 처리
List<CompletableFuture<String>> batch = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
batch.add(CompletableFuture.supplyAsync(() -> "데이터"));
if (batch.size() >= 100) {
CompletableFuture.allOf(batch.toArray(new CompletableFuture[0])).join();
batch.clear();
}
}
개선 효과
public class GoodMemoryManagement {
public static void main(String[] args) {
printMemory("시작");
List<CompletableFuture<String>> batch = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
return generateLargeData(); // 1MB
});
batch.add(future);
// 배치 크기 도달 시
if (batch.size() >= 100) {
// 현재 배치 처리
CompletableFuture.allOf(
batch.toArray(new CompletableFuture[0])
).join();
// 결과 처리
for (CompletableFuture<String> f : batch) {
String result = f.join();
processResult(result); // 결과 사용
}
// 배치 클리어 - 중요!
batch.clear();
if (i % 1000 == 0) {
printMemory("작업 " + i + "개 처리 후");
System.gc(); // GC 힌트
Thread.sleep(100);
}
}
}
printMemory("모든 작업 완료");
}
static void processResult(String result) {
// 결과를 DB에 저장하거나 다른 처리
// result는 이후 필요 없음 → GC 대상
}
}
출력 예:
시작 - 사용 메모리: 10MB
작업 1000개 처리 후 - 사용 메모리: 110MB ← 100MB만 유지
작업 2000개 처리 후 - 사용 메모리: 110MB ← 안정적!
작업 3000개 처리 후 - 사용 메모리: 110MB
...
모든 작업 완료 - 사용 메모리: 110MB
개선 원리
// 배치 1 (100개)
for (0 ~ 99) {
batch에 추가
}
→ allOf().join() // 완료 대기
→ 결과 처리
→ batch.clear() // 참조 제거! ← GC 가능해짐
// 배치 2 (100개)
for (100 ~ 199) {
batch에 추가
}
→ allOf().join()
→ 결과 처리
→ batch.clear() // 참조 제거! ← GC 가능해짐
// 최대 메모리: 100개 * 1MB = 100MB만 유지
9. 한 문장 요약
CompletableFuture는
“미래의 결과를 담아두고,
그 결과의 완료 시점과 후속 흐름을
외부에서 선언적으로 제어할 수 있는 비동기 도구”다.
'언어 > Java' 카테고리의 다른 글
| VO, 언제 쓰고 언제 쓰지 말아야 할까 (0) | 2026.02.08 |
|---|---|
| TDD 알아보기 (0) | 2026.02.06 |
| 애플리케이션 로그 분석 가이드 (0) | 2025.12.28 |
| Record 살펴보기 (0) | 2025.09.14 |
| Optional 바르게 사용하기 (0) | 2025.08.26 |
