当前位置:首页 » JAVA技术教程

读书笔记(java并发编程实战——CompletionService)

2018-11-03 08:01 本站整理 浏览(1)

原文请参考微信公众号(欢迎关注公众号:coding_song):https://mp.weixin.qq.com/s/R50Eh4kTDtA031i-yMUZAw 

 

Callable&Future

Callbale描述的是抽象的计算任务,有明确的起点,并且最终会结束;

 

@FunctionalInterface

public interface Callable<V>{

    V call()throwsException;

}

 

 

Future表示一个任务的生命周期 ,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。Future的get方法取决于任务的状态(尚未开始、正在运行、已完成),如果任务已经完成,get会立即返回结果或抛出一个异常;如果任务没有完成,则get将阻塞直到任务完成返回结果;如果任务被取消,则get抛出CancellationException;

Future的ge(long var1, TimeUnit var3)方法,可以设置超时时间,超时后可以做一些默认的处理,比如页面上展示广告信息,当获取某个广告时获取超时了,超时异常处理时可以设置一个默认广告位,而不至于什么都不显示

public interface Future<V>{

    boolean cancel(boolean var1);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException,ExecutionException;

    V get(long var1,TimeUnit var3) throws InterruptedException,ExecutionException,TimeoutException;

}

CompletionService

CompletionService将Executor和BlockingQueue的功能融合在一起,将Callable任务提交给CompletionService来执行,然后使用类似于队列操作的take和poll等方法来获得已完成的结果,而这些结果会在完成是被封装为Future

 

public interface CompletionService<V>{

    Future<V> submit(Callable<V> var1);

    Future<V> submit(Runnable var1, V var2);

    Future<V> take() throws InterruptedException;

    Future<V> poll();

    Future<V> poll(long var1,TimeUnit var3) throws InterruptedException;

}
 

 

ExcutorCompletionService实现了CompletionService,在构造函数中创建一个BlockingQueue来保存计算完成的结果,当计算完成时,调用FutureTask的done方法,将完成的结果添加到BlockingQueue中;队列的take和poll方法在得出结果之前是阻塞的

public class ExecutorCompletionService<V> implements CompletionService<V>{

    private final Executor executor;

    private final AbstractExecutorService aes;

    private final BlockingQueue<Future<V>> completionQueue;


    /**

     * FutureTask extension to enqueue upon completion

     */

    private class QueueingFuture extends FutureTask<Void>{

        QueueingFuture(RunnableFuture<V> task){

            super(task,null);

            this.task = task;

        }

        protected void done(){ completionQueue.add(task);}

        private final Future<V> task;

    }

    public Future<V> take()throws InterruptedException{

        return completionQueue.take();

    }

    public Future<V> poll(){

        return completionQueue.poll();

    }

    public Future<V> poll(long timeout,TimeUnit unit)

            throws InterruptedException{

        return completionQueue.poll(timeout, unit);

    }

    // 省略其他方法

}

 

CompletionService的使用:创建n个任务,将其提交到一个线程池,保留n个Future,可使用限时的get方法通过Future串行地获取每个结果;

 

public class CompletionServiceTest{


    private final ExecutorService executorService;


    private final static Integer COUNT =10;


    public CompletionServiceTest(ExecutorService executorService){

        this.executorService = executorService;

    }


    public void test()throws InterruptedException,ExecutionException{

        CompletionService<Object> completionService = new ExecutorCompletionService<>(executorService);

        for(int i =0; i < COUNT; i++){

            int finalI = i;

            completionService.submit(newCallable(){

                @Override

                public Object call()throws Exception{

                    return"done"+ finalI;

                }

            });

        }

        for(int i =0; i < COUNT; i++){

            Future<Object> future = completionService.take();

            Object object = future.get();

        }
 

    }

}

 

上述列子描述多个ExecutorCompletionService共享一个executorService,CompletionService的作用就相当于一组计算的句柄,与Future作为单个计算的句柄是非常类似的。通过记录提交给CompletionService的任务数量,并计算出已经获取的已完成结果的数量,及时使用一个共享的ExecutorService,也能知道已经获得了所有任务结果的时间

CompletionService应用场景:

(1)动态加载数据、下载图片等,一旦队列中有了数据,就可以陆续返回加载到的数据,不需要等到所有数据都加载完成才返回;滚动网页显示加载图片可以用其实现。

(2)从不同数据源加载数据,一个ExecutorCompletionService从一个数据源中获取数据,然后通过各个ExecutorCompletionService返回的结果,再做数据整合

(3)多线程并行处理数据,可以大大提高程序处理时间,提高性能