Concurrency in Java
All computers have multiple cores now. Even a cheap smart phone would have 2-4 cores to run 4-8 threads. Software needs to use multiple threads to take the advantage of multiple core systems. In this article, I would to introduce two approaches to use multiple threads in Java.
- CompletionStage or CompletableFuture
git clone https://[email protected]/LeSiQuang/completablefuture.git
- ListenableFuture and Producer graph
git clone https://[email protected]/LeSiQuang/producergraph.git
Future and CompletableFuture
Future has been introduced in Java long time ago to help dealing with multiple threads.
Here is an typical example
public class SquareCalculator { private ExecutorService executor = Executors.newSingleThreadExecutor(); public Future<Integer> calculate(Integer input) { return executor.submit(() -> { int computationTime = (rand.nextInt()% 10) + 2; System.out.println(Thread.currentThread().getName() + " Sleep for " + computationTime); Thread.sleep(1000); return input * input; }); } }
and we can consume the result as
Future<Integer> future = squareCalculator.calculate(10); System.out.println(future.get());
Future is good for a simple multiple threads but would make the code hard to follow (and of course to debug) when the business logic becomes more than complicated. Keep in mind that every time you call get, the whole thread is just hang to wait for the results. You can find the disadvantage of Future somewhere.
CompletionStage has been introduced in Java 8 to ease the coding using multiple threads. The advantage of CompletionStage is that CompletionStage allows users (1) to add listeners into CompletionStage objects and (2) to provide a method to combine two CompletionStage objects into a CompletionStage. These two features allow users to inject business login into threads without calling get, which will block threads.
Assume we need to write a service to accept input an userId and produce a object User including name, address, etc. which can be archived by calling REST API.
public class User { String name; String address; Integer age; Integer salary; Integer balance; String title; String status; String incomeStatus; String insuranceScore; }
User is built using builder pattern.
Assume for each information (e.g., name, address), we need to make a call REST API with userId to extract information. We mimic the calls by a method, generateValue, where CompletionStage is generated by using CompletableFuture.supplyAsync with Supplier.
static <T> CompletionStage<T> generateValue(T value) { return CompletableFuture.supplyAsync( new Supplier<T>() { @Override public T get() { int computationTime = rand.nextInt()% 10 + 2; try { //This mimic the REST API call System.out.println(Thread.currentThread().getName() + " needs" + computationTime + " seconds to get the value."); TimeUnit.SECONDS.sleep(computationTime); } catch (Exception e) { } return value; } } ); }
and the process of getting information for userID can be implemented as
CompletionStage<String> name = generateValue("David"); CompletionStage<String> address = generateValue("Southampton"); CompletionStage<Integer> age = generateValue(30); CompletionStage<Integer> salary = generateValue(100000); CompletionStage<Integer> balance = generateValue(200000); User.Builder builder = new User.Builder(); name.thenAccept(s -> builder.setName(s)); address.thenAccept(s -> builder.setAddress(s)); age.thenAccept(i -> builder.setAge(i)); salary.thenAccept(i -> builder.setSalary(i)); balance.thenAccept(i -> builder.setBalanace(i));
Take a close look at one example, we call an API to get name, (equivalent to generateValue("David"), and then inject the consumer of name as builder.setName. So when name is ready, it will be updated into builder.
We then wait for all threads returning results and then return the final users.
CompletionStage<Void> finish = CompletableFuture.allOf(name.toCompletableFuture(), address.toCompletableFuture(), age.toCompletableFuture(), salary.toCompletableFuture(), balance.toCompletableFuture()); try { finish.toCompletableFuture().get(); return builder.build(); } catch (Exception e) { }
That is simple and straight forward.
Assume we want to add title for the user based on age: title = (age >18)?"adult":"children". Then we use apply to age straight forward.
age.thenApply(x -> (x > 18) ? "Adult" : "Child").thenAccept(s -> builder.setTitle(s));
Now, if we want to calculate the finance status of a user based on salary and balance, e.g., (salary >-balance)?"good":"bad".
salary.thenCombine(balance, (sal, bal) -> { System.out.println( Thread.currentThread().getName() + " combine salary and balance"); return (sal > -bal) ? "good" : "bad"; }).thenAccept(s -> builder.setStatus(s));
or age with salary to calculate incomeStatus.
salary.thenCombine(age, (sal, a) -> { System.out.println( Thread.currentThread().getName() + " combine salary and age"); return (sal > 40000 && a < 50) ? "good income" : "normal income"; }).thenAccept(s -> builder.setIncomeStatus(s));
So far so good. It is quite straight forward to inject business logic as long as it is based on one or two variables. How about three variables.
CompletionStage does not provide a combination of more than 2 CompletionStage object. However, we need to implement a simple wrapper to combine 3 or more CompletionStage objects.
public class Promise { static <T, U, R> CompletionStage<R> combine(CompletionStage<T> value1, CompletionStage<U> value2, BiFunction<? super T, ? super U, ? extends R> fn) { return value1.thenCombine(value2, fn); } static <E1, E2, E3, R> CompletionStage<R> combine(CompletionStage<E1> value1, CompletionStage<E2> value2, CompletionStage<E3> value3, TripleFunction<? super E1, ? super E2, ? super E3, ? extends R> fn) { return value1.thenCompose(a -> value2.thenCombine(value3, (b, c) -> fn.apply(a, b, c))); } static <E1, E2, E3, E4, R> CompletionStage<R> combine(CompletionStage<E1> value1, CompletionStage<E2> value2, CompletionStage<E3> value3, CompletionStage<E4> value4, QuadFunction<? super E1, ? super E2, ? super E3, ? super E4, ? extends R> fn) { return value1.thenCompose(a -> value2.thenCompose(b -> value3.thenCombine(value4, (c, d) -> fn.apply(a, b, c, d)))); } }
with two functional interface TripleFunction and QuadFunction.
@FunctionalInterface public interface TripleFunction<E1, E2, E3, R> { R apply(E1 e1, E2 e2, E3 e3); }
@FunctionalInterface public interface QuadFunction<E1, E2, E3, E4, R> { R apply(E1 e1, E2 e2, E3 e3, E4 e); }
We then estimate Insurencescore by combining age, salary, and balance.
Promise.combine( age, salary, balance, (a, s, b) -> { System.out.println( Thread.currentThread().getName() + " combine age, salary and age"); return a*(s + b); } ).thenAccept(i -> builder.setInsuranceScore(i) );
Using CompletionStage with a Promise.combine would help us to implement multiple threads for high complicated logics. However, we still have an issue of race conditions using CompletionStage. For example, we may accidentally implement two methods to estimate the title,
age.thenApply(x -> (x > 18) ? "Adult" : "Child").thenAccept(s -> builder.setTitle(s));
and
salary.thenApply(x -> (x > 1000000) ? "Millionair" : "").thenAccept(s -> builder.setTitle(s));
Then title would not determine as we do not know which one would finish first.
Producer graph
Producer graph (PG) is based on Dagger, dependency system, maintained by Google. The idea of PG is to wire methods by automatically inject arguments when methods required. Users need only to implement their business logic in methods labelled with @Produces and @Provides. Dagger would automatically local instances (i.e., values) to fill argument lists.
We start with a simple example:
- Mimic Rest API or heavy computation using utilities class.
The generateValue returns a ListenableFuture given a callable instance.
- DataProvideModule: provide source data. The data we would obtain by sending REST API calls.
- UserModule: provide business logic to build an user. Values required in UserModule will be injected from DataProvideModule.
- UserComponent: just instruct PG to generate DaggerUserComponent.
public class utilities { static private final Random rand = new Random(); static <T> ListenableFuture<T> generateValue(T value) { ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); return executor.submit( new Callable<T>() { @Override public T call() throws Exception { int computationTime = Math.abs(rand.nextInt()) % 10 + 2; System.out.println(Thread.currentThread().getName() + " needs " + computationTime + " seconds to get the value."); TimeUnit.SECONDS.sleep(computationTime); return value; } } ); } }
We then produce a module to provide all data (see Dagger for more information)
@ProducerModule final class DataProvideModule { @Produces @Named("Name") static ListenableFuture<String> produceName() { return utilities.generateValue("David"); } @Produces @Named("Address") static ListenableFuture<String> produceAddress() { return utilities.generateValue("Southampton"); } @Produces @Named("Age") static ListenableFuture<Integer> produceAge() { return utilities.generateValue(25); } @Produces @Named("Salary") static ListenableFuture<Integer> produceSalary() { return utilities.generateValue(45000); } @Produces @Named("Balance") static ListenableFuture<Integer> produceBalance() { return utilities.generateValue(25000); } }
Similar to DAGGER for dependency inject, PG would inject an instance (e.g., David) when seeing the @Named("Name") annotation.
Then we can introduce the UserModule to generate an user
@ProducerModule(includes = DataProvideModule.class) final class UserModule { @Produces static User getUser( @Named("Name") String name, @Named("Address") String address, @Named("Age") Integer age, @Named("Salary") Integer salary, @Named("Balance") Integer balance) { return new User.Builder() .setName(name) .setAddress(address) .setAge(age) .setBalanace(balance) .setSalary(salary) .build(); } }
UserComponent to generate a DaggerUserComponent.
@ProductionComponent(modules = {UserModule.class, ExecutorModule.class}) interface UserComponent { ListenableFuture<User> user(); }
and we can start using the PG to generate an user.
UserComponent userComponent = DaggerUserComponent.create(); ListenableFuture<User> user = userComponent.user(); try{ user.get().print(); }
Now we need to implement more complicate business logic for an user. For example, produceTitle based on Age
@Produces @Named("Title") static ListenableFuture<String> produceTitle(@Named("Age") Integer age) { System.out.println("Generate title: in " + Thread.currentThread().getName()); String title = (age > 18) ? "Adult" : "child"; return Futures.immediateFuture(title); }
Similar we can implement logic for Status, IncomsStatus, etc.
@Produces @Named("Title") static ListenableFuture<String> produceTitle(@Named("Age") Integer age) { System.out.println("Generate title: in " + Thread.currentThread().getName()); String title = (age > 18) ? "Adult" : "child"; return Futures.immediateFuture(title); } @Produces @Named("Status") static ListenableFuture<String> produceStatus(@Named("Balance") Integer balance, @Named("Salary") Integer salary) { System.out.println("Generate status : in " + Thread.currentThread().getName()); String status = (salary > -balance) ? "good" : "bad"; return Futures.immediateFuture(status); } @Produces @Named("IncomeStatus") static ListenableFuture<String> produceIncomeStatus(@Named("Age") Integer age, @Named("Salary") Integer salary) { System.out.println("Generate income status : in " + Thread.currentThread().getName()); String incomeStatus = (salary > 40000 && age < 50) ? "good income" : "normal income"; return Futures.immediateFuture(incomeStatus); } @Produces @Named("InsuranceScore") static ListenableFuture<Integer> produceInsuranceScore(@Named("Age") Integer age, @Named("Salary") Integer salary, @Named("Balance") Integer balance) { System.out.println("Generate insurance score : in " + Thread.currentThread().getName()); return Futures.immediateFuture(age * (salary + balance)); } @Produces @Named("RealInsuranceScore") static ListenableFuture<Integer> produceRealInsuranceScore(@Named("InsuranceScore") Integer insuranceScore) { System.out.println("Get Real insurance score : in " + Thread.currentThread().getName()); return (insuranceScore > 10000) ? utilities.generateValue(200000) : Futures.immediateFuture(0); }
Look closely to an example, produceRealInsuranceScore. It requires InsuranceScore to decide whether to call an API to get the real InsuranceScore. To satisfy the argument of produceRealInsuranceScore, PG would look value from @Named("InsuranceScore"), produceInsuranceScore. produceInsuranceScore required age, salary, and balance, that can be provided from DataProvideModule.
The annotation @Named is heavily used in this example to wire methods as we use only two based data type, Integer and String. However, in practice, it is quite common that we do not need to use @Named as a method return a unique data type.
PG does not provides a powerful tool to implement multiple threads in Java but also a dependency injection framework for methods. It helps users to avoid the biggest issues, race condition, in concurrent coding,
Conclusion
CompletableFuture seems to be a good choice for small or medium projects where the business logic would be manageable (e.g., simple data concatenation). Coding with CompletableFuture would lead back to an issue of race conditions when projects become bigger and bigger.
PG would take more time to set up the framework but would be more powerful than CompletableFuture. It helps to avoid the main problem in concurrent coding, race condition. In additions, PG would provide a dependency injections for methods that would be helpful for big projects.