MyEnigma

とある自律移動システムエンジニアのブログです。#Robotics #Programing #C++ #Python #MATLAB #Vim #Mathematics #Book #Movie #Traveling #Mac #iPhone

JavaのFutureとCompleteFutureの違い


プロになるJava―仕事で必要なプログラミングの知識がゼロから身につく最高の指南書

目次

はじめに

最近様々な並列処理を学んでいるのですが、

Javaで説明されていることが多いため、

Javaにおける並列処理やスレッドを学んでいます。

 

そこで気になったのは、Future/promiseパターンによる並列パターンは

ja.wikipedia.org

Pythonにも、Juliaにもあり、

myenigma.hatenablog.com

Javaの標準機能にももちろん存在しているのですが、

FutureとCompleteFutureという2つの実装があり、

違いがよくわからなかったので、まとめてみました。

 

Java8のCompleteFuture

JavaにおけるFutureはJava5 (2004年リリース)から存在していますが、

docs.oracle.com

CompletableFutureはJava8 (2014年リリース)から入った

Future APIの進化系であり、

並列プログラミングのデザインパターンの一つであるFuture/promiseを実現するためのAPIです。

docs.oracle.com

 

なぜこのCompletableFutureが実装されたかというと、

Java5のFutureには、下記の問題点がありました。

  • 手動で並列処理を完了したり、停止することができない。

  • Futureの完了時にコールバック関数を呼ぶことなどができない

  • 複数のFutureをつないで処理することができない

  • 例外処理をすることができない

CompletableFutureでは以上の問題が解決されています。

 

下記では、それぞれの欠点を解決する使い方を説明します。

利点1: 手動での処理停止

CompleteFutureでは、

下記のようにcomplete関数で、手動でfutureの処理を完了させることができます。

(計算結果をキャッシュしているときなどに便利そうです)

package complete_future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class Pros1 {
    // ~~~~ stdout ~~~~
    // Future's Result
    // Process finished with exit code 0
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        // Futureの処理を外部から手動でcompleteで止めることができる。
        completableFuture.complete("Future's Result");
        String result = completableFuture.get();
        System.out.println(result);
    }
}

 

また、cancel関数を使うことで実行中の処理をキャンセルすることもできます。

package complete_future;

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class Pros1_1 {
    static void sleep(int millis){
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            sleep(4000);
            return "Hello";
        });

        int count = 0;
        int waitTimeCounter = new Random().nextInt(7);
        System.out.println(waitTimeCounter);

        while(!completableFuture.isDone()) {
            sleep(2000);
            System.out.println("Waiting...");
            count++;
            if(count > waitTimeCounter) completableFuture.cancel(true);
        }

        if(!completableFuture.isCancelled()){
            String result = completableFuture.get();
            System.out.println("Retrieved result from the task - " + result);
        }else {
            System.out.println("The future was cancelled");
        }
    }
}

yohhoy.hatenadiary.jp

 

利点2: Futureの処理を完了したときに呼ばれるCallback関数を設定できる

CompleteFutureでは、

supplayAsync()で非同期に関数を実行することができ、

続いて、

  • thenApply: Futureの結果を受け取って、何かを返す場合 (関数インターフェイスがFunction)
  • thenAccept: Futureの結果を受け取って、何もしない場合 (関数インターフェイスがConsumer)
  • thenRun: Futureの結果も使わずに、処理を実行する場合 (関数インターフェイスがRunnable)

を使って、Futureを実行した後に、Callback関数を設定することができます。

package complete_future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class Pros2 {
    // ~~~~ stdout ~~~~
    // Hello Tom
    // Process finished with exit code 0
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> whatsYourNameFuture = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return "Tom";
        });

        // thenApplyを使って、futureの処理後にcallback関数を設定した、新しいfutureを作る。
        CompletableFuture<String> greetingFuture = whatsYourNameFuture.thenApply(name -> {
            return "Hello " + name;
        });

        System.out.println(greetingFuture.get());
    }
}

また、これらの関数には、

  • thenApplyAsync

  • thenAcceptAsync

のように、末尾にAsyncがつくものがあり、

これはCallback関数が非同期(別スレッド)で実行されます。

また、メソッドチェーンを使えば、

2つ以上の複数のCallbackを数珠つなぎにすることも可能です。

 

利点3 複数のFutureを繋いで処理することができる。

先程のthenApply関数を使うことで、

複数の関数を繋いで、非同期処理が可能ですが、

ComputableFutureを返す複数の関数を繋ぎたい場合は、

Futureが入れ子になってしまうので、thenApplyは使えません。

そのような場合は、thenComposeを使うことで、

docs.oracle.com

複数のFutureの結果を直列に組み合わせて、

処理することが可能になります。

 

一方、2つのFutureを並列に同時に走らせて、

その2つの結果を使って何かをしたい場合は、

thenCombine()が使えます。

下記の例では、2つのFutreを同時に走らせて、

その結果を使って平均値を計算しています。

public class Pros3 {
    //~~~~ stdout ~~~~
    //Mean: 121.4
    //Elapsed time:3007 ms
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Double> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return 65.0;
        });

        CompletableFuture<Double> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return 177.8;
        });

        long startTime = System.currentTimeMillis();
        CompletableFuture<Double> combinedFuture = future1
                .thenCombine(future2, (score1, score2) -> (score1+score2)/2.0);
        System.out.println("Mean: " + combinedFuture.get());
        long endTime = System.currentTimeMillis();
        System.out.println("Elapsed time:" + (endTime - startTime) + " ms");
    }
}

以上の例は、2つのfutureの処理が終わるのを待ちますが、

どちらか結果の出力が早い方を使いたい場合は、acceptEitherAsyncが使えます。

package complete_future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class Pros3_2 {
    //~~~~ stdout ~~~~
    //177.8
    //Elapsed time:2004 ms
    //Process finished with exit code 0
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Double> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return 65.0;
        });

        CompletableFuture<Double> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return 177.8;
        });

        long startTime = System.currentTimeMillis();
        CompletableFuture<Void> combinedFuture = future1.acceptEitherAsync(
                future2, score -> System.out.println(score));
        combinedFuture.get();
        long endTime = System.currentTimeMillis();
        System.out.println("Elapsed time:" + (endTime - startTime) + " ms");
    }
}

また、3つ以上のfutureの結果を使って処理したい場合は、

  • allof: すべてのfutureの処理を待つ
package complete_future;

import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class Pros3_3 {

    static CompletableFuture<Integer> getFuture(int i){
        return CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(i);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return i;
        });
    }

    //~~~~ stdout ~~~~
    //[4, 5, 6]
    //Elapsed time:6008 ms
    //Process finished with exit code 0
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ArrayList<CompletableFuture<Integer>> taskFutures = new ArrayList<>();
        for (int i = 4; i <= 6; i++) {
            taskFutures.add(getFuture(i));
        }
        System.out.println(taskFutures);

        CompletableFuture<?> allCompletableFuture = CompletableFuture
                .allOf(taskFutures.toArray(new CompletableFuture[0]))
                .thenApply(v -> taskFutures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList())
                );

        long startTime = System.currentTimeMillis();
        System.out.println(allCompletableFuture.get());
        long endTime = System.currentTimeMillis();
        System.out.println("Elapsed time:" + (endTime - startTime) + " ms");
    }
}
  • anyof: どれかの一番早いfutureの結果を受け取る
package complete_future;

import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class Pros3_4 {

    static CompletableFuture<Integer> getFuture(int i){
        return CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(i);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return i;
        });
    }

    //~~~~ stdout ~~~~
    // 4
    //Elapsed time:4008 ms
    //Process finished with exit code 0
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ArrayList<CompletableFuture<Integer>> taskFutures = new ArrayList<>();
        for (int i = 3; i <= 6; i++) {
            taskFutures.add(getFuture(i));
        }
        CompletableFuture<?> allCompletableFuture = CompletableFuture
                .anyOf(taskFutures.toArray(new CompletableFuture[0]));

        long startTime = System.currentTimeMillis();
        System.out.println(allCompletableFuture.get());
        long endTime = System.currentTimeMillis();
        System.out.println("Elapsed time:" + (endTime - startTime) + " ms");
    }
}

を使って、3つ以上のfutureの結果を使うことができます。

 

利点4: Exception処理が実装できる。

Futureのメソッドチェーンの中に、

exceptionally callbackを設定することで、

メソッドチェーンの中でexceptionが発生したときの処理を実装することができます。

package complete_future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;

public class Pros4 {
    // ~~~~ stdout ~~~~
    // Oops! We have an exception - java.lang.IllegalArgumentException: Age can not be negative
    // Unknown!
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Supplier<Integer> ageSupplier = () -> -1;

        CompletableFuture<String> maturityFuture = CompletableFuture
                .supplyAsync(ageSupplier)
                .thenApply(age -> {
                if(age < 0) {
                    throw new IllegalArgumentException("Age can not be negative");
                }
                if(age > 18) {
                    return "Adult";
                } else {
                    return "Child";
                }
                }).exceptionally(ex -> {
                    System.out.println("Oops! We have an exception - " + ex.getMessage());
                    return "Unknown!";
                });
        System.out.printf(maturityFuture.get());
    }
}

Java9でのCompleteFutureの改善

前述の通り、Java8でCompletableFutureは導入されましたが、

Java9でいくつかの改善が実施されています

www.baeldung.com

 

timeout関連のAPIの追加

下記のようにtimeout関連のAPIが追加されています。

www.codingame.com

crondev.blog

 

元々のFutureから、get()で時間を下記のように指定すると

ある一定以上の時間がかかる時は、TimeoutExceptionを発生させることが可能でしたが、

System.out.println(allCompletableFuture.get(10, TimeUnit.SECONDS));

これは同期的なタイムアウトなので、何か別の作業をしながら、

タイムアウトをチェックすることができませんでした。

 

java9では、

  • orTimeout(): ある一定時間以上立つと、TimeoutExceptionをスロー
  • completeOnTimeout: ある一定時間以上立つと、指定した値を返す

ようなAPIが追加されています。

 

CompletableFutureの最大スレッド数の設定

こちらを参照

qiita.com

 

参考資料

www.callicoder.com

codechacha.com

www.baeldung.com

qiita.com

www.linkedin.com

qiita.com

pppurple.hatenablog.com

qiita.com

myenigma.hatenablog.com

myenigma.hatenablog.com

myenigma.hatenablog.com

myenigma.hatenablog.com


プロになるJava―仕事で必要なプログラミングの知識がゼロから身につく最高の指南書

 

MyEnigma Supporters

もしこの記事が参考になり、

ブログをサポートしたいと思われた方は、

こちらからよろしくお願いします。

myenigma.hatenablog.com