Converting a Java application from synchronous to asynchronous using CompletableFuture

Synchronous and asynchronous applications

In a synchronous application, each line of code is dependent upon the complete execution of any lines above it. In a small program with in-memory data structures and low-resource operations, the effects of this are negligible; run time can feel almost instantaneous. When external dependencies or compute intensive operations are introduced, this can change the behavior of the application because now many lines of code may be waiting for work above them to complete. This is where asynchronous programming can help improve an application’s performance.

Asynchronous calls prevent blocking and allow the program to continue execution. While the response from a given asynchronous API call may not be complete, it can be acted upon in the future when it has completed. This will be demonstrated in an example below. The main advantage of this paradigm is that can increase the performance of the application and provide a better user experience.

Creating asynchronous Java applications with CompletableFuture

In Java, you can use the CompletableFuture API that was introduced in Java 8 {:target=”_blank”} to help create an asynchronous application. This class is extensive so this post will only focus on a small portion of it. The advantage of this API over the Future API is that you are not required to wait to act upon a response. It also allows for exception handling, which you can use to control the behavior of the application should something unexpected occur. This is an extensive API and may be intimidating at the first but slowly introduce it in your own code and you’ll quickly become more comfortable with it.

Lou’s Plumbing CompletableFuture example

To demonstrate this API, it will be applied to refactor a portion of a codebase from a fictional company called Lou’s Plumbing. Lou’s uses an e-commerce application that synchronously queries multiple data sources to complete purchases. It requests inventory counts from the warehouse (WarehouseDao), prices from the catalog (CatalogDao), and finally creates the purchase order (PurchaseDao). The code can be found on Github{:target=”_blank”}.

Existing application structure

When a purchase request is made by a user, the WarehouseDao is used to check if there is enough product to complete the order. If not, the process ends. With enough product in the warehouse, the catalog is queried via the CatalogDao in order to obtain the price of the item. When the price is returned, it is used, along with the desired quantity, to create a purchase order using the PurchaseDao. To do this, the application uses an interface called CreateOrder and an implementing class CreateOrderImpl. There are two methods in the interface: createOrder creates a purchase order synchronously and createOrderAsync asynchronously creates the purchase order. The Main class and its main method use a CreateOrder object in order to create a purchase order for this demonstration.

Purchase order creation

// CreateOrderImpl
@Override
public int createOrder() {
    LOGGER.info("Beginning to create order synchronously");

    LOGGER.info("Beginning to query warehouse");
    int quantityAvailable = warehouseDao.getQuantity(PRODUCT_ID);
    LOGGER.info("Finished querying warehouse.");

    if (quantityAvailable < QUANTITY_DESIRED) {
        LOGGER.error("There isn't enough quantity available");
        throw new QuantityUnavailableException();
    }

    LOGGER.info("Beginning to query catalog for price.");
    int itemPrice = catalogDao.getPrice(PRODUCT_ID);
    LOGGER.info("Finished querying catalog.");

    int totalPrice = itemPrice * quantityAvailable;

    LOGGER.info("Adding purchase order.");
    return purchaseDao.addPurchase(CUSTOMER_ID, totalPrice);
}

// Main
public static void main(String[] args) {
    CreateOrder createOrder = Guice.createInjector(new StoreModule()).getInstance(CreateOrder.class);
    createOrder.createOrder();
}

The drawback to this design is that the application blocks in createOrder until each request is complete. The catalog and warehouse requests each take at least 5 seconds so a complete purchase takes at least 10 seconds.

Below are the application logs when the requests are made synchronously. The important points to note here are the total running time of 10 s (16:02:07 - 16:02:17), all operations are performed in the main thread, and all requests are performed sequentially.

16:02:07.807 [main] INFO  com.influentialcode.CreateOrder - Beginning to create order synchronously
16:02:07.810 [main] INFO  com.influentialcode.CreateOrder - Beginning to query warehouse
16:02:07.811 [main] INFO  c.i.persistence.WarehouseDaoImpl - Querying the quantity of item 105.
16:02:12.811 [main] INFO  c.i.persistence.WarehouseDaoImpl - Retrieved quantity for item 105.
16:02:12.812 [main] INFO  c.i.persistence.WarehouseDaoImpl - Took 5 seconds to query for item 105.
16:02:12.812 [main] INFO  com.influentialcode.CreateOrder - Finished querying warehouse.
16:02:12.812 [main] INFO  com.influentialcode.CreateOrder - Beginning to query catalog for price.
16:02:12.812 [main] INFO  c.i.persistence.CatalogDaoImpl - Querying for price of item 105.
16:02:17.814 [main] INFO  c.i.persistence.CatalogDaoImpl - Retrieved price for item 105.
16:02:17.814 [main] INFO  c.i.persistence.CatalogDaoImpl - Took 5 seconds to get price of item 105.
16:02:17.814 [main] INFO  com.influentialcode.CreateOrder - Finished querying catalog.
16:02:17.814 [main] INFO  com.influentialcode.CreateOrder - Adding purchase order.
16:02:17.814 [main] INFO  c.i.persistence.PurchasesDaoImpl - Creating order for customer 115 of cost 50.
16:02:17.814 [main] INFO  c.i.persistence.PurchasesDaoImpl - Order has been placed.

Asynchronous purchase order creation

// CreateOrderImpl
@Override
public CompletableFuture<Integer> createOrderAsync() throws PurchaseException {
    LOGGER.info("Beginning to create order asynchronously");

    LOGGER.info("Beginning to query warehouse");
    CompletableFuture<Integer> futureQuantity = CompletableFuture.supplyAsync(() ->
            warehouseDao.getQuantity(PRODUCT_ID));

    LOGGER.info("Beginning to query catalog for price");
    CompletableFuture<Integer> futurePrice = CompletableFuture.supplyAsync(() -> catalogDao.getPrice(PRODUCT_ID));

    CompletableFuture<Integer> futureTotal = futureQuantity.thenCombine(futurePrice,
            (quantity, price) -> {
                LOGGER.info("Finished querying warehouse");
                LOGGER.info("Finished querying catalog");
                LOGGER.info("Creating total price.");

                if (quantity < QUANTITY_DESIRED) {
                    LOGGER.error("There isn't enough quantity available");
                    throw new QuantityUnavailableException();
                }

                return price * quantity;
            });

    return futureTotal.thenApply(total -> purchaseDao.addPurchase(CUSTOMER_ID, total));
}

The createOrderAsync method is similar in flow but with important changes made to requests. At the start, the warehouse and catalog are queried by wrapping each request in a supplyAsync method; this is one way to start an asynchronous workflow but there are others. For each supplyAsync call, a thread from the common ForkJoinPool is used for execution, allowing the application to continue processing the main method in the main thread. The futurePrice is combined with the futureQuantity using the thenCombine method. The second parameter is a BiFunction with the completed results of the catalog and warehouse requests as parameters. In this method, the the total price is returned if the necessary quantity is available. With the total price calculated, the thenApply method is used to create the purchase order. The CompletableFuture<Integer> returned from this method is returned by createOrderAsync where the Integer is the purchase order number.

// Main
public static void main(String[] args) {
    CreateOrder createOrder = Guice.createInjector(new StoreModule()).getInstance(CreateOrder.class);
    createOrderUsingAsyncMethod(createOrder);
}

private static void createOrderUsingAsyncMethod(CreateOrder createOrder) {
    CompletableFuture<Integer> orderNumberFuture = createOrder.createOrderAsync();

    while (!orderNumberFuture.isDone()) {
        // just waiting to log the below
    }

    orderNumberFuture.whenComplete((purchaseOrderNumber, throwable) -> {
        if (throwable != null) {
            LOGGER.error("Failed to create purchase order.");
            throw new PurchaseException();
        }

        LOGGER.info("Created purchase order number {}.", purchaseOrderNumber);
    });
}

In Main, after calling createOrderAsync, the result of the purchase order is evaluated in the whenComplete method. Note that although there is a while condition used in the code sample, this is only done for this demonstration in order to allow the application to run to completion. whenComplete has two parameters: the first is the purchase order number and the second is a potentially non-null Throwable. If throwable is not null, the error is logged and an exception is thrown. Otherwise, the new purchase order is logged.

Alternatively, there is an exceptionally method that can be used to provide a value earlier in the processing should an exception occur. In this case, if there was an exception somewhere, the transaction shouldn’t be completed so exceptionally isn’t used.

Below are the logs for the asynchronous application. Here there is more than just the main thread. ForkJoinPool worker threads 2 and 9 are now used to query the catalog and the warehouse, respectively. Also, notice it takes 5 s (16:20:47 - 16:20:52) to complete the purchase order. The total time to create a purchase has decreased from 10 s to 5 s without any changes to the back-end database services because the 5 second requests to the warehouse and catalog are done in tandem. It seems inconsequential but in a large application with many users this would quickly become a consequential improvement.

16:20:47.057 [main] INFO  com.influentialcode.CreateOrder - Beginning to create order asynchronously
16:20:47.060 [main] INFO  com.influentialcode.CreateOrder - Beginning to query warehouse
16:20:47.065 [main] INFO  com.influentialcode.CreateOrder - Beginning to query catalog for price
16:20:47.067 [ForkJoinPool.commonPool-worker-9] INFO  c.i.persistence.WarehouseDaoImpl - Querying the quantity of item 105.
16:20:47.067 [ForkJoinPool.commonPool-worker-2] INFO  c.i.persistence.CatalogDaoImpl - Querying for price of item 105.
16:20:52.072 [ForkJoinPool.commonPool-worker-2] INFO  c.i.persistence.CatalogDaoImpl - Retrieved price for item 105.
16:20:52.072 [ForkJoinPool.commonPool-worker-9] INFO  c.i.persistence.WarehouseDaoImpl - Retrieved quantity for item 105.
16:20:52.073 [ForkJoinPool.commonPool-worker-2] INFO  c.i.persistence.CatalogDaoImpl - Took 5 seconds to get price of item 105.
16:20:52.073 [ForkJoinPool.commonPool-worker-9] INFO  c.i.persistence.WarehouseDaoImpl - Took 5 seconds to query for item 105.
16:20:52.073 [ForkJoinPool.commonPool-worker-9] INFO  com.influentialcode.CreateOrder - Finished querying warehouse
16:20:52.073 [ForkJoinPool.commonPool-worker-9] INFO  com.influentialcode.CreateOrder - Finished querying catalog
16:20:52.073 [ForkJoinPool.commonPool-worker-9] INFO  com.influentialcode.CreateOrder - Creating total price.
16:20:52.073 [ForkJoinPool.commonPool-worker-9] INFO  c.i.persistence.PurchasesDaoImpl - Creating order for customer 115 of cost 50.
16:20:52.073 [ForkJoinPool.commonPool-worker-9] INFO  c.i.persistence.PurchasesDaoImpl - Order has been placed.
16:20:52.074 [main] INFO  com.influentialcode.Main - Created purchase order number 1002.

Summary

In summary, you’ve been introduced to the Java CompletableFuture API and have been shown how it can dramatically increase the performance of a previously synchronous application. I encourage you to review the API and try to implement it in some of your code to gain a better understanding. If you have questions or comments about this post, please leave them below. If you’d like to know when other posts are available, please follow us on Twitter.