=================
== Time Stream ==
=================
一个小学生

Java中的DelayQueue延时队列

Java

DelayQueue使用和原理学习。

简介

DelayQueue是一个无界阻塞队列,支持延时获取元素。内部使用PriorityQueue优先级队列来存储元素,存储的元素需要实现Delayed接口。

也就是在往DelayQueue中放元素的时候,该元素必须实现Delayed接口,并且可以指定一个需要延迟的时间,等过了一段时间到了我们指定的延迟时间后,可以获取到该元素,继续针对该元素处理。

示例

模拟延迟发送短信功能,发短信的时候可以指定延时多久进行发送。

/**
 * 延迟短信
 */
public class DelayedSms implements Delayed {

    /**
     * 短信内容
     */
    private String sms;

    /**
     * 短信创建时间
     */
    private long createTime;

    /**
     * 短信发送时间
     */
    private long sendTime;

    /**
     * 返回剩余的时间
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        long remaining = unit.convert(sendTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        System.out.println("剩余时间:" + remaining + "ms");
        return remaining;
    }

    /**
     * 延时队列内部比较排序
     * @param other
     * @return
     */
    @Override
    public int compareTo(Delayed other) {
        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), other.getDelay(TimeUnit.MILLISECONDS));
    }

    // ... getter and setter...
    @Override
    public String toString() {...}
}
public class SmsSender {

    private DelayQueue<DelayedSms> queue = new DelayQueue<>();

    public SmsSender() {
        new Thread(new SmsSendTask()).start();
    }

    /**
     * 发送延时短信
     * @param sms 短信内容
     * @param createTime 短信创建时间
     * @param delay 要延长的时间,毫秒
     */
    public void sendDelaySms(String sms, long createTime, long delay) {
        DelayedSms delayedSms = new DelayedSms();
        delayedSms.setSms(sms);
        delayedSms.setCreateTime(createTime);
        delayedSms.setSendTime(createTime + delay);

        queue.offer(delayedSms);
    }

    private class SmsSendTask implements Runnable {

        @Override
        public void run() {
            while (true) {
                try {
                    DelayedSms sms = queue.take();
                    sendSms(sms);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private void sendSms(DelayedSms sms) {
        System.out.println("发送短信:" + sms.getSms() + ",createTime:" + new Date(sms.getCreateTime()) + ",sendTime: " + new Date(sms.getSendTime()) + ",发送时间:" + new Date(System.currentTimeMillis()));
    }
}
public class Client {

    public static void main(String[] args) throws InterruptedException {
        SmsSender smsSender = new SmsSender();

        long createTime = System.currentTimeMillis();
        // 延时20s发送
        smsSender.sendDelaySms("短信1", createTime, 20 * 1000);

        // 延时30秒发送
        smsSender.sendDelaySms("短信2", createTime, 30 * 1000);

        // 延时10秒发送
        smsSender.sendDelaySms("短信3", createTime, 10 * 1000);

    }
}

解析

延时队列重点:阻塞队列、PriorityQueue、Delayed。延时队列是使用优先级队列来实现的一个阻塞队列,队列中存放的是实现接口Delayed的元素,优先队列比较是根据指定的延时时间。

阻塞队列

延时队列是阻塞队列,获取元素的时候如果没有元素到期,获取元素的线程会被阻塞。

Delayed

延时队列中存放的元素必须实现Delayed接口,该接口定义了一个getDelay方法,是我们必须要实现的方法,该方法用来返回元素到过期时间还剩余多少时间。

Delayed还继承了接口Comparable,compareTo方法也是我们必须要实现的,该方法用来确定元素在PriorityQueue中的顺序。

PriorityQueue

延时队列内部使用PriorityQueue实现,PriorityQueue中存放的元素必须实现Delayed接口,根据元素的compareTo方法来确定元素顺序

源码

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    /**
     * 重入锁,保证队列操作的安全
     */
    private final transient ReentrantLock lock = new ReentrantLock();

    /**
     * 优先级队列,用来存储元素,可以根据我们在compareTo方法中指定的算法来排序我们的元素
     */
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    /**
     * leader指向第一个从队列获取元素时阻塞等待的线程,用来优化内部阻塞通知,
     * 减少其他线程不必要的等待时间
     *
     * 采用的是Leader-Follower模式
     */
    private Thread leader = null;

    /**
     * 队列头部有可用新元素或者新线程需要成为新的leader时需要被通知
     */
    private final Condition available = lock.newCondition();

   
    public boolean offer(E e) {
        // 添加元素进队列,先加锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 使用优先级队列,添加元素进队列,优先级队列会对元素按照指定规则排序
            q.offer(e);
            // 添加完元素后,获取优先级队列的队头元素
            // 如果刚添加的元素在队列头部,说明刚添加的元素就是要到期的元素
            if (q.peek() == e) {
                // leader置为null
                leader = null;
                // 唤醒阻塞在等待队列的线程
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        // 出队列前先加锁
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                // 获取队列头的元素
                E first = q.peek();
                //队列中没有元素,
                if (first == null)
                    // 当前线程等待
                    available.await();
                else {
                    // 获取到了队列头元素
                    // 查看下我们定义元素时设置的延时时间规则,看是否到期
                    long delay = first.getDelay(NANOSECONDS);
                    // 队列头元素已经过期,直接返回队列头元素
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    // 走到这里说明队列中有元素,但都还没到过期时间
                    // 如果leader存在,说明有其他的线程已经调用了take获取
                    if (leader != null)
                        // 当前线程挂起等待
                        available.await();
                    else {
                        // leader为空,将当前线程变成leader
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 上面已经有了剩余的时间,当前线程就可以直接挂起等待这剩余的一段时间
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // leader处理完,返回了需要的元素,这里要唤醒其他的follower
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
}