当前位置:首页 » JAVA技术教程

java定时任务之ScheduledThreadPoolExecutor

2018-01-12 08:00 本站整理 浏览(1)

Java定时任务

 

定时任务:在日常开发中,经常会遇到延迟任务和周期性任务问题。解决这类问题,一般有以下几种途径:spring+Quartz、java Timer、spring3.0以后的scheduled task。

 

首先来看java Timer和java ScheduledThreadPoolExecutor,在jdk1.5推出ScheduledThreadPoolExecutor后,Timer几乎被完全替代。主要原因是Timer有三个大的缺陷:

1、不能捕获任务执行中的异常,如果出现一次异常整个定时任务执行将停止执行;

2、Timer是基于Date时钟的,在分布式环境下由于机器的时钟不一致肯定导致一些致命的问题;

3、Timer只会创建一个线程,存在任务丢失问题(任务执行时间超过一个周期时)。而ScheduledThreadPoolExecutor巧好能解决上述3个问题:

1、ScheduledThreadPoolExecutor可以捕获异常,当然如果不捕获异常 该线程会中止;

2、ScheduledThreadPoolExecutor是基于相对时间;

3、ScheduledThreadPoolExecutor本质上是线程池,可以创建多个线程,当前一次任务没有执行完成时,可以在新线程中继续执行新周期任务。

ScheduledThreadPoolExecutor也是本次分享的主角。

 

再来看下跟spring相关的两种方式:spring+ Quartz、spring scheduled task。其实在这两种方式中,spring只是起到整合作用,真实做事的是Quartz框架、以及java 的ScheduledThreadPoolExecutor。与spring整合Quartz一样,spring scheduled task只是对ScheduledThreadPoolExecutor进行了包装,简单的说就是可以实现配置化或注解方式 透明的使用ScheduledThreadPoolExecutor。

 

对于Quartz框架,个人觉得有点鸡肋,相对于ScheduledThreadPoolExecutor 它的优势是能实现分布式集群下的定时任务调度。但使用起来确实不是很方便,需要创建10多张数据库表。建议分布式环境下可以直接使用一些开源的分布式调度框架,比如当当的elastic-job,当然也可以选择更轻量的实现SkySchedule(关于SkySchedule相关原理以及使用方法,可以点击这里)。

 

如果不需要分布式调度,只是简单的实现定时任务,使用spring scheduled task会轻量很多,它本质上是基于java ScheduledThreadPoolExecutor实现无需引入第三方包。而且还支持注解使用,非常简单方便:

Spring xml配置:

    <task:executor id="testExecutor" pool-size="5" /> //创建核心线程数为5的线程池
<task:annotation-driven executor="testExecutor" />//启动线程池
 

 

Java注解:

@Component
public class TestBussiness {
    //@Scheduled(cron = "* * 1 * * *")
    @Scheduled(fixedRate=10000)//每隔10秒
    public void doJob(){
        System.out.println("1111111");
    }
}
 

 

综上所述:java Timer几乎被遗弃,spring+ Quartz略显鸡肋;

使用ScheduledThreadPoolExecutor相对来说是最好的选择,如果项目中使用的spring 3.0以上建议直接使用spring scheduled task,但它本质上只是对ScheduledThreadPoolExecutor进行了简单的包装。所以要了解java中定时任务的实现原理,还得从ScheduledThreadPoolExecutor说起。

 

ScheduledThreadPoolExecutor简单介绍

 

首先来看ScheduledThreadPoolExecutor的类定义:

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
//省略实现代码
}

 

可以看到它继承了ThreadPoolExecutor,说明它本质上也是一个线程池(想了解ThreadPoolExecutor详细介绍,请点击这里)。另它还实现了ScheduledExecutorService接口,这个接口中的方法 其实就是ScheduledThreadPoolExecutor相对于父类ScheduledExecutorService来说,要自己定制的个性化方法,也就是ScheduledThreadPoolExecutor实现“延迟任务和周期性任务”的核心方法,一共4个方法:

//延迟任务 执行方法,参数为Runnable类型对象
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
 
    //延迟任务 执行方法,参数为Callable类型对象
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);
    //固定频率 执行方法,时间到了以后就执行
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
 
    //固定延迟周期 执行方法,上次执行完成后固定等待延迟时间后 再执行
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

 

这4个方法都是提交任务方法,相当于ThreadPoolExecutor的execute方法;并且都有返回值,类型为ScheduledFuture,相当于普通线程池的Future,可以用于控制任务生命周期。

 

第1、2个是延迟任务,即延迟固定period时间后,执行任务。区别是参数不同;

第3、4个是周期性任务,scheduleAtFixedRate是固定频率,任务时间到后就立即执行,但等待周期分两种情况:如果程序的执行时间大于间隔时间,等待周期为执行时间,如果程序的执行时间小于间隔时间等待周期为间隔时间;scheduleWithFixedDelay是固定周期,不过执行任务需要花多长时间,下次执行必须等待固定delay时间。

 

了解了这个方法的作用,使用ScheduledThreadPoolExecutor就已经不成问题了。下面来看具体实现原理。

 

ScheduledThreadPoolExecutor实现原理

 

构造方法

ScheduledThreadPoolExecutor有4个构造方法,本质上都是调用父类ThreadPoolExecutor的构造方法创建线程池,只是使用的任务队列是DelayedWorkQueue而已:

 

//指定核心线程数,最大线程数不做限制
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
}
 
//指定核心线程数 和拒绝策略
public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
}
 
//指定核心线程数,创建线程工厂类
public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
}
 
//指定核心线程数,创建线程工厂类,拒绝策略
public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
}

 

构造方法都很简单,只是需要注意的是最大线程数都没有做线程。

 

提交Callable任务schedule方法

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        //把Callable对象包装成ScheduledFutureTask对象
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        //提交任务
        delayedExecute(t);
        return t;
    }
 

 

这个方法与上次总结FutureTask(点击这里http://moon-walker.iteye.com/blog/2407394)中,AbstractExecutorService的submit方法类似。首先把callable对象包装成ScheduledFutureTask对象,ScheduledFutureTask继承自FutureTask,同时实现了RunnableScheduledFuture接口。所以这里可以把ScheduledFutureTask对象作为一个任务对象提交(Runnable),同时也可以作为返回值(ScheduledFuture),关系有点复杂 上个类图有助于理解:


 

 

关系有点复杂,但我们只需要知道ScheduledFutureTask从根本上实现了三个接口:Future、Runnable、Delayed,即上图标黄的部分。另外还持有一个Callable的成员变量对象,即上面说的把Callable对象包装成ScheduledFutureTask对象。

 

ScheduledFutureTask相对于FutureTask而言,多实现了一个Delayed接口,这个是关键。因为ScheduledThreadPoolExecutor 的任务队列是DelayedWorkQueue,该队列要求放入的对象必须是实现了Delayed接口的RunnableScheduledFuture类型,而ScheduledFutureTask刚好实现了RunnableScheduledFuture。放入队列的入口就是delayedExecute方法。

 

delayedExecute方法

schedule方法包装好任务对象后,会调用delayedExecute方法,把任务放入任务队列,下面看下这个方法实现:

private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())//线程池已关闭,执行拒绝策略
            reject(task);
        else {
            super.getQueue().add(task);//调用任务队列的add方法加入队列
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false); //取消任务
            else
                ensurePrestart();//是否需要创建新线程
        }
    }

 

这里主要的操作就是super.getQueue().add(task),调用“任务队列”的add方法把任务放入队列。这个队列在构造方法中,已经可以看出来了,是DelayedWorkQueue。如何实现延迟任务,核心操作都在DelayedWorkQueue中实现。

 

内部类DelayedWorkQueue

DelayedWorkQueue主要维护了一个由二叉堆算法实现的数组,关于二叉堆算法可以参考这篇文章https://www.cnblogs.com/skywang12345/p/3610390.html(最小堆),这里就不细讲。简单的理解就是在调用add方法插入队列时,采用二叉堆算法保证第一个元素是最小值。在这里其实就是Delay时间最短的值。

 

线程池获取任务执行时,是调用的take方法,这是一个阻塞方法,由于二叉堆算法的缘故,每次取头结点即可,该任务是延时最小的任务。具体要阻塞多久呢,调用的是任务对象的getDelay方法。来看下take方法实现:

public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    //取出头结点
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();//队列为空就阻塞
                    else {
                        //获取头结点的,延时时长
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);//取出任务,并重写计算二叉堆
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();//阻塞等待其他线程take完成
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay); //阻塞等待delay时间
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();//继续唤醒其他线程take
                lock.unlock();
            }
        }

 

结合给出的注释,应该很好理解了。任务对象的getDelay方法又是在什么地方定义呢?任务类型为RunnableScheduledFuture,前面提到过具体的实现是ScheduledFutureTask,getDelay实际就是Delayed接口定义的方法,Delayed又继承了Comparable接口,该接口中定义了compareTo方法,二叉堆算法就是利用这个方法比较进行排序的。也就是说ScheduledFutureTask应该同时实现了getDelay方法和compareTo方法。

 

ScheduledFutureTask实现原理

 

ScheduledFutureTask有几个重载的构造方法,这里就不列出来了,主要作用就是初始化两个重要的成员变量:

//任务下一次执行间隔时间
//初始延迟时间 和 period计算得到
private long time;
//传入间隔时间
private final long period;
 

 

getDelay方法

上面已经讲过,主要用于计算执行下次任务阻塞时间:

public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), NANOSECONDS);
        }
 

 

compareTo方法

上面讲过,用于二叉堆算法计算时进行排序:

public int compareTo(Delayed other) {
            //省略其他代码
 
            //这句是关键,比较的其实就是getDelay方法返回的大小
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }

 

compareTo本质上比较的就是getDelay方法的返回值。

 

run方法

线程池中执行任务时,最终会调用任务的run方法,看下ScheduledFutureTask的run方法:

public void run() {
            boolean periodic = isPeriodic();//判断是不是 周期性任务
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)//如果只是延时任务,只需执行一次
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();//周期性任务,需要计算下个周期执行时间
                reExecutePeriodic(outerTask);//把任务重新放入队列
            }
        }
 
private void setNextRunTime() {
            long p = period;
            if (p > 0)
                time += p;
            else
                time = triggerTime(-p);
        }
 

 

run方法主要的逻辑就是执行延时任务,或者执行周期性任务。如果是周期性任务,需要调用setNextRunTime方法设置下次执行时间,并且把任务重新放入队列。至此周期性任务就可以一直循环执行下去了。

 

ScheduleThreadPoolExecutor的另外三个提交任务方法

前面只讲了schedule的Callable参数类型方法,另外schedule的Runnable参数类型方法只是包装成ScheduledFutureTask对象有点细微的区别,这里就不列出来了。

 

另外scheduleAtFixedRate和scheduleWithFixedDelay方法,的区别也只是在包装ScheduledFutureTask对象时,构造period成员变量有区别:scheduleAtFixedRate使用的是unit.toNanos(period),scheduleWithFixedDelay使用的是unit.toNanos(-delay)。在执行setNextRunTime方法获取下一次执行间隔时间时,就有差别了。

 

另外 延时任务和周期性任务的区别其实就在run方法中,前面已经讲过。至此ScheduleThreadPoolExecutor的核心实现代码分析完毕。

 

总结

 

 

ScheduleThreadPoolExecutor本质上还是线程池,只是使用了DelayedWorkQueue作为任务队列。对应周期性任务而已,只是每次执行完成后,重新计算下一次执行时间,然后再重新放入队列而已。如果出现异常,没有捕获的话,就无法再次放入队列,周期性任务也就无法继续执行了。

 

另外我们可以通过Executors框架提供的静态方法快速的创建ScheduleThreadPoolExecutor定时任务线程池:Executors.newScheduledThreadPool(int coresize);