Concurrency in Java

Concurrency in Java

All computers have multiple cores now. Even a cheap smart phone would have 2-4 cores to run 4-8 threads. Software needs to use multiple threads to take the advantage of multiple core systems. In this article, I would to introduce two approaches to use multiple threads in Java.

  • CompletionStage or CompletableFuture

git clone https://[email protected]/LeSiQuang/completablefuture.git

  • ListenableFuture and Producer graph

git clone https://[email protected]/LeSiQuang/producergraph.git

Future and CompletableFuture

Future has been introduced in Java long time ago to help dealing with multiple threads.

Here is an typical example

public class SquareCalculator {
    private ExecutorService executor
            = Executors.newSingleThreadExecutor();
    public Future<Integer> calculate(Integer input) {
        return executor.submit(() -> {
            int computationTime = (rand.nextInt()% 10) + 2;
            System.out.println(Thread.currentThread().getName() + " Sleep for " + computationTime);
            Thread.sleep(1000);
            return input * input;
        });
    }
}

and we can consume the result as

Future<Integer> future = squareCalculator.calculate(10);
System.out.println(future.get());

Future is good for a simple multiple threads but would make the code hard to follow (and of course to debug) when the business logic becomes more than complicated. Keep in mind that every time you call get, the whole thread is just hang to wait for the results. You can find the disadvantage of Future somewhere.

CompletionStage has been introduced in Java 8 to ease the coding using multiple threads. The advantage of CompletionStage is that CompletionStage allows users (1) to add listeners into CompletionStage objects and (2) to provide a method to combine two CompletionStage objects into a CompletionStage. These two features allow users to inject business login into threads without calling get, which will block threads.

Assume we need to write a service to accept input an userId and produce a object User including name, address, etc. which can be archived by calling REST API.

public class User {
    String name;
    String address;
    Integer age;
    Integer salary;
    Integer balance;
    String title;
    String status;
    String incomeStatus;
    String insuranceScore;
}

User is built using builder pattern.

Assume for each information (e.g., name, address), we need to make a call REST API with userId to extract information. We mimic the calls by a method, generateValue, where CompletionStage is generated by using CompletableFuture.supplyAsync with Supplier.

static <T> CompletionStage<T> generateValue(T value) {
    return
            CompletableFuture.supplyAsync(
                    new Supplier<T>() {
                        @Override
                        public T get() {
                            int computationTime = rand.nextInt()% 10 + 2;
                            try {
                                //This mimic the REST API call
System.out.println(Thread.currentThread().getName() + " needs" + computationTime + " seconds to get the value.");
                                TimeUnit.SECONDS.sleep(computationTime);
                                
                            } catch (Exception e) {
                            }
                            return value;
                        }
                    }
            );
}

and the process of getting information for userID can be implemented as

CompletionStage<String> name = generateValue("David");
CompletionStage<String> address = generateValue("Southampton");
CompletionStage<Integer> age = generateValue(30);
CompletionStage<Integer> salary = generateValue(100000);
CompletionStage<Integer> balance = generateValue(200000);
User.Builder builder = new User.Builder();

name.thenAccept(s -> builder.setName(s));
address.thenAccept(s -> builder.setAddress(s));
age.thenAccept(i -> builder.setAge(i));
salary.thenAccept(i -> builder.setSalary(i));
balance.thenAccept(i -> builder.setBalanace(i));

Take a close look at one example, we call an API to get name, (equivalent to generateValue("David"), and then inject the consumer of name as builder.setName. So when name is ready, it will be updated into builder.

We then wait for all threads returning results and then return the final users.

CompletionStage<Void> finish = CompletableFuture.allOf(name.toCompletableFuture(),
        address.toCompletableFuture(),
        age.toCompletableFuture(),
        salary.toCompletableFuture(),
        balance.toCompletableFuture());

try {
    finish.toCompletableFuture().get();
    return builder.build();
} catch (Exception e) {
}

That is simple and straight forward.

Assume we want to add title for the user based on age: title = (age >18)?"adult":"children". Then we use apply to age straight forward.

age.thenApply(x ->
        (x > 18) ? "Adult" : "Child").thenAccept(s -> builder.setTitle(s));

Now, if we want to calculate the finance status of a user based on salary and balance, e.g., (salary >-balance)?"good":"bad".

salary.thenCombine(balance,
        (sal, bal) -> {
            System.out.println(
                    Thread.currentThread().getName() + " combine salary and balance");
            return (sal > -bal) ? "good" : "bad";
        }).thenAccept(s -> builder.setStatus(s));

or age with salary to calculate incomeStatus.

salary.thenCombine(age,
        (sal, a) -> {
            System.out.println(
                    Thread.currentThread().getName() + " combine salary and age");
            return (sal > 40000 && a < 50) ? "good income" : "normal income";
        }).thenAccept(s -> builder.setIncomeStatus(s));


So far so good. It is quite straight forward to inject business logic as long as it is based on one or two variables. How about three variables.

CompletionStage does not provide a combination of more than 2 CompletionStage object. However, we need to implement a simple wrapper to combine 3 or more CompletionStage objects.

public class Promise {
    static <T, U, R>
    CompletionStage<R> combine(CompletionStage<T> value1,
                               CompletionStage<U> value2,
                               BiFunction<? super T, ? super U, ? extends R> fn) {
        return value1.thenCombine(value2, fn);
    }


    static <E1, E2, E3, R>
    CompletionStage<R> combine(CompletionStage<E1> value1,
                               CompletionStage<E2> value2,
                               CompletionStage<E3> value3,
                               TripleFunction<? super E1, ? super E2, ? super E3, ? extends R> fn) {
        return value1.thenCompose(a ->
                value2.thenCombine(value3,
                        (b, c) -> fn.apply(a, b, c)));
    }


    static <E1, E2, E3, E4, R>
    CompletionStage<R> combine(CompletionStage<E1> value1,
                               CompletionStage<E2> value2,
                               CompletionStage<E3> value3,
                               CompletionStage<E4> value4,
                               QuadFunction<? super E1, ? super E2, ? super E3, ? super E4, ? extends R> fn) {
        return
                value1.thenCompose(a ->
                        value2.thenCompose(b ->
                                value3.thenCombine(value4,
                                        (c, d) -> fn.apply(a, b, c, d))));
    }

}


with two functional interface TripleFunction and QuadFunction.

@FunctionalInterface
public interface  TripleFunction<E1, E2, E3, R> {
    R apply(E1 e1, E2 e2, E3 e3);
}


@FunctionalInterface
public interface  QuadFunction<E1, E2, E3, E4, R> {
    R apply(E1 e1, E2 e2, E3 e3, E4 e);
}

We then estimate Insurencescore by combining age, salary, and balance.

Promise.combine(
        age, salary, balance,
        (a, s, b) -> {
            System.out.println(
                    Thread.currentThread().getName() + " combine age, salary and age");
            return a*(s + b);
        }
).thenAccept(i -> builder.setInsuranceScore(i) );


Using CompletionStage with a Promise.combine would help us to implement multiple threads for high complicated logics. However, we still have an issue of race conditions using CompletionStage. For example, we may accidentally implement two methods to estimate the title,

age.thenApply(x ->
        (x > 18) ? "Adult" : "Child").thenAccept(s -> builder.setTitle(s));

and

salary.thenApply(x ->
        (x > 1000000) ? "Millionair" : "").thenAccept(s -> builder.setTitle(s));

Then title would not determine as we do not know which one would finish first.

Producer graph

Producer graph (PG) is based on Dagger, dependency system, maintained by Google. The idea of PG is to wire methods by automatically inject arguments when methods required. Users need only to implement their business logic in methods labelled with @Produces and @Provides. Dagger would automatically local instances (i.e., values) to fill argument lists.

We start with a simple example:

  • Mimic Rest API or heavy computation using utilities class.

The generateValue returns a ListenableFuture given a callable instance.

  • DataProvideModule: provide source data. The data we would obtain by sending REST API calls.
  • UserModule: provide business logic to build an user. Values required in UserModule will be injected from DataProvideModule.
  • UserComponent: just instruct PG to generate DaggerUserComponent.
public class utilities {
    static private final Random rand = new Random();

    static <T> ListenableFuture<T> generateValue(T value) {
        ListeningExecutorService executor =
                MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

        return executor.submit(
                new Callable<T>() {
                    @Override
                    public T call() throws Exception {
                        int computationTime = Math.abs(rand.nextInt()) % 10 + 2;
                        System.out.println(Thread.currentThread().getName() + " needs " + computationTime + " seconds to get the value.");
                        TimeUnit.SECONDS.sleep(computationTime);
                        return value;
                    }
                }
        );
    }
}



We then produce a module to provide all data (see Dagger for more information)

@ProducerModule
final class DataProvideModule {
    @Produces
    @Named("Name")
    static ListenableFuture<String> produceName() {
        return utilities.generateValue("David");
    }

    @Produces
    @Named("Address")
    static ListenableFuture<String> produceAddress() {
        return utilities.generateValue("Southampton");
    }

    @Produces
    @Named("Age")
    static ListenableFuture<Integer> produceAge() {
        return utilities.generateValue(25);
    }


    @Produces
    @Named("Salary")
    static ListenableFuture<Integer> produceSalary() {
        return utilities.generateValue(45000);
    }

    @Produces
    @Named("Balance")
    static ListenableFuture<Integer> produceBalance() {
        return utilities.generateValue(25000);
    }
}


Similar to DAGGER for dependency inject, PG would inject an instance (e.g., David) when seeing the @Named("Name") annotation.

Then we can introduce the UserModule to generate an user

@ProducerModule(includes = DataProvideModule.class)
final class UserModule {

 @Produces
 static User getUser(
        @Named("Name") String name,
        @Named("Address") String address,
        @Named("Age") Integer age,
        @Named("Salary") Integer salary,
        @Named("Balance") Integer balance) {

    return new User.Builder()
            .setName(name)
            .setAddress(address)
            .setAge(age)
            .setBalanace(balance)
            .setSalary(salary)
            .build();
 }

}

UserComponent to generate a DaggerUserComponent.

@ProductionComponent(modules = {UserModule.class,  ExecutorModule.class})
interface UserComponent {
    ListenableFuture<User> user();
}


and we can start using the PG to generate an user.

UserComponent userComponent = DaggerUserComponent.create();

ListenableFuture<User> user = userComponent.user();

try{
    user.get().print();
}


Now we need to implement more complicate business logic for an user. For example, produceTitle based on Age

@Produces
@Named("Title")
static ListenableFuture<String> produceTitle(@Named("Age") Integer age) {
    System.out.println("Generate title: in " + Thread.currentThread().getName());
    String title = (age > 18) ? "Adult" : "child";
    return Futures.immediateFuture(title);
}

Similar we can implement logic for Status, IncomsStatus, etc.

@Produces
@Named("Title")
static ListenableFuture<String> produceTitle(@Named("Age") Integer age) {
    System.out.println("Generate title: in " + Thread.currentThread().getName());
    String title = (age > 18) ? "Adult" : "child";
    return Futures.immediateFuture(title);
}


@Produces
@Named("Status")
static ListenableFuture<String> produceStatus(@Named("Balance") Integer balance,
                                              @Named("Salary") Integer salary) {
    System.out.println("Generate status : in " + Thread.currentThread().getName());
    String status = (salary > -balance) ? "good" : "bad";
    return Futures.immediateFuture(status);
}

@Produces
@Named("IncomeStatus")
static ListenableFuture<String> produceIncomeStatus(@Named("Age") Integer age,
                                                    @Named("Salary") Integer salary) {
    System.out.println("Generate income status : in " + Thread.currentThread().getName());
    String incomeStatus = (salary > 40000 && age < 50) ? "good income" : "normal income";
    return Futures.immediateFuture(incomeStatus);
}

@Produces
@Named("InsuranceScore")
static ListenableFuture<Integer> produceInsuranceScore(@Named("Age") Integer age,
                                                       @Named("Salary") Integer salary,
                                                       @Named("Balance") Integer balance) {
    System.out.println("Generate insurance score : in " + Thread.currentThread().getName());
    return Futures.immediateFuture(age * (salary + balance));
}

@Produces
@Named("RealInsuranceScore")
static ListenableFuture<Integer> produceRealInsuranceScore(@Named("InsuranceScore") Integer insuranceScore) {
    System.out.println("Get Real insurance score : in " + Thread.currentThread().getName());
    return (insuranceScore > 10000) ? utilities.generateValue(200000) : Futures.immediateFuture(0);
}

Look closely to an example, produceRealInsuranceScore. It requires InsuranceScore to decide whether to call an API to get the real InsuranceScore. To satisfy the argument of produceRealInsuranceScore, PG would look value from @Named("InsuranceScore"), produceInsuranceScore. produceInsuranceScore required age, salary, and balance, that can be provided from DataProvideModule.

The annotation @Named is heavily used in this example to wire methods as we use only two based data type, Integer and String. However, in practice, it is quite common that we do not need to use @Named as a method return a unique data type.

PG does not provides a powerful tool to implement multiple threads in Java but also a dependency injection framework for methods. It helps users to avoid the biggest issues, race condition, in concurrent coding,


Conclusion


CompletableFuture seems to be a good choice for small or medium projects where the business logic would be manageable (e.g., simple data concatenation). Coding with CompletableFuture would lead back to an issue of race conditions when projects become bigger and bigger.


PG would take more time to set up the framework but would be more powerful than CompletableFuture. It helps to avoid the main problem in concurrent coding, race condition. In additions, PG would provide a dependency injections for methods that would be helpful for big projects.








要查看或添加评论,请登录

Quang Si LE, PhD的更多文章

  • ChatGPT, far to the future

    ChatGPT, far to the future

    tl;dr; In terms of Software Engineering and creative works, both ChatGPT and Bard still have a long way to go. ChatGPT…

    1 条评论
  • Virtual threads, an Iceberg solution

    Virtual threads, an Iceberg solution

    tl;dr;Virtual threads can make it easier to put code in a thread, but they don't address the core problem of multiple…

  • Java: how can you leave reference const out?

    Java: how can you leave reference const out?

    lt;dr; The absence of const reference in Java creates a challenge in controlling object status as every function is a…

  • Concurrency in C++

    Concurrency in C++

    Related topic: Concurrency in Java. C++11 (and later) provides a simple way for concurrency using std::async method.

  • Multiple steps solution

    Multiple steps solution

    Continuous Although the problem looks like a binary classification problem, and we will employ the binary…

  • Machine learning for fun

    Machine learning for fun

    Author: Quang Si Le ([email protected]) To my wife, H?ng, who has been so supportive during my hard period.

    3 条评论
  • Machine learning and Artificial intelligent: not a new tool but weapon

    Machine learning and Artificial intelligent: not a new tool but weapon

    When Google Deepmind defeated Ke Jie [1], the world number one Go player, the most sophisticated game, everyone had to…

  • Machine Learning: inevitable but expensive path for future business.

    Machine Learning: inevitable but expensive path for future business.

    Machine Learning seems to be the future in any services with a large number of customers. Companies / organizations may…

    1 条评论

社区洞察

其他会员也浏览了