Java併發(二十二):定時任務ScheduledThreadPoolExecutor

需要在理解線程池原理的基礎上學習定時任務:Java併發(二十一):線程池實現原理

一、先做總結

通過一個簡單示例總結:

    public static void main(String[] args) {
        ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(3);
        scheduled.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println();
            }
            
        }, 10, 30, TimeUnit.MILLISECONDS);
    }

1、概述

new一個線程池,等待隊列是DelayedWorkQueue,將Runable放入隊列中,到時間會被線程池取出執行

2、如何實現任務到時間被自動取出?

延時隊列DelayedWorkQueue:

  DelayedWorkQueue為ScheduledThreadPoolExecutor中的內部類(類似DelayQueue)

  DelayedWorkQueue中的任務是按照延遲時間從短到長來進行排序的(插入時排序)

  只有在延遲期滿時才能從中提取元素,其列頭是延遲期滿後保存時間最長的Delayed元素

3、週期任務如何實現?

   任務被取出來run之後,將time+period又放入DelayedWorkQueue隊列

4、四個定時任務及區別:

(1)schedule(Callable callable, long delay, TimeUnit unit) :創建並執行在給定延遲後啟用的 ScheduledFuture。
(2)schedule(Runnable command, long delay, TimeUnit unit) :創建並執行在給定延遲後啟用的一次性操作。
(3)scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) :創建並執行一個在給定初始延遲後首次啟用的定期操作,後續操作具有給定的週期;也就是將在 initialDelay 後開始執行,然後在 initialDelay+period 後執行,接着在 initialDelay + 2 * period 後執行,依此類推。
(4)scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) :創建並執行一個在給定初始延遲後首次啟用的定期操作,隨後,在每一次執行終止和下一次執行開始之間都存在給定的延遲。

區別:

第三個方法(scheduleAtFixedRate)是週期固定,也就説它是不會受到這個延遲的影響的,每個線程的調度週期在初始化時就已經絕對了,是什麼時候調度就是什麼時候調度,它不會因為上一個線程的調度失效延遲而受到影響。
但是第四個方法(scheduleWithFixedDelay),則不一樣,它是每個線程的調度間隔固定,也就是説第一個線程與第二線程之間間隔delay,第二個與第三個間隔delay,以此類推。如果第二線程推遲了那麼後面所有的線程調度都會推遲。

scheduleAtFixedRate與scheduleWithFixedDelay區別原理:

  任務被取出來run之後,將time+period又放入DelayedWorkQueue隊列

  細節一:構造ScheduledFutureTask時,scheduleAtFixedRate傳入period(>0),scheduleWithFixedDelay傳入-delay(<0)

  細節二:setNextRunTime時,scheduleAtFixedRate.time=time+period;scheduleWithFixedDelay.time=now()+period

細節一:

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

細節二:

    private void setNextRunTime() {
        long p = period;
        if (p > 0)
            time += p;// scheduleAtFixedRate:在上次開始執行的時間+週期時間
        else
            time = triggerTime(-p);// scheduleWithFixedDelay:執行完上一個線程的時間+週期時間
    }

    long triggerTime(long delay) {
        return now()
                + ((delay < (Long.MAX_VALUE >> 1)) ? delay: overflowFree(delay));
    }

二、四個定時任務方法

ScheduledThreadPoolExecutor提供瞭如下四個方法,也就是四個調度器:

  1. schedule(Callable callable, long delay, TimeUnit unit) :創建並執行在給定延遲後啟用的 ScheduledFuture。
  2. schedule(Runnable command, long delay, TimeUnit unit) :創建並執行在給定延遲後啟用的一次性操作。
  3. scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) :創建並執行一個在給定初始延遲後首次啟用的定期操作,後續操作具有給定的週期;也就是將在 initialDelay 後開始執行,然後在 initialDelay+period 後執行,接着在 initialDelay + 2 * period 後執行,依此類推。
  4. scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) :創建並執行一個在給定初始延遲後首次啟用的定期操作,隨後,在每一次執行終止和下一次執行開始之間都存在給定的延遲。

第一、二個方法差不多,都是一次性操作,只不過參數一個是Callable,一個是Runnable。

稍微分析下第三(scheduleAtFixedRate)、四個(scheduleWithFixedDelay)方法,加入initialDelay = 5,period/delay = 3,unit為秒。

如果每個線程都是都運行非常良好不存在延遲的問題,那麼這兩個方法線程運行週期是5、8、11、14、17…….,但是如果存在延遲呢?比如第三個線程用了5秒鐘,那麼這兩個方法的處理策略是怎樣的?第三個方法(scheduleAtFixedRate)是週期固定,也就説它是不會受到這個延遲的影響的,每個線程的調度週期在初始化時就已經絕對了,是什麼時候調度就是什麼時候調度,它不會因為上一個線程的調度失效延遲而受到影響。但是第四個方法(scheduleWithFixedDelay),則不一樣,它是每個線程的調度間隔固定,也就是説第一個線程與第二線程之間間隔delay,第二個與第三個間隔delay,以此類推。如果第二線程推遲了那麼後面所有的線程調度都會推遲,例如,上面第二線程推遲了2秒,那麼第三個就不再是11秒執行了,而是13秒執行。

三、ScheduledFutureTask

ScheduledFutureTask是ScheduledThreadPoolExecutor的內部類,線程池將Runable任務封裝成ScheduledFutureTask來提交

ScheduledFutureTask內部繼承FutureTask,實現RunnableScheduledFuture接口,它內部定義了三個比較重要的變量:

        /** 任務被添加到ScheduledThreadPoolExecutor中的序號 */
        private final long sequenceNumber;

        /** 任務要執行的具體時間 */
        private long time;

        /**  任務的間隔週期 /
        private final long period;

構造函數:

        ScheduledFutureTask(Runnable r, V result, long ns) {
            super(r, result);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

        ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;
            this.period = period;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

        ScheduledFutureTask(Callable<V> callable, long ns) {
            super(callable);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

        ScheduledFutureTask(Callable<V> callable, long ns) {
            super(callable);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

compareTo()方法:

提供一個排序算法,該算法規則是:首先按照time排序,time小的排在前面,大的排在後面,如果time相同,則使用sequenceNumber排序,小的排在前面,大的排在後面。

為什麼在這個類裏面提供compareTo()方法呢?

在前面就介紹過ScheduledThreadPoolExecutor在構造方法中提供的是DelayedWorkQueue()隊列中,也就是説ScheduledThreadPoolExecutor是把任務添加到DelayedWorkQueue中的,而DelayedWorkQueue則是類似於DelayQueue,內部維護着一個以時間為先後順序的隊列,所以compareTo()方法使用與DelayedWorkQueue隊列對其元素ScheduledThreadPoolExecutor task進行排序的算法。

 public int compareTo(Delayed other) {
        if (other == this) // compare zero if same object
            return 0;
        if (other instanceof ScheduledFutureTask) {
            ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
            else if (sequenceNumber < x.sequenceNumber)
                return -1;
            else
                return 1;
        }
        long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
        return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    }

run()方法:

ScheduledThreadPoolExecutor通過run()方法對task任務進行調度和延遲

        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }

(1)調用isPeriodic()獲取該線程是否為週期性任務標誌,然後調用canRunInCurrentRunState()方法判斷該線程是否可以執行,如果不可以執行則調用cancel()取消任務。

(2)如果當線程已經到達了執行點,則調用run()方法執行task,該run()方法是在FutureTask中定義的。

(3)否則調用runAndReset()方法運行並充值,調用setNextRunTime()方法計算任務下次的執行時間,重新把任務添加到隊列中,讓該任務可以重複執行。

四、延時隊列DelayedWorkQueue

使用優先級隊列DelayedWorkQueue,保證添加到隊列中的任務會按照任務的延時時間進行排序,延時時間少的任務首先被獲取。

重要屬性:

     // 初始時,數組長度大小。
        private static final int INITIAL_CAPACITY = 16;
        // 使用數組來儲存隊列中的元素。
        private RunnableScheduledFuture<?>[] queue =
            new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
        // 使用lock來保證多線程併發安全問題。
        private final ReentrantLock lock = new ReentrantLock();
        // 隊列中儲存元素的大小
        private int size = 0;

        //特指隊列頭任務所在線程
        private Thread leader = null;
        
        // 當隊列頭的任務延時時間到了,或者有新的任務變成隊列頭時,用來喚醒等待線程
        private final Condition available = lock.newCondition();

offer()方法插入元素:

    public boolean offer(Runnable x) {
        if (x == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>) x;
        // 使用lock保證併發操作安全
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = size;
            // 如果要超過數組長度,就要進行數組擴容
            if (i >= queue.length)
                // 數組擴容
                grow();
            // 將隊列中元素個數加一
            size = i + 1;
            // 如果是第一個元素,那麼就不需要排序,直接賦值就行了
            if (i == 0) {
                queue[0] = e;
                setIndex(e, 0);
            } else {
                // 調用siftUp方法,使插入的元素變得有序。
                siftUp(i, e);
            }
            // 表示新插入的元素是隊列頭,更換了隊列頭,
            // 那麼就要喚醒正在等待獲取任務的線程。
            if (queue[0] == e) {
                leader = null;
                // 喚醒正在等待等待獲取任務的線程
                available.signal();
            }
        } finally {
            lock.unlock();
        }
        return true;
    }

主要是三步:

(1)元素個數超過數組長度,就會調用grow()方法,進行數組擴容。

(2)將新元素e添加到優先級隊列中對應的位置,通過siftUp方法,保證按照元素的優先級排序。

(3)如果新插入的元素是隊列頭,即更換了隊列頭,那麼就要喚醒正在等待獲取任務的線程。這些線程可能是因為原隊列頭元素的延時時間沒到,而等待的。

siftUp方法:按照元素的優先級插入元素

通過循環,來查找元素key應該插入在堆二叉樹那個節點位置,並交互父節點的位置。
    private void siftUp(int k, RunnableScheduledFuture<?> key) {
            // 當k==0時,就到了堆二叉樹的根節點了,跳出循環
            while (k > 0) {
                // 父節點位置座標, 相當於(k - 1) / 2
                int parent = (k - 1) >>> 1;
                // 獲取父節點位置元素
                RunnableScheduledFuture<?> e = queue[parent];
                // 如果key元素大於父節點位置元素,滿足條件,那麼跳出循環
                // 因為是從小到大排序的。
                if (key.compareTo(e) >= 0)
                    break;
                // 否則就將父節點元素存放到k位置
                queue[k] = e;
                // 這個只有當元素是ScheduledFutureTask對象實例才有用,用來快速取消任務。
                setIndex(e, k);
                // 重新賦值k,尋找元素key應該插入到堆二叉樹的那個節點
                k = parent;
            }
            // 循環結束,k就是元素key應該插入的節點位置
            queue[k] = key;
            setIndex(key, k);
        }

take()方法取元素:

    public RunnableScheduledFuture<?> take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                RunnableScheduledFuture<?> first = queue[0];
                // 如果沒有任務,就讓線程在available條件下等待。
                if (first == null)
                    available.await();
                else {
                    // 獲取任務的剩餘延時時間
                    long delay = first.getDelay(NANOSECONDS);
                    // 如果延時時間到了,就返回這個任務,用來執行。
                    if (delay <= 0)
                        return finishPoll(first);
                    // 將first設置為null,當線程等待時,不持有first的引用
                    first = null; // don't retain ref while waiting

                    // 如果還是原來那個等待隊列頭任務的線程,
                    // 説明隊列頭任務正在執行。
                    if (leader != null)
                        available.await();
                    else {
                        // 記錄一下當前等待隊列頭任務的線程
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 當任務的延時時間到了時,能夠自動超時喚醒。
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && queue[0] != null)
                // 喚醒等待任務的線程
                available.signal();
            lock.unlock();
        }
    }

如果隊列中沒有任務,那麼就讓當前線程在available條件下等待。如果隊列頭任務的剩餘延時時間delay大於0,那麼就讓當前線程在available條件下等待delay時間。

五、源碼解析定時任務過程

以一個簡單的示例來分析:

    public static void main(String[] args) {
        ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(3);
        scheduled.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println();
            }

        }, 10, 30, TimeUnit.MILLISECONDS);
    }

new線程池:

ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(3); // new一個等待隊列是DelayedWorkQueue的線程池

    // Executors.newScheduledThreadPool(3);
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    // super父類即線程池類ThreadPoolExecutor
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
    
    public ThreadPoolExecutor(int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), defaultHandler);
    }

任務提交:

    // ScheduledThreadPoolExecutor.scheduleAtFixedRate
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
            long initialDelay, long period, TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        
        // 封裝成ScheduledFutureTask提交
        ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,
                null, triggerTime(initialDelay, unit), unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t); // 提交
        return t;
    }
    
    // ScheduledThreadPoolExecutor.delayedExecute(RunnableScheduledFuture<?>)
    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task); // 任務插入到延時隊列DelayedWorkQueue中
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart(); // 啟動一個線程 
        }
    }
    
    // ScheduledThreadPoolExecutor.DelayedWorkQueue
    public boolean add(Runnable e) {
        return offer(e); // 按時間排序,插入延時隊列(上文分析過了)
    }
    
    // ThreadPoolExecutor.ensurePrestart()
    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            // 線程池啟動一個沒有任務的線程,while循環到延時隊列中取任務,調用DelayedWorkQueue.take()取
            // addWorker(null, true)方法不做詳細介紹,前一篇線程池文章中分析過了
            addWorker(null, true); 
        else if (wc == 0)
            addWorker(null, false);
    }

    // DelayedWorkQueue.take()
    public RunnableScheduledFuture<?> take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                RunnableScheduledFuture<?> first = queue[0];
                if (first == null)
                    available.await(); // 如果沒有任務,就讓線程在available條件下等待。
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return finishPoll(first); // 如果延時時間到了,就返回這個任務,用來執行。
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 如果第一個任務延時時間沒到,就掛起delay時間,到延時時間自動喚醒
                            // 此處是循環,自動喚醒之後再取出任務去執行
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && queue[0] != null)
                available.signal();
            lock.unlock();
        }
    }
    
    // 任務是封裝成ScheduledFutureTask的,任務執行會調用ScheduledFutureTask的 run方法
    public void run() {
        boolean periodic = isPeriodic();
        if (!canRunInCurrentRunState(periodic))
            cancel(false);
        else if (!periodic)
            ScheduledFutureTask.super.run(); // 執行任務
        else if (ScheduledFutureTask.super.runAndReset()) { // 設置下一次循環的任務 
            setNextRunTime();
            reExecutePeriodic(outerTask);
        }
    }
    
    // 循環
    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

 

 

參考資料 / 相關推薦:

【死磕Java併發】—–J.U.C之線程池:ScheduledThreadPoolExecutor

Java優先級隊列DelayedWorkQueue原理分析

 

關鍵詞:任務 if 執行 long 隊列 方法 delay null scheduledfuturetask period

相關推薦:

Java併發(十八):阻塞隊列BlockingQueue

【Java併發】Executor框架

Java併發編程筆記之Timer源碼分析

Java中的阻塞隊列

WARN conf.FlumeConfiguration: Could not configure sink sink1 due to: No channel configured for sink: sink1 org.apache.flume.conf.ConfigurationException: No channel configured for sink: sink1

ScheduledThreadPoolExecutor

Getting NullPointerException when running Spark Code in Zeppelin

(四)juc線程高級特性——線程池 / 線程調度 / ForkJoinPool

如何將Flume kafka source 插件從0.9升級到0.10.1

io.netty.resolver.dns.DnsNameResolverContext