horiga blog

とあるエンジニアのメモ

google-guava ListeningExecutorService 試してみた

私もここ数年 Java 開発してて、パラメータチェックとか、アプリケーション内での簡単なキャッシュ処理とかのユーティリティは guava よく使うんだけど、スレッド処理で guava の concurrent パッケージにある ListeningExecutorService ってクラスを使ってみたからまとめてみる. ちなみにGoogle guavaライブラリの説明は以下のようにされていてgoogleJavaプロジェクトにおけるコアライブラリとして発展していて. 最新のバージョンは、The latest release is 17.0, released April 22, 2014 とある.

The Guava project contains several of Google's core libraries that we rely on in our Java-based projects: collections, caching, primitives support, concurrency libraries, common annotations, string processing, I/O, and so forth.

まずは、ListeningExecutorServiceの生成はこんな感じ. java で支援されているのはjava.util.concurrent.Executorsを使ってExecutorServiceを生成するがguavaの拡張版は、guavaが提供するcom.google.common.util.concurrent.MoreExecutorsを使ってListeningExecutorServiceを生成する.

protected static ListeningExecutorService es = MoreExecutors.listeningDecorator(
                        Executors.newCachedThreadPool(new ThreadFactory() {
                                    public Thread newThread(Runnable r) {
                                        return new Thread(r, "task-worker#" + RandomUtils.nextInt(100));
                                    }}));

そしてListeningExecutorServiceスレッドの実行方法は以下のように

ListenableFuture<String> future = es.submit(new Callable<String>() {
            /* (non-Javadoc)
            * @see java.util.concurrent.Callable#call()
            */
            public String call() throws Exception {
                            // do anything...
                        }});

このcom.google.common.util.concurrent.ListenableFutureが、既存のjava.util.concurrent.Futureの拡張で使いやすかった。 シンプルな例を少し作ったのでまとめておきます。

  • リスナー ListenableFuture<?>にスレッドの処理が終了されたときに呼び出されるリスナーを登録する方式
public class CallableWithListenerListenableFutureExcample {
    
    private static Logger log = LoggerFactory
            .getLogger(CallableWithListenerListenableFutureExcample.class);
    
    protected static ListeningExecutorService es = MoreExecutors.listeningDecorator(
                        Executors.newCachedThreadPool(new ThreadFactory() {
                                    public Thread newThread(Runnable r) {
                                        return new Thread(r, "task-worker#" + RandomUtils.nextInt(100));
                                    }}));
    
    protected static ExecutorService th_pool = Executors.newCachedThreadPool(new ThreadFactory() {
                public Thread newThread(Runnable r) {
                    return new Thread(r, "thrd-worker#" + RandomUtils.nextInt(100));
                }
            });
    
    public void test() throws Exception {
        
        log.debug(">>> test-task/start");
        
        final CountDownLatch latch = new CountDownLatch(1);
        
        final ListenableFuture<String> future = es.submit(new Callable<String>() {
            /* (non-Javadoc)
            * @see java.util.concurrent.Callable#call()
            */
            public String call() throws Exception {
                
                try {
                    long waitMillis = Long.parseLong(RandomStringUtils.randomNumeric(5));
                    
                    log.debug("process waiting.... / waitMillis={}ms", waitMillis);
                    
                    if (waitMillis > 30000)
                        throw new IllegalStateException("system busy...");
                    
                    log.debug("processing...");
                    Thread.sleep(waitMillis); // do anything
                    log.debug("process completed.");
                    
                    return "callback-task finished/" + Thread.currentThread().getName();
                    
                } finally {
                    latch.countDown();
                }
            }
        });
        
        future.addListener(new Thread() {
            public void run() {
                try {
                    log.debug("--- start Listener: isDone={}, isCancelled={}", future.isDone(), future.isCancelled());
                    log.debug("Listener# future.get()={}", future.get()); // if task failed future.get returned 'java.util.concurrent.ExecutionException'
                    log.debug("--- end Listener");
                } catch (Exception e) {
                    log.error("Failed", e);
                }
            }
        }, th_pool);
        
        log.debug(">>> test-task/end");
    }
    
    public static void main(String[] args) {
        try {
            new CallableWithListenerListenableFutureExcample().test();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

実行結果

18:34:41.532 DEBUG [main] o.h.s.g.e.c.CallableWithListenerListenableFutureExcample[37] - >>> test-task/start
18:34:41.547 DEBUG [main] o.h.s.g.e.c.CallableWithListenerListenableFutureExcample[79] - >>> test-task/end
18:34:41.547 DEBUG [task-worker#28] o.h.s.g.e.c.CallableWithListenerListenableFutureExcample[50] - process waiting.... / waitMillis=29422ms
18:34:41.547 DEBUG [task-worker#28] o.h.s.g.e.c.CallableWithListenerListenableFutureExcample[55] - processing...
18:35:10.970 DEBUG [task-worker#28] o.h.s.g.e.c.CallableWithListenerListenableFutureExcample[57] - process completed.
18:35:10.971 DEBUG [thrd-worker#6] o.h.s.g.e.c.CallableWithListenerListenableFutureExcample[70] - --- start Listener: isDone=true, isCancelled=false
18:35:10.972 DEBUG [thrd-worker#6] o.h.s.g.e.c.CallableWithListenerListenableFutureExcample[71] - Listener# future.get()=callback-task finished/task-worker#28
18:35:10.972 DEBUG [thrd-worker#6] o.h.s.g.e.c.CallableWithListenerListenableFutureExcample[72] - --- end Listener

もし、スレッド内で失敗(例外処理が発生)するとfuture.get()java.util.concurrent.ExecutionExceptionが返却された。

18:36:12.139 ERROR [thrd-worker#22] o.h.s.g.e.c.CallableWithListenerListenableFutureExcample[74] - Failed
java.util.concurrent.ExecutionException: java.lang.IllegalStateException: system busy...
    at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:252) ~[na:1.7.0_11]
    at java.util.concurrent.FutureTask.get(FutureTask.java:111) ~[na:1.7.0_11]
  • コールバック com.google.common.util.concurrent.Futures.addCallback(ListenableFuture<T>,FutureCallback<? super T>, Executor)を使ってスレッド終了時のコールバックを登録する方式。サンプルは、同時に5個のスレッドを実行して、コールバックでは全てのスレッドが終了された時に全てが完了したことをログに出力する コールバックの場合は、com.google.common.util.concurrent.FutureCallback<T>インタフェースを実装することになり、onSuccess(T result)onFailure(Throwable cause)を分けて実装することになるからエラーハンドリングなどの実装は明確に行える
public class MultiCallableWithCallbackHandlerListenableFutureExcample {

    private static Logger log = LoggerFactory
            .getLogger(MultiCallableWithCallbackHandlerListenableFutureExcample.class);

    protected static ListeningExecutorService les = MoreExecutors
            .listeningDecorator(Executors
                    .newCachedThreadPool(new ThreadFactory() {
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "task-worker#"
                                    + RandomUtils.nextInt(100));
                        }
                    }));

    protected static ExecutorService callbackHandlers = Executors
            .newCachedThreadPool(new ThreadFactory() {
                public Thread newThread(Runnable r) {
                    return new Thread(r, "cab-handler#"
                            + RandomUtils.nextInt(100));
                }
            });

    public void test() throws Exception {
        log.debug(">>> test-task/start");
        try {
            final int taskCount = 5;
            final CountDownLatch latch = new CountDownLatch(taskCount);
            final CountDownLatch startGate = new CountDownLatch(1);
            
            for (int i = 0; i < taskCount; i++) {
                // invoked task
                Futures.addCallback(les.submit(new Callable<String>() {
                    
                    public String call() throws Exception {
                        
                        try {
                            startGate.await(1000L, TimeUnit.MILLISECONDS);
                            
                            long waitMillis = Long.parseLong(RandomStringUtils.randomNumeric(5));
                            
                            log.debug("process waiting.... / waitMillis={}ms", waitMillis);
                            
                            if (waitMillis > 50000)
                                throw new IllegalStateException("system busy...");
                            
                            log.debug("processing...");
                            Thread.sleep(waitMillis); // do anything
                            log.debug("process completed.");
                            
                            return "callback-task finished/" + Thread.currentThread().getName();
                            
                        } finally {
                            latch.countDown();
                        }
                    }
                }),
                new FutureCallback<String>() {
                            public void onSuccess(String result) {
                                log.info("callback process success. Unresolved task count={},  {}", latch.getCount(), result);
                                if (latch.getCount() <= 0) {
                                    log.info("#### ALL TASK FINISHED ####");
                                }
                            }
                            public void onFailure(Throwable t) {
                                log.warn("callback process failed. Unresolved task count={} : {}", latch.getCount(), t.getMessage());
                                if (latch.getCount() <= 0) {
                                    log.info("#### ALL TASK FINISHED ####");
                                }
                            }
                        }, 
                callbackHandlers);
            }
            
            // start all tasks
            startGate.countDown();

        } catch (Exception e) {
            log.error("Failed", e);
        }
        log.debug(">>> test-task/end");
    }

    public static void main(String[] args) {
        try {
            new MultiCallableWithCallbackHandlerListenableFutureExcample()
                    .test();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

実行結果

18:46:29.142 DEBUG [main] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[43] - >>> test-task/start
18:46:29.169 DEBUG [main] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[99] - >>> test-task/end
18:46:29.171 DEBUG [task-worker#12] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[60] - process waiting.... / waitMillis=3868ms
18:46:29.171 DEBUG [task-worker#87] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[60] - process waiting.... / waitMillis=36163ms
18:46:29.171 DEBUG [task-worker#12] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[65] - processing...
18:46:29.171 DEBUG [task-worker#14] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[60] - process waiting.... / waitMillis=54401ms
18:46:29.171 DEBUG [task-worker#12] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[60] - process waiting.... / waitMillis=71526ms
18:46:29.171 DEBUG [task-worker#96] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[60] - process waiting.... / waitMillis=63505ms
18:46:29.171 DEBUG [task-worker#87] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[65] - processing...
18:46:29.174 WARN  [cab-handler#47] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[84] - callback process failed. Unresolved task count=2 : system busy...
18:46:29.174 WARN  [cab-handler#65] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[84] - callback process failed. Unresolved task count=2 : system busy...
18:46:29.174 WARN  [cab-handler#95] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[84] - callback process failed. Unresolved task count=2 : system busy...
18:46:33.041 DEBUG [task-worker#12] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[67] - process completed.
18:46:33.042 INFO  [cab-handler#95] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[78] - callback process success. Unresolved task count=1,  callback-task finished/task-worker#12
18:47:05.337 DEBUG [task-worker#87] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[67] - process completed.
18:47:05.337 INFO  [cab-handler#95] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[78] - callback process success. Unresolved task count=0,  callback-task finished/task-worker#87
18:47:05.338 INFO  [cab-handler#95] o.h.s.g.e.c.MultiCallableWithCallbackHandlerListenableFutureExcample[80] - #### ALL TASK FINISHED ####

guavaさんのお陰で簡単に非同期実装ができました。

今回のコードはこちら