プロになるJava―仕事で必要なプログラミングの知識がゼロから身につく最高の指南書
目次
- 目次
- はじめに
- Java8のCompleteFuture
- 利点1: 手動での処理停止
- 利点2: Futureの処理を完了したときに呼ばれるCallback関数を設定できる
- 利点3 複数のFutureを繋いで処理することができる。
- 利点4: Exception処理が実装できる。
- Java9でのCompleteFutureの改善
- CompletableFutureの最大スレッド数の設定
- 参考資料
- MyEnigma Supporters
はじめに
最近様々な並列処理を学んでいるのですが、
Javaで説明されていることが多いため、
Javaにおける並列処理やスレッドを学んでいます。
そこで気になったのは、Future/promiseパターンによる並列パターンは
Pythonにも、Juliaにもあり、
Javaの標準機能にももちろん存在しているのですが、
FutureとCompleteFutureという2つの実装があり、
違いがよくわからなかったので、まとめてみました。
Java8のCompleteFuture
JavaにおけるFutureはJava5 (2004年リリース)から存在していますが、
CompletableFutureはJava8 (2014年リリース)から入った
Future APIの進化系であり、
並列プログラミングのデザインパターンの一つであるFuture/promiseを実現するためのAPIです。
なぜこのCompletableFutureが実装されたかというと、
Java5のFutureには、下記の問題点がありました。
手動で並列処理を完了したり、停止することができない。
Futureの完了時にコールバック関数を呼ぶことなどができない
複数のFutureをつないで処理することができない
例外処理をすることができない
CompletableFutureでは以上の問題が解決されています。
下記では、それぞれの欠点を解決する使い方を説明します。
利点1: 手動での処理停止
CompleteFutureでは、
下記のようにcomplete関数で、手動でfutureの処理を完了させることができます。
(計算結果をキャッシュしているときなどに便利そうです)
package complete_future; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class Pros1 { // ~~~~ stdout ~~~~ // Future's Result // Process finished with exit code 0 public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture = new CompletableFuture<>(); // Futureの処理を外部から手動でcompleteで止めることができる。 completableFuture.complete("Future's Result"); String result = completableFuture.get(); System.out.println(result); } }
また、cancel関数を使うことで実行中の処理をキャンセルすることもできます。
package complete_future; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class Pros1_1 { static void sleep(int millis){ try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { sleep(4000); return "Hello"; }); int count = 0; int waitTimeCounter = new Random().nextInt(7); System.out.println(waitTimeCounter); while(!completableFuture.isDone()) { sleep(2000); System.out.println("Waiting..."); count++; if(count > waitTimeCounter) completableFuture.cancel(true); } if(!completableFuture.isCancelled()){ String result = completableFuture.get(); System.out.println("Retrieved result from the task - " + result); }else { System.out.println("The future was cancelled"); } } }
利点2: Futureの処理を完了したときに呼ばれるCallback関数を設定できる
CompleteFutureでは、
supplayAsync()で非同期に関数を実行することができ、
続いて、
- thenApply: Futureの結果を受け取って、何かを返す場合 (関数インターフェイスがFunction)
- thenAccept: Futureの結果を受け取って、何もしない場合 (関数インターフェイスがConsumer)
- thenRun: Futureの結果も使わずに、処理を実行する場合 (関数インターフェイスがRunnable)
を使って、Futureを実行した後に、Callback関数を設定することができます。
package complete_future; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; public class Pros2 { // ~~~~ stdout ~~~~ // Hello Tom // Process finished with exit code 0 public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> whatsYourNameFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Tom"; }); // thenApplyを使って、futureの処理後にcallback関数を設定した、新しいfutureを作る。 CompletableFuture<String> greetingFuture = whatsYourNameFuture.thenApply(name -> { return "Hello " + name; }); System.out.println(greetingFuture.get()); } }
また、これらの関数には、
thenApplyAsync
thenAcceptAsync
のように、末尾にAsyncがつくものがあり、
これはCallback関数が非同期(別スレッド)で実行されます。
また、メソッドチェーンを使えば、
2つ以上の複数のCallbackを数珠つなぎにすることも可能です。
利点3 複数のFutureを繋いで処理することができる。
先程のthenApply関数を使うことで、
複数の関数を繋いで、非同期処理が可能ですが、
ComputableFutureを返す複数の関数を繋ぎたい場合は、
Futureが入れ子になってしまうので、thenApplyは使えません。
そのような場合は、thenComposeを使うことで、
複数のFutureの結果を直列に組み合わせて、
処理することが可能になります。
一方、2つのFutureを並列に同時に走らせて、
その2つの結果を使って何かをしたい場合は、
thenCombine()が使えます。
下記の例では、2つのFutreを同時に走らせて、
その結果を使って平均値を計算しています。
public class Pros3 { //~~~~ stdout ~~~~ //Mean: 121.4 //Elapsed time:3007 ms public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Double> future1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new IllegalStateException(e); } return 65.0; }); CompletableFuture<Double> future2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { throw new IllegalStateException(e); } return 177.8; }); long startTime = System.currentTimeMillis(); CompletableFuture<Double> combinedFuture = future1 .thenCombine(future2, (score1, score2) -> (score1+score2)/2.0); System.out.println("Mean: " + combinedFuture.get()); long endTime = System.currentTimeMillis(); System.out.println("Elapsed time:" + (endTime - startTime) + " ms"); } }
以上の例は、2つのfutureの処理が終わるのを待ちますが、
どちらか結果の出力が早い方を使いたい場合は、acceptEitherAsyncが使えます。
package complete_future; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; public class Pros3_2 { //~~~~ stdout ~~~~ //177.8 //Elapsed time:2004 ms //Process finished with exit code 0 public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Double> future1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new IllegalStateException(e); } return 65.0; }); CompletableFuture<Double> future2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { throw new IllegalStateException(e); } return 177.8; }); long startTime = System.currentTimeMillis(); CompletableFuture<Void> combinedFuture = future1.acceptEitherAsync( future2, score -> System.out.println(score)); combinedFuture.get(); long endTime = System.currentTimeMillis(); System.out.println("Elapsed time:" + (endTime - startTime) + " ms"); } }
また、3つ以上のfutureの結果を使って処理したい場合は、
- allof: すべてのfutureの処理を待つ
package complete_future; import java.util.ArrayList; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class Pros3_3 { static CompletableFuture<Integer> getFuture(int i){ return CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(i); } catch (InterruptedException e) { throw new IllegalStateException(e); } return i; }); } //~~~~ stdout ~~~~ //[4, 5, 6] //Elapsed time:6008 ms //Process finished with exit code 0 public static void main(String[] args) throws ExecutionException, InterruptedException { ArrayList<CompletableFuture<Integer>> taskFutures = new ArrayList<>(); for (int i = 4; i <= 6; i++) { taskFutures.add(getFuture(i)); } System.out.println(taskFutures); CompletableFuture<?> allCompletableFuture = CompletableFuture .allOf(taskFutures.toArray(new CompletableFuture[0])) .thenApply(v -> taskFutures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()) ); long startTime = System.currentTimeMillis(); System.out.println(allCompletableFuture.get()); long endTime = System.currentTimeMillis(); System.out.println("Elapsed time:" + (endTime - startTime) + " ms"); } }
- anyof: どれかの一番早いfutureの結果を受け取る
package complete_future; import java.util.ArrayList; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; public class Pros3_4 { static CompletableFuture<Integer> getFuture(int i){ return CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(i); } catch (InterruptedException e) { throw new IllegalStateException(e); } return i; }); } //~~~~ stdout ~~~~ // 4 //Elapsed time:4008 ms //Process finished with exit code 0 public static void main(String[] args) throws ExecutionException, InterruptedException { ArrayList<CompletableFuture<Integer>> taskFutures = new ArrayList<>(); for (int i = 3; i <= 6; i++) { taskFutures.add(getFuture(i)); } CompletableFuture<?> allCompletableFuture = CompletableFuture .anyOf(taskFutures.toArray(new CompletableFuture[0])); long startTime = System.currentTimeMillis(); System.out.println(allCompletableFuture.get()); long endTime = System.currentTimeMillis(); System.out.println("Elapsed time:" + (endTime - startTime) + " ms"); } }
を使って、3つ以上のfutureの結果を使うことができます。
利点4: Exception処理が実装できる。
Futureのメソッドチェーンの中に、
exceptionally callbackを設定することで、
メソッドチェーンの中でexceptionが発生したときの処理を実装することができます。
package complete_future; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.function.Supplier; public class Pros4 { // ~~~~ stdout ~~~~ // Oops! We have an exception - java.lang.IllegalArgumentException: Age can not be negative // Unknown! public static void main(String[] args) throws ExecutionException, InterruptedException { Supplier<Integer> ageSupplier = () -> -1; CompletableFuture<String> maturityFuture = CompletableFuture .supplyAsync(ageSupplier) .thenApply(age -> { if(age < 0) { throw new IllegalArgumentException("Age can not be negative"); } if(age > 18) { return "Adult"; } else { return "Child"; } }).exceptionally(ex -> { System.out.println("Oops! We have an exception - " + ex.getMessage()); return "Unknown!"; }); System.out.printf(maturityFuture.get()); } }
Java9でのCompleteFutureの改善
前述の通り、Java8でCompletableFutureは導入されましたが、
Java9でいくつかの改善が実施されています
timeout関連のAPIの追加
下記のようにtimeout関連のAPIが追加されています。
元々のFutureから、get()で時間を下記のように指定すると
ある一定以上の時間がかかる時は、TimeoutExceptionを発生させることが可能でしたが、
System.out.println(allCompletableFuture.get(10, TimeUnit.SECONDS));
これは同期的なタイムアウトなので、何か別の作業をしながら、
タイムアウトをチェックすることができませんでした。
java9では、
- orTimeout(): ある一定時間以上立つと、TimeoutExceptionをスロー
- completeOnTimeout: ある一定時間以上立つと、指定した値を返す
ようなAPIが追加されています。
CompletableFutureの最大スレッド数の設定
こちらを参照
参考資料
プロになるJava―仕事で必要なプログラミングの知識がゼロから身につく最高の指南書
MyEnigma Supporters
もしこの記事が参考になり、
ブログをサポートしたいと思われた方は、
こちらからよろしくお願いします。