google-guava ListeningExecutorService 試してみた
私もここ数年 Java 開発してて、パラメータチェックとか、アプリケーション内での簡単なキャッシュ処理とかのユーティリティは guava よく使うんだけど、スレッド処理で guava の concurrent パッケージにある ListeningExecutorService ってクラスを使ってみたからまとめてみる. ちなみにGoogle guavaライブラリの説明は以下のようにされていてgoogleのJavaプロジェクトにおけるコアライブラリとして発展していて. 最新のバージョンは、The latest release is 17.0, released April 22, 2014
とある.
The Guava project contains several of Google's core libraries that we rely on in our Java-based projects: collections, caching, primitives support, concurrency libraries, common annotations, string processing, I/O, and so forth.
まずは、ListeningExecutorService
の生成はこんな感じ. java で支援されているのはjava.util.concurrent.Executors
を使ってExecutorService
を生成するがguavaの拡張版は、guavaが提供するcom.google.common.util.concurrent.MoreExecutors
を使ってListeningExecutorService
を生成する.
protected static ListeningExecutorService es = MoreExecutors.listeningDecorator( Executors.newCachedThreadPool(new ThreadFactory() { public Thread newThread(Runnable r) { return new Thread(r, "task-worker#" + RandomUtils.nextInt(100)); }}));
そしてListeningExecutorService
スレッドの実行方法は以下のように
ListenableFuture<String> future = es.submit(new Callable<String>() { /* (non-Javadoc) * @see java.util.concurrent.Callable#call() */ public String call() throws Exception { // do anything... }});
このcom.google.common.util.concurrent.ListenableFuture
が、既存のjava.util.concurrent.Future
の拡張で使いやすかった。
シンプルな例を少し作ったのでまとめておきます。
- リスナー
ListenableFuture<?>
にスレッドの処理が終了されたときに呼び出されるリスナーを登録する方式
public class CallableWithListenerListenableFutureExcample { private static Logger log = LoggerFactory .getLogger(CallableWithListenerListenableFutureExcample.class); protected static ListeningExecutorService es = MoreExecutors.listeningDecorator( Executors.newCachedThreadPool(new ThreadFactory() { public Thread newThread(Runnable r) { return new Thread(r, "task-worker#" + RandomUtils.nextInt(100)); }})); protected static ExecutorService th_pool = Executors.newCachedThreadPool(new ThreadFactory() { public Thread newThread(Runnable r) { return new Thread(r, "thrd-worker#" + RandomUtils.nextInt(100)); } }); public void test() throws Exception { log.debug(">>> test-task/start"); final CountDownLatch latch = new CountDownLatch(1); final ListenableFuture<String> future = es.submit(new Callable<String>() { /* (non-Javadoc) * @see java.util.concurrent.Callable#call() */ public String call() throws Exception { try { long waitMillis = Long.parseLong(RandomStringUtils.randomNumeric(5)); log.debug("process waiting.... / waitMillis={}ms", waitMillis); if (waitMillis > 30000) throw new IllegalStateException("system busy..."); log.debug("processing..."); Thread.sleep(waitMillis); // do anything log.debug("process completed."); return "callback-task finished/" + Thread.currentThread().getName(); } finally { latch.countDown(); } } }); future.addListener(new Thread() { public void run() { try { log.debug("--- start Listener: isDone={}, isCancelled={}", future.isDone(), future.isCancelled()); log.debug("Listener# future.get()={}", future.get()); // if task failed future.get returned 'java.util.concurrent.ExecutionException' log.debug("--- end Listener"); } catch (Exception e) { log.error("Failed", e); } } }, th_pool); log.debug(">>> test-task/end"); } public static void main(String[] args) { try { new CallableWithListenerListenableFutureExcample().test(); } catch (Exception e) { e.printStackTrace(); } } }
実行結果
18:34:41.532 DEBUG [main] o.h.s.g.e.c.CallableWithListenerListenableFutureExcample[37] - >>> test-task/start 18:34:41.547 DEBUG [main] o.h.s.g.e.c.CallableWithListenerListenableFutureExcample[79] - >>> test-task/end 18:34:41.547 DEBUG [task-worker#28] o.h.s.g.e.c.CallableWithListenerListenableFutureExcample[50] - process waiting.... / waitMillis=29422ms 18:34:41.547 DEBUG [task-worker#28] o.h.s.g.e.c.CallableWithListenerListenableFutureExcample[55] - processing... 18:35:10.970 DEBUG [task-worker#28] o.h.s.g.e.c.CallableWithListenerListenableFutureExcample[57] - process completed. 18:35:10.971 DEBUG [thrd-worker#6] o.h.s.g.e.c.CallableWithListenerListenableFutureExcample[70] - --- start Listener: isDone=true, isCancelled=false 18:35:10.972 DEBUG [thrd-worker#6] o.h.s.g.e.c.CallableWithListenerListenableFutureExcample[71] - Listener# future.get()=callback-task finished/task-worker#28 18:35:10.972 DEBUG [thrd-worker#6] o.h.s.g.e.c.CallableWithListenerListenableFutureExcample[72] - --- end Listener
もし、スレッド内で失敗(例外処理が発生)するとfuture.get()
でjava.util.concurrent.ExecutionException
が返却された。
18:36:12.139 ERROR [thrd-worker#22] o.h.s.g.e.c.CallableWithListenerListenableFutureExcample[74] - Failed java.util.concurrent.ExecutionException: java.lang.IllegalStateException: system busy... at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:252) ~[na:1.7.0_11] at java.util.concurrent.FutureTask.get(FutureTask.java:111) ~[na:1.7.0_11]
- コールバック
com.google.common.util.concurrent.Futures.addCallback(ListenableFuture<T>,FutureCallback<? super T>, Executor)
を使ってスレッド終了時のコールバックを登録する方式。サンプルは、同時に5個のスレッドを実行して、コールバックでは全てのスレッドが終了された時に全てが完了したことをログに出力する コールバックの場合は、com.google.common.util.concurrent.FutureCallback<T>
インタフェースを実装することになり、onSuccess(T result)
とonFailure(Throwable cause)
を分けて実装することになるからエラーハンドリングなどの実装は明確に行える
public class MultiCallableWithCallbackHandlerListenableFutureExcample { private static Logger log = LoggerFactory .getLogger(MultiCallableWithCallbackHandlerListenableFutureExcample.class); protected static ListeningExecutorService les = MoreExecutors .listeningDecorator(Executors .newCachedThreadPool(new ThreadFactory() { public Thread newThread(Runnable r) { return new Thread(r, "task-worker#" + RandomUtils.nextInt(100)); } })); protected static ExecutorService callbackHandlers = Executors .newCachedThreadPool(new ThreadFactory() { public Thread newThread(Runnable r) { return new Thread(r, "cab-handler#" + RandomUtils.nextInt(100)); } }); public void test() throws Exception { log.debug(">>> test-task/start"); try { final int taskCount = 5; final CountDownLatch latch = new CountDownLatch(taskCount); final CountDownLatch startGate = new CountDownLatch(1); for (int i = 0; i < taskCount; i++) { // invoked task Futures.addCallback(les.submit(new Callable<String>() { public String call() throws Exception { try { startGate.await(1000L, TimeUnit.MILLISECONDS); long waitMillis = Long.parseLong(RandomStringUtils.randomNumeric(5)); log.debug("process waiting.... / waitMillis={}ms", waitMillis); if (waitMillis > 50000) throw new IllegalStateException("system busy..."); log.debug("processing..."); Thread.sleep(waitMillis); // do anything log.debug("process completed."); return "callback-task finished/" + Thread.currentThread().getName(); } finally { latch.countDown(); } } }), new FutureCallback<String>() { public void onSuccess(String result) { log.info("callback process success. Unresolved task count={}, {}", latch.getCount(), result); if (latch.getCount() <= 0) { log.info("#### ALL TASK FINISHED ####"); } } public void onFailure(Throwable t) { log.warn("callback process failed. Unresolved task count={} : {}", latch.getCount(), t.getMessage()); if (latch.getCount() <= 0) { log.info("#### ALL TASK FINISHED ####"); } } }, callbackHandlers); } // start all tasks startGate.countDown(); } catch (Exception e) { log.error("Failed", e); } log.debug(">>> test-task/end"); } public static void main(String[] args) { try { new MultiCallableWithCallbackHandlerListenableFutureExcample() .test(); } catch (Exception e) { e.printStackTrace(); } } }
実行結果
18:46:29.142 DEBUG [main] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[43] - >>> test-task/start 18:46:29.169 DEBUG [main] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[99] - >>> test-task/end 18:46:29.171 DEBUG [task-worker#12] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[60] - process waiting.... / waitMillis=3868ms 18:46:29.171 DEBUG [task-worker#87] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[60] - process waiting.... / waitMillis=36163ms 18:46:29.171 DEBUG [task-worker#12] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[65] - processing... 18:46:29.171 DEBUG [task-worker#14] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[60] - process waiting.... / waitMillis=54401ms 18:46:29.171 DEBUG [task-worker#12] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[60] - process waiting.... / waitMillis=71526ms 18:46:29.171 DEBUG [task-worker#96] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[60] - process waiting.... / waitMillis=63505ms 18:46:29.171 DEBUG [task-worker#87] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[65] - processing... 18:46:29.174 WARN [cab-handler#47] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[84] - callback process failed. Unresolved task count=2 : system busy... 18:46:29.174 WARN [cab-handler#65] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[84] - callback process failed. Unresolved task count=2 : system busy... 18:46:29.174 WARN [cab-handler#95] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[84] - callback process failed. Unresolved task count=2 : system busy... 18:46:33.041 DEBUG [task-worker#12] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[67] - process completed. 18:46:33.042 INFO [cab-handler#95] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[78] - callback process success. Unresolved task count=1, callback-task finished/task-worker#12 18:47:05.337 DEBUG [task-worker#87] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[67] - process completed. 18:47:05.337 INFO [cab-handler#95] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[78] - callback process success. Unresolved task count=0, callback-task finished/task-worker#87 18:47:05.338 INFO [cab-handler#95] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[80] - #### ALL TASK FINISHED ####
guavaさんのお陰で簡単に非同期実装ができました。
今回のコードはこちら