并发编程是一个难题,但是一个强大而简单的抽象可以显著的简化并发的编写。出于这样的考虑,Guava定义了ListenableFuture接口并继承了JDK concurrent包下的Future接口。

我们强烈地建议你在代码中多使用ListenableFuture来代替JDK的Future

  • 大多数Futures方法中需要它。
  • 转到ListenableFuture编程比较容易。
  • Guava提供的通用公共类封装了公共的操作方方法,不需要提供Future和ListenableFuture的扩展方法。

接口

传统JDK中的Future通过异步的方式计算返回结果:在多线程运算中可能在没有结束返回结果,Future是运行中的多线程的一个引用句柄,确保在服务执行返回一个Result

ListenableFuture可以允许你注册回调方法(callbacks),在运算(多线程执行)完成的时候进行调用, 或者在运算(多线程执行)完成后立即执行。这样简单的改进,使得可以明显的支持更多的操作,这样的功能在JDK concurrent中的Future是不支持的。

ListenableFuture中的基础方法是addListener(Runnable, Executor), 该方法会在多线程运算完的时候,指定的Runnable参数传入的对象会被指定的Executor执行。

添加回调(Callbacks)

多数用户喜欢使用Futures.addCallback(ListenableFuture, FutureCallback, Executor)的方式, 或者另外一个版本,默认是采用MoreExecutors.sameThreadExecutor()线程池, 为了简化使用,Callback采用轻量级的设计. FutureCallback中实现了两个方法:

  • onSuccess(V),在Future成功的时候执行,根据Future结果来判断。
  • onFailure(Throwable), 在 Future失败的时候执行,根据Future结果来判断。

ListenableFuture的创建

对应JDK中的ExecutorService.submit(Callable)提交多线程异步运算的方式,Guava提供了ListeningExecutorService接口, 该接口返回ListenableFuture而相应的ExecutorService返回普通的Future。将ExecutorService转为ListeningExecutorService,可以使用MoreExecutors.listeningDecorator(ExecutorService)进行装饰。

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});

另外, 假如你是从FutureTask转换而来的, Guava提供ListenableFutureTask.create(Callable)ListenableFutureTask.create(Runnable, V). 和JDK不同的是, ListenableFutureTask不能随意被继承(ListenableFutureTask中的done方法实现了调用listener的操作)。

假如你喜欢抽象的方式来设置future的值,而不是想实现接口中的方法,可以考虑继承抽象类AbstractFuture或者直接使用SettableFuture

假如你必须将其他API提供的Future转换成ListenableFuture,你没有别的方法只能采用硬编码的方式 JdkFutureAdapters.listenInPoolThread(Future)来将Future转换成ListenableFuture。尽可能地采用修改原生的代码返回ListenableFuture会更好一些。

Application

使用ListenableFuture最重要的理由是它可以进行一系列的复杂链式的异步操作。

ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
AsyncFunction<RowKey, QueryResult> queryFunction =
  new AsyncFunction<RowKey, QueryResult>() {
    public ListenableFuture<QueryResult> apply(RowKey rowKey) {
      return dataService.read(rowKey);
    }
  };
ListenableFuture<QueryResult> queryFuture =
    Futures.transformAsync(rowKeyFuture, queryFunction, queryExecutor);

其他更多的操作可以更加有效的支持而JDK中的Future是没法支持的.

不同的操作可以在不同的Executors中执行,单独的ListenableFuture可以有多个操作等待。

当一个操作开始的时候其他的一些操作也会尽快开始执行–“fan-out”–ListenableFuture 能够满足这样的场景:触发所有的回调(callbacks)。反之更简单的工作是,同样可以满足“fan-in”场景,触发ListenableFuture获取(get)计算结果,同时其它的Futures也会尽快执行:可以参考the implementation of Futures.allAsList 。

方法描述参考
transform(ListenableFuture<A\>, AsyncFunction<A, B\>, Executor)*返回一个新的 ListenableFuture ,该ListenableFuture返回的 result 是由传入的 AsyncFunction 参数指派到传入的 ListenableFuture 中.transform(ListenableFuture<A\>, AsyncFunction<A, B\>)
transform(ListenableFuture<A\>, Function<A, B\>, Executor)返回一个新的 ListenableFuture ,该 ListenableFuture 返回的 result 是由传入的Function 参数指派到传入的 ListenableFuture 中.transform(ListenableFuture<A\>, Function<A, B\>)
allAsList(Iterable<ListenableFuture<V\>\>)返回一个ListenableFuture,该ListenableFuture 返回的 result 是一个 List,List 中的值是每个ListenableFuture的返回值,假如传入的其中之一fails或者cancel,这 个Future fails 或者 canceledallAsList(ListenableFuture<V\>...)
successfulAsList(Iterable<ListenableFuture<V\>\>)返回一个 ListenableFuture ,该 Future 的结果包含所有成功的 Future,按照原来的顺序,当其中之一 Failed 或者 cancel,则用 null 替代successfulAsList(ListenableFuture<V\>...)

AsyncFunction<A, B\>中提供一个方法ListenableFuture<B> apply(A input),它可以被用于异步变换值。

List<ListenableFuture<QueryResult>> queries;
// The queries go to all different data centers, but we want to wait until they're all done or failed.

ListenableFuture<List<QueryResult>> successfulQueries = Futures.successfulAsList(queries);

Futures.addCallback(successfulQueries, callbackOnSuccessfulQueries);

避免嵌套Future

在代码调用泛型接口并返回Future的情况下,可能最终使用嵌套的Future。例如:

executorService.submit(new Callable<ListenableFuture<Foo>() {
  @Override
  public ListenableFuture<Foo> call() {
    return otherExecutorService.submit(otherCallable);
  }
});

将返回一个ListenableFuture<ListenableFuture<Foo>>。这段代码是不正确的,因为如果外部future上的cancel与外部future的完成竞争,那么该取消将不会传播到内部future。使用get()或侦听器检查另一个将来的失败也是一个常见的错误,但是除非特别小心,否则会抑制从otherCallable抛出的异常。为了避免这种情况,Guava所有的未来处理方法(有些来自JDK)都有*Async版本,可以安全地打开这个嵌套-

transform(ListenableFuture<A\>, Function<A, B\>, Executor)

transformAsync(ListenableFuture<A\>,AsyncFunction<A, B\>,Executor)

ExecutorService.submit(Callable)

submitAsync(AsyncCallable<A\>, Executor)

【腾讯云】境外1核2G服务器低至2折,半价续费券限量免费领取!
https://cloud.tencent.com/act/cps/redirect?redirect=1068&cps_key=e4b50f6c64a4480367f8a8d16fd07c5a&from=console

标签: Guava, ListenableFuture, Futures, Application

添加新评论