Concurrent Utilities

Composing CompletableFutures

15 min Lesson 6 of 13

Composing CompletableFutures

In the previous lesson you saw the basics of CompletableFuture: how to start an async computation with supplyAsync and transform its result with thenApply. Real programs, however, rarely have a single isolated async step. You almost always need to chain one async call after another, combine results from two independent calls, or wait for a group of calls to finish before moving on. That is what composition is about — and doing it correctly is what separates async code that is clean and correct from async code that silently deadlocks or swallows exceptions.

thenCompose — Flat-Mapping Async Steps

thenCompose is the async equivalent of flatMap on a stream: it takes the result of one CompletableFuture, passes it to a function that returns another CompletableFuture, and flattens the two stages into one.

Why not just use thenApply? If your mapping function itself returns a CompletableFuture, then thenApply gives you a CompletableFuture<CompletableFuture<T>> — a nested future that is useless until you unwrap it manually. thenCompose does that unwrapping for you.

import java.util.concurrent.CompletableFuture; // Simulate fetching a user ID from a remote service CompletableFuture<String> fetchUserId() { return CompletableFuture.supplyAsync(() -> { // e.g. HTTP call return "user-42"; }); } // Simulate fetching a user profile given an ID CompletableFuture<String> fetchProfile(String userId) { return CompletableFuture.supplyAsync(() -> { return "Profile of " + userId; }); } // WRONG: gives CompletableFuture<CompletableFuture<String>> // var nested = fetchUserId().thenApply(id -> fetchProfile(id)); // CORRECT: flat pipeline CompletableFuture<String> profile = fetchUserId() .thenCompose(id -> fetchProfile(id)); System.out.println(profile.join()); // Profile of user-42
Execution context matters. When you call thenCompose (without the Async suffix), the continuation runs on whatever thread completed the previous stage — often a thread from the common fork-join pool. Use thenComposeAsync with an explicit Executor when you want to control which thread pool handles the next step, for example to keep blocking I/O off the common pool.

thenCombine — Merging Two Independent Futures

When two async operations do not depend on each other you should run them in parallel and combine their results only after both finish. thenCombine does exactly this: it takes a second CompletableFuture and a BiFunction, waits for both to complete, then merges the two results.

import java.util.concurrent.CompletableFuture; CompletableFuture<Double> fetchUsdRate = CompletableFuture.supplyAsync(() -> { // simulate remote FX call return 3.75; // 1 USD = 3.75 SAR (example) }); CompletableFuture<Integer> fetchQuantity = CompletableFuture.supplyAsync(() -> { // simulate DB query return 120; // units in stock }); // Both fetches run concurrently; thenCombine waits for both CompletableFuture<String> summary = fetchUsdRate .thenCombine(fetchQuantity, (rate, qty) -> "Total SAR value: " + (rate * qty)); System.out.println(summary.join()); // Total SAR value: 450.0
Do not chain sequentially what can run in parallel. If you wrote fetchUsdRate().thenCompose(_ -> fetchQuantity()) you would serialize two independent calls and waste wall-clock time. Use thenCombine (or allOf) so both start immediately and you pay only the cost of the slower one.

allOf — Waiting for Many Futures

CompletableFuture.allOf(futures...) returns a CompletableFuture<Void> that completes when every supplied future has completed. Because the result type is Void, you typically retrieve the individual results yourself after allOf finishes.

import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; List<String> urls = List.of( "https://api.example.com/a", "https://api.example.com/b", "https://api.example.com/c" ); // Start all fetches concurrently List<CompletableFuture<String>> fetches = urls.stream() .map(url -> CompletableFuture.supplyAsync(() -> fetch(url))) .collect(Collectors.toList()); // Wait for all of them CompletableFuture<Void> allDone = CompletableFuture.allOf(fetches.toArray(new CompletableFuture[0])); // After allOf completes, collect results (join() is safe here — already done) List<String> results = allDone.thenApply(__ -> fetches.stream() .map(CompletableFuture::join) .collect(Collectors.toList()) ).join(); results.forEach(System.out::println);
allOf does not propagate individual exceptions for you. If one of the futures fails, allOf itself will complete exceptionally, but calling join() on the failing future is how you get the actual cause. Handle exceptions on each individual future if you need per-task error reporting.

Exception Handling in Composed Pipelines

Exceptions travel through a CompletableFuture chain as a failed stage. Every subsequent stage that has not registered an error handler is skipped, and the exception surfaces at the terminal get() / join() call wrapped in a CompletionException. You have three recovery strategies:

  • exceptionally(fn) — runs only when the stage failed; replaces the exception with a default value.
  • handle(BiFunction) — runs whether the stage succeeded or failed; lets you inspect both the result and the exception in one place.
  • whenComplete(BiConsumer) — like handle but does not transform the result; useful for logging or cleanup.
CompletableFuture<String> riskyFetch = CompletableFuture .supplyAsync(() -> { if (Math.random() < 0.5) throw new RuntimeException("Network timeout"); return "fresh data"; }); // Strategy 1 — recover with a fallback String result1 = riskyFetch .exceptionally(ex -> "cached fallback") .join(); // Strategy 2 — inspect both sides String result2 = riskyFetch .handle((data, ex) -> { if (ex != null) { System.err.println("Failed: " + ex.getMessage()); return "default"; } return data; }) .join(); System.out.println(result1); System.out.println(result2);
Compose + exception handling together. When you have a thenCompose chain and one step can fail, attach exceptionally or handle at the point where you can meaningfully recover — typically right after the stage that might fail, not just at the very end. A handler at the end catches everything but gives you less context about which step failed.

Putting It All Together — A Realistic Pipeline

The following example combines thenCompose, thenCombine, and exceptionally in a single pipeline that (a) fetches a product ID asynchronously, (b) concurrently loads the product details and the live price, and (c) falls back gracefully on any failure:

import java.util.concurrent.CompletableFuture; CompletableFuture<String> pipeline = CompletableFuture // Step 1 — resolve a product slug to a numeric ID .supplyAsync(() -> resolveProductId("java-book")) // Step 2 — fetch details (sequential dependency on step 1) .thenCompose(id -> CompletableFuture.supplyAsync(() -> fetchDetails(id)) // Combine with a parallel price lookup .thenCombine( CompletableFuture.supplyAsync(() -> fetchPrice(id)), (details, price) -> details + " | price: $" + price ) ) // Step 3 — recover if anything above failed .exceptionally(ex -> "Product unavailable: " + ex.getMessage()); System.out.println(pipeline.join());

Key Takeaways

  • Use thenCompose when the next step is itself async (avoid double-wrapping).
  • Use thenCombine to merge two independent futures without serializing them.
  • Use allOf to fan out over a dynamic collection and collect all results afterward.
  • Attach exceptionally or handle close to the stage that can fail for precise recovery.
  • Always pick up the right Async variant with an explicit executor when blocking I/O must not touch the common pool.